• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agronholm / anyio / 16490461315

24 Jul 2025 07:21AM UTC coverage: 92.354% (-0.03%) from 92.385%
16490461315

push

github

web-flow
Added the BufferedByteReceiveStream.feed_data() method (#945)

4 of 4 new or added lines in 1 file covered. (100.0%)

2 existing lines in 1 file now uncovered.

5532 of 5990 relevant lines covered (92.35%)

9.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.34
/src/anyio/pytest_plugin.py
1
from __future__ import annotations
11✔
2

3
import socket
11✔
4
import sys
11✔
5
from collections.abc import Callable, Generator, Iterator
11✔
6
from contextlib import ExitStack, contextmanager
11✔
7
from inspect import isasyncgenfunction, iscoroutinefunction, ismethod
11✔
8
from typing import Any, cast
11✔
9

10
import pytest
11✔
11
import sniffio
11✔
12
from _pytest.fixtures import SubRequest
11✔
13
from _pytest.outcomes import Exit
11✔
14

15
from ._core._eventloop import get_all_backends, get_async_backend
11✔
16
from ._core._exceptions import iterate_exceptions
11✔
17
from .abc import TestRunner
11✔
18

19
if sys.version_info < (3, 11):
11✔
20
    from exceptiongroup import ExceptionGroup
4✔
21

22
_current_runner: TestRunner | None = None
11✔
23
_runner_stack: ExitStack | None = None
11✔
24
_runner_leases = 0
11✔
25

26

27
def extract_backend_and_options(backend: object) -> tuple[str, dict[str, Any]]:
11✔
28
    if isinstance(backend, str):
11✔
29
        return backend, {}
11✔
30
    elif isinstance(backend, tuple) and len(backend) == 2:
11✔
31
        if isinstance(backend[0], str) and isinstance(backend[1], dict):
11✔
32
            return cast(tuple[str, dict[str, Any]], backend)
11✔
33

34
    raise TypeError("anyio_backend must be either a string or tuple of (string, dict)")
×
35

36

37
@contextmanager
11✔
38
def get_runner(
11✔
39
    backend_name: str, backend_options: dict[str, Any]
40
) -> Iterator[TestRunner]:
41
    global _current_runner, _runner_leases, _runner_stack
42
    if _current_runner is None:
11✔
43
        asynclib = get_async_backend(backend_name)
11✔
44
        _runner_stack = ExitStack()
11✔
45
        if sniffio.current_async_library_cvar.get(None) is None:
11✔
46
            # Since we're in control of the event loop, we can cache the name of the
47
            # async library
48
            token = sniffio.current_async_library_cvar.set(backend_name)
11✔
49
            _runner_stack.callback(sniffio.current_async_library_cvar.reset, token)
11✔
50

51
        backend_options = backend_options or {}
11✔
52
        _current_runner = _runner_stack.enter_context(
11✔
53
            asynclib.create_test_runner(backend_options)
54
        )
55

56
    _runner_leases += 1
11✔
57
    try:
11✔
58
        yield _current_runner
11✔
59
    finally:
60
        _runner_leases -= 1
11✔
61
        if not _runner_leases:
11✔
62
            assert _runner_stack is not None
11✔
63
            _runner_stack.close()
11✔
64
            _runner_stack = _current_runner = None
11✔
65

66

67
def pytest_configure(config: Any) -> None:
11✔
68
    config.addinivalue_line(
11✔
69
        "markers",
70
        "anyio: mark the (coroutine function) test to be run asynchronously via anyio.",
71
    )
72

73

74
@pytest.hookimpl(hookwrapper=True)
11✔
75
def pytest_fixture_setup(fixturedef: Any, request: Any) -> Generator[Any]:
11✔
76
    def wrapper(
11✔
77
        *args: Any, anyio_backend: Any, request: SubRequest, **kwargs: Any
78
    ) -> Any:
79
        # Rebind any fixture methods to the request instance
80
        if (
11✔
81
            request.instance
82
            and ismethod(func)
83
            and type(func.__self__) is type(request.instance)
84
        ):
85
            local_func = func.__func__.__get__(request.instance)
×
86
        else:
87
            local_func = func
11✔
88

89
        backend_name, backend_options = extract_backend_and_options(anyio_backend)
11✔
90
        if has_backend_arg:
11✔
91
            kwargs["anyio_backend"] = anyio_backend
11✔
92

93
        if has_request_arg:
11✔
94
            kwargs["request"] = request
11✔
95

96
        with get_runner(backend_name, backend_options) as runner:
11✔
97
            if isasyncgenfunction(local_func):
11✔
98
                yield from runner.run_asyncgen_fixture(local_func, kwargs)
11✔
99
            else:
100
                yield runner.run_fixture(local_func, kwargs)
11✔
101

102
    # Only apply this to coroutine functions and async generator functions in requests
103
    # that involve the anyio_backend fixture
104
    func = fixturedef.func
11✔
105
    if isasyncgenfunction(func) or iscoroutinefunction(func):
11✔
106
        if "anyio_backend" in request.fixturenames:
11✔
107
            fixturedef.func = wrapper
11✔
108
            original_argname = fixturedef.argnames
11✔
109

110
            if not (has_backend_arg := "anyio_backend" in fixturedef.argnames):
11✔
111
                fixturedef.argnames += ("anyio_backend",)
11✔
112

113
            if not (has_request_arg := "request" in fixturedef.argnames):
11✔
114
                fixturedef.argnames += ("request",)
11✔
115

116
            try:
11✔
117
                return (yield)
11✔
118
            finally:
119
                fixturedef.func = func
11✔
120
                fixturedef.argnames = original_argname
11✔
121

122
    return (yield)
11✔
123

124

125
@pytest.hookimpl(tryfirst=True)
11✔
126
def pytest_pycollect_makeitem(collector: Any, name: Any, obj: Any) -> None:
11✔
127
    if collector.istestfunction(obj, name):
11✔
128
        inner_func = obj.hypothesis.inner_test if hasattr(obj, "hypothesis") else obj
11✔
129
        if iscoroutinefunction(inner_func):
11✔
130
            marker = collector.get_closest_marker("anyio")
11✔
131
            own_markers = getattr(obj, "pytestmark", ())
11✔
132
            if marker or any(marker.name == "anyio" for marker in own_markers):
11✔
133
                pytest.mark.usefixtures("anyio_backend")(obj)
11✔
134

135

136
@pytest.hookimpl(tryfirst=True)
11✔
137
def pytest_pyfunc_call(pyfuncitem: Any) -> bool | None:
11✔
138
    def run_with_hypothesis(**kwargs: Any) -> None:
11✔
139
        with get_runner(backend_name, backend_options) as runner:
11✔
140
            runner.run_test(original_func, kwargs)
11✔
141

142
    backend = pyfuncitem.funcargs.get("anyio_backend")
11✔
143
    if backend:
11✔
144
        backend_name, backend_options = extract_backend_and_options(backend)
11✔
145

146
        if hasattr(pyfuncitem.obj, "hypothesis"):
11✔
147
            # Wrap the inner test function unless it's already wrapped
148
            original_func = pyfuncitem.obj.hypothesis.inner_test
11✔
149
            if original_func.__qualname__ != run_with_hypothesis.__qualname__:
11✔
150
                if iscoroutinefunction(original_func):
11✔
151
                    pyfuncitem.obj.hypothesis.inner_test = run_with_hypothesis
11✔
152

153
            return None
11✔
154

155
        if iscoroutinefunction(pyfuncitem.obj):
11✔
156
            funcargs = pyfuncitem.funcargs
11✔
157
            testargs = {arg: funcargs[arg] for arg in pyfuncitem._fixtureinfo.argnames}
11✔
158
            with get_runner(backend_name, backend_options) as runner:
11✔
159
                try:
11✔
160
                    runner.run_test(pyfuncitem.obj, testargs)
11✔
161
                except ExceptionGroup as excgrp:
11✔
162
                    for exc in iterate_exceptions(excgrp):
11✔
163
                        if isinstance(exc, (Exit, KeyboardInterrupt, SystemExit)):
11✔
164
                            raise exc from excgrp
11✔
165

166
                    raise
3✔
167

168
            return True
11✔
169

170
    return None
11✔
171

172

173
@pytest.fixture(scope="module", params=get_all_backends())
11✔
174
def anyio_backend(request: Any) -> Any:
11✔
175
    return request.param
11✔
176

177

178
@pytest.fixture
11✔
179
def anyio_backend_name(anyio_backend: Any) -> str:
11✔
180
    if isinstance(anyio_backend, str):
11✔
181
        return anyio_backend
11✔
182
    else:
183
        return anyio_backend[0]
11✔
184

185

186
@pytest.fixture
11✔
187
def anyio_backend_options(anyio_backend: Any) -> dict[str, Any]:
11✔
188
    if isinstance(anyio_backend, str):
11✔
189
        return {}
11✔
190
    else:
191
        return anyio_backend[1]
11✔
192

193

194
class FreePortFactory:
11✔
195
    """
196
    Manages port generation based on specified socket kind, ensuring no duplicate
197
    ports are generated.
198

199
    This class provides functionality for generating available free ports on the
200
    system. It is initialized with a specific socket kind and can generate ports
201
    for given address families while avoiding reuse of previously generated ports.
202

203
    Users should not instantiate this class directly, but use the
204
    ``free_tcp_port_factory`` and ``free_udp_port_factory`` fixtures instead. For simple
205
    uses cases, ``free_tcp_port`` and ``free_udp_port`` can be used instead.
206
    """
207

208
    def __init__(self, kind: socket.SocketKind) -> None:
11✔
209
        self._kind = kind
11✔
210
        self._generated = set[int]()
11✔
211

212
    @property
11✔
213
    def kind(self) -> socket.SocketKind:
11✔
214
        """
215
        The type of socket connection (e.g., :data:`~socket.SOCK_STREAM` or
216
        :data:`~socket.SOCK_DGRAM`) used to bind for checking port availability
217

218
        """
219
        return self._kind
×
220

221
    def __call__(self, family: socket.AddressFamily | None = None) -> int:
11✔
222
        """
223
        Return an unbound port for the given address family.
224

225
        :param family: if omitted, both IPv4 and IPv6 addresses will be tried
226
        :return: a port number
227

228
        """
229
        if family is not None:
11✔
230
            families = [family]
×
231
        else:
232
            families = [socket.AF_INET]
11✔
233
            if socket.has_ipv6:
11✔
234
                families.append(socket.AF_INET6)
11✔
235

236
        while True:
8✔
237
            port = 0
11✔
238
            with ExitStack() as stack:
11✔
239
                for family in families:
11✔
240
                    sock = stack.enter_context(socket.socket(family, self._kind))
11✔
241
                    addr = "::1" if family == socket.AF_INET6 else "127.0.0.1"
11✔
242
                    try:
11✔
243
                        sock.bind((addr, port))
11✔
UNCOV
244
                    except OSError:
×
UNCOV
245
                        break
×
246

247
                    if not port:
11✔
248
                        port = sock.getsockname()[1]
11✔
249
                else:
250
                    if port not in self._generated:
11✔
251
                        self._generated.add(port)
11✔
252
                        return port
11✔
253

254

255
@pytest.fixture(scope="session")
11✔
256
def free_tcp_port_factory() -> FreePortFactory:
11✔
257
    return FreePortFactory(socket.SOCK_STREAM)
11✔
258

259

260
@pytest.fixture(scope="session")
11✔
261
def free_udp_port_factory() -> FreePortFactory:
11✔
262
    return FreePortFactory(socket.SOCK_DGRAM)
11✔
263

264

265
@pytest.fixture
11✔
266
def free_tcp_port(free_tcp_port_factory: Callable[[], int]) -> int:
11✔
267
    return free_tcp_port_factory()
11✔
268

269

270
@pytest.fixture
11✔
271
def free_udp_port(free_udp_port_factory: Callable[[], int]) -> int:
11✔
272
    return free_udp_port_factory()
11✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc