• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

qubit-ltd / rs-thread-pool / 4870a184-0519-4961-b7df-b299af549396

03 May 2026 05:33PM UTC coverage: 98.375% (+0.2%) from 98.225%
4870a184-0519-4961-b7df-b299af549396

push

circleci

Haixing-Hu
test(thread-pool): move steal retry coverage to regular tests

Replace coverage-only helper exports with direct integration tests for queue stealing and worker queue draining behavior. This keeps retry/drain defensive paths verified without relying on cfg(coverage) branches or style allowlist exceptions.

16 of 16 new or added lines in 2 files covered. (100.0%)

18 existing lines in 2 files now uncovered.

1513 of 1538 relevant lines covered (98.37%)

37.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.7
/src/fixed_thread_pool.rs
1
/*******************************************************************************
2
 *
3
 *    Copyright (c) 2025 - 2026 Haixing Hu.
4
 *
5
 *    SPDX-License-Identifier: Apache-2.0
6
 *
7
 *    Licensed under the Apache License, Version 2.0.
8
 *
9
 ******************************************************************************/
10
// qubit-style: allow inline-tests
11
// qubit-style: allow explicit-imports
12
use std::{
13
    future::Future,
14
    pin::Pin,
15
    sync::{
16
        Arc,
17
        atomic::{
18
            AtomicBool,
19
            AtomicUsize,
20
            Ordering,
21
        },
22
    },
23
};
24

25
use crossbeam_deque::Injector;
26
use qubit_function::Callable;
27

28
use qubit_executor::{
29
    TaskCompletionPair,
30
    TaskHandle,
31
};
32
use qubit_lock::Monitor;
33

34
use super::fixed_thread_pool_builder::FixedThreadPoolBuilder;
35
use super::queue_steal_source::{
36
    steal_batch_and_pop,
37
    steal_one,
38
};
39
use super::thread_pool::{
40
    ThreadPoolBuildError,
41
    ThreadPoolStats,
42
};
43
use super::worker_queue::WorkerQueue;
44
use super::worker_runtime::WorkerRuntime;
45
use crate::thread_pool::PoolJob;
46
use qubit_executor::service::{
47
    ExecutorService,
48
    RejectedExecution,
49
    ShutdownReport,
50
};
51

52
/// Maximum number of worker-local queues probed by one submit call.
53
const LOCAL_ENQUEUE_MAX_PROBES: usize = 4;
54
/// Maximum worker count that uses worker-local batch queues.
55
const LOCAL_QUEUE_WORKER_LIMIT: usize = 4;
56

57
/// Lifecycle state for a fixed-size thread pool.
58
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59
enum FixedThreadPoolLifecycle {
60
    /// The pool accepts new tasks and workers wait for queued work.
61
    Running,
62

63
    /// The pool rejects new tasks but drains queued work.
64
    Shutdown,
65

66
    /// The pool rejects new tasks and cancels queued work.
67
    Stopping,
68
}
69

70
impl FixedThreadPoolLifecycle {
71
    /// Returns whether this lifecycle still accepts submissions.
72
    ///
73
    /// # Returns
74
    ///
75
    /// `true` only while the pool is running.
76
    const fn is_running(self) -> bool {
52✔
77
        matches!(self, Self::Running)
52✔
78
    }
52✔
79
}
80

81
/// Mutable state protected by the fixed pool monitor.
82
struct FixedThreadPoolState {
83
    /// Current lifecycle state.
84
    lifecycle: FixedThreadPoolLifecycle,
85
    /// Number of worker loops that have not exited.
86
    live_workers: usize,
87
    /// Number of workers currently blocked waiting for work.
88
    idle_workers: usize,
89
}
90

91
impl FixedThreadPoolState {
92
    /// Creates an empty running state.
93
    ///
94
    /// # Returns
95
    ///
96
    /// A running state before any worker has been reserved.
97
    fn new() -> Self {
21✔
98
        Self {
21✔
99
            lifecycle: FixedThreadPoolLifecycle::Running,
21✔
100
            live_workers: 0,
21✔
101
            idle_workers: 0,
21✔
102
        }
21✔
103
    }
21✔
104
}
105

106
/// Shared state for a fixed-size thread pool.
107
struct FixedThreadPoolInner {
108
    /// Number of workers in this fixed pool.
109
    pool_size: usize,
110
    /// Mutable lifecycle and worker counters.
111
    state: Monitor<FixedThreadPoolState>,
112
    /// Admission gate used by submitters.
113
    accepting: AtomicBool,
114
    /// Whether immediate shutdown has requested workers to stop taking jobs.
115
    stop_now: AtomicBool,
116
    /// Submit calls that have passed the first admission check.
117
    inflight_submissions: AtomicUsize,
118
    /// Number of workers currently blocked or about to block waiting for work.
119
    idle_worker_count: AtomicUsize,
120
    /// Number of idle-worker wakeups already requested but not yet consumed.
121
    pending_worker_wakes: AtomicUsize,
122
    /// Lock-free queue for externally submitted jobs.
123
    global_queue: Injector<PoolJob>,
124
    /// Worker-local queues used for submit routing and work stealing.
125
    worker_queues: Vec<Arc<WorkerQueue>>,
126
    /// Round-robin cursor used for submit-path local queue selection.
127
    next_enqueue_worker: AtomicUsize,
128
    /// Optional maximum number of queued jobs.
129
    queue_capacity: Option<usize>,
130
    /// Number of queued jobs not yet started or cancelled.
131
    queued_task_count: AtomicUsize,
132
    /// Number of jobs currently running.
133
    running_task_count: AtomicUsize,
134
    /// Total number of accepted jobs.
135
    submitted_task_count: AtomicUsize,
136
    /// Total number of finished worker-held jobs.
137
    completed_task_count: AtomicUsize,
138
    /// Total number of queued jobs cancelled by immediate shutdown.
139
    cancelled_task_count: AtomicUsize,
140
}
141

142
impl FixedThreadPoolInner {
143
    /// Creates shared state for a fixed-size pool.
144
    ///
145
    /// # Parameters
146
    ///
147
    /// * `pool_size` - Number of workers that will be prestarted.
148
    /// * `queue_capacity` - Optional queue capacity.
149
    ///
150
    /// # Returns
151
    ///
152
    /// A shared state object ready for worker startup.
153
    fn new(
21✔
154
        pool_size: usize,
21✔
155
        queue_capacity: Option<usize>,
21✔
156
        worker_queues: Vec<Arc<WorkerQueue>>,
21✔
157
    ) -> Self {
21✔
158
        Self {
21✔
159
            pool_size,
21✔
160
            state: Monitor::new(FixedThreadPoolState::new()),
21✔
161
            accepting: AtomicBool::new(true),
21✔
162
            stop_now: AtomicBool::new(false),
21✔
163
            inflight_submissions: AtomicUsize::new(0),
21✔
164
            idle_worker_count: AtomicUsize::new(0),
21✔
165
            pending_worker_wakes: AtomicUsize::new(0),
21✔
166
            global_queue: Injector::new(),
21✔
167
            worker_queues,
21✔
168
            next_enqueue_worker: AtomicUsize::new(0),
21✔
169
            queue_capacity,
21✔
170
            queued_task_count: AtomicUsize::new(0),
21✔
171
            running_task_count: AtomicUsize::new(0),
21✔
172
            submitted_task_count: AtomicUsize::new(0),
21✔
173
            completed_task_count: AtomicUsize::new(0),
21✔
174
            cancelled_task_count: AtomicUsize::new(0),
21✔
175
        }
21✔
176
    }
21✔
177

178
    /// Returns the fixed worker count.
179
    ///
180
    /// # Returns
181
    ///
182
    /// Number of workers owned by this pool.
183
    #[inline]
184
    fn pool_size(&self) -> usize {
1✔
185
        self.pool_size
1✔
186
    }
1✔
187

188
    /// Returns the queued task count.
189
    ///
190
    /// # Returns
191
    ///
192
    /// Number of accepted tasks waiting to run.
193
    #[inline]
194
    fn queued_count(&self) -> usize {
161✔
195
        self.queued_task_count.load(Ordering::Acquire)
161✔
196
    }
161✔
197

198
    /// Returns the running task count.
199
    ///
200
    /// # Returns
201
    ///
202
    /// Number of tasks currently held by workers.
203
    #[inline]
204
    fn running_count(&self) -> usize {
26✔
205
        self.running_task_count.load(Ordering::Acquire)
26✔
206
    }
26✔
207

208
    /// Returns the number of in-flight submit calls.
209
    ///
210
    /// # Returns
211
    ///
212
    /// Number of submit calls that may still publish or roll back a queued job.
213
    #[inline]
214
    fn inflight_count(&self) -> usize {
48✔
215
        self.inflight_submissions.load(Ordering::Acquire)
48✔
216
    }
48✔
217

218
    /// Attempts to enter submit admission.
219
    ///
220
    /// # Returns
221
    ///
222
    /// A guard that leaves admission on drop.
223
    ///
224
    /// # Errors
225
    ///
226
    /// Returns [`RejectedExecution::Shutdown`] when admission is closed.
227
    fn begin_submit(&self) -> Result<FixedSubmitGuard<'_>, RejectedExecution> {
305✔
228
        if !self.accepting.load(Ordering::Acquire) {
305✔
229
            return Err(RejectedExecution::Shutdown);
2✔
230
        }
303✔
231
        self.inflight_submissions.fetch_add(1, Ordering::AcqRel);
303✔
232
        if self.accepting.load(Ordering::Acquire) {
303✔
233
            Ok(FixedSubmitGuard { inner: self })
303✔
234
        } else {
UNCOV
235
            let previous = self.inflight_submissions.fetch_sub(1, Ordering::AcqRel);
×
UNCOV
236
            debug_assert!(previous > 0, "fixed pool submit counter underflow");
×
237
            if previous == 1 {
×
238
                self.state.notify_all();
×
239
            }
×
240
            Err(RejectedExecution::Shutdown)
×
241
        }
242
    }
305✔
243

244
    /// Attempts to reserve one queue slot.
245
    ///
246
    /// # Returns
247
    ///
248
    /// `true` if one queued slot was reserved, otherwise `false`.
249
    fn reserve_queue_slot(&self) -> bool {
303✔
250
        if let Some(capacity) = self.queue_capacity {
303✔
251
            loop {
252
                let current = self.queued_count();
3✔
253
                if current >= capacity {
3✔
254
                    return false;
1✔
255
                }
2✔
256
                if self
2✔
257
                    .queued_task_count
2✔
258
                    .compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire)
2✔
259
                    .is_ok()
2✔
260
                {
261
                    return true;
2✔
UNCOV
262
                }
×
263
            }
264
        }
300✔
265
        self.queued_task_count.fetch_add(1, Ordering::AcqRel);
300✔
266
        true
300✔
267
    }
303✔
268

269
    /// Submits one job to this fixed pool.
270
    ///
271
    /// # Parameters
272
    ///
273
    /// * `job` - Type-erased job accepted by the pool.
274
    ///
275
    /// # Returns
276
    ///
277
    /// `Ok(())` when the job is accepted.
278
    ///
279
    /// # Errors
280
    ///
281
    /// Returns [`RejectedExecution::Shutdown`] after shutdown or
282
    /// [`RejectedExecution::Saturated`] when the bounded queue is full.
283
    fn submit(&self, job: PoolJob) -> Result<(), RejectedExecution> {
305✔
284
        let _guard = self.begin_submit()?;
305✔
285
        if !self.reserve_queue_slot() {
303✔
286
            return Err(RejectedExecution::Saturated);
1✔
287
        }
302✔
288
        if !self.accepting.load(Ordering::Acquire) {
302✔
UNCOV
289
            let previous = self.queued_task_count.fetch_sub(1, Ordering::AcqRel);
×
UNCOV
290
            debug_assert!(previous > 0, "fixed pool queued counter underflow");
×
291
            return Err(RejectedExecution::Shutdown);
×
292
        }
302✔
293
        self.submitted_task_count.fetch_add(1, Ordering::Relaxed);
302✔
294
        self.enqueue_job(job);
302✔
295
        Ok(())
302✔
296
    }
305✔
297

298
    /// Enqueues one accepted job to a worker inbox or the global fallback.
299
    ///
300
    /// # Parameters
301
    ///
302
    /// * `job` - Job whose queued slot has already been reserved.
303
    fn enqueue_job(&self, job: PoolJob) {
302✔
304
        if self.use_worker_local_queues() {
302✔
305
            match self.try_enqueue_to_worker(job) {
286✔
306
                Ok(()) => {}
259✔
307
                Err(job) => self.global_queue.push(job),
27✔
308
            }
309
        } else {
16✔
310
            self.global_queue.push(job);
16✔
311
        }
16✔
312
        self.wake_one_idle_worker();
302✔
313
    }
302✔
314

315
    /// Wakes one idle worker if no already-requested wakeup covers it.
316
    ///
317
    /// Pending wake tokens close the lost-notification window: a worker that
318
    /// has marked itself idle but has not yet parked will observe the token and
319
    /// retry work without relying on the condition-variable notification.
320
    fn wake_one_idle_worker(&self) {
302✔
321
        loop {
322
            let idle_workers = self.idle_worker_count.load(Ordering::Acquire);
302✔
323
            if idle_workers == 0 {
302✔
324
                return;
285✔
325
            }
17✔
326
            let pending_wakes = self.pending_worker_wakes.load(Ordering::Acquire);
17✔
327
            if pending_wakes >= idle_workers {
17✔
328
                return;
6✔
329
            }
11✔
330
            if self
11✔
331
                .pending_worker_wakes
11✔
332
                .compare_exchange_weak(
11✔
333
                    pending_wakes,
11✔
334
                    pending_wakes + 1,
11✔
335
                    Ordering::AcqRel,
11✔
336
                    Ordering::Acquire,
11✔
337
                )
11✔
338
                .is_ok()
11✔
339
            {
340
                self.state.notify_one();
11✔
341
                return;
11✔
UNCOV
342
            }
×
343
        }
344
    }
302✔
345

346
    /// Returns whether an idle-worker wakeup has been requested.
347
    ///
348
    /// # Returns
349
    ///
350
    /// `true` when at least one idle worker should leave the wait path and
351
    /// retry taking work.
352
    fn has_pending_worker_wake(&self) -> bool {
22✔
353
        self.pending_worker_wakes.load(Ordering::Acquire) > 0
22✔
354
    }
22✔
355

356
    /// Consumes one requested idle-worker wakeup if one exists.
357
    fn consume_pending_worker_wake(&self) {
22✔
358
        let mut current = self.pending_worker_wakes.load(Ordering::Acquire);
22✔
359
        while current > 0 {
22✔
360
            match self.pending_worker_wakes.compare_exchange_weak(
12✔
361
                current,
12✔
362
                current - 1,
12✔
363
                Ordering::AcqRel,
12✔
364
                Ordering::Acquire,
12✔
365
            ) {
12✔
366
                Ok(_) => return,
12✔
UNCOV
367
                Err(actual) => current = actual,
×
368
            }
369
        }
370
    }
22✔
371

372
    /// Attempts to route one job directly to an active worker queue.
373
    ///
374
    /// # Parameters
375
    ///
376
    /// * `job` - Job to route.
377
    ///
378
    /// # Returns
379
    ///
380
    /// `Ok(())` when the job was published to a worker inbox; otherwise the
381
    /// original job is returned for global fallback.
382
    fn try_enqueue_to_worker(&self, job: PoolJob) -> Result<(), PoolJob> {
286✔
383
        let queue_count = self.worker_queues.len();
286✔
384
        debug_assert!(queue_count > 0, "fixed pool must have worker queues");
286✔
385
        let probe_count = queue_count.min(LOCAL_ENQUEUE_MAX_PROBES);
286✔
386
        for _ in 0..probe_count {
286✔
387
            let index = self.next_enqueue_worker.fetch_add(1, Ordering::Relaxed) % queue_count;
303✔
388
            let queue = &self.worker_queues[index];
303✔
389
            if queue.is_active() {
303✔
390
                queue.push_back(job);
259✔
391
                return Ok(());
259✔
392
            }
44✔
393
        }
394
        Err(job)
27✔
395
    }
286✔
396

397
    /// Attempts to claim one queued job for a worker.
398
    ///
399
    /// The worker first checks its local queue, then its cross-thread inbox,
400
    /// then the global fallback queue, and finally steals from other workers.
401
    /// This matches the dynamic pool's hot path and avoids forcing all fixed
402
    /// workers through one global injector under skewed workloads.
403
    ///
404
    /// # Parameters
405
    ///
406
    /// * `worker_runtime` - Queue runtime owned by the current worker.
407
    ///
408
    /// # Returns
409
    ///
410
    /// `Some(job)` when a job was claimed, otherwise `None`.
411
    fn try_take_job(&self, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
84✔
412
        if self.stop_now.load(Ordering::Acquire) {
84✔
413
            self.cancel_worker_jobs(worker_runtime);
7✔
414
            return None;
7✔
415
        }
77✔
416
        if !self.use_worker_local_queues() {
77✔
417
            return self.steal_single_global_job(worker_runtime);
30✔
418
        }
47✔
419
        if let Some(job) = worker_runtime.local.pop() {
47✔
420
            return self.accept_claimed_job(job, worker_runtime);
10✔
421
        }
37✔
422
        if let Some(job) = worker_runtime.queue.pop_inbox_into(&worker_runtime.local) {
37✔
423
            return self.accept_claimed_job(job, worker_runtime);
12✔
424
        }
25✔
425
        if let Some(job) = self.steal_global_job(worker_runtime) {
25✔
426
            return Some(job);
8✔
427
        }
17✔
428
        self.steal_worker_job(worker_runtime)
17✔
429
    }
84✔
430

431
    /// Attempts to batch-steal one job from the global injector.
432
    ///
433
    /// # Parameters
434
    ///
435
    /// * `worker_runtime` - Queue runtime receiving any stolen batch remainder.
436
    ///
437
    /// # Returns
438
    ///
439
    /// `Some(job)` when a job was claimed, otherwise `None`.
440
    fn steal_global_job(&self, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
26✔
441
        if let Some(job) = steal_batch_and_pop(&self.global_queue, &worker_runtime.local) {
26✔
442
            if !worker_runtime.local.is_empty() {
9✔
443
                self.state.notify_one();
1✔
444
            }
8✔
445
            return self.accept_claimed_job(job, worker_runtime);
9✔
446
        }
17✔
447
        self.steal_single_global_job(worker_runtime)
17✔
448
    }
26✔
449

450
    /// Attempts to steal exactly one job from the global injector.
451
    ///
452
    /// # Parameters
453
    ///
454
    /// * `worker_runtime` - Queue runtime owned by the current worker.
455
    ///
456
    /// # Returns
457
    ///
458
    /// `Some(job)` when a job was claimed, otherwise `None`.
459
    fn steal_single_global_job(&self, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
47✔
460
        steal_one(&self.global_queue).and_then(|job| self.accept_claimed_job(job, worker_runtime))
47✔
461
    }
47✔
462

463
    /// Attempts to steal one job from another worker's local queue.
464
    ///
465
    /// # Parameters
466
    ///
467
    /// * `worker_runtime` - Queue runtime owned by the current worker.
468
    ///
469
    /// # Returns
470
    ///
471
    /// `Some(job)` when a job was claimed, otherwise `None`.
472
    fn steal_worker_job(&self, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
18✔
473
        let queue_count = self.worker_queues.len();
18✔
474
        if queue_count <= 1 {
18✔
475
            return None;
6✔
476
        }
12✔
477
        let worker_index = worker_runtime.worker_index();
12✔
478
        let start = worker_runtime.next_steal_start(queue_count);
12✔
479
        for offset in 0..queue_count {
23✔
480
            let victim = &self.worker_queues[(start + offset) % queue_count];
23✔
481
            if victim.worker_index() == worker_index {
23✔
482
                continue;
11✔
483
            }
12✔
484
            if !victim.is_active() {
12✔
485
                continue;
6✔
486
            }
6✔
487
            if let Some(job) = victim.steal_into(&worker_runtime.local) {
6✔
488
                if !worker_runtime.local.is_empty() {
1✔
489
                    self.state.notify_one();
1✔
490
                }
1✔
491
                return self.accept_claimed_job(job, worker_runtime);
1✔
492
            }
5✔
493
        }
494
        None
11✔
495
    }
18✔
496

497
    /// Returns whether this pool should use worker-local queues.
498
    ///
499
    /// # Returns
500
    ///
501
    /// `true` for small fixed pools where local batching reduces global queue
502
    /// contention; `false` for larger pools where inbox routing and victim
503
    /// scans cost more than they save.
504
    fn use_worker_local_queues(&self) -> bool {
379✔
505
        self.pool_size <= LOCAL_QUEUE_WORKER_LIMIT
379✔
506
    }
379✔
507

508
    /// Accepts a claimed queued job or cancels it after immediate shutdown.
509
    ///
510
    /// # Parameters
511
    ///
512
    /// * `job` - Job claimed from a queue.
513
    /// * `worker_runtime` - Queue runtime drained if stopping.
514
    ///
515
    /// # Returns
516
    ///
517
    /// `Some(job)` when the job may run, otherwise `None`.
518
    fn accept_claimed_job(&self, job: PoolJob, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
48✔
519
        if self.stop_now.load(Ordering::Acquire) {
48✔
520
            self.cancel_claimed_job(job);
1✔
521
            self.cancel_worker_jobs(worker_runtime);
1✔
522
            return None;
1✔
523
        }
47✔
524
        self.mark_queued_job_running();
47✔
525
        Some(job)
47✔
526
    }
48✔
527

528
    /// Cancels all jobs remaining in one worker runtime.
529
    ///
530
    /// # Parameters
531
    ///
532
    /// * `worker_runtime` - Worker-owned runtime to drain.
533
    fn cancel_worker_jobs(&self, worker_runtime: &WorkerRuntime) {
8✔
534
        while let Some(job) = worker_runtime.local.pop() {
9✔
535
            self.cancel_claimed_job(job);
1✔
536
        }
1✔
537
        for job in worker_runtime.queue.drain() {
8✔
538
            self.cancel_claimed_job(job);
1✔
539
        }
1✔
540
    }
8✔
541

542
    /// Marks one claimed queued job as running.
543
    fn mark_queued_job_running(&self) {
47✔
544
        let previous = self.queued_task_count.fetch_sub(1, Ordering::AcqRel);
47✔
545
        debug_assert!(previous > 0, "fixed pool queued counter underflow");
47✔
546
        self.running_task_count.fetch_add(1, Ordering::AcqRel);
47✔
547
    }
47✔
548

549
    /// Cancels one job claimed after immediate shutdown started.
550
    ///
551
    /// # Parameters
552
    ///
553
    /// * `job` - Queued job that must not be run.
554
    fn cancel_claimed_job(&self, job: PoolJob) {
262✔
555
        let previous = self.queued_task_count.fetch_sub(1, Ordering::AcqRel);
262✔
556
        debug_assert!(previous > 0, "fixed pool queued counter underflow");
262✔
557
        self.cancelled_task_count.fetch_add(1, Ordering::Relaxed);
262✔
558
        job.cancel();
262✔
559
        self.state.notify_all();
262✔
560
    }
262✔
561

562
    /// Marks one running job as finished.
563
    fn finish_running_job(&self) {
47✔
564
        let previous = self.running_task_count.fetch_sub(1, Ordering::AcqRel);
47✔
565
        debug_assert!(previous > 0, "fixed pool running counter underflow");
47✔
566
        self.completed_task_count.fetch_add(1, Ordering::Relaxed);
47✔
567
        if previous == 1 && self.queued_count() == 0 {
47✔
568
            self.state.notify_all();
14✔
569
        }
33✔
570
    }
47✔
571

572
    /// Reserves one worker slot before spawning a worker thread.
573
    pub(crate) fn reserve_worker_slot(&self) {
27✔
574
        self.state.write(|state| {
27✔
575
            state.live_workers += 1;
27✔
576
        });
27✔
577
    }
27✔
578

579
    /// Rolls back one worker slot after spawn failure.
580
    pub(crate) fn rollback_worker_slot(&self) {
1✔
581
        self.state.write(|state| {
1✔
582
            state.live_workers = state
1✔
583
                .live_workers
1✔
584
                .checked_sub(1)
1✔
585
                .expect("fixed pool live worker counter underflow");
1✔
586
        });
1✔
587
    }
1✔
588

589
    /// Stops the pool after a build-time worker spawn failure.
590
    pub(crate) fn stop_after_failed_build(&self) {
1✔
591
        self.accepting.store(false, Ordering::Release);
1✔
592
        self.stop_now.store(true, Ordering::Release);
1✔
593
        self.state.write(|state| {
1✔
594
            state.lifecycle = FixedThreadPoolLifecycle::Stopping;
1✔
595
        });
1✔
596
        self.state.notify_all();
1✔
597
    }
1✔
598

599
    /// Blocks until the pool is fully terminated.
600
    fn wait_for_termination(&self) {
14✔
601
        self.state
14✔
602
            .wait_until(|state| self.is_terminated_locked(state), |_| ());
20✔
603
    }
14✔
604

605
    /// Requests graceful shutdown.
606
    fn shutdown(&self) {
25✔
607
        self.accepting.store(false, Ordering::Release);
25✔
608
        self.state.write(|state| {
25✔
609
            if state.lifecycle.is_running() {
25✔
610
                state.lifecycle = FixedThreadPoolLifecycle::Shutdown;
11✔
611
            }
14✔
612
        });
25✔
613
        self.state.notify_all();
25✔
614
    }
25✔
615

616
    /// Requests immediate shutdown and cancels visible queued jobs.
617
    ///
618
    /// # Returns
619
    ///
620
    /// Count-based shutdown report.
621
    fn shutdown_now(&self) -> ShutdownReport {
4✔
622
        self.accepting.store(false, Ordering::Release);
4✔
623
        self.stop_now.store(true, Ordering::Release);
4✔
624
        let running = self.running_count();
4✔
625
        let mut state = self.state.lock();
4✔
626
        state.lifecycle = FixedThreadPoolLifecycle::Stopping;
4✔
627
        while self.inflight_count() > 0 {
5✔
628
            state = state.wait();
1✔
629
        }
1✔
630
        drop(state);
4✔
631
        let jobs = self.drain_visible_queued_jobs();
4✔
632
        let cancelled = jobs.len();
4✔
633
        for job in jobs {
257✔
634
            self.cancel_claimed_job(job);
257✔
635
        }
257✔
636
        self.state.notify_all();
4✔
637
        ShutdownReport::new(cancelled, running, cancelled)
4✔
638
    }
4✔
639

640
    /// Drains all jobs currently visible in global and worker-local queues.
641
    ///
642
    /// # Returns
643
    ///
644
    /// Drained queued jobs.
645
    fn drain_visible_queued_jobs(&self) -> Vec<PoolJob> {
4✔
646
        let mut jobs = Vec::new();
4✔
647
        loop {
648
            let previous_count = jobs.len();
7✔
649
            self.drain_global_queue(&mut jobs);
7✔
650
            self.drain_worker_queues(&mut jobs);
7✔
651
            if jobs.len() == previous_count {
7✔
652
                return jobs;
4✔
653
            }
3✔
654
        }
655
    }
4✔
656

657
    /// Drains visible jobs from the global injector.
658
    ///
659
    /// # Parameters
660
    ///
661
    /// * `jobs` - Destination for drained jobs.
662
    fn drain_global_queue(&self, jobs: &mut Vec<PoolJob>) {
7✔
663
        while let Some(job) = steal_one(&self.global_queue) {
27✔
664
            jobs.push(job);
20✔
665
        }
20✔
666
    }
7✔
667

668
    /// Drains visible jobs from all worker-local queues.
669
    ///
670
    /// # Parameters
671
    ///
672
    /// * `jobs` - Destination for drained jobs.
673
    fn drain_worker_queues(&self, jobs: &mut Vec<PoolJob>) {
7✔
674
        for queue in &self.worker_queues {
15✔
675
            jobs.extend(queue.drain());
15✔
676
        }
15✔
677
    }
7✔
678

679
    /// Returns whether shutdown has started.
680
    ///
681
    /// # Returns
682
    ///
683
    /// `true` when lifecycle is not running.
684
    fn is_shutdown(&self) -> bool {
1✔
685
        self.state.read(|state| !state.lifecycle.is_running())
1✔
686
    }
1✔
687

688
    /// Returns whether the pool is terminated.
689
    ///
690
    /// # Returns
691
    ///
692
    /// `true` after shutdown and after all workers and jobs are gone.
693
    fn is_terminated(&self) -> bool {
4✔
694
        self.state.read(|state| self.is_terminated_locked(state))
4✔
695
    }
4✔
696

697
    /// Checks termination against one locked state snapshot.
698
    ///
699
    /// # Parameters
700
    ///
701
    /// * `state` - Locked state snapshot.
702
    ///
703
    /// # Returns
704
    ///
705
    /// `true` when the pool is terminal.
706
    fn is_terminated_locked(&self, state: &FixedThreadPoolState) -> bool {
25✔
707
        !state.lifecycle.is_running()
25✔
708
            && state.live_workers == 0
24✔
709
            && self.queued_count() == 0
18✔
710
            && self.running_count() == 0
18✔
711
            && self.inflight_count() == 0
18✔
712
    }
25✔
713

714
    /// Returns a point-in-time stats snapshot.
715
    ///
716
    /// # Returns
717
    ///
718
    /// Snapshot using fixed pool size for both core and maximum sizes.
719
    fn stats(&self) -> ThreadPoolStats {
1✔
720
        let queued_tasks = self.queued_count();
1✔
721
        let running_tasks = self.running_count();
1✔
722
        let submitted_tasks = self.submitted_task_count.load(Ordering::Relaxed);
1✔
723
        let completed_tasks = self.completed_task_count.load(Ordering::Relaxed);
1✔
724
        let cancelled_tasks = self.cancelled_task_count.load(Ordering::Relaxed);
1✔
725
        self.state.read(|state| ThreadPoolStats {
1✔
726
            core_pool_size: self.pool_size,
1✔
727
            maximum_pool_size: self.pool_size,
1✔
728
            live_workers: state.live_workers,
1✔
729
            idle_workers: state.idle_workers,
1✔
730
            queued_tasks,
1✔
731
            running_tasks,
1✔
732
            submitted_tasks,
1✔
733
            completed_tasks,
1✔
734
            cancelled_tasks,
1✔
735
            shutdown: !state.lifecycle.is_running(),
1✔
736
            terminated: self.is_terminated_locked(state),
1✔
737
        })
1✔
738
    }
1✔
739
}
740

741
/// Submit guard that leaves in-flight accounting on drop.
742
struct FixedSubmitGuard<'a> {
743
    /// Pool whose in-flight counter was entered.
744
    inner: &'a FixedThreadPoolInner,
745
}
746

747
impl Drop for FixedSubmitGuard<'_> {
748
    /// Leaves submit accounting and wakes shutdown waiters if needed.
749
    fn drop(&mut self) {
304✔
750
        let previous = self
304✔
751
            .inner
304✔
752
            .inflight_submissions
304✔
753
            .fetch_sub(1, Ordering::AcqRel);
304✔
754
        debug_assert!(previous > 0, "fixed pool submit counter underflow");
304✔
755
        if previous == 1 && !self.inner.accepting.load(Ordering::Acquire) {
304✔
756
            self.inner.state.notify_all();
1✔
757
        }
303✔
758
    }
304✔
759
}
760

761
/// Fixed-size thread pool implementing [`ExecutorService`].
762
///
763
/// `FixedThreadPool` prestarts a fixed number of worker threads and does not
764
/// support runtime pool-size changes. Use [`super::ThreadPool`] when dynamic
765
/// core/maximum sizes or keep-alive policies are required.
766
pub struct FixedThreadPool {
767
    /// Shared fixed pool state.
768
    inner: Arc<FixedThreadPoolInner>,
769
}
770

771
impl FixedThreadPool {
772
    /// Builds a fixed pool from validated builder options.
773
    ///
774
    /// # Parameters
775
    ///
776
    /// * `pool_size` - Number of workers to prestart.
777
    /// * `queue_capacity` - Optional maximum queued task count.
778
    /// * `thread_name_prefix` - Prefix used for worker thread names.
779
    /// * `stack_size` - Optional worker stack size.
780
    ///
781
    /// # Returns
782
    ///
783
    /// A fixed thread-pool handle with workers already started.
784
    ///
785
    /// # Errors
786
    ///
787
    /// Returns [`ThreadPoolBuildError`] when a worker thread cannot be spawned.
788
    pub(crate) fn build_with_options(
15✔
789
        pool_size: usize,
15✔
790
        queue_capacity: Option<usize>,
15✔
791
        thread_name_prefix: String,
15✔
792
        stack_size: Option<usize>,
15✔
793
    ) -> Result<Self, ThreadPoolBuildError> {
15✔
794
        let mut worker_runtimes = Vec::with_capacity(pool_size);
15✔
795
        let mut worker_queues = Vec::with_capacity(pool_size);
15✔
796
        for index in 0..pool_size {
27✔
797
            let worker_runtime = WorkerRuntime::new(index);
27✔
798
            worker_queues.push(Arc::clone(&worker_runtime.queue));
27✔
799
            worker_runtimes.push(worker_runtime);
27✔
800
        }
27✔
801
        let inner = Arc::new(FixedThreadPoolInner::new(
15✔
802
            pool_size,
15✔
803
            queue_capacity,
15✔
804
            worker_queues,
15✔
805
        ));
806
        for (index, worker_runtime) in worker_runtimes.into_iter().enumerate() {
27✔
807
            inner.reserve_worker_slot();
27✔
808
            let worker_inner = Arc::clone(&inner);
27✔
809
            let mut builder =
27✔
810
                std::thread::Builder::new().name(format!("{}-{}", thread_name_prefix, index));
27✔
811
            if let Some(stack_size) = stack_size {
27✔
812
                builder = builder.stack_size(stack_size);
2✔
813
            }
25✔
814
            if let Err(source) =
1✔
815
                builder.spawn(move || run_fixed_worker(worker_inner, worker_runtime))
27✔
816
            {
817
                inner.rollback_worker_slot();
1✔
818
                inner.stop_after_failed_build();
1✔
819
                return Err(ThreadPoolBuildError::SpawnWorker { index, source });
1✔
820
            }
26✔
821
        }
822
        Ok(Self { inner })
14✔
823
    }
15✔
824

825
    /// Creates a fixed thread pool with `pool_size` prestarted workers.
826
    ///
827
    /// # Parameters
828
    ///
829
    /// * `pool_size` - Number of worker threads.
830
    ///
831
    /// # Returns
832
    ///
833
    /// A fixed thread pool.
834
    ///
835
    /// # Errors
836
    ///
837
    /// Returns [`ThreadPoolBuildError`] if the worker count is zero or a worker
838
    /// cannot be spawned.
839
    pub fn new(pool_size: usize) -> Result<Self, ThreadPoolBuildError> {
12✔
840
        Self::builder().pool_size(pool_size).build()
12✔
841
    }
12✔
842

843
    /// Creates a fixed pool builder.
844
    ///
845
    /// # Returns
846
    ///
847
    /// Builder with CPU parallelism defaults.
848
    pub fn builder() -> FixedThreadPoolBuilder {
18✔
849
        FixedThreadPoolBuilder::new()
18✔
850
    }
18✔
851

852
    /// Returns the fixed worker count.
853
    ///
854
    /// # Returns
855
    ///
856
    /// Number of workers in this pool.
857
    pub fn pool_size(&self) -> usize {
1✔
858
        self.inner.pool_size()
1✔
859
    }
1✔
860

861
    /// Returns the queued task count.
862
    ///
863
    /// # Returns
864
    ///
865
    /// Number of accepted tasks waiting to run.
866
    pub fn queued_count(&self) -> usize {
3✔
867
        self.inner.queued_count()
3✔
868
    }
3✔
869

870
    /// Returns the running task count.
871
    ///
872
    /// # Returns
873
    ///
874
    /// Number of tasks currently held by workers.
875
    pub fn running_count(&self) -> usize {
1✔
876
        self.inner.running_count()
1✔
877
    }
1✔
878

879
    /// Returns the live worker count.
880
    ///
881
    /// # Returns
882
    ///
883
    /// Number of worker loops that have not exited.
884
    pub fn live_worker_count(&self) -> usize {
1✔
885
        self.inner.state.read(|state| state.live_workers)
1✔
886
    }
1✔
887

888
    /// Returns a point-in-time stats snapshot.
889
    ///
890
    /// # Returns
891
    ///
892
    /// Snapshot containing queue, worker, and lifecycle counters.
893
    pub fn stats(&self) -> ThreadPoolStats {
1✔
894
        self.inner.stats()
1✔
895
    }
1✔
896
}
897

898
impl Drop for FixedThreadPool {
899
    /// Requests graceful shutdown when the pool handle is dropped.
900
    fn drop(&mut self) {
14✔
901
        self.inner.shutdown();
14✔
902
    }
14✔
903
}
904

905
impl ExecutorService for FixedThreadPool {
906
    type Handle<R, E>
907
        = TaskHandle<R, E>
908
    where
909
        R: Send + 'static,
910
        E: Send + 'static;
911

912
    type Termination<'a>
913
        = Pin<Box<dyn Future<Output = ()> + Send + 'a>>
914
    where
915
        Self: 'a;
916

917
    /// Accepts a callable and queues it for fixed pool workers.
918
    ///
919
    /// # Parameters
920
    ///
921
    /// * `task` - Callable to execute on a fixed pool worker.
922
    ///
923
    /// # Returns
924
    ///
925
    /// A [`TaskHandle`] for the accepted task.
926
    ///
927
    /// # Errors
928
    ///
929
    /// Returns [`RejectedExecution::Shutdown`] after shutdown or
930
    /// [`RejectedExecution::Saturated`] when a bounded queue is full.
931
    fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::Handle<R, E>, RejectedExecution>
305✔
932
    where
305✔
933
        C: Callable<R, E> + Send + 'static,
305✔
934
        R: Send + 'static,
305✔
935
        E: Send + 'static,
305✔
936
    {
937
        let (handle, completion) = TaskCompletionPair::new().into_parts();
305✔
938
        let job = PoolJob::from_task(task, completion);
305✔
939
        self.inner.submit(job)?;
305✔
940
        Ok(handle)
302✔
941
    }
305✔
942

943
    /// Stops accepting new work and drains accepted queued tasks.
944
    fn shutdown(&self) {
11✔
945
        self.inner.shutdown();
11✔
946
    }
11✔
947

948
    /// Stops accepting work and cancels queued tasks.
949
    ///
950
    /// # Returns
951
    ///
952
    /// A count-based shutdown report.
953
    fn shutdown_now(&self) -> ShutdownReport {
3✔
954
        self.inner.shutdown_now()
3✔
955
    }
3✔
956

957
    /// Returns whether shutdown has been requested.
958
    ///
959
    /// # Returns
960
    ///
961
    /// `true` when this pool no longer accepts new work.
962
    fn is_shutdown(&self) -> bool {
1✔
963
        self.inner.is_shutdown()
1✔
964
    }
1✔
965

966
    /// Returns whether this pool is fully terminated.
967
    ///
968
    /// # Returns
969
    ///
970
    /// `true` after shutdown and after all workers have exited.
971
    fn is_terminated(&self) -> bool {
4✔
972
        self.inner.is_terminated()
4✔
973
    }
4✔
974

975
    /// Waits until this fixed pool has terminated.
976
    ///
977
    /// # Returns
978
    ///
979
    /// A future that blocks the polling thread until termination.
980
    fn await_termination(&self) -> Self::Termination<'_> {
14✔
981
        Box::pin(async move {
14✔
982
            self.inner.wait_for_termination();
14✔
983
        })
14✔
984
    }
14✔
985
}
986

987
/// Runs one fixed-pool worker loop.
988
///
989
/// # Parameters
990
///
991
/// * `inner` - Shared fixed-pool state.
992
/// * `worker_runtime` - Queue runtime owned by this worker.
993
fn run_fixed_worker(inner: Arc<FixedThreadPoolInner>, worker_runtime: WorkerRuntime) {
26✔
994
    worker_runtime.queue.activate();
26✔
995
    loop {
996
        if let Some(job) = inner.try_take_job(&worker_runtime) {
84✔
997
            job.run();
45✔
998
            inner.finish_running_job();
45✔
999
            continue;
45✔
1000
        }
39✔
1001
        if !wait_for_fixed_pool_work(&inner) {
39✔
1002
            break;
26✔
1003
        }
13✔
1004
    }
1005
    worker_exited(&inner, &worker_runtime.queue);
26✔
1006
}
26✔
1007

1008
/// Waits until visible work exists or the worker should exit.
1009
///
1010
/// # Parameters
1011
///
1012
/// * `inner` - Shared fixed-pool state.
1013
///
1014
/// # Returns
1015
///
1016
/// `true` when the worker should try to take work again, or `false` when it
1017
/// should exit.
1018
fn wait_for_fixed_pool_work(inner: &FixedThreadPoolInner) -> bool {
40✔
1019
    let mut state = inner.state.lock();
40✔
1020
    loop {
1021
        match state.lifecycle {
62✔
1022
            FixedThreadPoolLifecycle::Running => {
1023
                if inner.queued_count() > 0 {
33✔
1024
                    return true;
13✔
1025
                }
20✔
1026
                mark_fixed_worker_idle(inner, &mut state);
20✔
1027
                if inner.queued_count() > 0 || inner.has_pending_worker_wake() {
20✔
UNCOV
1028
                    unmark_fixed_worker_idle(inner, &mut state);
×
UNCOV
1029
                    return true;
×
1030
                }
20✔
1031
                state = state.wait();
20✔
1032
                unmark_fixed_worker_idle(inner, &mut state);
20✔
1033
            }
1034
            FixedThreadPoolLifecycle::Shutdown => {
1035
                if inner.queued_count() > 0 {
22✔
UNCOV
1036
                    return true;
×
1037
                }
22✔
1038
                if inner.queued_count() == 0 && inner.inflight_count() == 0 {
22✔
1039
                    return false;
20✔
1040
                }
2✔
1041
                mark_fixed_worker_idle(inner, &mut state);
2✔
1042
                if inner.queued_count() > 0
2✔
1043
                    || inner.inflight_count() == 0
2✔
1044
                    || inner.has_pending_worker_wake()
2✔
1045
                {
1046
                    unmark_fixed_worker_idle(inner, &mut state);
1✔
1047
                    continue;
1✔
1048
                }
1✔
1049
                state = state.wait();
1✔
1050
                unmark_fixed_worker_idle(inner, &mut state);
1✔
1051
            }
1052
            FixedThreadPoolLifecycle::Stopping => return false,
7✔
1053
        }
1054
    }
1055
}
40✔
1056

1057
/// Marks a fixed-pool worker as idle in locked and lock-free state.
1058
///
1059
/// # Parameters
1060
///
1061
/// * `inner` - Fixed pool whose idle counter is updated.
1062
/// * `state` - Locked mutable state containing authoritative idle workers.
1063
fn mark_fixed_worker_idle(inner: &FixedThreadPoolInner, state: &mut FixedThreadPoolState) {
22✔
1064
    state.idle_workers += 1;
22✔
1065
    inner.idle_worker_count.fetch_add(1, Ordering::AcqRel);
22✔
1066
}
22✔
1067

1068
/// Marks a fixed-pool worker as no longer idle.
1069
///
1070
/// # Parameters
1071
///
1072
/// * `inner` - Fixed pool whose idle counter is updated.
1073
/// * `state` - Locked mutable state containing authoritative idle workers.
1074
fn unmark_fixed_worker_idle(inner: &FixedThreadPoolInner, state: &mut FixedThreadPoolState) {
22✔
1075
    state.idle_workers = state
22✔
1076
        .idle_workers
22✔
1077
        .checked_sub(1)
22✔
1078
        .expect("fixed pool idle worker counter underflow");
22✔
1079
    let previous = inner.idle_worker_count.fetch_sub(1, Ordering::AcqRel);
22✔
1080
    debug_assert!(previous > 0, "fixed pool idle worker counter underflow");
22✔
1081
    inner.consume_pending_worker_wake();
22✔
1082
}
22✔
1083

1084
/// Marks one fixed-pool worker as exited.
1085
///
1086
/// # Parameters
1087
///
1088
/// * `inner` - Shared fixed-pool state.
1089
/// * `worker_queue` - Queue owned by the exiting worker.
1090
fn worker_exited(inner: &FixedThreadPoolInner, worker_queue: &WorkerQueue) {
26✔
1091
    worker_queue.deactivate();
26✔
1092
    inner.state.write(|state| {
26✔
1093
        state.live_workers = state
26✔
1094
            .live_workers
26✔
1095
            .checked_sub(1)
26✔
1096
            .expect("fixed pool live worker counter underflow");
26✔
1097
    });
26✔
1098
    inner.state.notify_all();
26✔
1099
}
26✔
1100

1101
#[cfg(test)]
1102
mod tests {
1103
    use super::*;
1104
    use std::sync::{
1105
        Arc,
1106
        atomic::{
1107
            AtomicUsize,
1108
            Ordering,
1109
        },
1110
    };
1111
    use std::thread;
1112
    use std::time::Duration;
1113

1114
    fn counted_job(cancelled: Arc<AtomicUsize>, ran: Arc<AtomicUsize>) -> PoolJob {
7✔
1115
        PoolJob::new(
7✔
1116
            Box::new(move || {
7✔
1117
                ran.fetch_add(1, Ordering::AcqRel);
2✔
1118
            }),
2✔
1119
            Box::new(move || {
7✔
1120
                cancelled.fetch_add(1, Ordering::AcqRel);
5✔
1121
            }),
5✔
1122
        )
1123
    }
7✔
1124

1125
    #[test]
1126
    fn test_accept_claimed_job_stop_now_cancels_claimed_and_worker_queues() {
1✔
1127
        let runtime = WorkerRuntime::new(0);
1✔
1128
        runtime.queue.activate();
1✔
1129
        let inner = FixedThreadPoolInner::new(1, None, vec![Arc::clone(&runtime.queue)]);
1✔
1130
        inner.stop_now.store(true, Ordering::Release);
1✔
1131

1132
        let cancelled = Arc::new(AtomicUsize::new(0));
1✔
1133
        let ran = Arc::new(AtomicUsize::new(0));
1✔
1134
        runtime
1✔
1135
            .local
1✔
1136
            .push(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1✔
1137
        runtime
1✔
1138
            .queue
1✔
1139
            .push_back(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1✔
1140
        inner.queued_task_count.store(3, Ordering::Release);
1✔
1141

1142
        let accepted =
1✔
1143
            inner.accept_claimed_job(counted_job(cancelled.clone(), ran.clone()), &runtime);
1✔
1144
        assert!(accepted.is_none());
1✔
1145
        assert_eq!(inner.queued_count(), 0);
1✔
1146
        assert_eq!(inner.cancelled_task_count.load(Ordering::Acquire), 3);
1✔
1147
        assert_eq!(cancelled.load(Ordering::Acquire), 3);
1✔
1148
        assert_eq!(ran.load(Ordering::Acquire), 0);
1✔
1149
    }
1✔
1150

1151
    #[test]
1152
    fn test_steal_global_job_notifies_when_batch_leaves_local_jobs() {
1✔
1153
        let runtime = WorkerRuntime::new(0);
1✔
1154
        let inner = FixedThreadPoolInner::new(1, None, vec![Arc::clone(&runtime.queue)]);
1✔
1155
        let cancelled = Arc::new(AtomicUsize::new(0));
1✔
1156
        let ran = Arc::new(AtomicUsize::new(0));
1✔
1157
        runtime
1✔
1158
            .local
1✔
1159
            .push(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1✔
1160
        inner
1✔
1161
            .global_queue
1✔
1162
            .push(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1✔
1163
        inner.queued_task_count.store(2, Ordering::Release);
1✔
1164

1165
        let claimed = inner
1✔
1166
            .steal_global_job(&runtime)
1✔
1167
            .expect("global queue should provide one claimed job");
1✔
1168
        claimed.run();
1✔
1169
        inner.finish_running_job();
1✔
1170
        let remaining = runtime
1✔
1171
            .local
1✔
1172
            .pop()
1✔
1173
            .expect("preloaded local job should remain queued");
1✔
1174
        inner.cancel_claimed_job(remaining);
1✔
1175

1176
        assert_eq!(inner.queued_count(), 0);
1✔
1177
        assert_eq!(inner.running_count(), 0);
1✔
1178
        assert_eq!(inner.completed_task_count.load(Ordering::Acquire), 1);
1✔
1179
        assert_eq!(inner.cancelled_task_count.load(Ordering::Acquire), 1);
1✔
1180
        assert_eq!(ran.load(Ordering::Acquire), 1);
1✔
1181
        assert_eq!(cancelled.load(Ordering::Acquire), 1);
1✔
1182
    }
1✔
1183

1184
    #[test]
1185
    fn test_steal_worker_job_notifies_when_batch_leaves_local_jobs() {
1✔
1186
        let thief = WorkerRuntime::new(0);
1✔
1187
        let victim = WorkerRuntime::new(1);
1✔
1188
        thief.queue.activate();
1✔
1189
        victim.queue.activate();
1✔
1190
        let inner = FixedThreadPoolInner::new(
1✔
1191
            2,
1192
            None,
1✔
1193
            vec![Arc::clone(&thief.queue), Arc::clone(&victim.queue)],
1✔
1194
        );
1195
        let cancelled = Arc::new(AtomicUsize::new(0));
1✔
1196
        let ran = Arc::new(AtomicUsize::new(0));
1✔
1197
        thief
1✔
1198
            .local
1✔
1199
            .push(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1✔
1200
        victim
1✔
1201
            .queue
1✔
1202
            .push_back(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1✔
1203
        inner.queued_task_count.store(2, Ordering::Release);
1✔
1204

1205
        let claimed = inner
1✔
1206
            .steal_worker_job(&thief)
1✔
1207
            .expect("victim queue should provide one claimed job");
1✔
1208
        claimed.run();
1✔
1209
        inner.finish_running_job();
1✔
1210
        let remaining = thief
1✔
1211
            .local
1✔
1212
            .pop()
1✔
1213
            .expect("batch steal should leave one local job");
1✔
1214
        inner.cancel_claimed_job(remaining);
1✔
1215

1216
        assert_eq!(inner.queued_count(), 0);
1✔
1217
        assert_eq!(inner.running_count(), 0);
1✔
1218
        assert_eq!(inner.completed_task_count.load(Ordering::Acquire), 1);
1✔
1219
        assert_eq!(inner.cancelled_task_count.load(Ordering::Acquire), 1);
1✔
1220
        assert_eq!(ran.load(Ordering::Acquire), 1);
1✔
1221
        assert_eq!(cancelled.load(Ordering::Acquire), 1);
1✔
1222
    }
1✔
1223

1224
    #[test]
1225
    fn test_fixed_submit_guard_drop_notifies_when_shutdown_closes_admission() {
1✔
1226
        let runtime = WorkerRuntime::new(0);
1✔
1227
        let inner = FixedThreadPoolInner::new(1, None, vec![Arc::clone(&runtime.queue)]);
1✔
1228
        inner.inflight_submissions.store(1, Ordering::Release);
1✔
1229
        inner.accepting.store(false, Ordering::Release);
1✔
1230

1231
        {
1✔
1232
            let guard = FixedSubmitGuard { inner: &inner };
1✔
1233
            drop(guard);
1✔
1234
        }
1✔
1235

1236
        assert_eq!(inner.inflight_count(), 0);
1✔
1237
    }
1✔
1238

1239
    #[test]
1240
    fn test_wait_for_fixed_pool_work_shutdown_waits_for_inflight_submissions() {
1✔
1241
        let runtime = WorkerRuntime::new(0);
1✔
1242
        let inner = Arc::new(FixedThreadPoolInner::new(
1✔
1243
            1,
1244
            None,
1✔
1245
            vec![Arc::clone(&runtime.queue)],
1✔
1246
        ));
1247
        inner.state.write(|state| {
1✔
1248
            state.lifecycle = FixedThreadPoolLifecycle::Shutdown;
1✔
1249
        });
1✔
1250
        inner.inflight_submissions.store(1, Ordering::Release);
1✔
1251
        inner.pending_worker_wakes.store(1, Ordering::Release);
1✔
1252

1253
        let inner_for_release = Arc::clone(&inner);
1✔
1254
        let releaser = thread::spawn(move || {
1✔
1255
            thread::sleep(Duration::from_millis(10));
1✔
1256
            inner_for_release
1✔
1257
                .inflight_submissions
1✔
1258
                .store(0, Ordering::Release);
1✔
1259
            inner_for_release.state.notify_all();
1✔
1260
        });
1✔
1261

1262
        assert!(!wait_for_fixed_pool_work(&inner));
1✔
1263
        releaser.join().expect("releaser thread should finish");
1✔
1264
    }
1✔
1265

1266
    #[test]
1267
    fn test_shutdown_now_waits_for_inflight_submissions() {
1✔
1268
        let runtime = WorkerRuntime::new(0);
1✔
1269
        let inner = Arc::new(FixedThreadPoolInner::new(
1✔
1270
            1,
1271
            None,
1✔
1272
            vec![Arc::clone(&runtime.queue)],
1✔
1273
        ));
1274
        inner.inflight_submissions.store(1, Ordering::Release);
1✔
1275

1276
        let inner_for_release = Arc::clone(&inner);
1✔
1277
        let releaser = thread::spawn(move || {
1✔
1278
            thread::sleep(Duration::from_millis(10));
1✔
1279
            inner_for_release
1✔
1280
                .inflight_submissions
1✔
1281
                .store(0, Ordering::Release);
1✔
1282
            inner_for_release.state.notify_all();
1✔
1283
        });
1✔
1284

1285
        let report = inner.shutdown_now();
1✔
1286
        releaser.join().expect("releaser thread should finish");
1✔
1287
        assert_eq!(report.running, 0);
1✔
1288
        assert_eq!(report.queued, 0);
1✔
1289
        assert_eq!(report.cancelled, 0);
1✔
1290
    }
1✔
1291
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc