• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agronholm / anyio / 10716642356

05 Sep 2024 08:22AM UTC coverage: 91.7% (+0.005%) from 91.695%
10716642356

Pull #782

github

web-flow
Merge 8854bb22b into 0c8ad519e
Pull Request #782: Accept abstract namespace paths for unix domain sockets

9 of 10 new or added lines in 1 file covered. (90.0%)

5 existing lines in 1 file now uncovered.

4795 of 5229 relevant lines covered (91.7%)

9.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.15
/src/anyio/to_process.py
1
from __future__ import annotations
11✔
2

3
import os
11✔
4
import pickle
11✔
5
import subprocess
11✔
6
import sys
11✔
7
from collections import deque
11✔
8
from collections.abc import Callable
11✔
9
from importlib.util import module_from_spec, spec_from_file_location
11✔
10
from typing import TypeVar, cast
11✔
11

12
from ._core._eventloop import current_time, get_async_backend, get_cancelled_exc_class
11✔
13
from ._core._exceptions import BrokenWorkerProcess
11✔
14
from ._core._subprocesses import open_process
11✔
15
from ._core._synchronization import CapacityLimiter
11✔
16
from ._core._tasks import CancelScope, fail_after
11✔
17
from .abc import ByteReceiveStream, ByteSendStream, Process
11✔
18
from .lowlevel import RunVar, checkpoint_if_cancelled
11✔
19
from .streams.buffered import BufferedByteReceiveStream
11✔
20

21
if sys.version_info >= (3, 11):
11✔
22
    from typing import TypeVarTuple, Unpack
6✔
23
else:
24
    from typing_extensions import TypeVarTuple, Unpack
6✔
25

26
WORKER_MAX_IDLE_TIME = 300  # 5 minutes
11✔
27

28
T_Retval = TypeVar("T_Retval")
11✔
29
PosArgsT = TypeVarTuple("PosArgsT")
11✔
30

31
_process_pool_workers: RunVar[set[Process]] = RunVar("_process_pool_workers")
11✔
32
_process_pool_idle_workers: RunVar[deque[tuple[Process, float]]] = RunVar(
11✔
33
    "_process_pool_idle_workers"
34
)
35
_default_process_limiter: RunVar[CapacityLimiter] = RunVar("_default_process_limiter")
11✔
36

37

38
async def run_sync(
11✔
39
    func: Callable[[Unpack[PosArgsT]], T_Retval],
40
    *args: Unpack[PosArgsT],
41
    cancellable: bool = False,
42
    limiter: CapacityLimiter | None = None,
43
) -> T_Retval:
44
    """
45
    Call the given function with the given arguments in a worker process.
46

47
    If the ``cancellable`` option is enabled and the task waiting for its completion is
48
    cancelled, the worker process running it will be abruptly terminated using SIGKILL
49
    (or ``terminateProcess()`` on Windows).
50

51
    :param func: a callable
52
    :param args: positional arguments for the callable
53
    :param cancellable: ``True`` to allow cancellation of the operation while it's
54
        running
55
    :param limiter: capacity limiter to use to limit the total amount of processes
56
        running (if omitted, the default limiter is used)
57
    :return: an awaitable that yields the return value of the function.
58

59
    """
60

61
    async def send_raw_command(pickled_cmd: bytes) -> object:
10✔
62
        try:
10✔
63
            await stdin.send(pickled_cmd)
10✔
64
            response = await buffered.receive_until(b"\n", 50)
10✔
65
            status, length = response.split(b" ")
10✔
66
            if status not in (b"RETURN", b"EXCEPTION"):
10✔
67
                raise RuntimeError(
×
68
                    f"Worker process returned unexpected response: {response!r}"
69
                )
70

71
            pickled_response = await buffered.receive_exactly(int(length))
10✔
72
        except BaseException as exc:
10✔
73
            workers.discard(process)
10✔
74
            try:
10✔
75
                process.kill()
10✔
76
                with CancelScope(shield=True):
10✔
77
                    await process.aclose()
10✔
78
            except ProcessLookupError:
×
79
                pass
×
80

81
            if isinstance(exc, get_cancelled_exc_class()):
10✔
82
                raise
10✔
83
            else:
84
                raise BrokenWorkerProcess from exc
×
85

86
        retval = pickle.loads(pickled_response)
10✔
87
        if status == b"EXCEPTION":
10✔
88
            assert isinstance(retval, BaseException)
10✔
89
            raise retval
10✔
90
        else:
91
            return retval
10✔
92

93
    # First pickle the request before trying to reserve a worker process
94
    await checkpoint_if_cancelled()
10✔
95
    request = pickle.dumps(("run", func, args), protocol=pickle.HIGHEST_PROTOCOL)
10✔
96

97
    # If this is the first run in this event loop thread, set up the necessary variables
98
    try:
10✔
99
        workers = _process_pool_workers.get()
10✔
100
        idle_workers = _process_pool_idle_workers.get()
10✔
101
    except LookupError:
10✔
102
        workers = set()
10✔
103
        idle_workers = deque()
10✔
104
        _process_pool_workers.set(workers)
10✔
105
        _process_pool_idle_workers.set(idle_workers)
10✔
106
        get_async_backend().setup_process_pool_exit_at_shutdown(workers)
10✔
107

108
    async with limiter or current_default_process_limiter():
10✔
109
        # Pop processes from the pool (starting from the most recently used) until we
110
        # find one that hasn't exited yet
111
        process: Process
112
        while idle_workers:
10✔
113
            process, idle_since = idle_workers.pop()
10✔
114
            if process.returncode is None:
10✔
115
                stdin = cast(ByteSendStream, process.stdin)
10✔
116
                buffered = BufferedByteReceiveStream(
10✔
117
                    cast(ByteReceiveStream, process.stdout)
118
                )
119

120
                # Prune any other workers that have been idle for WORKER_MAX_IDLE_TIME
121
                # seconds or longer
122
                now = current_time()
10✔
123
                killed_processes: list[Process] = []
10✔
124
                while idle_workers:
10✔
125
                    if now - idle_workers[0][1] < WORKER_MAX_IDLE_TIME:
10✔
126
                        break
×
127

128
                    process_to_kill, idle_since = idle_workers.popleft()
10✔
129
                    process_to_kill.kill()
10✔
130
                    workers.remove(process_to_kill)
10✔
131
                    killed_processes.append(process_to_kill)
10✔
132

133
                with CancelScope(shield=True):
10✔
134
                    for killed_process in killed_processes:
10✔
135
                        await killed_process.aclose()
10✔
136

137
                break
10✔
138

139
            workers.remove(process)
×
140
        else:
141
            command = [sys.executable, "-u", "-m", __name__]
10✔
142
            process = await open_process(
10✔
143
                command, stdin=subprocess.PIPE, stdout=subprocess.PIPE
144
            )
145
            try:
10✔
146
                stdin = cast(ByteSendStream, process.stdin)
10✔
147
                buffered = BufferedByteReceiveStream(
10✔
148
                    cast(ByteReceiveStream, process.stdout)
149
                )
150
                with fail_after(20):
10✔
151
                    message = await buffered.receive(6)
10✔
152

153
                if message != b"READY\n":
10✔
154
                    raise BrokenWorkerProcess(
×
155
                        f"Worker process returned unexpected response: {message!r}"
156
                    )
157

158
                main_module_path = getattr(sys.modules["__main__"], "__file__", None)
10✔
159
                pickled = pickle.dumps(
10✔
160
                    ("init", sys.path, main_module_path),
161
                    protocol=pickle.HIGHEST_PROTOCOL,
162
                )
163
                await send_raw_command(pickled)
10✔
164
            except (BrokenWorkerProcess, get_cancelled_exc_class()):
×
165
                raise
×
166
            except BaseException as exc:
×
167
                process.kill()
×
168
                raise BrokenWorkerProcess(
×
169
                    "Error during worker process initialization"
170
                ) from exc
171

172
            workers.add(process)
10✔
173

174
        with CancelScope(shield=not cancellable):
10✔
175
            try:
10✔
176
                return cast(T_Retval, await send_raw_command(request))
10✔
177
            finally:
178
                if process in workers:
10✔
179
                    idle_workers.append((process, current_time()))
10✔
180

181

182
def current_default_process_limiter() -> CapacityLimiter:
11✔
183
    """
184
    Return the capacity limiter that is used by default to limit the number of worker
185
    processes.
186

187
    :return: a capacity limiter object
188

189
    """
190
    try:
10✔
191
        return _default_process_limiter.get()
10✔
192
    except LookupError:
10✔
193
        limiter = CapacityLimiter(os.cpu_count() or 2)
10✔
194
        _default_process_limiter.set(limiter)
10✔
195
        return limiter
10✔
196

197

198
def process_worker() -> None:
11✔
199
    # Redirect standard streams to os.devnull so that user code won't interfere with the
200
    # parent-worker communication
201
    stdin = sys.stdin
×
202
    stdout = sys.stdout
×
203
    sys.stdin = open(os.devnull)
×
204
    sys.stdout = open(os.devnull, "w")
×
205

206
    stdout.buffer.write(b"READY\n")
×
207
    while True:
208
        retval = exception = None
×
209
        try:
×
210
            command, *args = pickle.load(stdin.buffer)
×
211
        except EOFError:
×
212
            return
×
213
        except BaseException as exc:
×
214
            exception = exc
×
215
        else:
216
            if command == "run":
×
217
                func, args = args
×
218
                try:
×
219
                    retval = func(*args)
×
220
                except BaseException as exc:
×
221
                    exception = exc
×
222
            elif command == "init":
×
223
                main_module_path: str | None
224
                sys.path, main_module_path = args
×
225
                del sys.modules["__main__"]
×
226
                if main_module_path and os.path.isfile(main_module_path):
×
227
                    # Load the parent's main module but as __mp_main__ instead of
228
                    # __main__ (like multiprocessing does) to avoid infinite recursion
229
                    try:
×
230
                        spec = spec_from_file_location("__mp_main__", main_module_path)
×
231
                        if spec and spec.loader:
×
232
                            main = module_from_spec(spec)
×
233
                            spec.loader.exec_module(main)
×
234
                            sys.modules["__main__"] = main
×
235
                    except BaseException as exc:
×
236
                        exception = exc
×
237
        try:
×
238
            if exception is not None:
×
239
                status = b"EXCEPTION"
×
240
                pickled = pickle.dumps(exception, pickle.HIGHEST_PROTOCOL)
×
241
            else:
242
                status = b"RETURN"
×
243
                pickled = pickle.dumps(retval, pickle.HIGHEST_PROTOCOL)
×
244
        except BaseException as exc:
×
245
            exception = exc
×
246
            status = b"EXCEPTION"
×
247
            pickled = pickle.dumps(exc, pickle.HIGHEST_PROTOCOL)
×
248

249
        stdout.buffer.write(b"%s %d\n" % (status, len(pickled)))
×
250
        stdout.buffer.write(pickled)
×
251

252
        # Respect SIGTERM
253
        if isinstance(exception, SystemExit):
×
254
            raise exception
×
255

256

257
if __name__ == "__main__":
11✔
258
    process_worker()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc