• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agronholm / anyio / 4946493218

pending completion
4946493218

Pull #567

github

GitHub
Merge 6b66f35af into e1ba31f1c
Pull Request #567: Fix broken support for `Callable[..., Awaitable]`

48 of 48 new or added lines in 3 files covered. (100.0%)

4033 of 4459 relevant lines covered (90.45%)

8.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.35
/src/anyio/to_process.py
1
from __future__ import annotations
10✔
2

3
import os
10✔
4
import pickle
10✔
5
import subprocess
10✔
6
import sys
10✔
7
from collections import deque
10✔
8
from collections.abc import Callable
10✔
9
from importlib.util import module_from_spec, spec_from_file_location
10✔
10
from typing import TypeVar, cast
10✔
11

12
from ._core._eventloop import current_time, get_async_backend, get_cancelled_exc_class
10✔
13
from ._core._exceptions import BrokenWorkerProcess
10✔
14
from ._core._subprocesses import open_process
10✔
15
from ._core._synchronization import CapacityLimiter
10✔
16
from ._core._tasks import CancelScope, fail_after
10✔
17
from .abc import ByteReceiveStream, ByteSendStream, Process
10✔
18
from .lowlevel import RunVar, checkpoint_if_cancelled
10✔
19
from .streams.buffered import BufferedByteReceiveStream
10✔
20

21
WORKER_MAX_IDLE_TIME = 300  # 5 minutes
10✔
22

23
T_Retval = TypeVar("T_Retval")
10✔
24
_process_pool_workers: RunVar[set[Process]] = RunVar("_process_pool_workers")
10✔
25
_process_pool_idle_workers: RunVar[deque[tuple[Process, float]]] = RunVar(
10✔
26
    "_process_pool_idle_workers"
27
)
28
_default_process_limiter: RunVar[CapacityLimiter] = RunVar("_default_process_limiter")
10✔
29

30

31
async def run_sync(
10✔
32
    func: Callable[..., T_Retval],
33
    *args: object,
34
    cancellable: bool = False,
35
    limiter: CapacityLimiter | None = None,
36
) -> T_Retval:
37
    """
38
    Call the given function with the given arguments in a worker process.
39

40
    If the ``cancellable`` option is enabled and the task waiting for its completion is
41
    cancelled, the worker process running it will be abruptly terminated using SIGKILL
42
    (or ``terminateProcess()`` on Windows).
43

44
    :param func: a callable
45
    :param args: positional arguments for the callable
46
    :param cancellable: ``True`` to allow cancellation of the operation while it's
47
        running
48
    :param limiter: capacity limiter to use to limit the total amount of processes
49
        running (if omitted, the default limiter is used)
50
    :return: an awaitable that yields the return value of the function.
51

52
    """
53

54
    async def send_raw_command(pickled_cmd: bytes) -> object:
9✔
55
        try:
9✔
56
            await stdin.send(pickled_cmd)
9✔
57
            response = await buffered.receive_until(b"\n", 50)
9✔
58
            status, length = response.split(b" ")
9✔
59
            if status not in (b"RETURN", b"EXCEPTION"):
9✔
60
                raise RuntimeError(
×
61
                    f"Worker process returned unexpected response: {response!r}"
62
                )
63

64
            pickled_response = await buffered.receive_exactly(int(length))
9✔
65
        except BaseException as exc:
9✔
66
            workers.discard(process)
9✔
67
            try:
9✔
68
                process.kill()
9✔
69
                with CancelScope(shield=True):
9✔
70
                    await process.aclose()
9✔
71
            except ProcessLookupError:
×
72
                pass
×
73

74
            if isinstance(exc, get_cancelled_exc_class()):
9✔
75
                raise
9✔
76
            else:
77
                raise BrokenWorkerProcess from exc
×
78

79
        retval = pickle.loads(pickled_response)
9✔
80
        if status == b"EXCEPTION":
9✔
81
            assert isinstance(retval, BaseException)
9✔
82
            raise retval
9✔
83
        else:
84
            return retval
9✔
85

86
    # First pickle the request before trying to reserve a worker process
87
    await checkpoint_if_cancelled()
9✔
88
    request = pickle.dumps(("run", func, args), protocol=pickle.HIGHEST_PROTOCOL)
9✔
89

90
    # If this is the first run in this event loop thread, set up the necessary variables
91
    try:
9✔
92
        workers = _process_pool_workers.get()
9✔
93
        idle_workers = _process_pool_idle_workers.get()
9✔
94
    except LookupError:
9✔
95
        workers = set()
9✔
96
        idle_workers = deque()
9✔
97
        _process_pool_workers.set(workers)
9✔
98
        _process_pool_idle_workers.set(idle_workers)
9✔
99
        get_async_backend().setup_process_pool_exit_at_shutdown(workers)
9✔
100

101
    async with (limiter or current_default_process_limiter()):
9✔
102
        # Pop processes from the pool (starting from the most recently used) until we
103
        # find one that hasn't exited yet
104
        process: Process
105
        while idle_workers:
9✔
106
            process, idle_since = idle_workers.pop()
9✔
107
            if process.returncode is None:
9✔
108
                stdin = cast(ByteSendStream, process.stdin)
9✔
109
                buffered = BufferedByteReceiveStream(
9✔
110
                    cast(ByteReceiveStream, process.stdout)
111
                )
112

113
                # Prune any other workers that have been idle for WORKER_MAX_IDLE_TIME
114
                # seconds or longer
115
                now = current_time()
9✔
116
                killed_processes: list[Process] = []
9✔
117
                while idle_workers:
9✔
118
                    if now - idle_workers[0][1] < WORKER_MAX_IDLE_TIME:
×
119
                        break
×
120

121
                    process, idle_since = idle_workers.popleft()
×
122
                    process.kill()
×
123
                    workers.remove(process)
×
124
                    killed_processes.append(process)
×
125

126
                with CancelScope(shield=True):
9✔
127
                    for process in killed_processes:
9✔
128
                        await process.aclose()
×
129

130
                break
9✔
131

132
            workers.remove(process)
×
133
        else:
134
            command = [sys.executable, "-u", "-m", __name__]
9✔
135
            process = await open_process(
9✔
136
                command, stdin=subprocess.PIPE, stdout=subprocess.PIPE
137
            )
138
            try:
9✔
139
                stdin = cast(ByteSendStream, process.stdin)
9✔
140
                buffered = BufferedByteReceiveStream(
9✔
141
                    cast(ByteReceiveStream, process.stdout)
142
                )
143
                with fail_after(20):
9✔
144
                    message = await buffered.receive(6)
9✔
145

146
                if message != b"READY\n":
9✔
147
                    raise BrokenWorkerProcess(
×
148
                        f"Worker process returned unexpected response: {message!r}"
149
                    )
150

151
                main_module_path = getattr(sys.modules["__main__"], "__file__", None)
9✔
152
                pickled = pickle.dumps(
9✔
153
                    ("init", sys.path, main_module_path),
154
                    protocol=pickle.HIGHEST_PROTOCOL,
155
                )
156
                await send_raw_command(pickled)
9✔
157
            except (BrokenWorkerProcess, get_cancelled_exc_class()):
×
158
                raise
×
159
            except BaseException as exc:
×
160
                process.kill()
×
161
                raise BrokenWorkerProcess(
×
162
                    "Error during worker process initialization"
163
                ) from exc
164

165
            workers.add(process)
9✔
166

167
        with CancelScope(shield=not cancellable):
9✔
168
            try:
9✔
169
                return cast(T_Retval, await send_raw_command(request))
9✔
170
            finally:
171
                if process in workers:
9✔
172
                    idle_workers.append((process, current_time()))
9✔
173

174

175
def current_default_process_limiter() -> CapacityLimiter:
10✔
176
    """
177
    Return the capacity limiter that is used by default to limit the number of worker
178
    processes.
179

180
    :return: a capacity limiter object
181

182
    """
183
    try:
9✔
184
        return _default_process_limiter.get()
9✔
185
    except LookupError:
9✔
186
        limiter = CapacityLimiter(os.cpu_count() or 2)
9✔
187
        _default_process_limiter.set(limiter)
9✔
188
        return limiter
9✔
189

190

191
def process_worker() -> None:
10✔
192
    # Redirect standard streams to os.devnull so that user code won't interfere with the
193
    # parent-worker communication
194
    stdin = sys.stdin
×
195
    stdout = sys.stdout
×
196
    sys.stdin = open(os.devnull)
×
197
    sys.stdout = open(os.devnull, "w")
×
198

199
    stdout.buffer.write(b"READY\n")
×
200
    while True:
201
        retval = exception = None
×
202
        try:
×
203
            command, *args = pickle.load(stdin.buffer)
×
204
        except EOFError:
×
205
            return
×
206
        except BaseException as exc:
×
207
            exception = exc
×
208
        else:
209
            if command == "run":
×
210
                func, args = args
×
211
                try:
×
212
                    retval = func(*args)
×
213
                except BaseException as exc:
×
214
                    exception = exc
×
215
            elif command == "init":
×
216
                main_module_path: str | None
217
                sys.path, main_module_path = args
×
218
                del sys.modules["__main__"]
×
219
                if main_module_path:
×
220
                    # Load the parent's main module but as __mp_main__ instead of
221
                    # __main__ (like multiprocessing does) to avoid infinite recursion
222
                    try:
×
223
                        spec = spec_from_file_location("__mp_main__", main_module_path)
×
224
                        if spec and spec.loader:
×
225
                            main = module_from_spec(spec)
×
226
                            spec.loader.exec_module(main)
×
227
                            sys.modules["__main__"] = main
×
228
                    except BaseException as exc:
×
229
                        exception = exc
×
230

231
        try:
×
232
            if exception is not None:
×
233
                status = b"EXCEPTION"
×
234
                pickled = pickle.dumps(exception, pickle.HIGHEST_PROTOCOL)
×
235
            else:
236
                status = b"RETURN"
×
237
                pickled = pickle.dumps(retval, pickle.HIGHEST_PROTOCOL)
×
238
        except BaseException as exc:
×
239
            exception = exc
×
240
            status = b"EXCEPTION"
×
241
            pickled = pickle.dumps(exc, pickle.HIGHEST_PROTOCOL)
×
242

243
        stdout.buffer.write(b"%s %d\n" % (status, len(pickled)))
×
244
        stdout.buffer.write(pickled)
×
245

246
        # Respect SIGTERM
247
        if isinstance(exception, SystemExit):
×
248
            raise exception
×
249

250

251
if __name__ == "__main__":
10✔
252
    process_worker()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc