• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

qubit-ltd / rs-thread-pool / 4870a184-0519-4961-b7df-b299af549396

03 May 2026 05:33PM UTC coverage: 98.375% (+0.2%) from 98.225%
4870a184-0519-4961-b7df-b299af549396

push

circleci

Haixing-Hu
test(thread-pool): move steal retry coverage to regular tests

Replace coverage-only helper exports with direct integration tests for queue stealing and worker queue draining behavior. This keeps retry/drain defensive paths verified without relying on cfg(coverage) branches or style allowlist exceptions.

16 of 16 new or added lines in 2 files covered. (100.0%)

18 existing lines in 2 files now uncovered.

1513 of 1538 relevant lines covered (98.37%)

37.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.76
/src/thread_pool/thread_pool_inner.rs
1
/*******************************************************************************
2
 *
3
 *    Copyright (c) 2025 - 2026 Haixing Hu.
4
 *
5
 *    SPDX-License-Identifier: Apache-2.0
6
 *
7
 *    Licensed under the Apache License, Version 2.0.
8
 *
9
 ******************************************************************************/
10
use std::{
11
    sync::{
12
        Arc,
13
        Mutex,
14
        atomic::{
15
            AtomicUsize,
16
            Ordering,
17
        },
18
    },
19
    thread,
20
    time::Duration,
21
};
22

23
use crossbeam_deque::{
24
    Injector,
25
    Steal,
26
};
27

28
use qubit_lock::{
29
    Monitor,
30
    MonitorGuard,
31
    WaitTimeoutStatus,
32
};
33

34
use super::pool_job::PoolJob;
35
use super::thread_pool_build_error::ThreadPoolBuildError;
36
use super::thread_pool_config::ThreadPoolConfig;
37
use super::thread_pool_lifecycle::ThreadPoolLifecycle;
38
use super::thread_pool_state::ThreadPoolState;
39
use super::thread_pool_stats::ThreadPoolStats;
40
use qubit_executor::service::{
41
    RejectedExecution,
42
    ShutdownReport,
43
};
44

45
/// Queue owned by one worker and used for local dispatch plus stealing.
46
struct WorkerQueue {
47
    /// Logical worker index used as a stable identity key.
48
    worker_index: usize,
49
    /// Lock-free deque of queued jobs assigned to this worker.
50
    jobs: Injector<PoolJob>,
51
}
52

53
impl WorkerQueue {
54
    /// Creates an empty local queue for one worker.
55
    ///
56
    /// # Parameters
57
    ///
58
    /// * `worker_index` - Stable index of the worker owning this queue.
59
    ///
60
    /// # Returns
61
    ///
62
    /// A local queue with no jobs.
63
    fn new(worker_index: usize) -> Self {
23✔
64
        Self {
23✔
65
            worker_index,
23✔
66
            jobs: Injector::new(),
23✔
67
        }
23✔
68
    }
23✔
69

70
    /// Returns the owning worker index.
71
    ///
72
    /// # Returns
73
    ///
74
    /// The worker index associated with this queue.
75
    #[inline]
76
    fn worker_index(&self) -> usize {
71✔
77
        self.worker_index
71✔
78
    }
71✔
79

80
    /// Appends a job to the back of this queue.
81
    ///
82
    /// # Parameters
83
    ///
84
    /// * `job` - Job to enqueue.
85
    fn push_back(&self, job: PoolJob) {
3✔
86
        self.jobs.push(job);
3✔
87
    }
3✔
88

89
    /// Pops one job from the front of this queue.
90
    ///
91
    /// # Returns
92
    ///
93
    /// `Some(job)` when this queue is non-empty, otherwise `None`.
94
    ///
95
    /// # Implementation notes
96
    ///
97
    /// [`Injector`] does not expose an owner-only pop operation. Both "local
98
    /// pop" and "remote steal" therefore share the same `steal()` primitive.
99
    fn pop_front(&self) -> Option<PoolJob> {
11✔
100
        self.steal_one()
11✔
101
    }
11✔
102

103
    /// Steals one job from the back of this queue.
104
    ///
105
    /// # Returns
106
    ///
107
    /// `Some(job)` when this queue is non-empty, otherwise `None`.
108
    ///
109
    /// # Implementation notes
110
    ///
111
    /// This method intentionally reuses [`Self::steal_one`] so all queue
112
    /// consumers handle transient `Steal::Retry` states consistently.
113
    fn steal_back(&self) -> Option<PoolJob> {
4✔
114
        self.steal_one()
4✔
115
    }
4✔
116

117
    /// Drains all queued jobs from this queue.
118
    ///
119
    /// # Returns
120
    ///
121
    /// A vector containing all queued jobs in FIFO order.
122
    fn drain(&self) -> Vec<PoolJob> {
24✔
123
        let mut jobs = Vec::new();
24✔
124
        while let Some(job) = self.steal_one() {
24✔
125
            jobs.push(job);
×
UNCOV
126
        }
×
127
        jobs
24✔
128
    }
24✔
129

130
    /// Steals one job from this queue and transparently retries transient
131
    /// contention states.
132
    ///
133
    /// # Returns
134
    ///
135
    /// `Some(job)` when a job is available, otherwise `None`.
136
    fn steal_one(&self) -> Option<PoolJob> {
39✔
137
        loop {
138
            match self.jobs.steal() {
39✔
139
                Steal::Success(job) => return Some(job),
3✔
140
                Steal::Empty => return None,
36✔
141
                // Another thread raced us while mutating queue internals.
142
                // Retry immediately to mask this transient state from callers.
UNCOV
143
                Steal::Retry => continue,
×
144
            }
145
        }
146
    }
39✔
147
}
148

149
/// Shared state for a thread pool.
150
pub(crate) struct ThreadPoolInner {
151
    /// Mutable pool state protected by a monitor.
152
    state_monitor: Monitor<ThreadPoolState>,
153
    /// Registered worker-local queues used for local dispatch and stealing.
154
    worker_queues: Mutex<Vec<Arc<WorkerQueue>>>,
155
    /// Round-robin cursor used for queue selection and steal start offsets.
156
    next_enqueue_worker: AtomicUsize,
157
    /// Prefix used for naming newly spawned workers.
158
    thread_name_prefix: String,
159
    /// Optional stack size in bytes for newly spawned workers.
160
    stack_size: Option<usize>,
161
}
162

163
impl ThreadPoolInner {
164
    /// Creates shared state for a thread pool.
165
    ///
166
    /// # Parameters
167
    ///
168
    /// * `config` - Initial immutable and mutable pool configuration.
169
    ///
170
    /// # Returns
171
    ///
172
    /// A shared-state object ready to accept worker and queue operations.
173
    pub(crate) fn new(config: ThreadPoolConfig) -> Self {
23✔
174
        let mut config = config;
23✔
175
        let thread_name_prefix = std::mem::take(&mut config.thread_name_prefix);
23✔
176
        let stack_size = config.stack_size;
23✔
177
        Self {
23✔
178
            state_monitor: Monitor::new(ThreadPoolState::new(config)),
23✔
179
            worker_queues: Mutex::new(Vec::new()),
23✔
180
            next_enqueue_worker: AtomicUsize::new(0),
23✔
181
            thread_name_prefix,
23✔
182
            stack_size,
23✔
183
        }
23✔
184
    }
23✔
185

186
    /// Acquires the pool state monitor while tolerating poisoned locks.
187
    ///
188
    /// # Returns
189
    ///
190
    /// A monitor guard for the mutable pool state.
191
    #[inline]
192
    pub(crate) fn lock_state(&self) -> MonitorGuard<'_, ThreadPoolState> {
131✔
193
        self.state_monitor.lock()
131✔
194
    }
131✔
195

196
    /// Acquires the pool state and reads it while holding the monitor lock.
197
    ///
198
    /// # Arguments
199
    ///
200
    /// * `f` - Closure that reads the state.
201
    ///
202
    /// # Returns
203
    ///
204
    /// The value returned by the closure.
205
    #[inline]
206
    pub(crate) fn read_state<R, F>(&self, f: F) -> R
41✔
207
    where
41✔
208
        F: FnOnce(&ThreadPoolState) -> R,
41✔
209
    {
210
        self.state_monitor.read(f)
41✔
211
    }
41✔
212

213
    /// Acquires the pool state and mutates it while holding the monitor lock.
214
    ///
215
    /// # Arguments
216
    ///
217
    /// * `f` - Closure that mutates the state.
218
    ///
219
    /// # Returns
220
    ///
221
    /// The value returned by the closure.
222
    #[inline]
223
    pub(crate) fn write_state<R, F>(&self, f: F) -> R
8✔
224
    where
8✔
225
        F: FnOnce(&mut ThreadPoolState) -> R,
8✔
226
    {
227
        self.state_monitor.write(f)
8✔
228
    }
8✔
229

230
    /// Submits a job into the queue.
231
    ///
232
    /// # Overall logic
233
    ///
234
    /// This method follows a staged admission strategy while holding the pool
235
    /// monitor lock:
236
    ///
237
    /// 1. Reject immediately if the lifecycle is not running.
238
    /// 2. If live workers are below the core size, spawn a worker and hand the
239
    ///    job to it directly (no queue hop).
240
    /// 3. Otherwise, try enqueuing the job if the queue is not saturated.
241
    /// 4. If the queue is saturated but live workers are still below maximum,
242
    ///    spawn a non-core worker with the job as its first task.
243
    /// 5. Otherwise reject as saturated.
244
    ///
245
    /// For queued submissions we use a targeted wake-up strategy: wake exactly
246
    /// one idle worker only when idle workers exist. This avoids the
247
    /// `notify_all` "thundering herd" effect under high submission rates.
248
    ///
249
    /// # Parameters
250
    ///
251
    /// * `job` - Type-erased job to execute or cancel later.
252
    ///
253
    /// # Returns
254
    ///
255
    /// `Ok(())` when the job is accepted.
256
    ///
257
    /// # Errors
258
    ///
259
    /// Returns [`RejectedExecution::Shutdown`] after shutdown, returns
260
    /// [`RejectedExecution::Saturated`] when the queue and worker capacity are
261
    /// full, or returns [`RejectedExecution::WorkerSpawnFailed`] if a required
262
    /// worker cannot be created.
263
    pub(crate) fn submit(self: &Arc<Self>, job: PoolJob) -> Result<(), RejectedExecution> {
26✔
264
        let mut state = self.lock_state();
26✔
265
        if !state.lifecycle.is_running() {
26✔
266
            return Err(RejectedExecution::Shutdown);
2✔
267
        }
24✔
268
        if state.live_workers < state.core_pool_size {
24✔
269
            self.spawn_worker_locked(&mut state, Some(job))?;
12✔
270
            state.submitted_tasks += 1;
11✔
271
            return Ok(());
11✔
272
        }
12✔
273
        if !state.is_saturated() {
12✔
274
            state.submitted_tasks += 1;
8✔
275
            state.queued_tasks += 1;
8✔
276
            // Only wake a waiter when at least one worker is currently idle.
277
            // Busy workers will eventually pull from the queue after finishing
278
            // their current task, so a broadcast wake-up is unnecessary.
279
            let should_wake_one_idle_worker = state.idle_workers > 0;
8✔
280
            // Keep unbounded-queue pools on the legacy global-queue fast path.
281
            // For bounded pools under sustained load and without idle workers,
282
            // local queues can reduce submit-path contention on the shared
283
            // global queue.
284
            let use_local_queue =
8✔
285
                state.queue_capacity.is_some() && state.idle_workers == 0 && state.live_workers > 0;
8✔
286
            if !use_local_queue {
8✔
287
                state.queue.push_back(job);
5✔
288
            } else {
5✔
289
                let mut pending_job = Some(job);
3✔
290
                self.try_enqueue_worker_job_locked(&mut pending_job);
3✔
291
                if let Some(job) = pending_job {
3✔
UNCOV
292
                    state.queue.push_back(job);
×
293
                }
3✔
294
            }
295
            if state.live_workers == 0
8✔
296
                && let Err(error) = self.spawn_worker_locked(&mut state, None)
2✔
297
            {
298
                if let Some(job) = state.queue.pop_back() {
1✔
299
                    state.submitted_tasks = state
1✔
300
                        .submitted_tasks
1✔
301
                        .checked_sub(1)
1✔
302
                        .expect("thread pool submitted task counter underflow");
1✔
303
                    state.queued_tasks = state
1✔
304
                        .queued_tasks
1✔
305
                        .checked_sub(1)
1✔
306
                        .expect("thread pool queued task counter underflow");
1✔
307
                    drop(state);
1✔
308
                    job.cancel();
1✔
309
                }
1✔
310
                return Err(error);
1✔
311
            }
7✔
312
            // Release the monitor before notifying to keep the critical section
313
            // short and reduce lock handoff contention.
314
            drop(state);
7✔
315
            if should_wake_one_idle_worker {
7✔
UNCOV
316
                self.state_monitor.notify_one();
×
317
            }
7✔
318
            return Ok(());
7✔
319
        }
4✔
320
        if state.live_workers < state.maximum_pool_size {
4✔
321
            self.spawn_worker_locked(&mut state, Some(job))?;
2✔
322
            state.submitted_tasks += 1;
2✔
323
            Ok(())
2✔
324
        } else {
325
            Err(RejectedExecution::Saturated)
2✔
326
        }
327
    }
26✔
328

329
    /// Tries to enqueue a job into one worker-local queue.
330
    ///
331
    /// # Parameters
332
    ///
333
    /// * `job` - Slot containing one pending job. This method moves the job
334
    ///   out of the slot on success.
335
    ///
336
    /// # Returns
337
    ///
338
    /// `true` when a worker-local queue accepted the job, otherwise `false`.
339
    ///
340
    /// # Overall logic
341
    ///
342
    /// The submit path keeps this operation O(1): choose exactly one local
343
    /// queue with a round-robin cursor and push once, then fallback to the
344
    /// global queue when local enqueue is unavailable.
345
    fn try_enqueue_worker_job_locked(&self, job: &mut Option<PoolJob>) -> bool {
3✔
346
        let queues = self
3✔
347
            .worker_queues
3✔
348
            .lock()
3✔
349
            .unwrap_or_else(std::sync::PoisonError::into_inner);
3✔
350
        if queues.is_empty() {
3✔
UNCOV
351
            return false;
×
352
        }
3✔
353
        let start = self.next_enqueue_worker.fetch_add(1, Ordering::Relaxed);
3✔
354
        let slot = start % queues.len();
3✔
355
        if let Some(job) = job.take() {
3✔
356
            queues[slot].push_back(job);
3✔
357
            true
3✔
358
        } else {
UNCOV
359
            false
×
360
        }
361
    }
3✔
362

363
    /// Starts one missing core worker.
364
    ///
365
    /// # Returns
366
    ///
367
    /// `Ok(true)` when a worker was spawned, or `Ok(false)` when the core
368
    /// pool size is already satisfied.
369
    ///
370
    /// # Errors
371
    ///
372
    /// Returns [`RejectedExecution::Shutdown`] after shutdown or
373
    /// [`RejectedExecution::WorkerSpawnFailed`] if the worker cannot be
374
    /// created.
375
    pub(crate) fn prestart_core_thread(self: &Arc<Self>) -> Result<bool, RejectedExecution> {
14✔
376
        let mut state = self.lock_state();
14✔
377
        if !state.lifecycle.is_running() {
14✔
378
            return Err(RejectedExecution::Shutdown);
2✔
379
        }
12✔
380
        if state.live_workers >= state.core_pool_size {
12✔
381
            return Ok(false);
5✔
382
        }
7✔
383
        self.spawn_worker_locked(&mut state, None)?;
7✔
384
        Ok(true)
6✔
385
    }
14✔
386

387
    /// Starts all missing core workers.
388
    ///
389
    /// # Returns
390
    ///
391
    /// The number of workers started.
392
    ///
393
    /// # Errors
394
    ///
395
    /// Returns [`RejectedExecution`] if shutdown is observed or a worker cannot
396
    /// be created.
397
    pub(crate) fn prestart_all_core_threads(self: &Arc<Self>) -> Result<usize, RejectedExecution> {
6✔
398
        let mut started = 0;
6✔
399
        while self.prestart_core_thread()? {
11✔
400
            started += 1;
5✔
401
        }
5✔
402
        Ok(started)
4✔
403
    }
6✔
404

405
    /// Spawns a worker while the caller holds the pool state lock.
406
    ///
407
    /// # Parameters
408
    ///
409
    /// * `state` - Locked mutable pool state to update while spawning.
410
    /// * `first_task` - Optional first job assigned directly to the new worker.
411
    ///
412
    /// # Returns
413
    ///
414
    /// `Ok(())` when the worker thread is spawned.
415
    ///
416
    /// # Errors
417
    ///
418
    /// Returns [`RejectedExecution::WorkerSpawnFailed`] if
419
    /// [`thread::Builder::spawn`] fails.
420
    fn spawn_worker_locked(
23✔
421
        self: &Arc<Self>,
23✔
422
        state: &mut ThreadPoolState,
23✔
423
        first_task: Option<PoolJob>,
23✔
424
    ) -> Result<(), RejectedExecution> {
23✔
425
        let index = state.next_worker_index;
23✔
426
        state.next_worker_index += 1;
23✔
427
        state.live_workers += 1;
23✔
428
        if first_task.is_some() {
23✔
429
            state.running_tasks += 1;
14✔
430
        }
14✔
431
        let worker_queue = self.register_worker_queue_locked(index);
23✔
432

433
        let worker_inner = Arc::clone(self);
23✔
434
        let mut builder =
23✔
435
            thread::Builder::new().name(format!("{}-{index}", self.thread_name_prefix));
23✔
436
        if let Some(stack_size) = self.stack_size {
23✔
437
            builder = builder.stack_size(stack_size);
4✔
438
        }
19✔
439
        match builder.spawn(move || run_worker(worker_inner, worker_queue, first_task)) {
23✔
440
            Ok(_) => Ok(()),
20✔
441
            Err(source) => {
3✔
442
                // Roll back the queue registration because this worker never
443
                // reached the run loop.
444
                self.remove_worker_queue_locked(index);
3✔
445
                state.live_workers = state
3✔
446
                    .live_workers
3✔
447
                    .checked_sub(1)
3✔
448
                    .expect("thread pool live worker counter underflow");
3✔
449
                if state.running_tasks > 0 {
3✔
450
                    state.running_tasks -= 1;
1✔
451
                }
2✔
452
                self.notify_if_terminated(state);
3✔
453
                Err(RejectedExecution::WorkerSpawnFailed {
3✔
454
                    source: Arc::new(source),
3✔
455
                })
3✔
456
            }
457
        }
458
    }
23✔
459

460
    /// Registers an empty worker-local queue for a newly spawned worker.
461
    ///
462
    /// # Parameters
463
    ///
464
    /// * `worker_index` - Stable index of the new worker.
465
    fn register_worker_queue_locked(&self, worker_index: usize) -> Arc<WorkerQueue> {
23✔
466
        let queue = Arc::new(WorkerQueue::new(worker_index));
23✔
467
        self.worker_queues
23✔
468
            .lock()
23✔
469
            .unwrap_or_else(std::sync::PoisonError::into_inner)
23✔
470
            .push(Arc::clone(&queue));
23✔
471
        queue
23✔
472
    }
23✔
473

474
    /// Removes one worker-local queue and returns all jobs still queued in it.
475
    ///
476
    /// # Parameters
477
    ///
478
    /// * `worker_index` - Stable index of the retiring worker.
479
    ///
480
    /// # Returns
481
    ///
482
    /// Remaining queued jobs from the removed queue, if any.
483
    fn remove_worker_queue_locked(&self, worker_index: usize) -> Vec<PoolJob> {
23✔
484
        let queue = {
23✔
485
            let mut queues = self
23✔
486
                .worker_queues
23✔
487
                .lock()
23✔
488
                .unwrap_or_else(std::sync::PoisonError::into_inner);
23✔
489
            queues
23✔
490
                .iter()
23✔
491
                .position(|queue| queue.worker_index() == worker_index)
26✔
492
                .map(|position| queues.remove(position))
23✔
493
        };
494
        queue.map_or_else(Vec::new, |queue| queue.drain())
23✔
495
    }
23✔
496

497
    /// Attempts to take one queued job for the specified worker.
498
    ///
499
    /// # Overall logic
500
    ///
501
    /// The lookup order favors locality first and balance second:
502
    ///
503
    /// 1. Pop from the worker's own local queue.
504
    /// 2. Steal from other workers' local queues (bounded pools only).
505
    /// 3. Pop from the global fallback queue.
506
    ///
507
    /// This method mutates queue-related counters only after a job is
508
    /// successfully claimed.
509
    ///
510
    /// # Parameters
511
    ///
512
    /// * `state` - Locked mutable pool state.
513
    /// * `worker_queue` - Local queue owned by the worker requesting work.
514
    ///
515
    /// # Returns
516
    ///
517
    /// `Some(job)` when any queue has work, otherwise `None`.
518
    fn try_take_queued_job_locked(
33✔
519
        &self,
33✔
520
        state: &mut ThreadPoolState,
33✔
521
        worker_queue: &WorkerQueue,
33✔
522
    ) -> Option<PoolJob> {
33✔
523
        if state.queue_capacity.is_some() {
33✔
524
            let worker_index = worker_queue.worker_index();
11✔
525
            let own_job = worker_queue.pop_front();
11✔
526
            if let Some(job) = own_job {
11✔
527
                state.queued_tasks = state
1✔
528
                    .queued_tasks
1✔
529
                    .checked_sub(1)
1✔
530
                    .expect("thread pool queued task counter underflow");
1✔
531
                state.running_tasks += 1;
1✔
532
                return Some(job);
1✔
533
            }
10✔
534

535
            if let Some(job) = self.try_steal_job_locked(worker_index) {
10✔
536
                state.queued_tasks = state
2✔
537
                    .queued_tasks
2✔
538
                    .checked_sub(1)
2✔
539
                    .expect("thread pool queued task counter underflow");
2✔
540
                state.running_tasks += 1;
2✔
541
                return Some(job);
2✔
542
            }
8✔
543
        }
22✔
544

545
        if let Some(job) = state.queue.pop_front() {
30✔
546
            state.queued_tasks = state
3✔
547
                .queued_tasks
3✔
548
                .checked_sub(1)
3✔
549
                .expect("thread pool queued task counter underflow");
3✔
550
            state.running_tasks += 1;
3✔
551
            return Some(job);
3✔
552
        }
27✔
553

554
        None
27✔
555
    }
33✔
556

557
    /// Attempts to steal one queued job from another worker queue.
558
    ///
559
    /// # Parameters
560
    ///
561
    /// * `worker_index` - Worker that is requesting stolen work.
562
    ///
563
    /// # Returns
564
    ///
565
    /// `Some(job)` when any other worker queue can provide one job.
566
    fn try_steal_job_locked(&self, worker_index: usize) -> Option<PoolJob> {
10✔
567
        let queues = self
10✔
568
            .worker_queues
10✔
569
            .lock()
10✔
570
            .unwrap_or_else(std::sync::PoisonError::into_inner);
10✔
571
        let queue_count = queues.len();
10✔
572
        if queue_count <= 1 {
10✔
573
            return None;
6✔
574
        }
4✔
575
        // Rotate victim probing start index to avoid repeatedly hammering the
576
        // same queue under contention.
577
        let start = self.next_enqueue_worker.fetch_add(1, Ordering::Relaxed) % queue_count;
4✔
578
        for offset in 0..queue_count {
8✔
579
            let victim = &queues[(start + offset) % queue_count];
8✔
580
            if victim.worker_index() == worker_index {
8✔
581
                continue;
4✔
582
            }
4✔
583
            if let Some(job) = victim.steal_back() {
4✔
584
                return Some(job);
2✔
585
            }
2✔
586
        }
587
        None
2✔
588
    }
10✔
589

590
    /// Drains all jobs from all worker-local queues.
591
    ///
592
    /// # Returns
593
    ///
594
    /// A vector containing every job drained from worker-local queues.
595
    fn drain_all_worker_queued_jobs_locked(&self) -> Vec<PoolJob> {
4✔
596
        let queues = self
4✔
597
            .worker_queues
4✔
598
            .lock()
4✔
599
            .unwrap_or_else(std::sync::PoisonError::into_inner)
4✔
600
            .iter()
4✔
601
            .cloned()
4✔
602
            .collect::<Vec<_>>();
4✔
603
        let mut jobs = Vec::new();
4✔
604
        for queue in queues {
4✔
605
            jobs.extend(queue.drain());
1✔
606
        }
1✔
607
        jobs
4✔
608
    }
4✔
609

610
    /// Requests graceful shutdown.
611
    ///
612
    /// The pool rejects later submissions but lets queued work drain.
613
    pub(crate) fn shutdown(&self) {
42✔
614
        let mut state = self.lock_state();
42✔
615
        if state.lifecycle.is_running() {
42✔
616
            state.lifecycle = ThreadPoolLifecycle::Shutdown;
20✔
617
        }
22✔
618
        self.state_monitor.notify_all();
42✔
619
        self.notify_if_terminated(&state);
42✔
620
    }
42✔
621

622
    /// Requests abrupt shutdown and cancels queued jobs.
623
    ///
624
    /// # Returns
625
    ///
626
    /// A report containing queued jobs cancelled and jobs running at the time
627
    /// of the request.
628
    pub(crate) fn shutdown_now(&self) -> ShutdownReport {
4✔
629
        let (jobs, report) = {
4✔
630
            let mut state = self.lock_state();
4✔
631
            if state.lifecycle.is_running() || state.lifecycle.is_shutdown() {
4✔
632
                state.lifecycle = ThreadPoolLifecycle::Stopping;
3✔
633
            }
3✔
634
            let queued = state.queued_tasks;
4✔
635
            let running = state.running_tasks;
4✔
636
            let mut jobs = state.queue.drain(..).collect::<Vec<_>>();
4✔
637
            jobs.extend(self.drain_all_worker_queued_jobs_locked());
4✔
638
            debug_assert_eq!(jobs.len(), queued);
4✔
639
            state.queued_tasks = 0;
4✔
640
            state.cancelled_tasks += queued;
4✔
641
            self.state_monitor.notify_all();
4✔
642
            self.notify_if_terminated(&state);
4✔
643
            (jobs, ShutdownReport::new(queued, running, queued))
4✔
644
        };
645
        for job in jobs {
4✔
646
            job.cancel();
1✔
647
        }
1✔
648
        report
4✔
649
    }
4✔
650

651
    /// Returns whether shutdown has been requested.
652
    ///
653
    /// # Returns
654
    ///
655
    /// `true` if the pool is no longer in the running lifecycle state.
656
    pub(crate) fn is_shutdown(&self) -> bool {
1✔
657
        self.read_state(|state| !state.lifecycle.is_running())
1✔
658
    }
1✔
659

660
    /// Returns whether the pool is fully terminated.
661
    ///
662
    /// # Returns
663
    ///
664
    /// `true` if shutdown has started and no queued, running, or live worker
665
    /// state remains.
666
    pub(crate) fn is_terminated(&self) -> bool {
6✔
667
        self.read_state(ThreadPoolState::is_terminated)
6✔
668
    }
6✔
669

670
    /// Blocks the current thread until this pool is terminated.
671
    ///
672
    /// This method waits on a condition variable and therefore blocks the
673
    /// calling thread.
674
    pub(crate) fn wait_for_termination(&self) {
22✔
675
        self.state_monitor
22✔
676
            .wait_until(|state| state.is_terminated(), |_| ());
23✔
677
    }
22✔
678

679
    /// Returns a point-in-time pool snapshot.
680
    ///
681
    /// # Returns
682
    ///
683
    /// A snapshot built while holding the pool state lock.
684
    pub(crate) fn stats(&self) -> ThreadPoolStats {
10✔
685
        self.read_state(ThreadPoolStats::new)
10✔
686
    }
10✔
687

688
    /// Updates the core pool size.
689
    ///
690
    /// # Parameters
691
    ///
692
    /// * `core_pool_size` - New core pool size.
693
    ///
694
    /// # Returns
695
    ///
696
    /// `Ok(())` when the value is accepted.
697
    ///
698
    /// # Errors
699
    ///
700
    /// Returns [`ThreadPoolBuildError::CorePoolSizeExceedsMaximum`] when the
701
    /// new core size is greater than the current maximum size.
702
    pub(crate) fn set_core_pool_size(
3✔
703
        self: &Arc<Self>,
3✔
704
        core_pool_size: usize,
3✔
705
    ) -> Result<(), ThreadPoolBuildError> {
3✔
706
        let err = self.write_state(|state| {
3✔
707
            if core_pool_size > state.maximum_pool_size {
3✔
708
                Some(state.maximum_pool_size)
1✔
709
            } else {
710
                state.core_pool_size = core_pool_size;
2✔
711
                None
2✔
712
            }
713
        });
3✔
714
        if let Some(maximum_pool_size) = err {
3✔
715
            return Err(ThreadPoolBuildError::CorePoolSizeExceedsMaximum {
1✔
716
                core_pool_size,
1✔
717
                maximum_pool_size,
1✔
718
            });
1✔
719
        }
2✔
720
        self.state_monitor.notify_all();
2✔
721
        Ok(())
2✔
722
    }
3✔
723

724
    /// Updates the maximum pool size.
725
    ///
726
    /// # Parameters
727
    ///
728
    /// * `maximum_pool_size` - New maximum pool size.
729
    ///
730
    /// # Returns
731
    ///
732
    /// `Ok(())` when the value is accepted.
733
    ///
734
    /// # Errors
735
    ///
736
    /// Returns [`ThreadPoolBuildError::ZeroMaximumPoolSize`] for zero, or
737
    /// [`ThreadPoolBuildError::CorePoolSizeExceedsMaximum`] when the current
738
    /// core size is greater than the new maximum size.
739
    pub(crate) fn set_maximum_pool_size(
4✔
740
        self: &Arc<Self>,
4✔
741
        maximum_pool_size: usize,
4✔
742
    ) -> Result<(), ThreadPoolBuildError> {
4✔
743
        if maximum_pool_size == 0 {
4✔
744
            return Err(ThreadPoolBuildError::ZeroMaximumPoolSize);
1✔
745
        }
3✔
746
        let exceeds = self.write_state(|state| {
3✔
747
            if state.core_pool_size > maximum_pool_size {
3✔
748
                Some(state.core_pool_size)
1✔
749
            } else {
750
                state.maximum_pool_size = maximum_pool_size;
2✔
751
                None
2✔
752
            }
753
        });
3✔
754
        if let Some(core_pool_size) = exceeds {
3✔
755
            return Err(ThreadPoolBuildError::CorePoolSizeExceedsMaximum {
1✔
756
                core_pool_size,
1✔
757
                maximum_pool_size,
1✔
758
            });
1✔
759
        }
2✔
760
        self.state_monitor.notify_all();
2✔
761
        Ok(())
2✔
762
    }
4✔
763

764
    /// Updates the worker keep-alive timeout.
765
    ///
766
    /// # Parameters
767
    ///
768
    /// * `keep_alive` - New idle timeout.
769
    ///
770
    /// # Returns
771
    ///
772
    /// `Ok(())` when the timeout is accepted.
773
    ///
774
    /// # Errors
775
    ///
776
    /// Returns [`ThreadPoolBuildError::ZeroKeepAlive`] when the duration is
777
    /// zero.
778
    pub(crate) fn set_keep_alive(&self, keep_alive: Duration) -> Result<(), ThreadPoolBuildError> {
2✔
779
        if keep_alive.is_zero() {
2✔
780
            return Err(ThreadPoolBuildError::ZeroKeepAlive);
1✔
781
        }
1✔
782
        self.write_state(|state| state.keep_alive = keep_alive);
1✔
783
        self.state_monitor.notify_all();
1✔
784
        Ok(())
1✔
785
    }
2✔
786

787
    /// Updates whether idle core workers may time out.
788
    ///
789
    /// # Parameters
790
    ///
791
    /// * `allow` - Whether idle core workers may retire after keep-alive.
792
    pub(crate) fn allow_core_thread_timeout(&self, allow: bool) {
1✔
793
        self.write_state(|state| state.allow_core_thread_timeout = allow);
1✔
794
        self.state_monitor.notify_all();
1✔
795
    }
1✔
796

797
    /// Notifies termination waiters when the state is terminal.
798
    ///
799
    /// # Parameters
800
    ///
801
    /// * `state` - Current pool state observed while holding the state lock.
802
    fn notify_if_terminated(&self, state: &ThreadPoolState) {
88✔
803
        if state.is_terminated() {
88✔
804
            self.state_monitor.notify_all();
46✔
805
        }
46✔
806
    }
88✔
807
}
808

809
/// Runs a single worker loop until the pool asks it to exit.
810
///
811
/// # Parameters
812
///
813
/// * `inner` - Shared pool state used for queue access and counters.
814
/// * `worker_queue` - Local queue owned by this worker.
815
/// * `first_task` - Optional job assigned directly when the worker is spawned.
816
fn run_worker(
20✔
817
    inner: Arc<ThreadPoolInner>,
20✔
818
    worker_queue: Arc<WorkerQueue>,
20✔
819
    first_task: Option<PoolJob>,
20✔
820
) {
20✔
821
    if let Some(job) = first_task {
20✔
822
        job.run();
13✔
823
        finish_running_job(&inner);
13✔
824
    }
13✔
825
    loop {
826
        let job = wait_for_job(&inner, &worker_queue);
26✔
827
        match job {
26✔
828
            Some(job) => {
6✔
829
                job.run();
6✔
830
                finish_running_job(&inner);
6✔
831
            }
6✔
832
            None => return,
20✔
833
        }
834
    }
835
}
20✔
836

837
/// Waits until a worker can take a job or should exit.
838
///
839
/// # Parameters
840
///
841
/// * `inner` - Shared pool state and monitor wait queue.
842
/// * `worker_queue` - Local queue owned by the worker requesting a job.
843
///
844
/// # Returns
845
///
846
/// `Some(job)` when work is available, or `None` when the worker should exit.
847
fn wait_for_job(inner: &ThreadPoolInner, worker_queue: &WorkerQueue) -> Option<PoolJob> {
26✔
848
    let worker_index = worker_queue.worker_index();
26✔
849
    let mut state = inner.lock_state();
26✔
850
    loop {
851
        match state.lifecycle {
34✔
852
            ThreadPoolLifecycle::Running => {
853
                if let Some(job) = inner.try_take_queued_job_locked(&mut state, worker_queue) {
16✔
854
                    return Some(job);
4✔
855
                }
12✔
856
                if state.live_workers > state.maximum_pool_size && state.live_workers > 0 {
12✔
857
                    unregister_exiting_worker(inner, &mut state, worker_index);
1✔
858
                    return None;
1✔
859
                }
11✔
860
                if state.worker_wait_is_timed() {
11✔
861
                    let keep_alive = state.keep_alive;
3✔
862
                    state.idle_workers += 1;
3✔
863
                    let (next_state, status) = state.wait_timeout(keep_alive);
3✔
864
                    state = next_state;
3✔
865
                    state.idle_workers = state
3✔
866
                        .idle_workers
3✔
867
                        .checked_sub(1)
3✔
868
                        .expect("thread pool idle worker counter underflow");
3✔
869
                    if status == WaitTimeoutStatus::TimedOut
3✔
870
                        && state.queued_tasks == 0
3✔
871
                        && state.idle_worker_can_retire()
3✔
872
                    {
873
                        unregister_exiting_worker(inner, &mut state, worker_index);
3✔
874
                        return None;
3✔
UNCOV
875
                    }
×
876
                } else {
8✔
877
                    state.idle_workers += 1;
8✔
878
                    state = state.wait();
8✔
879
                    state.idle_workers = state
8✔
880
                        .idle_workers
8✔
881
                        .checked_sub(1)
8✔
882
                        .expect("thread pool idle worker counter underflow");
8✔
883
                }
8✔
884
            }
885
            ThreadPoolLifecycle::Shutdown => {
886
                if let Some(job) = inner.try_take_queued_job_locked(&mut state, worker_queue) {
17✔
887
                    return Some(job);
2✔
888
                }
15✔
889
                unregister_exiting_worker(inner, &mut state, worker_index);
15✔
890
                return None;
15✔
891
            }
892
            ThreadPoolLifecycle::Stopping => {
893
                unregister_exiting_worker(inner, &mut state, worker_index);
1✔
894
                return None;
1✔
895
            }
896
        }
897
    }
898
}
26✔
899

900
/// Marks a worker-held job as finished.
901
///
902
/// # Parameters
903
///
904
/// * `inner` - Shared pool state whose running and completed counters are
905
///   updated.
906
fn finish_running_job(inner: &ThreadPoolInner) {
19✔
907
    let mut state = inner.lock_state();
19✔
908
    state.running_tasks = state
19✔
909
        .running_tasks
19✔
910
        .checked_sub(1)
19✔
911
        .expect("thread pool running task counter underflow");
19✔
912
    state.completed_tasks += 1;
19✔
913
    inner.notify_if_terminated(&state);
19✔
914
}
19✔
915

916
/// Marks a worker as exited.
917
///
918
/// # Parameters
919
///
920
/// * `inner` - Shared pool coordination state used for termination
921
///   notification.
922
/// * `state` - Locked mutable state whose live worker count is decremented.
923
/// * `worker_index` - Stable index of the exiting worker.
924
fn unregister_exiting_worker(
20✔
925
    inner: &ThreadPoolInner,
20✔
926
    state: &mut ThreadPoolState,
20✔
927
    worker_index: usize,
20✔
928
) {
20✔
929
    // Migrate leftover local jobs back to the global queue before removing the
930
    // worker registration so queued work is not lost while this worker retires.
931
    let requeued_jobs = inner.remove_worker_queue_locked(worker_index);
20✔
932
    for job in requeued_jobs {
20✔
933
        state.queue.push_back(job);
×
UNCOV
934
    }
×
935
    state.live_workers = state
20✔
936
        .live_workers
20✔
937
        .checked_sub(1)
20✔
938
        .expect("thread pool live worker counter underflow");
20✔
939
    inner.notify_if_terminated(state);
20✔
940
}
20✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc