• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agronholm / anyio / 17710582260

14 Sep 2025 11:32AM UTC coverage: 92.235% (-0.03%) from 92.261%
17710582260

push

github

web-flow
Added support for cancellation reasons (#975)

This forces a bump of the minimum Trio version as only v0.31.0 supports cancel reasons.

15 of 15 new or added lines in 4 files covered. (100.0%)

3 existing lines in 2 files now uncovered.

5500 of 5963 relevant lines covered (92.24%)

7.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.45
/src/anyio/pytest_plugin.py
1
from __future__ import annotations
9✔
2

3
import socket
9✔
4
import sys
9✔
5
from collections.abc import Callable, Generator, Iterator
9✔
6
from contextlib import ExitStack, contextmanager
9✔
7
from inspect import isasyncgenfunction, iscoroutinefunction, ismethod
9✔
8
from typing import Any, cast
9✔
9

10
import pytest
9✔
11
import sniffio
9✔
12
from _pytest.fixtures import SubRequest
9✔
13
from _pytest.outcomes import Exit
9✔
14

15
from ._core._eventloop import get_all_backends, get_async_backend
9✔
16
from ._core._exceptions import iterate_exceptions
9✔
17
from .abc import TestRunner
9✔
18

19
if sys.version_info < (3, 11):
9✔
20
    from exceptiongroup import ExceptionGroup
3✔
21

22
_current_runner: TestRunner | None = None
9✔
23
_runner_stack: ExitStack | None = None
9✔
24
_runner_leases = 0
9✔
25

26

27
def extract_backend_and_options(backend: object) -> tuple[str, dict[str, Any]]:
9✔
28
    if isinstance(backend, str):
9✔
29
        return backend, {}
9✔
30
    elif isinstance(backend, tuple) and len(backend) == 2:
9✔
31
        if isinstance(backend[0], str) and isinstance(backend[1], dict):
9✔
32
            return cast(tuple[str, dict[str, Any]], backend)
9✔
33

34
    raise TypeError("anyio_backend must be either a string or tuple of (string, dict)")
×
35

36

37
@contextmanager
9✔
38
def get_runner(
9✔
39
    backend_name: str, backend_options: dict[str, Any]
40
) -> Iterator[TestRunner]:
41
    global _current_runner, _runner_leases, _runner_stack
42
    if _current_runner is None:
9✔
43
        asynclib = get_async_backend(backend_name)
9✔
44
        _runner_stack = ExitStack()
9✔
45
        if sniffio.current_async_library_cvar.get(None) is None:
9✔
46
            # Since we're in control of the event loop, we can cache the name of the
47
            # async library
48
            token = sniffio.current_async_library_cvar.set(backend_name)
9✔
49
            _runner_stack.callback(sniffio.current_async_library_cvar.reset, token)
9✔
50

51
        backend_options = backend_options or {}
9✔
52
        _current_runner = _runner_stack.enter_context(
9✔
53
            asynclib.create_test_runner(backend_options)
54
        )
55

56
    _runner_leases += 1
9✔
57
    try:
9✔
58
        yield _current_runner
9✔
59
    finally:
60
        _runner_leases -= 1
9✔
61
        if not _runner_leases:
9✔
62
            assert _runner_stack is not None
9✔
63
            _runner_stack.close()
9✔
64
            _runner_stack = _current_runner = None
9✔
65

66

67
def pytest_addoption(parser: pytest.Parser) -> None:
9✔
68
    parser.addini(
9✔
69
        "anyio_mode",
70
        default="strict",
71
        help='AnyIO plugin mode (either "strict" or "auto")',
72
        type="string",
73
    )
74

75

76
def pytest_configure(config: pytest.Config) -> None:
9✔
77
    config.addinivalue_line(
9✔
78
        "markers",
79
        "anyio: mark the (coroutine function) test to be run asynchronously via anyio.",
80
    )
81
    if (
9✔
82
        config.getini("anyio_mode") == "auto"
83
        and config.pluginmanager.has_plugin("asyncio")
84
        and config.getini("asyncio_mode") == "auto"
85
    ):
86
        config.issue_config_time_warning(
9✔
87
            pytest.PytestConfigWarning(
88
                "AnyIO auto mode has been enabled together with pytest-asyncio auto "
89
                "mode. This may cause unexpected behavior."
90
            ),
91
            1,
92
        )
93

94

95
@pytest.hookimpl(hookwrapper=True)
9✔
96
def pytest_fixture_setup(fixturedef: Any, request: Any) -> Generator[Any]:
9✔
97
    def wrapper(anyio_backend: Any, request: SubRequest, **kwargs: Any) -> Any:
9✔
98
        # Rebind any fixture methods to the request instance
99
        if (
9✔
100
            request.instance
101
            and ismethod(func)
102
            and type(func.__self__) is type(request.instance)
103
        ):
104
            local_func = func.__func__.__get__(request.instance)
×
105
        else:
106
            local_func = func
9✔
107

108
        backend_name, backend_options = extract_backend_and_options(anyio_backend)
9✔
109
        if has_backend_arg:
9✔
110
            kwargs["anyio_backend"] = anyio_backend
9✔
111

112
        if has_request_arg:
9✔
113
            kwargs["request"] = request
9✔
114

115
        with get_runner(backend_name, backend_options) as runner:
9✔
116
            if isasyncgenfunction(local_func):
9✔
117
                yield from runner.run_asyncgen_fixture(local_func, kwargs)
9✔
118
            else:
119
                yield runner.run_fixture(local_func, kwargs)
9✔
120

121
    # Only apply this to coroutine functions and async generator functions in requests
122
    # that involve the anyio_backend fixture
123
    func = fixturedef.func
9✔
124
    if isasyncgenfunction(func) or iscoroutinefunction(func):
9✔
125
        if "anyio_backend" in request.fixturenames:
9✔
126
            fixturedef.func = wrapper
9✔
127
            original_argname = fixturedef.argnames
9✔
128

129
            if not (has_backend_arg := "anyio_backend" in fixturedef.argnames):
9✔
130
                fixturedef.argnames += ("anyio_backend",)
9✔
131

132
            if not (has_request_arg := "request" in fixturedef.argnames):
9✔
133
                fixturedef.argnames += ("request",)
9✔
134

135
            try:
9✔
136
                return (yield)
9✔
137
            finally:
138
                fixturedef.func = func
9✔
139
                fixturedef.argnames = original_argname
9✔
140

141
    return (yield)
9✔
142

143

144
@pytest.hookimpl(tryfirst=True)
9✔
145
def pytest_pycollect_makeitem(
9✔
146
    collector: pytest.Module | pytest.Class, name: str, obj: object
147
) -> None:
148
    if collector.istestfunction(obj, name):
9✔
149
        inner_func = obj.hypothesis.inner_test if hasattr(obj, "hypothesis") else obj
9✔
150
        if iscoroutinefunction(inner_func):
9✔
151
            anyio_auto_mode = collector.config.getini("anyio_mode") == "auto"
9✔
152
            marker = collector.get_closest_marker("anyio")
9✔
153
            own_markers = getattr(obj, "pytestmark", ())
9✔
154
            if (
9✔
155
                anyio_auto_mode
156
                or marker
157
                or any(marker.name == "anyio" for marker in own_markers)
158
            ):
159
                pytest.mark.usefixtures("anyio_backend")(obj)
9✔
160

161

162
@pytest.hookimpl(tryfirst=True)
9✔
163
def pytest_pyfunc_call(pyfuncitem: Any) -> bool | None:
9✔
164
    def run_with_hypothesis(**kwargs: Any) -> None:
9✔
165
        with get_runner(backend_name, backend_options) as runner:
9✔
166
            runner.run_test(original_func, kwargs)
9✔
167

168
    backend = pyfuncitem.funcargs.get("anyio_backend")
9✔
169
    if backend:
9✔
170
        backend_name, backend_options = extract_backend_and_options(backend)
9✔
171

172
        if hasattr(pyfuncitem.obj, "hypothesis"):
9✔
173
            # Wrap the inner test function unless it's already wrapped
174
            original_func = pyfuncitem.obj.hypothesis.inner_test
9✔
175
            if original_func.__qualname__ != run_with_hypothesis.__qualname__:
9✔
176
                if iscoroutinefunction(original_func):
9✔
177
                    pyfuncitem.obj.hypothesis.inner_test = run_with_hypothesis
9✔
178

179
            return None
9✔
180

181
        if iscoroutinefunction(pyfuncitem.obj):
9✔
182
            funcargs = pyfuncitem.funcargs
9✔
183
            testargs = {arg: funcargs[arg] for arg in pyfuncitem._fixtureinfo.argnames}
9✔
184
            with get_runner(backend_name, backend_options) as runner:
9✔
185
                try:
9✔
186
                    runner.run_test(pyfuncitem.obj, testargs)
9✔
187
                except ExceptionGroup as excgrp:
9✔
188
                    for exc in iterate_exceptions(excgrp):
9✔
189
                        if isinstance(exc, (Exit, KeyboardInterrupt, SystemExit)):
9✔
190
                            raise exc from excgrp
9✔
191

192
                    raise
2✔
193

194
            return True
9✔
195

196
    return None
9✔
197

198

199
@pytest.fixture(scope="module", params=get_all_backends())
9✔
200
def anyio_backend(request: Any) -> Any:
9✔
201
    return request.param
9✔
202

203

204
@pytest.fixture
9✔
205
def anyio_backend_name(anyio_backend: Any) -> str:
9✔
206
    if isinstance(anyio_backend, str):
9✔
207
        return anyio_backend
9✔
208
    else:
209
        return anyio_backend[0]
9✔
210

211

212
@pytest.fixture
9✔
213
def anyio_backend_options(anyio_backend: Any) -> dict[str, Any]:
9✔
214
    if isinstance(anyio_backend, str):
9✔
215
        return {}
9✔
216
    else:
217
        return anyio_backend[1]
9✔
218

219

220
class FreePortFactory:
9✔
221
    """
222
    Manages port generation based on specified socket kind, ensuring no duplicate
223
    ports are generated.
224

225
    This class provides functionality for generating available free ports on the
226
    system. It is initialized with a specific socket kind and can generate ports
227
    for given address families while avoiding reuse of previously generated ports.
228

229
    Users should not instantiate this class directly, but use the
230
    ``free_tcp_port_factory`` and ``free_udp_port_factory`` fixtures instead. For simple
231
    uses cases, ``free_tcp_port`` and ``free_udp_port`` can be used instead.
232
    """
233

234
    def __init__(self, kind: socket.SocketKind) -> None:
9✔
235
        self._kind = kind
9✔
236
        self._generated = set[int]()
9✔
237

238
    @property
9✔
239
    def kind(self) -> socket.SocketKind:
9✔
240
        """
241
        The type of socket connection (e.g., :data:`~socket.SOCK_STREAM` or
242
        :data:`~socket.SOCK_DGRAM`) used to bind for checking port availability
243

244
        """
245
        return self._kind
×
246

247
    def __call__(self, family: socket.AddressFamily | None = None) -> int:
9✔
248
        """
249
        Return an unbound port for the given address family.
250

251
        :param family: if omitted, both IPv4 and IPv6 addresses will be tried
252
        :return: a port number
253

254
        """
255
        if family is not None:
9✔
256
            families = [family]
×
257
        else:
258
            families = [socket.AF_INET]
9✔
259
            if socket.has_ipv6:
9✔
260
                families.append(socket.AF_INET6)
9✔
261

262
        while True:
7✔
263
            port = 0
9✔
264
            with ExitStack() as stack:
9✔
265
                for family in families:
9✔
266
                    sock = stack.enter_context(socket.socket(family, self._kind))
9✔
267
                    addr = "::1" if family == socket.AF_INET6 else "127.0.0.1"
9✔
268
                    try:
9✔
269
                        sock.bind((addr, port))
9✔
UNCOV
270
                    except OSError:
×
UNCOV
271
                        break
×
272

273
                    if not port:
9✔
274
                        port = sock.getsockname()[1]
9✔
275
                else:
276
                    if port not in self._generated:
9✔
277
                        self._generated.add(port)
9✔
278
                        return port
9✔
279

280

281
@pytest.fixture(scope="session")
9✔
282
def free_tcp_port_factory() -> FreePortFactory:
9✔
283
    return FreePortFactory(socket.SOCK_STREAM)
9✔
284

285

286
@pytest.fixture(scope="session")
9✔
287
def free_udp_port_factory() -> FreePortFactory:
9✔
288
    return FreePortFactory(socket.SOCK_DGRAM)
9✔
289

290

291
@pytest.fixture
9✔
292
def free_tcp_port(free_tcp_port_factory: Callable[[], int]) -> int:
9✔
293
    return free_tcp_port_factory()
9✔
294

295

296
@pytest.fixture
9✔
297
def free_udp_port(free_udp_port_factory: Callable[[], int]) -> int:
9✔
298
    return free_udp_port_factory()
9✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc