• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cjrh / deadpool / 20208580546

14 Dec 2025 01:19PM UTC coverage: 94.698% (-1.3%) from 95.978%
20208580546

push

github

web-flow
Bump github/codeql-action from 4.31.7 to 4.31.8 (#344)

Bumps [github/codeql-action](https://github.com/github/codeql-action) from 4.31.7 to 4.31.8.
- [Release notes](https://github.com/github/codeql-action/releases)
- [Changelog](https://github.com/github/codeql-action/blob/main/CHANGELOG.md)
- [Commits](https://github.com/github/codeql-action/compare/cf1bb45a2...1b168cd39)

---
updated-dependencies:
- dependency-name: github/codeql-action
  dependency-version: 4.31.8
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

97 of 107 branches covered (90.65%)

Branch coverage included in aggregate %.

421 of 440 relevant lines covered (95.68%)

4.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.7
/deadpool.py
1
"""
2
Deadpool
3
========
4

5
Important design considerations:
6

7
Backpressure
8
------------
9

10
To allow backpressure when submitting work to the pool, we make
11
the ``submit`` method block when the number of pending tasks is
12
greater than the ``max_workers`` parameter. This has consequences,
13
basically it means the main thread is blocked and nothing else
14
can happen until it unblocks by getting space in the queue.
15

16
Deadpool itself needs to do actions around job management, so
17
this is why we have a separate "supervisor" thread for each
18
worker process.
19

20
"""
21

22
import concurrent.futures
5✔
23
import ctypes
5✔
24
import logging
5✔
25
import multiprocessing as mp
5✔
26
import os
5✔
27
import pickle
5✔
28
import signal
5✔
29
import sys
5✔
30
import threading
5✔
31
import traceback
5✔
32
import typing
5✔
33
import weakref
5✔
34
import atexit
5✔
35
import json
5✔
36
from concurrent.futures import CancelledError, Executor, InvalidStateError, as_completed
5✔
37
from dataclasses import dataclass, field
5✔
38
from multiprocessing.connection import Connection
5✔
39
from queue import Empty, PriorityQueue, Queue, SimpleQueue
5✔
40
from typing import Callable, Optional, Tuple
5✔
41
from collections.abc import Mapping
5✔
42
from functools import partial
5✔
43

44
import psutil
5✔
45
from setproctitle import setproctitle
5✔
46

47
__version__ = "2025.2.2"
5✔
48
__all__ = [
5✔
49
    "Deadpool",
50
    "Future",
51
    "CancelledError",
52
    "TimeoutError",
53
    "ProcessError",
54
    "PoolClosed",
55
    "as_completed",
56
]
57
logger = logging.getLogger("deadpool")
5✔
58

59

60
# Does not work. Hangs the process on exit.
61
# There currently isn't an official way to clean up the
62
# resource tracker process. It is an open issue on the
63
# Python issue tracker.
64
# @atexit.register
65
# def stop_resource_tracker():
66
#     from multiprocessing import resource_tracker
67
#     tracker = resource_tracker._resource_tracker
68
#     try:
69
#         import time
70
#         time.sleep(5)
71
#         tracker._stop()
72
#     except Exception:
73
#         logger.info("Error stopping the multiprocessing resource tracker")
74

75

76
@dataclass
5✔
77
class Stat:
5✔
78
    lock: threading.Lock
5✔
79
    value: int = 0
5✔
80

81
    def increment(self, value: int = 1):
5✔
82
        with self.lock:
5✔
83
            self.value += value
5✔
84

85
    def set(self, value: int = 0):
5✔
86
        self.value = value
5✔
87

88

89
class Statistics:
5✔
90
    def __init__(self):
5✔
91
        self._lock = threading.Lock()
5✔
92

93
        self.tasks_received = Stat(self._lock, 0)
5✔
94
        self.tasks_launched = Stat(self._lock, 0)
5✔
95
        self.tasks_failed = Stat(self._lock, 0)
5✔
96
        self.worker_processes_created = Stat(self._lock, 0)
5✔
97
        self.max_workers_busy_concurrently = Stat(self._lock, 0)
5✔
98

99
    def reset_counters(self):
5✔
100
        self.tasks_received.set()
5✔
101
        self.tasks_launched.set()
5✔
102
        self.tasks_failed.set()
5✔
103
        self.worker_processes_created.set()
5✔
104
        self.max_workers_busy_concurrently.set()
5✔
105

106
    def to_dict(self) -> dict[str, typing.Any]:
5✔
107
        return {
5✔
108
            "tasks_received": self.tasks_received.value,
109
            "tasks_launched": self.tasks_launched.value,
110
            "tasks_failed": self.tasks_failed.value,
111
            "worker_processes_created": self.worker_processes_created.value,
112
            "max_workers_busy_concurrently": self.max_workers_busy_concurrently.value,
113
        }
114

115

116
@dataclass(order=True)
5✔
117
class PrioritizedItem:
5✔
118
    priority: int
5✔
119
    item: typing.Any = field(compare=False)
5✔
120

121

122
@dataclass(init=False)
5✔
123
class WorkerProcess:
5✔
124
    process: mp.Process
5✔
125
    connection_receive_msgs_from_process: Connection
5✔
126
    connection_send_msgs_to_process: Connection
5✔
127
    # Stats
128
    tasks_ran_counter: int
5✔
129
    # Controls
130
    # If the subprocess RSS memory is above this threshold,
131
    # ask the system allocator to release unused memory back
132
    # to the OS.
133
    malloc_trim_rss_memory_threshold_bytes: Optional[int] = None
5✔
134
    ok: bool = True
5✔
135

136
    def __init__(
5✔
137
        self,
138
        initializer=None,
139
        initargs=(),
140
        finalizer=None,
141
        finargs=(),
142
        daemon=True,
143
        mp_context="forkserver",
144
        malloc_trim_rss_memory_threshold_bytes=None,
145
    ):
146
        # For the process to send info OUT OF the process
147
        conn_receiver, conn_sender = mp.Pipe(duplex=False)
5✔
148
        # For sending work INTO the process
149
        conn_receiver2, conn_sender2 = mp.Pipe(duplex=False)
5✔
150
        p = mp_context.Process(
5✔
151
            daemon=daemon,
152
            target=raw_runner2,
153
            args=(
154
                conn_sender,
155
                conn_receiver2,
156
                os.getpid(),
157
                initializer,
158
                initargs,
159
                finalizer,
160
                finargs,
161
                malloc_trim_rss_memory_threshold_bytes,
162
            ),
163
        )
164

165
        p.start()
5✔
166
        self.process = p
5✔
167
        self.connection_receive_msgs_from_process = conn_receiver
5✔
168
        self.connection_send_msgs_to_process = conn_sender2
5✔
169
        self.tasks_ran_counter = 0
5✔
170
        self.ok = True
5✔
171

172
    def __hash__(self):
5✔
173
        return hash(self.process.pid)
5✔
174

175
    @property
5✔
176
    def pid(self):
5✔
177
        return self.process.pid
5✔
178

179
    def get_rss_bytes(self) -> int:
5✔
180
        return psutil.Process(pid=self.pid).memory_info().rss
5✔
181

182
    def submit_job(self, job):
5✔
183
        self.tasks_ran_counter += 1
5✔
184
        self.connection_send_msgs_to_process.send(job)
5✔
185

186
    def shutdown(self, wait=True):
5✔
187
        if not self.process.is_alive():
5✔
188
            return
5✔
189

190
        self.connection_receive_msgs_from_process.close()
5✔
191

192
        if self.connection_send_msgs_to_process.writable:  # pragma: no branch
5✔
193
            try:
5✔
194
                self.connection_send_msgs_to_process.send(None)
5✔
195
            except BrokenPipeError:  # pragma: no cover
196
                pass
197
            else:
198
                self.connection_send_msgs_to_process.close()
5✔
199

200
        if wait:
5✔
201
            self.process.join()
5✔
202

203
    def is_alive(self):
5✔
204
        return self.process.is_alive()
5✔
205

206
    def results_are_available(self, block_for: float = 0.2):
5✔
207
        return self.connection_receive_msgs_from_process.poll(timeout=block_for)
5✔
208

209
    def get_results(self):
5✔
210
        return self.connection_receive_msgs_from_process.recv()
5✔
211

212

213
class Future(concurrent.futures.Future):
5✔
214
    def __init__(self, *args, **kwargs) -> None:
5✔
215
        super().__init__(*args, **kwargs)
5✔
216
        self._pid: Optional[int] = None
5✔
217
        self.pid_callback = None
5✔
218

219
    @property
5✔
220
    def pid(self):
5✔
221
        return self._pid
5✔
222

223
    @pid.setter
5✔
224
    def pid(self, value):
5✔
225
        self._pid = value
5✔
226
        if self.pid_callback:
5✔
227
            try:
5✔
228
                self.pid_callback(self)
5✔
229
            except Exception:  # pragma: no cover
230
                logger.exception("Error calling pid_callback")
231

232
    def add_pid_callback(self, fn):
5✔
233
        self.pid_callback = fn
5✔
234

235
    def cancel_and_kill_if_running(self, sig=signal.SIGKILL):
5✔
236
        self.cancel()
5✔
237
        if self.pid:
5!
238
            try:
5✔
239
                kill_proc_tree(self.pid, sig=sig)
5✔
240
            except Exception as e:  # pragma: no cover
241
                logger.warning(f"Got error killing pid {self.pid}: {e}")
242

243

244
class TimeoutError(concurrent.futures.TimeoutError): ...
5✔
245

246

247
class ProcessError(mp.ProcessError): ...
5✔
248

249

250
class PoolClosed(Exception): ...
5✔
251

252

253
class Deadpool(Executor):
5✔
254
    def __init__(
5✔
255
        self,
256
        max_workers: Optional[int] = None,
257
        min_workers: Optional[int] = None,
258
        max_tasks_per_child: Optional[int] = None,
259
        max_worker_memory_bytes: Optional[int] = None,
260
        mp_context=None,
261
        initializer=None,
262
        initargs=(),
263
        finalizer=None,
264
        finalargs=(),
265
        max_backlog=5,
266
        shutdown_wait: Optional[bool] = None,
267
        shutdown_cancel_futures: Optional[bool] = None,
268
        daemon=True,
269
        malloc_trim_rss_memory_threshold_bytes: Optional[int] = None,
270
        propagate_environ: Optional[Mapping] = None,
271
    ) -> None:
272
        """The pool.
273

274
        :param propagate_environ: A mapping of environment variables to
275
            propagate to the worker processes. This is useful for
276
            setting up the environment in the worker processes. Subprocesses
277
            will inherit the environment of the parent process, but crucially,
278
            they will not inherit any changes made to the environment after
279
            the subprocess is created (via `os.environ`). This parameter
280
            allows you to specify a mapping of environment variables to
281
            propagate to the worker processes. The worker processes will
282
            receive these environment variables at the time they are created.
283
            There are two important points: firstly, these env vars will
284
            be set before the initializer is run, so the initializer can
285
            use them. Secondly, these are applied only when the worker
286
            process is created, which means that you can dynamically change the
287
            values of the dict supplied here, and they will be used in
288
            new worker processes as they are created. (The new parameters
289
            will not be seen by existing worker processes.)
290

291
        """
292
        super().__init__()
5✔
293

294
        if not mp_context:
5✔
295
            mp_context = "forkserver"
5✔
296

297
        if isinstance(mp_context, str):
5✔
298
            mp_context = mp.get_context(mp_context)
5✔
299

300
        # This is stored (instead of immediately currying the `initializer`)
301
        # for a very important reason, which you can read about in the
302
        # `add_worker_to_pool` method.
303
        self.propagate_environ = propagate_environ
5✔
304
        self.ctx = mp_context
5✔
305
        self.initializer = initializer
5✔
306
        self.initargs = initargs
5✔
307
        self.finitializer = finalizer
5✔
308
        self.finitargs = finalargs
5✔
309
        self.pool_size = max_workers or len(os.sched_getaffinity(0))
5✔
310
        if min_workers is None:
5✔
311
            self.min_workers = self.pool_size
5✔
312
        else:
313
            self.min_workers = min_workers
5✔
314

315
        self.max_tasks_per_child = max_tasks_per_child
5✔
316
        self.max_worker_memory_bytes = max_worker_memory_bytes
5✔
317
        self.submitted_jobs: PriorityQueue[PrioritizedItem] = PriorityQueue(
5✔
318
            maxsize=max_backlog
319
        )
320
        self.running_jobs = Queue(maxsize=self.pool_size)
5✔
321
        self.running_futs = weakref.WeakSet()
5✔
322
        self.existing_workers = weakref.WeakSet()
5✔
323
        self.closed = False
5✔
324
        self.shutdown_wait = shutdown_wait
5✔
325
        self.shutdown_cancel_futures = shutdown_cancel_futures
5✔
326
        self.daemon = daemon
5✔
327
        self.malloc_trim_rss_memory_threshold_bytes = (
5✔
328
            malloc_trim_rss_memory_threshold_bytes
329
        )
330
        self._statistics = Statistics()
5✔
331

332
        # TODO: overcommit
333
        self.workers: SimpleQueue[WorkerProcess] = SimpleQueue()
5✔
334
        for _ in range(self.pool_size):
5✔
335
            self.add_worker_to_pool()
5✔
336
        # When a worker is running a job, it will be removed from
337
        # the workers queue, and added to the busy_workers set.
338
        # When a worker successfully completes a job, it will be
339
        # added back to the workers queue, and removed from the
340
        # busy_workers set.
341
        self.busy_workers = set()  #  weakref.WeakSet()
5✔
342

343
        # THE ONLY ACTIVE, PERSISTENT STATE IN DEADPOOL IS THIS THREAD
344
        # BELOW. PROTECT IT AT ALL COSTS.
345
        self.runner_thread = threading.Thread(
5✔
346
            target=self.runner, name="deadpool.runner", daemon=True
347
        )
348
        self.runner_thread.start()
5✔
349

350
    def get_statistics(self) -> dict[str, typing.Any]:
5✔
351
        stats = self._statistics.to_dict()
5✔
352

353
        # These are not counters; they are determined at the time of the
354
        # call based on the state of the worker processes.
355
        stats["worker_processes_still_alive"] = len(self.existing_workers)
5✔
356
        stats["worker_processes_idle"] = self.workers.qsize()
5✔
357
        stats["worker_processes_busy"] = len(self.busy_workers)
5✔
358

359
        return stats
5✔
360

361
    def add_worker_to_pool(self):
5✔
362
        if self.propagate_environ:
5✔
363
            # By constructing here, late, we allow the user to make
364
            # changes dynamically to the configured env vars and these
365
            # will be reflected in the worker processes as they are
366
            # added to the pool. This has a large number of interesting
367
            # applications, such as dynamically changing the logging
368
            # level of the worker processes, or changing the location
369
            # of a file that the worker processes need to read, or
370
            # changing timeouts and so on. All the user needs to do
371
            # is update the value on the Deadpool instance itself.
372
            initializer = partial(
5✔
373
                initializer_environ_propagator,
374
                dict(self.propagate_environ),
375
                original_initializer=self.initializer,
376
            )
377
        else:
378
            initializer = self.initializer
5✔
379

380
        worker = WorkerProcess(
5✔
381
            initializer=initializer,
382
            initargs=self.initargs,
383
            finalizer=self.finitializer,
384
            finargs=self.finitargs,
385
            mp_context=self.ctx,
386
            daemon=self.daemon,
387
            malloc_trim_rss_memory_threshold_bytes=self.malloc_trim_rss_memory_threshold_bytes,
388
        )
389
        self.workers.put(worker)
5✔
390
        self._statistics.worker_processes_created.increment()
5✔
391
        self.existing_workers.add(worker)
5✔
392

393
    def clear_workers(self):
5✔
394
        """Clear all workers from the pool.
395

396
        Typically they will all get added back according to the
397
        rules for `max_workers` and so on. One neat reason to do
398
        this is to have new settings take effect, such as a new
399
        environment variable that needs to be set in the workers.
400
        """
401
        while not self.workers.empty():
5✔
402
            worker = self.workers.get()
5✔
403
            worker.shutdown(wait=False)
5✔
404

405
    def runner(self):
5✔
406
        while True:
4✔
407
            # This will block if the queue of running jobs is full.
408
            self.running_jobs.put(None)
5✔
409

410
            priority_job = self.submitted_jobs.get()
5✔
411
            job = priority_job.item
5✔
412
            if job is None:
5✔
413
                # This is for the `None` that terminates the while loop.
414
                self.submitted_jobs.task_done()
5✔
415
                self.running_jobs.get()
5✔
416
                # TODO: this probably isn't necessary, since cleanup is happening
417
                # in the shutdown method anyway.
418
                cancel_all_futures_on_queue(self.submitted_jobs)
5✔
419
                logger.debug("Got shutdown event, leaving runner.")
5✔
420
                return
5✔
421

422
            *_, fut = job
5✔
423
            if fut.done():
5✔
424
                # This shouldn't really be possible, but if the associated future
425
                # for this job has somehow already been marked as done (e.g. if
426
                # the caller decided to cancel it themselves) then just skip the
427
                # whole job.
428
                self.submitted_jobs.task_done()
5✔
429
                self.running_jobs.get()
5✔
430
                continue
5✔
431

432
            t = threading.Thread(target=self.run_task, args=job, daemon=True)
5✔
433
            self._statistics.tasks_launched.increment()
5✔
434
            t.start()
5✔
435

436
    def get_process(self) -> WorkerProcess:
5✔
437
        bw = len(self.busy_workers)
5✔
438
        mw = self.pool_size
5✔
439
        qs = self.workers.qsize()
5✔
440

441
        total_workers = bw + qs
5✔
442
        if total_workers < mw and qs == 0:
5✔
443
            self.add_worker_to_pool()
5✔
444

445
        wp = self.workers.get()
5✔
446
        self.busy_workers.add(wp)
5✔
447
        if (
5✔
448
            len(self.busy_workers)
449
            > self._statistics.max_workers_busy_concurrently.value
450
        ):
451
            self._statistics.max_workers_busy_concurrently.value = len(
5✔
452
                self.busy_workers
453
            )
454

455
        return wp
5✔
456

457
    def done_with_process(self, wp: WorkerProcess):
5✔
458
        # This worker is done with its job and is no longer busy.
459
        self.busy_workers.remove(wp)
5✔
460

461
        count_workers_busy = len(self.busy_workers)
5✔
462
        count_workers_idle = self.workers.qsize()
5✔
463
        backlog_size = self.submitted_jobs.qsize()
5✔
464

465
        # The `1` is for `wp` itself.
466
        total_workers = count_workers_busy + count_workers_idle + 1
5✔
467
        there_are_more_workers_than_min = total_workers > self.min_workers
5✔
468
        task_backlog_is_empty = backlog_size == 0
5✔
469

470
        # if there_are_more_workers_than_min and (there_are_idle_workers or task_backlog_is_empty):
471
        if there_are_more_workers_than_min and task_backlog_is_empty:
5✔
472
            # We have more workers than the minimum, and there is no backlog of
473
            # tasks. This implies any tasks currently in play have already been picked
474
            # up by workers in the pool, or the pool is idle. We can safely remove
475
            # this worker from the pool.
476
            wp.shutdown(wait=False)
5✔
477
            return
5✔
478

479
        if not wp.is_alive():
5✔
480
            self.add_worker_to_pool()
5✔
481
            return
5✔
482

483
        if not wp.ok:
5✔
484
            self.add_worker_to_pool()
5✔
485
            return
5✔
486

487
        if self.max_tasks_per_child is not None:
5✔
488
            if wp.tasks_ran_counter >= self.max_tasks_per_child:
5✔
489
                logger.debug(f"Worker {wp.pid} hit max tasks per child.")
5✔
490
                wp.shutdown(wait=False)
5✔
491
                self.add_worker_to_pool()
5✔
492
                return
5✔
493

494
        if self.max_worker_memory_bytes is not None:
5✔
495
            mem = wp.get_rss_bytes()
5✔
496
            logger.debug(f"Worker {wp.pid} has {mem} bytes of RSS memory.")
5✔
497
            if mem >= self.max_worker_memory_bytes:
5✔
498
                logger.debug(f"Worker {wp.pid} hit max memory threshold.")
5✔
499
                wp.shutdown(wait=False)
5✔
500
                self.add_worker_to_pool()
5✔
501
                return
5✔
502

503
        self.workers.put(wp)
5✔
504

505
    def run_task(self, fn, args, kwargs, timeout, fut: Future):
5✔
506
        try:
5✔
507
            retry_count = 10
5✔
508
            while retry_count > 0:
5✔
509
                retry_count -= 1
5✔
510
                worker: WorkerProcess = self.get_process()
5✔
511
                try:
5✔
512
                    worker.submit_job((fn, args, kwargs, timeout))
5✔
513
                    break
5✔
514
                except (pickle.PicklingError, AttributeError) as e:
5✔
515
                    # If the user passed in a function or params that can't
516
                    # be pickled, use the future to communicate the error.
517
                    # Note that in this scenario, there is nothing wrong
518
                    # with the worker process itself, so we don't need to
519
                    # shut it down.
520
                    fut.set_exception(e)
5✔
521
                    self.done_with_process(worker)
5✔
522
                    return
5✔
523
                except BrokenPipeError:
×
524
                    # This likely comes from trying to send a job over a pipe
525
                    # that has been closed. This is a serious problem, and
526
                    # we should shut down the worker process and get rid of
527
                    # it. We're going to loop back around and try again with
528
                    # a new worker.
529
                    # TODO: it seems that this might be expected in situations
530
                    #  where the worker process often OOMs. As such, not sure
531
                    #  whether logging at warning level is appropriate.
532
                    logger.warning(f"BrokenPipeError on {worker.pid}, retrying.")
×
533
                    worker.ok = False
×
534
                    self.done_with_process(worker)
×
535
                    # TODO: probably this should be moved into the `done_with_process`
536
                    #  and can act on the `worker.ok` flag.
537
                    kill_proc_tree(worker.pid, sig=signal.SIGKILL)
×
538
            else:  # pragma: no cover
539
                # If we get here, we've tried to submit the job to a worker
540
                # process multiple times and failed each time. We're giving
541
                # up.
542
                logger.error("Failed to submit job to worker")
543
                fut.set_exception(ProcessError("Failed to submit job to worker"))
544
                return
545

546
            fut.pid = worker.pid
5✔
547
            self.running_futs.add(fut)
5✔
548

549
            while True:
4✔
550
                if worker.results_are_available():
5✔
551
                    try:
5✔
552
                        results = worker.get_results()
5✔
553
                    except EOFError:
5✔
554
                        self._statistics.tasks_failed.increment()
5✔
555
                        fut.set_exception(
5✔
556
                            ProcessError("Worker process died unexpectedly")
557
                        )
558
                    except BaseException as e:
5✔
559
                        self._statistics.tasks_failed.increment()
5✔
560
                        logger.debug(f"Unexpected exception from worker: {e}")
5✔
561
                        fut.set_exception(e)
5✔
562
                    else:
563
                        if isinstance(results, BaseException):
5✔
564
                            self._statistics.tasks_failed.increment()
5✔
565
                            fut.set_exception(results)
5✔
566
                        else:
567
                            fut.set_result(results)
5✔
568

569
                        if isinstance(results, TimeoutError):
5✔
570
                            self._statistics.tasks_failed.increment()
5✔
571
                            logger.debug(
5✔
572
                                f"TimeoutError on {worker.pid}, setting ok=False"
573
                            )
574
                            worker.ok = False
5✔
575
                    finally:
576
                        break
5✔
577
                elif not worker.is_alive():
5✔
578
                    self._statistics.tasks_failed.increment()
×
579
                    logger.debug(f"p is no longer alive: {worker.process}")
×
580
                    try:
×
581
                        signame = signal.strsignal(-worker.process.exitcode)
×
582
                    except (ValueError, TypeError):  # pragma: no cover
583
                        signame = "Unknown"
584

585
                    if not fut.done():
×
586
                        # It is possible that fut has already had a result set on
587
                        # it. If that's the case we'll do nothing. Otherwise, put
588
                        # an exception reporting the unexpected situation.
589
                        msg = (
×
590
                            f"Subprocess {worker.pid} completed unexpectedly with "
591
                            f"exitcode {worker.process.exitcode} ({signame})"
592
                        )
593
                        try:
×
594
                            fut.set_exception(ProcessError(msg))
×
595
                        except InvalidStateError:  # pragma: no cover
596
                            # We still have to catch this even though there is a
597
                            # check for `fut.done()`, simply due to an possible
598
                            # race between the done check and the set_exception call.
599
                            pass
600

601
                    break
×
602
                else:
603
                    pass  # pragma: no cover
604

605
            self.done_with_process(worker)
5✔
606
        finally:
607
            self.submitted_jobs.task_done()
5✔
608

609
            if not fut.done():  # pragma: no cover
610
                fut.set_exception(ProcessError("Somehow no result got set on fut."))
611

612
            try:
5✔
613
                self.running_jobs.get_nowait()
5✔
614
            except Empty:  # pragma: no cover
615
                logger.warning("Weird error, did not expect running jobs to be empty")
616

617
    def submit(
5✔
618
        self,
619
        fn: Callable,
620
        /,
621
        *args,
622
        deadpool_timeout=None,
623
        deadpool_priority=0,
624
        **kwargs,
625
    ) -> Future:
626
        if deadpool_priority < 0:  # pragma: no cover
627
            raise ValueError(
628
                f"Parameter deadpool_priority must be >= 0, but was {deadpool_priority}"
629
            )
630

631
        if self.closed:
5✔
632
            raise PoolClosed("The pool is closed. No more tasks can be submitted.")
5✔
633

634
        fut = Future()
5✔
635
        self.submitted_jobs.put(
5✔
636
            PrioritizedItem(
637
                priority=deadpool_priority,
638
                item=(fn, args, kwargs, deadpool_timeout, fut),
639
            )
640
        )
641
        self._statistics.tasks_received.increment()
5✔
642
        return fut
5✔
643

644
    def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None:
5✔
645
        logger.debug(f"shutdown: {wait=} {cancel_futures=}")
5✔
646

647
        # No more new tasks can be submitted
648
        self.closed = True
5✔
649

650
        if cancel_futures:
5✔
651
            cancel_all_futures_on_queue(self.submitted_jobs)
5✔
652

653
        if wait:
5✔
654
            # The None sentinel will pop last
655
            shutdown_priority = sys.maxsize
5✔
656
        else:
657
            # The None sentinel will pop first
658
            shutdown_priority = -1
5✔
659

660
        try:
5✔
661
            self.submitted_jobs.put(
5✔
662
                PrioritizedItem(priority=shutdown_priority, item=None),
663
                timeout=2.0,
664
            )
665
        except TimeoutError:  # pragma: no cover
666
            logger.warning(
667
                "Timed out putting None on the submit queue. This "
668
                "should not be possible "
669
                "and might be a bug in deadpool."
670
            )
671

672
        # Up till this point, all the pending work that has been
673
        # submitted, but not yet started, has been cancelled. The
674
        # runner loop has also been stopped (with the None sentinel).
675
        # The only thing left to do is decide whether or not to
676
        # actively kill processes that are still running. We presume
677
        # that if the user is asking for cancellation and doesn't
678
        # want to wait, that she probably wants us to also stop
679
        # running processes.
680
        if (not wait) and cancel_futures:
5✔
681
            running_futs = list(self.running_futs)
5✔
682
            for fut in running_futs:
5✔
683
                fut.cancel_and_kill_if_running()
5✔
684

685
        logger.debug("waiting for submitted_jobs to join...")
5✔
686
        self.submitted_jobs.join()
5✔
687

688
        super().shutdown(wait, cancel_futures=cancel_futures)
5✔
689

690
        # We can now remove all other processes hanging around
691
        # in the background.
692
        while not self.workers.empty():
5✔
693
            try:
5✔
694
                worker = self.workers.get_nowait()
5✔
695
                worker.shutdown()
5✔
696
            except Empty:  # pragma: no cover
697
                break
698

699
        # There may be a few processes left in the
700
        # `busy_workers` queue. Shut them down too.
701
        while self.busy_workers:
5!
702
            worker = self.busy_workers.pop()
×
703
            worker.shutdown()
×
704

705
    def __enter__(self):
5✔
706
        return self
5✔
707

708
    def __exit__(self, exc_type, exc_val, exc_tb):
5✔
709
        if not self.closed:
5!
710
            kwargs = {}
5✔
711
            if self.shutdown_wait is not None:
5✔
712
                kwargs["wait"] = self.shutdown_wait
5✔
713

714
            if self.shutdown_cancel_futures is not None:
5✔
715
                kwargs["cancel_futures"] = self.shutdown_cancel_futures
5✔
716

717
            self.shutdown(**kwargs)
5✔
718

719
        self.runner_thread.join()
5✔
720
        return False
5✔
721

722

723
def cancel_all_futures_on_queue(q: Queue):
5✔
724
    while True:
4✔
725
        try:
5✔
726
            priority_item = q.get_nowait()
5✔
727
            q.task_done()
5✔
728
            job = priority_item.item
5✔
729
            *_, fut = job
5✔
730
            fut.cancel()
5✔
731
        except Empty:
5✔
732
            break
5✔
733

734

735
# Taken from
736
# https://psutil.readthedocs.io/en/latest/index.html?highlight=children#kill-process-tree
737
def kill_proc_tree(
5✔
738
    pid,
739
    sig=signal.SIGTERM,
740
    include_parent=True,
741
    timeout=None,
742
    on_terminate=None,
743
    allow_kill_self=False,
744
):
745
    """Kill a process tree (including grandchildren) with signal
746
    "sig" and return a (gone, still_alive) tuple.
747
    "on_terminate", if specified, is a callback function which is
748
    called as soon as a child terminates.
749
    """
750
    if not allow_kill_self and pid == os.getpid():
5!
751
        raise ValueError("Won't kill myself")
×
752

753
    try:
5✔
754
        parent = psutil.Process(pid)
5✔
755
    except psutil.NoSuchProcess:
×
756
        return
×
757

758
    children = parent.children(recursive=True)
5✔
759
    if include_parent:
5!
760
        children.append(parent)
5✔
761

762
    for p in children:
5✔
763
        try:
5✔
764
            p.send_signal(sig)
5✔
765
        except psutil.NoSuchProcess:  # pragma: no cover
766
            pass
767

768
    gone, alive = psutil.wait_procs(children, timeout=timeout, callback=on_terminate)
5✔
769
    return (gone, alive)
5✔
770

771

772
def raw_runner2(
5✔
773
    conn: Connection,
774
    conn_receiver: Connection,
775
    parent_pid,
776
    initializer,
777
    initargs,
778
    finitializer: Optional[Callable] = None,
779
    finitargs: Optional[Tuple] = None,
780
    mem_clear_threshold_bytes: Optional[int] = None,
781
    kill_proc_tree=kill_proc_tree,
782
):
783
    setproctitle("deadpool.worker")
5✔
784
    # This event is used to signal that the "parent"
785
    # monitor thread should be deactivated.
786
    evt = threading.Event()
5✔
787

788
    def self_destruct_if_parent_disappers():
5✔
789
        """Poll every 5 seconds to see whether the parent is still
790
        alive.
791
        """
792
        while True:
4✔
793
            if evt.wait(2.0):
5✔
794
                return
5✔
795

796
            if not psutil.pid_exists(parent_pid):
5✔
797
                logger.warning(f"Parent {parent_pid} is gone, self-destructing.")
5✔
798
                evt.set()
5✔
799
                atexit._run_exitfuncs()
5✔
800
                kill_proc_tree(
801
                    pid, sig=signal.SIGKILL, allow_kill_self=True
802
                )  # pragma: no cover
803
                return  # pragma: no cover
804

805
    tparent = threading.Thread(target=self_destruct_if_parent_disappers, daemon=True)
5✔
806
    tparent.start()
5✔
807

808
    def deactivate_parentless_self_destruct():
5✔
809
        evt.set()
5✔
810

811
    proc = psutil.Process()
5✔
812
    pid = proc.pid
5✔
813

814
    def conn_send_safe(obj):
5✔
815
        try:
5✔
816
            conn.send(obj)
5✔
817
        except BrokenPipeError:  # pragma: no cover
818
            logger.debug("Pipe not usable")
819
        except BaseException:  # pragma: no cover
820
            logger.exception("Unexpected pipe error")
821

822
    def timed_out():
5✔
823
        """Action to fire when the timeout given to ``threading.Timer``
824
        is reached. It kills this process with SIGKILL."""
825
        # First things first. Set a self-destruct timer for ourselves.
826
        # If we don't finish up in time, boom.
827
        deactivate_parentless_self_destruct()
5✔
828
        conn_send_safe(TimeoutError(f"Process {pid} timed out, self-destructing."))
5✔
829
        # kill_proc_tree_in_process_daemon(pid, signal.SIGKILL)
830
        atexit._run_exitfuncs()
5✔
831
        kill_proc_tree(
832
            pid, sig=signal.SIGKILL, allow_kill_self=True
833
        )  # pragma: no cover
834

835
    if initializer:
5✔
836
        initargs = initargs or ()
5✔
837
        try:
5✔
838
            initializer(*initargs)
5✔
839
        except Exception:
5✔
840
            logger.exception("Initializer failed")
5✔
841

842
    while True:
4✔
843
        # Wait for some work.
844
        try:
5✔
845
            logger.debug("Waiting for work...")
5✔
846
            job = conn_receiver.recv()
5✔
847
            logger.debug("Got a job")
5✔
848
        except EOFError:
5✔
849
            logger.debug("Received EOF, exiting.")
5✔
850
            break
5✔
851
        except KeyboardInterrupt:  # pragma: no cover
852
            logger.debug("Received KeyboardInterrupt, exiting.")
853
            break
854
        except BaseException:  # pragma: no cover
855
            logger.exception("Received unexpected exception, exiting.")
856
            break
857

858
        if job is None:
5✔
859
            logger.debug("Received None, exiting.")
5✔
860
            break
5✔
861

862
        # Real work, unpack.
863
        fn, args, kwargs, timeout = job
5✔
864

865
        if timeout:
5✔
866
            t = threading.Timer(timeout, timed_out)
5✔
867
            t.start()
5✔
868
            deactivate_timer = lambda: t.cancel()  # noqa: E731
5✔
869
        else:
870
            deactivate_timer = lambda: None  # noqa: E731
5✔
871

872
        try:
5✔
873
            results = fn(*args, **kwargs)
5✔
874
        except BaseException as e:
5✔
875
            # Check whether the exception can be pickled. If not we're going
876
            # to wrap it. Why do this? It turns out that mp.Connection.send
877
            # will try to pickle the exception, and if it can't, it will
878
            # lose its mind. I've gotten segfaults in Python with this.
879
            try:  # pragma: no cover
880
                pickle.dumps(e)
881
            except Exception as pickle_error:
4✔
882
                msg = (
4✔
883
                    f"An exception occurred but pickling it failed. "
884
                    f"The original exception is presented here as a string with "
885
                    f"traceback.\n{e}\n{traceback.format_exception(e)}\n\n"
886
                    f"The reason for the pickling failure is the following:\n"
887
                    f"{traceback.format_exception(pickle_error)}"
888
                )
889
                e = ProcessError(msg)
4✔
890

891
            # Because we can't retain the traceback (can't be pickled by default,
892
            # an external library like "tblib" would be needed), we're going to
893
            # render the traceback to a string and add that to the exception
894
            # text. This approach also works for when deadpool can be distributed
895
            # across multiple machines, since the traceback is a string.
896
            traceback_str = "".join(
5✔
897
                traceback.format_exception(type(e), e, e.__traceback__)
898
            )
899
            # Modify the exception's args to include the traceback
900
            # This changes the string representation of the exception
901
            e.args = (f"{e}\n{traceback_str}",) + e.args[1:]
5✔
902
            conn_send_safe(e)
5✔
903
        else:
904
            conn_send_safe(results)
5✔
905
        finally:
906
            deactivate_timer()
5✔
907

908
            if mem_clear_threshold_bytes is not None:
5✔
909
                mem = proc.memory_info().rss
5✔
910
                if mem > mem_clear_threshold_bytes:
5!
911
                    trim_memory()
5✔
912

913
    if finitializer:
5✔
914
        finitargs = finitargs or ()
5✔
915
        try:
5✔
916
            finitializer(*finitargs)
5✔
917
        except BaseException:
5✔
918
            logger.exception("finitializer failed")
5✔
919

920
    # We've reached the end of this function which means this
921
    # process must exit. However, we started a couple threads
922
    # in here and they don't magically exit. Additional
923
    # synchronization controls are needed to tell the threads
924
    # to exit, which we don't have. However, we do have a kill
925
    # switch. Since this process worker will process no more
926
    # work, and since we've already fun the finalizer, we may
927
    # as well just nuke it. That will remove its memory space
928
    # and all its threads too.
929
    deactivate_parentless_self_destruct()
5✔
930
    logger.debug(f"Deleting worker {pid=}")
5✔
931
    atexit._run_exitfuncs()
5✔
932
    kill_proc_tree(pid, sig=signal.SIGKILL, allow_kill_self=True)  # pragma: no cover
933

934

935
def kill_proc_tree_in_process_daemon(pid, sig):  # pragma: no cover
936
    mp.Process(target=kill_proc_tree, args=(pid, sig), daemon=True).start()
937

938

939
def trim_memory() -> None:
5✔
940
    """Tell malloc to give all the unused memory back to the OS."""
941
    if sys.platform == "linux":
5!
942
        libc = ctypes.CDLL("libc.so.6")
5✔
943
        libc.malloc_trim(0)
5✔
944

945

946
def initializer_environ_propagator(
5✔
947
    environ: dict,
948
    original_initializer: Optional[Callable] = None,
949
    initargs=(),
950
):
951
    """Wrap the original initializer with one that sets the
952
    environment variables in the given dict."""
953

954
    # Quite important that we run this first, so that the
955
    # environment variables are set before the original
956
    # initializer runs. This allows the original initializer
957
    # to use the environment variables.
958
    os.environ.update(environ or {})
5✔
959
    if original_initializer:
5✔
960
        original_initializer(*(initargs or ()))
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc