• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

qubit-ltd / rs-concurrent / 5c5edb16-d6df-4ce4-9650-1c77ea61b219

21 Apr 2026 11:36PM UTC coverage: 99.401% (+0.4%) from 98.978%
5c5edb16-d6df-4ce4-9650-1c77ea61b219

push

circleci

Haixing-Hu
test(concurrent): expand lock, task, and double-checked coverage

1659 of 1669 relevant lines covered (99.4%)

43.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.35
/src/task/service/thread_pool.rs
1
/*******************************************************************************
2
 *
3
 *    Copyright (c) 2025 - 2026.
4
 *    Haixing Hu, Qubit Co. Ltd.
5
 *
6
 *    All rights reserved.
7
 *
8
 ******************************************************************************/
9
use std::{
10
    collections::VecDeque,
11
    future::Future,
12
    io,
13
    pin::Pin,
14
    sync::{
15
        Arc,
16
        Condvar,
17
        Mutex,
18
        MutexGuard,
19
    },
20
    thread,
21
    time::Duration,
22
};
23

24
use qubit_function::Callable;
25
use thiserror::Error;
26

27
use crate::task::{
28
    TaskCompletion,
29
    TaskHandle,
30
    task_runner::run_callable,
31
};
32

33
use super::{
34
    ExecutorService,
35
    RejectedExecution,
36
    ShutdownReport,
37
};
38

39
/// Default thread name prefix used by [`ThreadPoolBuilder`].
40
const DEFAULT_THREAD_NAME_PREFIX: &str = "qubit-thread-pool";
41

42
/// Default idle lifetime for workers above the core pool size.
43
const DEFAULT_KEEP_ALIVE: Duration = Duration::from_secs(60);
44

45
/// OS thread pool implementing [`ExecutorService`].
46
///
47
/// `ThreadPool` accepts fallible tasks, stores them in an internal FIFO queue,
48
/// and executes them on worker threads. Workers are created lazily up to the
49
/// configured core size, queued after that, and may grow up to the maximum size
50
/// when a bounded queue is full. Submitted tasks return [`TaskHandle`], which
51
/// supports both blocking [`TaskHandle::get`] and async `.await` result
52
/// retrieval.
53
///
54
/// `shutdown` is graceful: already accepted queued tasks are allowed to run.
55
/// `shutdown_now` is abrupt: queued tasks that have not started are completed
56
/// with [`TaskExecutionError::Cancelled`](crate::task::TaskExecutionError::Cancelled).
57
///
58
/// # Author
59
///
60
/// Haixing Hu
61
pub struct ThreadPool {
62
    inner: Arc<ThreadPoolInner>,
63
}
64

65
impl ThreadPool {
66
    /// Creates a thread pool with equal core and maximum worker counts.
67
    ///
68
    /// # Parameters
69
    ///
70
    /// * `worker_count` - Core and maximum worker count for this pool.
71
    ///
72
    /// # Returns
73
    ///
74
    /// `Ok(ThreadPool)` if all workers are spawned successfully. Returns
75
    /// [`ThreadPoolBuildError`] if the configuration is invalid or a worker
76
    /// thread cannot be spawned.
77
    #[inline]
78
    pub fn new(worker_count: usize) -> Result<Self, ThreadPoolBuildError> {
9✔
79
        Self::builder().worker_count(worker_count).build()
9✔
80
    }
9✔
81

82
    /// Creates a builder for configuring a thread pool.
83
    ///
84
    /// # Returns
85
    ///
86
    /// A builder with default worker count and an unbounded queue.
87
    #[inline]
88
    pub fn builder() -> ThreadPoolBuilder {
26✔
89
        ThreadPoolBuilder::default()
26✔
90
    }
26✔
91

92
    /// Returns the number of queued tasks waiting for a worker.
93
    ///
94
    /// # Returns
95
    ///
96
    /// The number of accepted tasks that have not started yet.
97
    #[inline]
98
    pub fn queued_count(&self) -> usize {
2✔
99
        self.inner.lock_state().queue.len()
2✔
100
    }
2✔
101

102
    /// Returns the number of tasks currently held by workers.
103
    ///
104
    /// # Returns
105
    ///
106
    /// The number of tasks that workers have taken from the queue and have not
107
    /// yet finished processing.
108
    #[inline]
109
    pub fn running_count(&self) -> usize {
1✔
110
        self.inner.lock_state().running_tasks
1✔
111
    }
1✔
112

113
    /// Returns the number of worker threads that have not exited.
114
    ///
115
    /// # Returns
116
    ///
117
    /// The number of live worker loops still owned by this pool.
118
    #[inline]
119
    pub fn worker_count(&self) -> usize {
17✔
120
        self.inner.lock_state().live_workers
17✔
121
    }
17✔
122

123
    /// Returns the configured core pool size.
124
    ///
125
    /// # Returns
126
    ///
127
    /// The number of workers kept for normal load before tasks are queued.
128
    #[inline]
129
    pub fn core_pool_size(&self) -> usize {
2✔
130
        self.inner.lock_state().core_pool_size
2✔
131
    }
2✔
132

133
    /// Returns the configured maximum pool size.
134
    ///
135
    /// # Returns
136
    ///
137
    /// The maximum number of worker threads this pool may create.
138
    #[inline]
139
    pub fn maximum_pool_size(&self) -> usize {
2✔
140
        self.inner.lock_state().maximum_pool_size
2✔
141
    }
2✔
142

143
    /// Returns a point-in-time snapshot of pool counters.
144
    ///
145
    /// # Returns
146
    ///
147
    /// A snapshot containing worker, queue, and task counters observed under
148
    /// the pool state lock.
149
    #[inline]
150
    pub fn stats(&self) -> ThreadPoolStats {
8✔
151
        self.inner.stats()
8✔
152
    }
8✔
153

154
    /// Starts one core worker if the pool has fewer live workers than its
155
    /// configured core size.
156
    ///
157
    /// # Returns
158
    ///
159
    /// `Ok(true)` if a worker was started, `Ok(false)` if no core worker was
160
    /// needed, or `Err(RejectedExecution)` if the pool is shut down or worker
161
    /// creation fails.
162
    #[inline]
163
    pub fn prestart_core_thread(&self) -> Result<bool, RejectedExecution> {
3✔
164
        self.inner.prestart_core_thread()
3✔
165
    }
3✔
166

167
    /// Starts all missing core workers.
168
    ///
169
    /// # Returns
170
    ///
171
    /// The number of workers started, or `Err(RejectedExecution)` if the pool
172
    /// is shut down or worker creation fails.
173
    #[inline]
174
    pub fn prestart_all_core_threads(&self) -> Result<usize, RejectedExecution> {
3✔
175
        self.inner.prestart_all_core_threads()
3✔
176
    }
3✔
177

178
    /// Updates the core pool size.
179
    ///
180
    /// Increasing the core size does not eagerly create new workers unless
181
    /// queued work is waiting. Call [`Self::prestart_all_core_threads`] when
182
    /// eager creation is desired. Decreasing the core size lets excess idle
183
    /// workers retire according to the keep-alive policy.
184
    ///
185
    /// # Parameters
186
    ///
187
    /// * `core_pool_size` - New core pool size.
188
    ///
189
    /// # Returns
190
    ///
191
    /// `Ok(())` if the size is accepted. Returns [`ThreadPoolBuildError`] when
192
    /// the new core size would exceed the maximum size.
193
    pub fn set_core_pool_size(&self, core_pool_size: usize) -> Result<(), ThreadPoolBuildError> {
3✔
194
        self.inner.set_core_pool_size(core_pool_size)
3✔
195
    }
3✔
196

197
    /// Updates the maximum pool size.
198
    ///
199
    /// Excess workers are not interrupted. They retire after finishing current
200
    /// work or timing out while idle.
201
    ///
202
    /// # Parameters
203
    ///
204
    /// * `maximum_pool_size` - New maximum pool size.
205
    ///
206
    /// # Returns
207
    ///
208
    /// `Ok(())` if the size is accepted. Returns [`ThreadPoolBuildError`] when
209
    /// the maximum size is zero or smaller than the core size.
210
    pub fn set_maximum_pool_size(
4✔
211
        &self,
4✔
212
        maximum_pool_size: usize,
4✔
213
    ) -> Result<(), ThreadPoolBuildError> {
4✔
214
        self.inner.set_maximum_pool_size(maximum_pool_size)
4✔
215
    }
4✔
216

217
    /// Updates how long excess idle workers may wait before exiting.
218
    ///
219
    /// # Parameters
220
    ///
221
    /// * `keep_alive` - New idle timeout for workers above the core size.
222
    ///
223
    /// # Returns
224
    ///
225
    /// `Ok(())` if the timeout is accepted. Returns [`ThreadPoolBuildError`]
226
    /// when the duration is zero.
227
    pub fn set_keep_alive(&self, keep_alive: Duration) -> Result<(), ThreadPoolBuildError> {
2✔
228
        self.inner.set_keep_alive(keep_alive)
2✔
229
    }
2✔
230

231
    /// Updates whether core workers may also retire after keep-alive timeout.
232
    ///
233
    /// # Parameters
234
    ///
235
    /// * `allow` - Whether core workers are subject to idle timeout.
236
    pub fn allow_core_thread_timeout(&self, allow: bool) {
1✔
237
        self.inner.allow_core_thread_timeout(allow);
1✔
238
    }
1✔
239

240
    /// Submits an already type-erased pool job.
241
    ///
242
    /// This low-level hook is intended for higher-level service crates that
243
    /// need to attach their own lifecycle callbacks while still using this
244
    /// pool's queueing, cancellation, and shutdown behavior.
245
    pub fn submit_job(&self, job: PoolJob) -> Result<(), RejectedExecution> {
1✔
246
        self.inner.submit(job)
1✔
247
    }
1✔
248
}
249

250
impl Drop for ThreadPool {
251
    /// Requests graceful shutdown when the pool value is dropped.
252
    fn drop(&mut self) {
20✔
253
        self.inner.shutdown();
20✔
254
    }
20✔
255
}
256

257
impl ExecutorService for ThreadPool {
258
    type Handle<R, E>
259
        = TaskHandle<R, E>
260
    where
261
        R: Send + 'static,
262
        E: Send + 'static;
263

264
    type Termination<'a>
265
        = Pin<Box<dyn Future<Output = ()> + Send + 'a>>
266
    where
267
        Self: 'a;
268

269
    /// Accepts a callable and queues it for pool workers.
270
    fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::Handle<R, E>, RejectedExecution>
25✔
271
    where
25✔
272
        C: Callable<R, E> + Send + 'static,
25✔
273
        R: Send + 'static,
25✔
274
        E: Send + 'static,
25✔
275
    {
276
        let (handle, completion) = TaskHandle::completion_pair();
25✔
277
        let completion_for_run = completion.clone();
25✔
278
        let job = PoolJob::new(
25✔
279
            Box::new(move || run_task(task, completion_for_run)),
25✔
280
            Box::new(move || {
25✔
281
                completion.cancel();
2✔
282
            }),
2✔
283
        );
284
        self.inner.submit(job)?;
25✔
285
        Ok(handle)
19✔
286
    }
25✔
287

288
    /// Stops accepting new tasks after already queued work is drained.
289
    #[inline]
290
    fn shutdown(&self) {
19✔
291
        self.inner.shutdown();
19✔
292
    }
19✔
293

294
    /// Stops accepting tasks and cancels queued tasks that have not started.
295
    #[inline]
296
    fn shutdown_now(&self) -> ShutdownReport {
2✔
297
        self.inner.shutdown_now()
2✔
298
    }
2✔
299

300
    /// Returns whether shutdown has been requested.
301
    #[inline]
302
    fn is_shutdown(&self) -> bool {
1✔
303
        self.inner.is_shutdown()
1✔
304
    }
1✔
305

306
    /// Returns whether shutdown was requested and all workers have exited.
307
    #[inline]
308
    fn is_terminated(&self) -> bool {
5✔
309
        self.inner.is_terminated()
5✔
310
    }
5✔
311

312
    /// Waits until the pool has terminated.
313
    ///
314
    /// This future blocks the polling thread while waiting on a condition
315
    /// variable.
316
    fn await_termination(&self) -> Self::Termination<'_> {
20✔
317
        Box::pin(async move {
20✔
318
            self.inner.wait_for_termination();
20✔
319
        })
20✔
320
    }
20✔
321
}
322

323
/// Builder for [`ThreadPool`].
324
///
325
/// The default builder uses the available CPU parallelism as worker count and
326
/// an unbounded FIFO queue.
327
///
328
/// # Author
329
///
330
/// Haixing Hu
331
#[derive(Debug, Clone)]
332
pub struct ThreadPoolBuilder {
333
    core_pool_size: usize,
334
    maximum_pool_size: usize,
335
    queue_capacity: Option<usize>,
336
    thread_name_prefix: String,
337
    stack_size: Option<usize>,
338
    keep_alive: Duration,
339
    allow_core_thread_timeout: bool,
340
    prestart_core_threads: bool,
341
}
342

343
impl ThreadPoolBuilder {
344
    /// Sets the number of worker threads.
345
    ///
346
    /// # Parameters
347
    ///
348
    /// * `worker_count` - Core and maximum worker count for this pool.
349
    ///
350
    /// # Returns
351
    ///
352
    /// This builder for fluent configuration.
353
    #[inline]
354
    pub fn worker_count(mut self, worker_count: usize) -> Self {
17✔
355
        self.core_pool_size = worker_count;
17✔
356
        self.maximum_pool_size = worker_count;
17✔
357
        self
17✔
358
    }
17✔
359

360
    /// Sets the core pool size.
361
    ///
362
    /// A submitted task creates a new worker while the live worker count is
363
    /// below this value. Once the core size is reached, tasks are queued before
364
    /// the pool considers growing toward the maximum size.
365
    ///
366
    /// # Parameters
367
    ///
368
    /// * `core_pool_size` - Number of core workers.
369
    ///
370
    /// # Returns
371
    ///
372
    /// This builder for fluent configuration.
373
    #[inline]
374
    pub fn core_pool_size(mut self, core_pool_size: usize) -> Self {
6✔
375
        self.core_pool_size = core_pool_size;
6✔
376
        self
6✔
377
    }
6✔
378

379
    /// Sets the maximum pool size.
380
    ///
381
    /// The pool grows above the core size only when the queue cannot accept a
382
    /// submitted task.
383
    ///
384
    /// # Parameters
385
    ///
386
    /// * `maximum_pool_size` - Maximum number of live workers.
387
    ///
388
    /// # Returns
389
    ///
390
    /// This builder for fluent configuration.
391
    #[inline]
392
    pub fn maximum_pool_size(mut self, maximum_pool_size: usize) -> Self {
6✔
393
        self.maximum_pool_size = maximum_pool_size;
6✔
394
        self
6✔
395
    }
6✔
396

397
    /// Sets a bounded queue capacity.
398
    ///
399
    /// The capacity counts only tasks waiting in the queue. Tasks already held
400
    /// by worker threads are not included.
401
    ///
402
    /// # Parameters
403
    ///
404
    /// * `capacity` - Maximum number of queued tasks.
405
    ///
406
    /// # Returns
407
    ///
408
    /// This builder for fluent configuration.
409
    #[inline]
410
    pub fn queue_capacity(mut self, capacity: usize) -> Self {
7✔
411
        self.queue_capacity = Some(capacity);
7✔
412
        self
7✔
413
    }
7✔
414

415
    /// Uses an unbounded queue.
416
    ///
417
    /// # Returns
418
    ///
419
    /// This builder for fluent configuration.
420
    #[inline]
421
    pub fn unbounded_queue(mut self) -> Self {
1✔
422
        self.queue_capacity = None;
1✔
423
        self
1✔
424
    }
1✔
425

426
    /// Sets the worker thread name prefix.
427
    ///
428
    /// Worker names are created by appending the worker index to this prefix.
429
    ///
430
    /// # Parameters
431
    ///
432
    /// * `prefix` - Prefix for worker thread names.
433
    ///
434
    /// # Returns
435
    ///
436
    /// This builder for fluent configuration.
437
    #[inline]
438
    pub fn thread_name_prefix(mut self, prefix: &str) -> Self {
1✔
439
        self.thread_name_prefix = prefix.to_owned();
1✔
440
        self
1✔
441
    }
1✔
442

443
    /// Sets the worker thread stack size.
444
    ///
445
    /// # Parameters
446
    ///
447
    /// * `stack_size` - Stack size in bytes for each worker thread.
448
    ///
449
    /// # Returns
450
    ///
451
    /// This builder for fluent configuration.
452
    #[inline]
453
    pub fn stack_size(mut self, stack_size: usize) -> Self {
5✔
454
        self.stack_size = Some(stack_size);
5✔
455
        self
5✔
456
    }
5✔
457

458
    /// Sets the idle timeout for workers above the core pool size.
459
    ///
460
    /// # Parameters
461
    ///
462
    /// * `keep_alive` - Duration an excess worker may stay idle.
463
    ///
464
    /// # Returns
465
    ///
466
    /// This builder for fluent configuration.
467
    #[inline]
468
    pub fn keep_alive(mut self, keep_alive: Duration) -> Self {
5✔
469
        self.keep_alive = keep_alive;
5✔
470
        self
5✔
471
    }
5✔
472

473
    /// Allows core workers to retire after the keep-alive timeout.
474
    ///
475
    /// # Parameters
476
    ///
477
    /// * `allow` - Whether idle core workers may time out.
478
    ///
479
    /// # Returns
480
    ///
481
    /// This builder for fluent configuration.
482
    #[inline]
483
    pub fn allow_core_thread_timeout(mut self, allow: bool) -> Self {
2✔
484
        self.allow_core_thread_timeout = allow;
2✔
485
        self
2✔
486
    }
2✔
487

488
    /// Starts all core workers during [`Self::build`].
489
    ///
490
    /// Without this option, workers are created lazily as tasks are submitted,
491
    /// matching the default JDK `ThreadPoolExecutor` behavior.
492
    ///
493
    /// # Returns
494
    ///
495
    /// This builder for fluent configuration.
496
    #[inline]
497
    pub fn prestart_core_threads(mut self) -> Self {
3✔
498
        self.prestart_core_threads = true;
3✔
499
        self
3✔
500
    }
3✔
501

502
    /// Builds the configured thread pool.
503
    ///
504
    /// # Returns
505
    ///
506
    /// `Ok(ThreadPool)` if all workers are spawned successfully. Returns
507
    /// [`ThreadPoolBuildError`] if the configuration is invalid or a worker
508
    /// thread cannot be spawned.
509
    pub fn build(self) -> Result<ThreadPool, ThreadPoolBuildError> {
26✔
510
        self.validate()?;
26✔
511
        let prestart_core_threads = self.prestart_core_threads;
21✔
512
        let inner = Arc::new(ThreadPoolInner::new(ThreadPoolConfig {
21✔
513
            core_pool_size: self.core_pool_size,
21✔
514
            maximum_pool_size: self.maximum_pool_size,
21✔
515
            queue_capacity: self.queue_capacity,
21✔
516
            thread_name_prefix: self.thread_name_prefix,
21✔
517
            stack_size: self.stack_size,
21✔
518
            keep_alive: self.keep_alive,
21✔
519
            allow_core_thread_timeout: self.allow_core_thread_timeout,
21✔
520
        }));
21✔
521
        if prestart_core_threads {
21✔
522
            inner
3✔
523
                .prestart_all_core_threads()
3✔
524
                .map_err(ThreadPoolBuildError::from_rejected_execution)?;
3✔
525
        }
18✔
526
        Ok(ThreadPool { inner })
20✔
527
    }
26✔
528

529
    /// Validates this builder configuration.
530
    fn validate(&self) -> Result<(), ThreadPoolBuildError> {
26✔
531
        if self.maximum_pool_size == 0 {
26✔
532
            return Err(ThreadPoolBuildError::ZeroMaximumPoolSize);
1✔
533
        }
25✔
534
        if self.core_pool_size > self.maximum_pool_size {
25✔
535
            return Err(ThreadPoolBuildError::CorePoolSizeExceedsMaximum {
1✔
536
                core_pool_size: self.core_pool_size,
1✔
537
                maximum_pool_size: self.maximum_pool_size,
1✔
538
            });
1✔
539
        }
24✔
540
        if self.queue_capacity == Some(0) {
24✔
541
            return Err(ThreadPoolBuildError::ZeroQueueCapacity);
1✔
542
        }
23✔
543
        if self.stack_size == Some(0) {
23✔
544
            return Err(ThreadPoolBuildError::ZeroStackSize);
1✔
545
        }
22✔
546
        if self.keep_alive.is_zero() {
22✔
547
            return Err(ThreadPoolBuildError::ZeroKeepAlive);
1✔
548
        }
21✔
549
        Ok(())
21✔
550
    }
26✔
551
}
552

553
impl Default for ThreadPoolBuilder {
554
    /// Creates a builder with CPU parallelism defaults.
555
    fn default() -> Self {
26✔
556
        let worker_count = default_worker_count();
26✔
557
        Self {
26✔
558
            core_pool_size: worker_count,
26✔
559
            maximum_pool_size: worker_count,
26✔
560
            queue_capacity: None,
26✔
561
            thread_name_prefix: DEFAULT_THREAD_NAME_PREFIX.to_owned(),
26✔
562
            stack_size: None,
26✔
563
            keep_alive: DEFAULT_KEEP_ALIVE,
26✔
564
            allow_core_thread_timeout: false,
26✔
565
            prestart_core_threads: false,
26✔
566
        }
26✔
567
    }
26✔
568
}
569

570
/// Error returned when a [`ThreadPool`] cannot be built.
571
///
572
/// # Author
573
///
574
/// Haixing Hu
575
#[derive(Debug, Error)]
576
pub enum ThreadPoolBuildError {
577
    /// The configured maximum pool size is zero.
578
    #[error("thread pool maximum pool size must be greater than zero")]
579
    ZeroMaximumPoolSize,
580

581
    /// The configured core pool size is greater than the maximum pool size.
582
    #[error(
583
        "thread pool core pool size {core_pool_size} exceeds maximum pool size {maximum_pool_size}"
584
    )]
585
    CorePoolSizeExceedsMaximum {
586
        /// Configured core pool size.
587
        core_pool_size: usize,
588

589
        /// Configured maximum pool size.
590
        maximum_pool_size: usize,
591
    },
592

593
    /// The configured bounded queue capacity is zero.
594
    #[error("thread pool queue capacity must be greater than zero")]
595
    ZeroQueueCapacity,
596

597
    /// The configured worker stack size is zero.
598
    #[error("thread pool stack size must be greater than zero")]
599
    ZeroStackSize,
600

601
    /// The configured keep-alive timeout is zero.
602
    #[error("thread pool keep-alive timeout must be greater than zero")]
603
    ZeroKeepAlive,
604

605
    /// A worker thread could not be spawned.
606
    #[error("failed to spawn thread pool worker {index}: {source}")]
607
    SpawnWorker {
608
        /// Index of the worker that failed to spawn.
609
        index: usize,
610

611
        /// I/O error reported by [`std::thread::Builder::spawn`].
612
        source: io::Error,
613
    },
614
}
615

616
impl ThreadPoolBuildError {
617
    /// Converts a runtime worker-spawn rejection into a build error.
618
    fn from_rejected_execution(error: RejectedExecution) -> Self {
1✔
619
        match error {
1✔
620
            RejectedExecution::WorkerSpawnFailed { source } => Self::SpawnWorker {
1✔
621
                index: 0,
1✔
622
                source: io::Error::new(source.kind(), source.to_string()),
1✔
623
            },
1✔
624
            RejectedExecution::Shutdown => Self::SpawnWorker {
×
625
                index: 0,
×
626
                source: io::Error::other("thread pool shut down during prestart"),
×
627
            },
×
628
            RejectedExecution::Saturated => Self::SpawnWorker {
×
629
                index: 0,
×
630
                source: io::Error::other("thread pool saturated during prestart"),
×
631
            },
×
632
        }
633
    }
1✔
634
}
635

636
/// Point-in-time counters reported by [`ThreadPool`].
637
///
638
/// The snapshot is intended for monitoring and tests. It is not a stable
639
/// synchronization primitive; concurrent submissions and completions may make
640
/// the next snapshot different immediately after this one is returned.
641
///
642
/// # Author
643
///
644
/// Haixing Hu
645
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
646
pub struct ThreadPoolStats {
647
    /// Configured core pool size.
648
    pub core_pool_size: usize,
649

650
    /// Configured maximum pool size.
651
    pub maximum_pool_size: usize,
652

653
    /// Number of live worker loops.
654
    pub live_workers: usize,
655

656
    /// Number of workers currently waiting for work.
657
    pub idle_workers: usize,
658

659
    /// Number of queued tasks waiting for a worker.
660
    pub queued_tasks: usize,
661

662
    /// Number of tasks currently held by workers.
663
    pub running_tasks: usize,
664

665
    /// Number of tasks accepted since pool creation.
666
    pub submitted_tasks: usize,
667

668
    /// Number of worker-held jobs finished since pool creation.
669
    pub completed_tasks: usize,
670

671
    /// Number of queued jobs cancelled by immediate shutdown.
672
    pub cancelled_tasks: usize,
673

674
    /// Whether shutdown has been requested.
675
    pub shutdown: bool,
676

677
    /// Whether the pool has fully terminated.
678
    pub terminated: bool,
679
}
680

681
/// Immutable and initial mutable configuration used by a thread pool.
682
struct ThreadPoolConfig {
683
    core_pool_size: usize,
684
    maximum_pool_size: usize,
685
    queue_capacity: Option<usize>,
686
    thread_name_prefix: String,
687
    stack_size: Option<usize>,
688
    keep_alive: Duration,
689
    allow_core_thread_timeout: bool,
690
}
691

692
/// Shared state for a thread pool.
693
struct ThreadPoolInner {
694
    state: Mutex<ThreadPoolState>,
695
    available: Condvar,
696
    terminated: Condvar,
697
    thread_name_prefix: String,
698
    stack_size: Option<usize>,
699
}
700

701
impl ThreadPoolInner {
702
    /// Creates shared state for a thread pool.
703
    fn new(config: ThreadPoolConfig) -> Self {
21✔
704
        Self {
21✔
705
            state: Mutex::new(ThreadPoolState {
21✔
706
                lifecycle: ThreadPoolLifecycle::Running,
21✔
707
                queue: VecDeque::new(),
21✔
708
                queue_capacity: config.queue_capacity,
21✔
709
                running_tasks: 0,
21✔
710
                live_workers: 0,
21✔
711
                idle_workers: 0,
21✔
712
                submitted_tasks: 0,
21✔
713
                completed_tasks: 0,
21✔
714
                cancelled_tasks: 0,
21✔
715
                core_pool_size: config.core_pool_size,
21✔
716
                maximum_pool_size: config.maximum_pool_size,
21✔
717
                keep_alive: config.keep_alive,
21✔
718
                allow_core_thread_timeout: config.allow_core_thread_timeout,
21✔
719
                next_worker_index: 0,
21✔
720
            }),
21✔
721
            available: Condvar::new(),
21✔
722
            terminated: Condvar::new(),
21✔
723
            thread_name_prefix: config.thread_name_prefix,
21✔
724
            stack_size: config.stack_size,
21✔
725
        }
21✔
726
    }
21✔
727

728
    /// Acquires the pool state while tolerating poisoned locks.
729
    fn lock_state(&self) -> MutexGuard<'_, ThreadPoolState> {
193✔
730
        self.state
193✔
731
            .lock()
193✔
732
            .unwrap_or_else(std::sync::PoisonError::into_inner)
193✔
733
    }
193✔
734

735
    /// Submits a job into the queue.
736
    fn submit(self: &Arc<Self>, job: PoolJob) -> Result<(), RejectedExecution> {
26✔
737
        let mut state = self.lock_state();
26✔
738
        if !state.lifecycle.is_running() {
26✔
739
            return Err(RejectedExecution::Shutdown);
2✔
740
        }
24✔
741
        if state.live_workers < state.core_pool_size {
24✔
742
            self.spawn_worker_locked(&mut state, Some(job))?;
12✔
743
            state.submitted_tasks += 1;
11✔
744
            return Ok(());
11✔
745
        }
12✔
746
        if !state.is_saturated() {
12✔
747
            state.queue.push_back(job);
8✔
748
            state.submitted_tasks += 1;
8✔
749
            if state.live_workers == 0
8✔
750
                && let Err(error) = self.spawn_worker_locked(&mut state, None)
2✔
751
            {
752
                if let Some(job) = state.queue.pop_back() {
1✔
753
                    state.submitted_tasks = state
1✔
754
                        .submitted_tasks
1✔
755
                        .checked_sub(1)
1✔
756
                        .expect("thread pool submitted task counter underflow");
1✔
757
                    drop(state);
1✔
758
                    job.cancel();
1✔
759
                }
1✔
760
                return Err(error);
1✔
761
            }
7✔
762
            self.available.notify_one();
7✔
763
            return Ok(());
7✔
764
        }
4✔
765
        if state.live_workers < state.maximum_pool_size {
4✔
766
            self.spawn_worker_locked(&mut state, Some(job))?;
2✔
767
            state.submitted_tasks += 1;
2✔
768
            Ok(())
2✔
769
        } else {
770
            Err(RejectedExecution::Saturated)
2✔
771
        }
772
    }
26✔
773

774
    /// Starts one missing core worker.
775
    fn prestart_core_thread(self: &Arc<Self>) -> Result<bool, RejectedExecution> {
14✔
776
        let mut state = self.lock_state();
14✔
777
        if !state.lifecycle.is_running() {
14✔
778
            return Err(RejectedExecution::Shutdown);
2✔
779
        }
12✔
780
        if state.live_workers >= state.core_pool_size {
12✔
781
            return Ok(false);
5✔
782
        }
7✔
783
        self.spawn_worker_locked(&mut state, None)?;
7✔
784
        Ok(true)
6✔
785
    }
14✔
786

787
    /// Starts all missing core workers.
788
    fn prestart_all_core_threads(self: &Arc<Self>) -> Result<usize, RejectedExecution> {
6✔
789
        let mut started = 0;
6✔
790
        while self.prestart_core_thread()? {
11✔
791
            started += 1;
5✔
792
        }
5✔
793
        Ok(started)
4✔
794
    }
6✔
795

796
    /// Spawns a worker while the caller holds the pool state lock.
797
    fn spawn_worker_locked(
23✔
798
        self: &Arc<Self>,
23✔
799
        state: &mut ThreadPoolState,
23✔
800
        first_task: Option<PoolJob>,
23✔
801
    ) -> Result<(), RejectedExecution> {
23✔
802
        let index = state.next_worker_index;
23✔
803
        state.next_worker_index += 1;
23✔
804
        state.live_workers += 1;
23✔
805
        if first_task.is_some() {
23✔
806
            state.running_tasks += 1;
14✔
807
        }
14✔
808

809
        let worker_inner = Arc::clone(self);
23✔
810
        let mut builder =
23✔
811
            thread::Builder::new().name(format!("{}-{index}", self.thread_name_prefix));
23✔
812
        if let Some(stack_size) = self.stack_size {
23✔
813
            builder = builder.stack_size(stack_size);
4✔
814
        }
19✔
815
        match builder.spawn(move || run_worker(worker_inner, first_task)) {
23✔
816
            Ok(_) => Ok(()),
20✔
817
            Err(source) => {
3✔
818
                state.live_workers = state
3✔
819
                    .live_workers
3✔
820
                    .checked_sub(1)
3✔
821
                    .expect("thread pool live worker counter underflow");
3✔
822
                if state.running_tasks > 0 {
3✔
823
                    state.running_tasks -= 1;
1✔
824
                }
2✔
825
                self.notify_if_terminated(state);
3✔
826
                Err(RejectedExecution::WorkerSpawnFailed {
3✔
827
                    source: Arc::new(source),
3✔
828
                })
3✔
829
            }
830
        }
831
    }
23✔
832

833
    /// Requests graceful shutdown.
834
    fn shutdown(&self) {
39✔
835
        let mut state = self.lock_state();
39✔
836
        if state.lifecycle.is_running() {
39✔
837
            state.lifecycle = ThreadPoolLifecycle::Shutdown;
19✔
838
        }
20✔
839
        self.available.notify_all();
39✔
840
        self.notify_if_terminated(&state);
39✔
841
    }
39✔
842

843
    /// Requests abrupt shutdown and cancels queued jobs.
844
    fn shutdown_now(&self) -> ShutdownReport {
2✔
845
        let (jobs, report) = {
2✔
846
            let mut state = self.lock_state();
2✔
847
            if state.lifecycle.is_running() || state.lifecycle.is_shutdown() {
2✔
848
                state.lifecycle = ThreadPoolLifecycle::Stopping;
2✔
849
            }
2✔
850
            let queued = state.queue.len();
2✔
851
            let running = state.running_tasks;
2✔
852
            let jobs = state.queue.drain(..).collect::<Vec<_>>();
2✔
853
            state.cancelled_tasks += queued;
2✔
854
            self.available.notify_all();
2✔
855
            self.notify_if_terminated(&state);
2✔
856
            (jobs, ShutdownReport::new(queued, running, queued))
2✔
857
        };
858
        for job in jobs {
2✔
859
            job.cancel();
1✔
860
        }
1✔
861
        report
2✔
862
    }
2✔
863

864
    /// Returns whether shutdown has been requested.
865
    fn is_shutdown(&self) -> bool {
1✔
866
        !self.lock_state().lifecycle.is_running()
1✔
867
    }
1✔
868

869
    /// Returns whether the pool is fully terminated.
870
    fn is_terminated(&self) -> bool {
5✔
871
        self.lock_state().is_terminated()
5✔
872
    }
5✔
873

874
    /// Blocks the current thread until this pool is terminated.
875
    fn wait_for_termination(&self) {
20✔
876
        let mut state = self.lock_state();
20✔
877
        while !state.is_terminated() {
21✔
878
            state = self
1✔
879
                .terminated
1✔
880
                .wait(state)
1✔
881
                .unwrap_or_else(std::sync::PoisonError::into_inner);
1✔
882
        }
1✔
883
    }
20✔
884

885
    /// Returns a point-in-time pool snapshot.
886
    fn stats(&self) -> ThreadPoolStats {
8✔
887
        let state = self.lock_state();
8✔
888
        ThreadPoolStats {
8✔
889
            core_pool_size: state.core_pool_size,
8✔
890
            maximum_pool_size: state.maximum_pool_size,
8✔
891
            live_workers: state.live_workers,
8✔
892
            idle_workers: state.idle_workers,
8✔
893
            queued_tasks: state.queue.len(),
8✔
894
            running_tasks: state.running_tasks,
8✔
895
            submitted_tasks: state.submitted_tasks,
8✔
896
            completed_tasks: state.completed_tasks,
8✔
897
            cancelled_tasks: state.cancelled_tasks,
8✔
898
            shutdown: !state.lifecycle.is_running(),
8✔
899
            terminated: state.is_terminated(),
8✔
900
        }
8✔
901
    }
8✔
902

903
    /// Updates the core pool size.
904
    fn set_core_pool_size(
3✔
905
        self: &Arc<Self>,
3✔
906
        core_pool_size: usize,
3✔
907
    ) -> Result<(), ThreadPoolBuildError> {
3✔
908
        let mut state = self.lock_state();
3✔
909
        if core_pool_size > state.maximum_pool_size {
3✔
910
            return Err(ThreadPoolBuildError::CorePoolSizeExceedsMaximum {
1✔
911
                core_pool_size,
1✔
912
                maximum_pool_size: state.maximum_pool_size,
1✔
913
            });
1✔
914
        }
2✔
915
        state.core_pool_size = core_pool_size;
2✔
916
        self.available.notify_all();
2✔
917
        Ok(())
2✔
918
    }
3✔
919

920
    /// Updates the maximum pool size.
921
    fn set_maximum_pool_size(
4✔
922
        self: &Arc<Self>,
4✔
923
        maximum_pool_size: usize,
4✔
924
    ) -> Result<(), ThreadPoolBuildError> {
4✔
925
        let mut state = self.lock_state();
4✔
926
        if maximum_pool_size == 0 {
4✔
927
            return Err(ThreadPoolBuildError::ZeroMaximumPoolSize);
1✔
928
        }
3✔
929
        if state.core_pool_size > maximum_pool_size {
3✔
930
            return Err(ThreadPoolBuildError::CorePoolSizeExceedsMaximum {
1✔
931
                core_pool_size: state.core_pool_size,
1✔
932
                maximum_pool_size,
1✔
933
            });
1✔
934
        }
2✔
935
        state.maximum_pool_size = maximum_pool_size;
2✔
936
        self.available.notify_all();
2✔
937
        Ok(())
2✔
938
    }
4✔
939

940
    /// Updates the worker keep-alive timeout.
941
    fn set_keep_alive(&self, keep_alive: Duration) -> Result<(), ThreadPoolBuildError> {
2✔
942
        if keep_alive.is_zero() {
2✔
943
            return Err(ThreadPoolBuildError::ZeroKeepAlive);
1✔
944
        }
1✔
945
        let mut state = self.lock_state();
1✔
946
        state.keep_alive = keep_alive;
1✔
947
        self.available.notify_all();
1✔
948
        Ok(())
1✔
949
    }
2✔
950

951
    /// Updates whether idle core workers may time out.
952
    fn allow_core_thread_timeout(&self, allow: bool) {
1✔
953
        let mut state = self.lock_state();
1✔
954
        state.allow_core_thread_timeout = allow;
1✔
955
        self.available.notify_all();
1✔
956
    }
1✔
957

958
    /// Notifies termination waiters when the state is terminal.
959
    fn notify_if_terminated(&self, state: &ThreadPoolState) {
83✔
960
        if state.is_terminated() {
83✔
961
            self.terminated.notify_all();
41✔
962
        }
42✔
963
    }
83✔
964
}
965

966
/// Mutable pool state protected by [`ThreadPoolInner::state`].
967
struct ThreadPoolState {
968
    lifecycle: ThreadPoolLifecycle,
969
    queue: VecDeque<PoolJob>,
970
    queue_capacity: Option<usize>,
971
    running_tasks: usize,
972
    live_workers: usize,
973
    idle_workers: usize,
974
    submitted_tasks: usize,
975
    completed_tasks: usize,
976
    cancelled_tasks: usize,
977
    core_pool_size: usize,
978
    maximum_pool_size: usize,
979
    keep_alive: Duration,
980
    allow_core_thread_timeout: bool,
981
    next_worker_index: usize,
982
}
983

984
impl ThreadPoolState {
985
    /// Returns whether the queue is currently full.
986
    fn is_saturated(&self) -> bool {
12✔
987
        self.queue_capacity
12✔
988
            .is_some_and(|capacity| self.queue.len() >= capacity)
12✔
989
    }
12✔
990

991
    /// Returns whether the service lifecycle is fully terminated.
992
    fn is_terminated(&self) -> bool {
117✔
993
        !self.lifecycle.is_running()
117✔
994
            && self.queue.is_empty()
88✔
995
            && self.running_tasks == 0
84✔
996
            && self.live_workers == 0
82✔
997
    }
117✔
998

999
    /// Returns whether an idle worker should use a timed wait.
1000
    fn worker_wait_is_timed(&self) -> bool {
13✔
1001
        self.allow_core_thread_timeout || self.live_workers > self.core_pool_size
13✔
1002
    }
13✔
1003

1004
    /// Returns whether an idle worker may retire now.
1005
    fn idle_worker_can_retire(&self) -> bool {
3✔
1006
        self.live_workers > self.maximum_pool_size
3✔
1007
            || (self.worker_wait_is_timed()
3✔
1008
                && (self.live_workers > self.core_pool_size || self.allow_core_thread_timeout))
3✔
1009
    }
3✔
1010
}
1011

1012
/// Lifecycle state for a thread pool.
1013
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1014
enum ThreadPoolLifecycle {
1015
    /// The pool accepts new tasks and workers wait for queue items.
1016
    Running,
1017

1018
    /// The pool rejects new tasks but drains queued work.
1019
    Shutdown,
1020

1021
    /// The pool rejects new tasks, cancels queued work, and stops workers.
1022
    Stopping,
1023
}
1024

1025
impl ThreadPoolLifecycle {
1026
    /// Returns whether this lifecycle still accepts new work.
1027
    const fn is_running(self) -> bool {
207✔
1028
        matches!(self, Self::Running)
207✔
1029
    }
207✔
1030

1031
    /// Returns whether this lifecycle represents graceful shutdown.
1032
    const fn is_shutdown(self) -> bool {
1✔
1033
        matches!(self, Self::Shutdown)
1✔
1034
    }
1✔
1035
}
1036

1037
/// Type-erased pool job with a cancellation path for queued work.
1038
///
1039
/// `PoolJob` is a low-level extension point for building custom services on
1040
/// top of [`ThreadPool`]. The pool calls the run callback after a worker takes
1041
/// the job, or the cancel callback if the job is still queued during immediate
1042
/// shutdown.
1043
pub struct PoolJob {
1044
    run: Option<Box<dyn FnOnce() + Send + 'static>>,
1045
    cancel: Option<Box<dyn FnOnce() + Send + 'static>>,
1046
}
1047

1048
impl PoolJob {
1049
    /// Creates a pool job from run and cancel callbacks.
1050
    ///
1051
    /// # Parameters
1052
    ///
1053
    /// * `run` - Callback executed once a worker starts this job.
1054
    /// * `cancel` - Callback executed if this job is cancelled while queued.
1055
    ///
1056
    /// # Returns
1057
    ///
1058
    /// A type-erased job accepted by [`ThreadPool::submit_job`].
1059
    pub fn new(
26✔
1060
        run: Box<dyn FnOnce() + Send + 'static>,
26✔
1061
        cancel: Box<dyn FnOnce() + Send + 'static>,
26✔
1062
    ) -> Self {
26✔
1063
        Self {
26✔
1064
            run: Some(run),
26✔
1065
            cancel: Some(cancel),
26✔
1066
        }
26✔
1067
    }
26✔
1068

1069
    /// Runs this job if it has not been cancelled first.
1070
    fn run(mut self) {
19✔
1071
        if let Some(run) = self.run.take() {
19✔
1072
            run();
19✔
1073
        }
19✔
1074
    }
19✔
1075

1076
    /// Cancels this queued job if it has not been run first.
1077
    fn cancel(mut self) {
2✔
1078
        if let Some(cancel) = self.cancel.take() {
2✔
1079
            cancel();
2✔
1080
        }
2✔
1081
    }
2✔
1082
}
1083

1084
/// Runs a callable task through a task completion endpoint.
1085
fn run_task<C, R, E>(task: C, completion: TaskCompletion<R, E>)
18✔
1086
where
18✔
1087
    C: Callable<R, E>,
18✔
1088
{
1089
    completion.start_and_complete(|| run_callable(task));
18✔
1090
}
18✔
1091

1092
/// Runs a single worker loop until the pool asks it to exit.
1093
fn run_worker(inner: Arc<ThreadPoolInner>, first_task: Option<PoolJob>) {
20✔
1094
    if let Some(job) = first_task {
20✔
1095
        job.run();
13✔
1096
        finish_running_job(&inner);
13✔
1097
    }
13✔
1098
    loop {
1099
        let job = wait_for_job(&inner);
26✔
1100
        match job {
26✔
1101
            Some(job) => {
6✔
1102
                job.run();
6✔
1103
                finish_running_job(&inner);
6✔
1104
            }
6✔
1105
            None => return,
20✔
1106
        }
1107
    }
1108
}
20✔
1109

1110
/// Waits until a worker can take a job or should exit.
1111
fn wait_for_job(inner: &ThreadPoolInner) -> Option<PoolJob> {
26✔
1112
    let mut state = inner.lock_state();
26✔
1113
    loop {
1114
        match state.lifecycle {
33✔
1115
            ThreadPoolLifecycle::Running => {
1116
                if let Some(job) = state.queue.pop_front() {
15✔
1117
                    state.running_tasks += 1;
4✔
1118
                    return Some(job);
4✔
1119
                }
11✔
1120
                if state.live_workers > state.maximum_pool_size && state.live_workers > 0 {
11✔
1121
                    unregister_exiting_worker(inner, &mut state);
1✔
1122
                    return None;
1✔
1123
                }
10✔
1124
                if state.worker_wait_is_timed() {
10✔
1125
                    let keep_alive = state.keep_alive;
3✔
1126
                    state.idle_workers += 1;
3✔
1127
                    let (next_state, timeout) = inner
3✔
1128
                        .available
3✔
1129
                        .wait_timeout(state, keep_alive)
3✔
1130
                        .unwrap_or_else(std::sync::PoisonError::into_inner);
3✔
1131
                    state = next_state;
3✔
1132
                    state.idle_workers = state
3✔
1133
                        .idle_workers
3✔
1134
                        .checked_sub(1)
3✔
1135
                        .expect("thread pool idle worker counter underflow");
3✔
1136
                    if timeout.timed_out()
3✔
1137
                        && state.queue.is_empty()
3✔
1138
                        && state.idle_worker_can_retire()
3✔
1139
                    {
1140
                        unregister_exiting_worker(inner, &mut state);
3✔
1141
                        return None;
3✔
1142
                    }
×
1143
                } else {
7✔
1144
                    state.idle_workers += 1;
7✔
1145
                    state = inner
7✔
1146
                        .available
7✔
1147
                        .wait(state)
7✔
1148
                        .unwrap_or_else(std::sync::PoisonError::into_inner);
7✔
1149
                    state.idle_workers = state
7✔
1150
                        .idle_workers
7✔
1151
                        .checked_sub(1)
7✔
1152
                        .expect("thread pool idle worker counter underflow");
7✔
1153
                }
7✔
1154
            }
1155
            ThreadPoolLifecycle::Shutdown => {
1156
                if let Some(job) = state.queue.pop_front() {
17✔
1157
                    state.running_tasks += 1;
2✔
1158
                    return Some(job);
2✔
1159
                }
15✔
1160
                unregister_exiting_worker(inner, &mut state);
15✔
1161
                return None;
15✔
1162
            }
1163
            ThreadPoolLifecycle::Stopping => {
1164
                unregister_exiting_worker(inner, &mut state);
1✔
1165
                return None;
1✔
1166
            }
1167
        }
1168
    }
1169
}
26✔
1170

1171
/// Marks a worker-held job as finished.
1172
fn finish_running_job(inner: &ThreadPoolInner) {
19✔
1173
    let mut state = inner.lock_state();
19✔
1174
    state.running_tasks = state
19✔
1175
        .running_tasks
19✔
1176
        .checked_sub(1)
19✔
1177
        .expect("thread pool running task counter underflow");
19✔
1178
    state.completed_tasks += 1;
19✔
1179
    inner.notify_if_terminated(&state);
19✔
1180
}
19✔
1181

1182
/// Marks a worker as exited.
1183
fn unregister_exiting_worker(inner: &ThreadPoolInner, state: &mut ThreadPoolState) {
20✔
1184
    state.live_workers = state
20✔
1185
        .live_workers
20✔
1186
        .checked_sub(1)
20✔
1187
        .expect("thread pool live worker counter underflow");
20✔
1188
    inner.notify_if_terminated(state);
20✔
1189
}
20✔
1190

1191
/// Returns the default worker count for new builders.
1192
fn default_worker_count() -> usize {
26✔
1193
    thread::available_parallelism()
26✔
1194
        .map(usize::from)
26✔
1195
        .unwrap_or(1)
26✔
1196
}
26✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc