• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agronholm / anyio / 22005847248

13 Feb 2026 11:04PM UTC coverage: 92.852% (+0.2%) from 92.678%
22005847248

Pull #1071

github

web-flow
Merge 3fe37125f into c33395a5d
Pull Request #1071: Fix some `TaskGroup` exit issues on asyncio

23 of 24 new or added lines in 1 file covered. (95.83%)

3 existing lines in 2 files now uncovered.

6053 of 6519 relevant lines covered (92.85%)

9.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.63
/src/anyio/pytest_plugin.py
1
from __future__ import annotations
12✔
2

3
import socket
12✔
4
import sys
12✔
5
from collections.abc import Callable, Generator, Iterator
12✔
6
from contextlib import ExitStack, contextmanager
12✔
7
from inspect import isasyncgenfunction, iscoroutinefunction, ismethod
12✔
8
from typing import Any, cast
12✔
9

10
import pytest
12✔
11
from _pytest.fixtures import SubRequest
12✔
12
from _pytest.outcomes import Exit
12✔
13

14
from . import get_available_backends
12✔
15
from ._core._eventloop import (
12✔
16
    current_async_library,
17
    get_async_backend,
18
    reset_current_async_library,
19
    set_current_async_library,
20
)
21
from ._core._exceptions import iterate_exceptions
12✔
22
from .abc import TestRunner
12✔
23

24
if sys.version_info < (3, 11):
12✔
25
    from exceptiongroup import ExceptionGroup
4✔
26

27
_current_runner: TestRunner | None = None
12✔
28
_runner_stack: ExitStack | None = None
12✔
29
_runner_leases = 0
12✔
30

31

32
def extract_backend_and_options(backend: object) -> tuple[str, dict[str, Any]]:
12✔
33
    if isinstance(backend, str):
12✔
34
        return backend, {}
12✔
35
    elif isinstance(backend, tuple) and len(backend) == 2:
12✔
36
        if isinstance(backend[0], str) and isinstance(backend[1], dict):
12✔
37
            return cast(tuple[str, dict[str, Any]], backend)
12✔
38

39
    raise TypeError("anyio_backend must be either a string or tuple of (string, dict)")
×
40

41

42
@contextmanager
12✔
43
def get_runner(
12✔
44
    backend_name: str, backend_options: dict[str, Any]
45
) -> Iterator[TestRunner]:
46
    global _current_runner, _runner_leases, _runner_stack
47
    if _current_runner is None:
12✔
48
        asynclib = get_async_backend(backend_name)
12✔
49
        _runner_stack = ExitStack()
12✔
50
        if current_async_library() is None:
12✔
51
            # Since we're in control of the event loop, we can cache the name of the
52
            # async library
53
            token = set_current_async_library(backend_name)
12✔
54
            _runner_stack.callback(reset_current_async_library, token)
12✔
55

56
        backend_options = backend_options or {}
12✔
57
        _current_runner = _runner_stack.enter_context(
12✔
58
            asynclib.create_test_runner(backend_options)
59
        )
60

61
    _runner_leases += 1
12✔
62
    try:
12✔
63
        yield _current_runner
12✔
64
    finally:
65
        _runner_leases -= 1
12✔
66
        if not _runner_leases:
12✔
67
            assert _runner_stack is not None
12✔
68
            _runner_stack.close()
12✔
69
            _runner_stack = _current_runner = None
12✔
70

71

72
def pytest_addoption(parser: pytest.Parser) -> None:
12✔
73
    parser.addini(
12✔
74
        "anyio_mode",
75
        default="strict",
76
        help='AnyIO plugin mode (either "strict" or "auto")',
77
    )
78

79

80
def pytest_configure(config: pytest.Config) -> None:
12✔
81
    config.addinivalue_line(
12✔
82
        "markers",
83
        "anyio: mark the (coroutine function) test to be run asynchronously via anyio.",
84
    )
85
    if (
12✔
86
        config.getini("anyio_mode") == "auto"
87
        and config.pluginmanager.has_plugin("asyncio")
88
        and config.getini("asyncio_mode") == "auto"
89
    ):
90
        config.issue_config_time_warning(
12✔
91
            pytest.PytestConfigWarning(
92
                "AnyIO auto mode has been enabled together with pytest-asyncio auto "
93
                "mode. This may cause unexpected behavior."
94
            ),
95
            1,
96
        )
97

98

99
@pytest.hookimpl(hookwrapper=True)
12✔
100
def pytest_fixture_setup(fixturedef: Any, request: Any) -> Generator[Any]:
12✔
101
    def wrapper(anyio_backend: Any, request: SubRequest, **kwargs: Any) -> Any:
12✔
102
        # Rebind any fixture methods to the request instance
103
        if (
12✔
104
            request.instance
105
            and ismethod(func)
106
            and type(func.__self__) is type(request.instance)
107
        ):
108
            local_func = func.__func__.__get__(request.instance)
12✔
109
        else:
110
            local_func = func
12✔
111

112
        backend_name, backend_options = extract_backend_and_options(anyio_backend)
12✔
113
        if has_backend_arg:
12✔
114
            kwargs["anyio_backend"] = anyio_backend
12✔
115

116
        if has_request_arg:
12✔
117
            kwargs["request"] = request
12✔
118

119
        with get_runner(backend_name, backend_options) as runner:
12✔
120
            if isasyncgenfunction(local_func):
12✔
121
                yield from runner.run_asyncgen_fixture(local_func, kwargs)
12✔
122
            else:
123
                yield runner.run_fixture(local_func, kwargs)
12✔
124

125
    # Only apply this to coroutine functions and async generator functions in requests
126
    # that involve the anyio_backend fixture
127
    func = fixturedef.func
12✔
128
    if isasyncgenfunction(func) or iscoroutinefunction(func):
12✔
129
        if "anyio_backend" in request.fixturenames:
12✔
130
            fixturedef.func = wrapper
12✔
131
            original_argname = fixturedef.argnames
12✔
132

133
            if not (has_backend_arg := "anyio_backend" in fixturedef.argnames):
12✔
134
                fixturedef.argnames += ("anyio_backend",)
12✔
135

136
            if not (has_request_arg := "request" in fixturedef.argnames):
12✔
137
                fixturedef.argnames += ("request",)
12✔
138

139
            try:
12✔
140
                return (yield)
12✔
141
            finally:
142
                fixturedef.func = func
12✔
143
                fixturedef.argnames = original_argname
12✔
144

145
    return (yield)
12✔
146

147

148
@pytest.hookimpl(tryfirst=True)
12✔
149
def pytest_pycollect_makeitem(
12✔
150
    collector: pytest.Module | pytest.Class, name: str, obj: object
151
) -> None:
152
    if collector.istestfunction(obj, name):
12✔
153
        inner_func = obj.hypothesis.inner_test if hasattr(obj, "hypothesis") else obj
12✔
154
        if iscoroutinefunction(inner_func):
12✔
155
            anyio_auto_mode = collector.config.getini("anyio_mode") == "auto"
12✔
156
            marker = collector.get_closest_marker("anyio")
12✔
157
            own_markers = getattr(obj, "pytestmark", ())
12✔
158
            if (
12✔
159
                anyio_auto_mode
160
                or marker
161
                or any(marker.name == "anyio" for marker in own_markers)
162
            ):
163
                pytest.mark.usefixtures("anyio_backend")(obj)
12✔
164

165

166
@pytest.hookimpl(tryfirst=True)
12✔
167
def pytest_pyfunc_call(pyfuncitem: Any) -> bool | None:
12✔
168
    def run_with_hypothesis(**kwargs: Any) -> None:
12✔
169
        with get_runner(backend_name, backend_options) as runner:
12✔
170
            runner.run_test(original_func, kwargs)
12✔
171

172
    backend = pyfuncitem.funcargs.get("anyio_backend")
12✔
173
    if backend:
12✔
174
        backend_name, backend_options = extract_backend_and_options(backend)
12✔
175

176
        if hasattr(pyfuncitem.obj, "hypothesis"):
12✔
177
            # Wrap the inner test function unless it's already wrapped
178
            original_func = pyfuncitem.obj.hypothesis.inner_test
12✔
179
            if original_func.__qualname__ != run_with_hypothesis.__qualname__:
12✔
180
                if iscoroutinefunction(original_func):
12✔
181
                    pyfuncitem.obj.hypothesis.inner_test = run_with_hypothesis
12✔
182

183
            return None
12✔
184

185
        if iscoroutinefunction(pyfuncitem.obj):
12✔
186
            funcargs = pyfuncitem.funcargs
12✔
187
            testargs = {arg: funcargs[arg] for arg in pyfuncitem._fixtureinfo.argnames}
12✔
188
            with get_runner(backend_name, backend_options) as runner:
12✔
189
                try:
12✔
190
                    runner.run_test(pyfuncitem.obj, testargs)
12✔
191
                except ExceptionGroup as excgrp:
12✔
192
                    for exc in iterate_exceptions(excgrp):
12✔
193
                        if isinstance(exc, (Exit, KeyboardInterrupt, SystemExit)):
12✔
194
                            raise exc from excgrp
12✔
195

196
                    raise
×
197

198
            return True
12✔
199

200
    return None
12✔
201

202

203
@pytest.fixture(scope="module", params=get_available_backends())
12✔
204
def anyio_backend(request: Any) -> Any:
12✔
205
    return request.param
12✔
206

207

208
@pytest.fixture
12✔
209
def anyio_backend_name(anyio_backend: Any) -> str:
12✔
210
    if isinstance(anyio_backend, str):
12✔
211
        return anyio_backend
12✔
212
    else:
213
        return anyio_backend[0]
12✔
214

215

216
@pytest.fixture
12✔
217
def anyio_backend_options(anyio_backend: Any) -> dict[str, Any]:
12✔
218
    if isinstance(anyio_backend, str):
12✔
219
        return {}
11✔
220
    else:
221
        return anyio_backend[1]
12✔
222

223

224
class FreePortFactory:
12✔
225
    """
226
    Manages port generation based on specified socket kind, ensuring no duplicate
227
    ports are generated.
228

229
    This class provides functionality for generating available free ports on the
230
    system. It is initialized with a specific socket kind and can generate ports
231
    for given address families while avoiding reuse of previously generated ports.
232

233
    Users should not instantiate this class directly, but use the
234
    ``free_tcp_port_factory`` and ``free_udp_port_factory`` fixtures instead. For simple
235
    uses cases, ``free_tcp_port`` and ``free_udp_port`` can be used instead.
236
    """
237

238
    def __init__(self, kind: socket.SocketKind) -> None:
12✔
239
        self._kind = kind
12✔
240
        self._generated = set[int]()
12✔
241

242
    @property
12✔
243
    def kind(self) -> socket.SocketKind:
12✔
244
        """
245
        The type of socket connection (e.g., :data:`~socket.SOCK_STREAM` or
246
        :data:`~socket.SOCK_DGRAM`) used to bind for checking port availability
247

248
        """
UNCOV
249
        return self._kind
×
250

251
    def __call__(self, family: socket.AddressFamily | None = None) -> int:
12✔
252
        """
253
        Return an unbound port for the given address family.
254

255
        :param family: if omitted, both IPv4 and IPv6 addresses will be tried
256
        :return: a port number
257

258
        """
259
        if family is not None:
12✔
260
            families = [family]
1✔
261
        else:
262
            families = [socket.AF_INET]
12✔
263
            if socket.has_ipv6:
12✔
264
                families.append(socket.AF_INET6)
12✔
265

266
        while True:
12✔
267
            port = 0
12✔
268
            with ExitStack() as stack:
12✔
269
                for family in families:
12✔
270
                    sock = stack.enter_context(socket.socket(family, self._kind))
12✔
271
                    addr = "::1" if family == socket.AF_INET6 else "127.0.0.1"
12✔
272
                    try:
12✔
273
                        sock.bind((addr, port))
12✔
274
                    except OSError:
×
275
                        break
1✔
276

277
                    if not port:
12✔
278
                        port = sock.getsockname()[1]
12✔
279
                else:
280
                    if port not in self._generated:
12✔
281
                        self._generated.add(port)
12✔
282
                        return port
12✔
283

284

285
@pytest.fixture(scope="session")
12✔
286
def free_tcp_port_factory() -> FreePortFactory:
12✔
287
    return FreePortFactory(socket.SOCK_STREAM)
12✔
288

289

290
@pytest.fixture(scope="session")
12✔
291
def free_udp_port_factory() -> FreePortFactory:
12✔
292
    return FreePortFactory(socket.SOCK_DGRAM)
12✔
293

294

295
@pytest.fixture
12✔
296
def free_tcp_port(free_tcp_port_factory: Callable[[], int]) -> int:
12✔
297
    return free_tcp_port_factory()
12✔
298

299

300
@pytest.fixture
12✔
301
def free_udp_port(free_udp_port_factory: Callable[[], int]) -> int:
12✔
302
    return free_udp_port_factory()
12✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc