• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agronholm / anyio / 18649753262

20 Oct 2025 10:52AM UTC coverage: 92.428% (-0.03%) from 92.462%
18649753262

push

github

web-flow
Add changelog entry for BufferedByteStream (#1004)

5542 of 5996 relevant lines covered (92.43%)

9.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.45
/src/anyio/pytest_plugin.py
1
from __future__ import annotations
11✔
2

3
import socket
11✔
4
import sys
11✔
5
from collections.abc import Callable, Generator, Iterator
11✔
6
from contextlib import ExitStack, contextmanager
11✔
7
from inspect import isasyncgenfunction, iscoroutinefunction, ismethod
11✔
8
from typing import Any, cast
11✔
9

10
import pytest
11✔
11
import sniffio
11✔
12
from _pytest.fixtures import SubRequest
11✔
13
from _pytest.outcomes import Exit
11✔
14

15
from ._core._eventloop import get_all_backends, get_async_backend
11✔
16
from ._core._exceptions import iterate_exceptions
11✔
17
from .abc import TestRunner
11✔
18

19
if sys.version_info < (3, 11):
11✔
20
    from exceptiongroup import ExceptionGroup
4✔
21

22
_current_runner: TestRunner | None = None
11✔
23
_runner_stack: ExitStack | None = None
11✔
24
_runner_leases = 0
11✔
25

26

27
def extract_backend_and_options(backend: object) -> tuple[str, dict[str, Any]]:
11✔
28
    if isinstance(backend, str):
11✔
29
        return backend, {}
11✔
30
    elif isinstance(backend, tuple) and len(backend) == 2:
11✔
31
        if isinstance(backend[0], str) and isinstance(backend[1], dict):
11✔
32
            return cast(tuple[str, dict[str, Any]], backend)
11✔
33

34
    raise TypeError("anyio_backend must be either a string or tuple of (string, dict)")
×
35

36

37
@contextmanager
11✔
38
def get_runner(
11✔
39
    backend_name: str, backend_options: dict[str, Any]
40
) -> Iterator[TestRunner]:
41
    global _current_runner, _runner_leases, _runner_stack
42
    if _current_runner is None:
11✔
43
        asynclib = get_async_backend(backend_name)
11✔
44
        _runner_stack = ExitStack()
11✔
45
        if sniffio.current_async_library_cvar.get(None) is None:
11✔
46
            # Since we're in control of the event loop, we can cache the name of the
47
            # async library
48
            token = sniffio.current_async_library_cvar.set(backend_name)
11✔
49
            _runner_stack.callback(sniffio.current_async_library_cvar.reset, token)
11✔
50

51
        backend_options = backend_options or {}
11✔
52
        _current_runner = _runner_stack.enter_context(
11✔
53
            asynclib.create_test_runner(backend_options)
54
        )
55

56
    _runner_leases += 1
11✔
57
    try:
11✔
58
        yield _current_runner
11✔
59
    finally:
60
        _runner_leases -= 1
11✔
61
        if not _runner_leases:
11✔
62
            assert _runner_stack is not None
11✔
63
            _runner_stack.close()
11✔
64
            _runner_stack = _current_runner = None
11✔
65

66

67
def pytest_addoption(parser: pytest.Parser) -> None:
11✔
68
    parser.addini(
11✔
69
        "anyio_mode",
70
        default="strict",
71
        help='AnyIO plugin mode (either "strict" or "auto")',
72
        type="string",
73
    )
74

75

76
def pytest_configure(config: pytest.Config) -> None:
11✔
77
    config.addinivalue_line(
11✔
78
        "markers",
79
        "anyio: mark the (coroutine function) test to be run asynchronously via anyio.",
80
    )
81
    if (
11✔
82
        config.getini("anyio_mode") == "auto"
83
        and config.pluginmanager.has_plugin("asyncio")
84
        and config.getini("asyncio_mode") == "auto"
85
    ):
86
        config.issue_config_time_warning(
11✔
87
            pytest.PytestConfigWarning(
88
                "AnyIO auto mode has been enabled together with pytest-asyncio auto "
89
                "mode. This may cause unexpected behavior."
90
            ),
91
            1,
92
        )
93

94

95
@pytest.hookimpl(hookwrapper=True)
11✔
96
def pytest_fixture_setup(fixturedef: Any, request: Any) -> Generator[Any]:
11✔
97
    def wrapper(anyio_backend: Any, request: SubRequest, **kwargs: Any) -> Any:
11✔
98
        # Rebind any fixture methods to the request instance
99
        if (
11✔
100
            request.instance
101
            and ismethod(func)
102
            and type(func.__self__) is type(request.instance)
103
        ):
104
            local_func = func.__func__.__get__(request.instance)
×
105
        else:
106
            local_func = func
11✔
107

108
        backend_name, backend_options = extract_backend_and_options(anyio_backend)
11✔
109
        if has_backend_arg:
11✔
110
            kwargs["anyio_backend"] = anyio_backend
11✔
111

112
        if has_request_arg:
11✔
113
            kwargs["request"] = request
11✔
114

115
        with get_runner(backend_name, backend_options) as runner:
11✔
116
            if isasyncgenfunction(local_func):
11✔
117
                yield from runner.run_asyncgen_fixture(local_func, kwargs)
11✔
118
            else:
119
                yield runner.run_fixture(local_func, kwargs)
11✔
120

121
    # Only apply this to coroutine functions and async generator functions in requests
122
    # that involve the anyio_backend fixture
123
    func = fixturedef.func
11✔
124
    if isasyncgenfunction(func) or iscoroutinefunction(func):
11✔
125
        if "anyio_backend" in request.fixturenames:
11✔
126
            fixturedef.func = wrapper
11✔
127
            original_argname = fixturedef.argnames
11✔
128

129
            if not (has_backend_arg := "anyio_backend" in fixturedef.argnames):
11✔
130
                fixturedef.argnames += ("anyio_backend",)
11✔
131

132
            if not (has_request_arg := "request" in fixturedef.argnames):
11✔
133
                fixturedef.argnames += ("request",)
11✔
134

135
            try:
11✔
136
                return (yield)
11✔
137
            finally:
138
                fixturedef.func = func
11✔
139
                fixturedef.argnames = original_argname
11✔
140

141
    return (yield)
11✔
142

143

144
@pytest.hookimpl(tryfirst=True)
11✔
145
def pytest_pycollect_makeitem(
11✔
146
    collector: pytest.Module | pytest.Class, name: str, obj: object
147
) -> None:
148
    if collector.istestfunction(obj, name):
11✔
149
        inner_func = obj.hypothesis.inner_test if hasattr(obj, "hypothesis") else obj
11✔
150
        if iscoroutinefunction(inner_func):
11✔
151
            anyio_auto_mode = collector.config.getini("anyio_mode") == "auto"
11✔
152
            marker = collector.get_closest_marker("anyio")
11✔
153
            own_markers = getattr(obj, "pytestmark", ())
11✔
154
            if (
11✔
155
                anyio_auto_mode
156
                or marker
157
                or any(marker.name == "anyio" for marker in own_markers)
158
            ):
159
                pytest.mark.usefixtures("anyio_backend")(obj)
11✔
160

161

162
@pytest.hookimpl(tryfirst=True)
11✔
163
def pytest_pyfunc_call(pyfuncitem: Any) -> bool | None:
11✔
164
    def run_with_hypothesis(**kwargs: Any) -> None:
11✔
165
        with get_runner(backend_name, backend_options) as runner:
11✔
166
            runner.run_test(original_func, kwargs)
11✔
167

168
    backend = pyfuncitem.funcargs.get("anyio_backend")
11✔
169
    if backend:
11✔
170
        backend_name, backend_options = extract_backend_and_options(backend)
11✔
171

172
        if hasattr(pyfuncitem.obj, "hypothesis"):
11✔
173
            # Wrap the inner test function unless it's already wrapped
174
            original_func = pyfuncitem.obj.hypothesis.inner_test
11✔
175
            if original_func.__qualname__ != run_with_hypothesis.__qualname__:
11✔
176
                if iscoroutinefunction(original_func):
11✔
177
                    pyfuncitem.obj.hypothesis.inner_test = run_with_hypothesis
11✔
178

179
            return None
11✔
180

181
        if iscoroutinefunction(pyfuncitem.obj):
11✔
182
            funcargs = pyfuncitem.funcargs
11✔
183
            testargs = {arg: funcargs[arg] for arg in pyfuncitem._fixtureinfo.argnames}
11✔
184
            with get_runner(backend_name, backend_options) as runner:
11✔
185
                try:
11✔
186
                    runner.run_test(pyfuncitem.obj, testargs)
11✔
187
                except ExceptionGroup as excgrp:
11✔
188
                    for exc in iterate_exceptions(excgrp):
11✔
189
                        if isinstance(exc, (Exit, KeyboardInterrupt, SystemExit)):
11✔
190
                            raise exc from excgrp
11✔
191

192
                    raise
3✔
193

194
            return True
11✔
195

196
    return None
11✔
197

198

199
@pytest.fixture(scope="module", params=get_all_backends())
11✔
200
def anyio_backend(request: Any) -> Any:
11✔
201
    return request.param
11✔
202

203

204
@pytest.fixture
11✔
205
def anyio_backend_name(anyio_backend: Any) -> str:
11✔
206
    if isinstance(anyio_backend, str):
11✔
207
        return anyio_backend
11✔
208
    else:
209
        return anyio_backend[0]
11✔
210

211

212
@pytest.fixture
11✔
213
def anyio_backend_options(anyio_backend: Any) -> dict[str, Any]:
11✔
214
    if isinstance(anyio_backend, str):
11✔
215
        return {}
11✔
216
    else:
217
        return anyio_backend[1]
11✔
218

219

220
class FreePortFactory:
11✔
221
    """
222
    Manages port generation based on specified socket kind, ensuring no duplicate
223
    ports are generated.
224

225
    This class provides functionality for generating available free ports on the
226
    system. It is initialized with a specific socket kind and can generate ports
227
    for given address families while avoiding reuse of previously generated ports.
228

229
    Users should not instantiate this class directly, but use the
230
    ``free_tcp_port_factory`` and ``free_udp_port_factory`` fixtures instead. For simple
231
    uses cases, ``free_tcp_port`` and ``free_udp_port`` can be used instead.
232
    """
233

234
    def __init__(self, kind: socket.SocketKind) -> None:
11✔
235
        self._kind = kind
11✔
236
        self._generated = set[int]()
11✔
237

238
    @property
11✔
239
    def kind(self) -> socket.SocketKind:
11✔
240
        """
241
        The type of socket connection (e.g., :data:`~socket.SOCK_STREAM` or
242
        :data:`~socket.SOCK_DGRAM`) used to bind for checking port availability
243

244
        """
245
        return self._kind
×
246

247
    def __call__(self, family: socket.AddressFamily | None = None) -> int:
11✔
248
        """
249
        Return an unbound port for the given address family.
250

251
        :param family: if omitted, both IPv4 and IPv6 addresses will be tried
252
        :return: a port number
253

254
        """
255
        if family is not None:
11✔
256
            families = [family]
×
257
        else:
258
            families = [socket.AF_INET]
11✔
259
            if socket.has_ipv6:
11✔
260
                families.append(socket.AF_INET6)
11✔
261

262
        while True:
8✔
263
            port = 0
11✔
264
            with ExitStack() as stack:
11✔
265
                for family in families:
11✔
266
                    sock = stack.enter_context(socket.socket(family, self._kind))
11✔
267
                    addr = "::1" if family == socket.AF_INET6 else "127.0.0.1"
11✔
268
                    try:
11✔
269
                        sock.bind((addr, port))
11✔
270
                    except OSError:
×
271
                        break
×
272

273
                    if not port:
11✔
274
                        port = sock.getsockname()[1]
11✔
275
                else:
276
                    if port not in self._generated:
11✔
277
                        self._generated.add(port)
11✔
278
                        return port
11✔
279

280

281
@pytest.fixture(scope="session")
11✔
282
def free_tcp_port_factory() -> FreePortFactory:
11✔
283
    return FreePortFactory(socket.SOCK_STREAM)
11✔
284

285

286
@pytest.fixture(scope="session")
11✔
287
def free_udp_port_factory() -> FreePortFactory:
11✔
288
    return FreePortFactory(socket.SOCK_DGRAM)
11✔
289

290

291
@pytest.fixture
11✔
292
def free_tcp_port(free_tcp_port_factory: Callable[[], int]) -> int:
11✔
293
    return free_tcp_port_factory()
11✔
294

295

296
@pytest.fixture
11✔
297
def free_udp_port(free_udp_port_factory: Callable[[], int]) -> int:
11✔
298
    return free_udp_port_factory()
11✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc