• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agronholm / anyio / 23486144650

24 Mar 2026 11:03AM UTC coverage: 92.601% (-0.1%) from 92.701%
23486144650

push

github

web-flow
Fixed cancellation exceptions leaking from a `CancelScope` on asyncio when they are in a group with non-cancellation exceptions (#1092)

Fixes #1091.

16 of 16 new or added lines in 1 file covered. (100.0%)

9 existing lines in 2 files now uncovered.

5882 of 6352 relevant lines covered (92.6%)

10.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.31
/src/anyio/pytest_plugin.py
1
from __future__ import annotations
12✔
2

3
import dataclasses
12✔
4
import socket
12✔
5
import sys
12✔
6
from collections.abc import Callable, Generator, Iterator
12✔
7
from contextlib import ExitStack, contextmanager
12✔
8
from inspect import isasyncgenfunction, iscoroutinefunction, ismethod
12✔
9
from typing import Any, cast
12✔
10

11
import pytest
12✔
12
from _pytest.fixtures import FuncFixtureInfo, SubRequest
12✔
13
from _pytest.outcomes import Exit
12✔
14
from _pytest.python import CallSpec2
12✔
15
from _pytest.scope import Scope
12✔
16

17
from . import get_available_backends
12✔
18
from ._core._eventloop import (
12✔
19
    current_async_library,
20
    get_async_backend,
21
    reset_current_async_library,
22
    set_current_async_library,
23
)
24
from ._core._exceptions import iterate_exceptions
12✔
25
from .abc import TestRunner
12✔
26

27
if sys.version_info < (3, 11):
12✔
28
    from exceptiongroup import ExceptionGroup
4✔
29

30
_current_runner: TestRunner | None = None
12✔
31
_runner_stack: ExitStack | None = None
12✔
32
_runner_leases = 0
12✔
33

34

35
def extract_backend_and_options(backend: object) -> tuple[str, dict[str, Any]]:
12✔
36
    if isinstance(backend, str):
12✔
37
        return backend, {}
12✔
38
    elif isinstance(backend, tuple) and len(backend) == 2:
12✔
39
        if isinstance(backend[0], str) and isinstance(backend[1], dict):
12✔
40
            return cast(tuple[str, dict[str, Any]], backend)
12✔
41

42
    raise TypeError("anyio_backend must be either a string or tuple of (string, dict)")
×
43

44

45
@contextmanager
12✔
46
def get_runner(
12✔
47
    backend_name: str, backend_options: dict[str, Any]
48
) -> Iterator[TestRunner]:
49
    global _current_runner, _runner_leases, _runner_stack
50
    if _current_runner is None:
12✔
51
        asynclib = get_async_backend(backend_name)
12✔
52
        _runner_stack = ExitStack()
12✔
53
        if current_async_library() is None:
12✔
54
            # Since we're in control of the event loop, we can cache the name of the
55
            # async library
56
            token = set_current_async_library(backend_name)
12✔
57
            _runner_stack.callback(reset_current_async_library, token)
12✔
58

59
        backend_options = backend_options or {}
12✔
60
        _current_runner = _runner_stack.enter_context(
12✔
61
            asynclib.create_test_runner(backend_options)
62
        )
63

64
    _runner_leases += 1
12✔
65
    try:
12✔
66
        yield _current_runner
12✔
67
    finally:
68
        _runner_leases -= 1
12✔
69
        if not _runner_leases:
12✔
70
            assert _runner_stack is not None
12✔
71
            _runner_stack.close()
12✔
72
            _runner_stack = _current_runner = None
12✔
73

74

75
def pytest_addoption(parser: pytest.Parser) -> None:
12✔
76
    parser.addini(
12✔
77
        "anyio_mode",
78
        default="strict",
79
        help='AnyIO plugin mode (either "strict" or "auto")',
80
    )
81

82

83
def pytest_configure(config: pytest.Config) -> None:
12✔
84
    config.addinivalue_line(
12✔
85
        "markers",
86
        "anyio: mark the (coroutine function) test to be run asynchronously via anyio.",
87
    )
88
    if (
12✔
89
        config.getini("anyio_mode") == "auto"
90
        and config.pluginmanager.has_plugin("asyncio")
91
        and config.getini("asyncio_mode") == "auto"
92
    ):
93
        config.issue_config_time_warning(
12✔
94
            pytest.PytestConfigWarning(
95
                "AnyIO auto mode has been enabled together with pytest-asyncio auto "
96
                "mode. This may cause unexpected behavior."
97
            ),
98
            1,
99
        )
100

101

102
@pytest.hookimpl(hookwrapper=True)
12✔
103
def pytest_fixture_setup(fixturedef: Any, request: Any) -> Generator[Any]:
12✔
104
    def wrapper(anyio_backend: Any, request: SubRequest, **kwargs: Any) -> Any:
12✔
105
        # Rebind any fixture methods to the request instance
106
        if (
12✔
107
            request.instance
108
            and ismethod(func)
109
            and type(func.__self__) is type(request.instance)
110
        ):
111
            local_func = func.__func__.__get__(request.instance)
12✔
112
        else:
113
            local_func = func
12✔
114

115
        backend_name, backend_options = extract_backend_and_options(anyio_backend)
12✔
116
        if has_backend_arg:
12✔
117
            kwargs["anyio_backend"] = anyio_backend
12✔
118

119
        if has_request_arg:
12✔
120
            kwargs["request"] = request
12✔
121

122
        with get_runner(backend_name, backend_options) as runner:
12✔
123
            if isasyncgenfunction(local_func):
12✔
124
                yield from runner.run_asyncgen_fixture(local_func, kwargs)
12✔
125
            else:
126
                yield runner.run_fixture(local_func, kwargs)
12✔
127

128
    # Only apply this to coroutine functions and async generator functions in requests
129
    # that involve the anyio_backend fixture
130
    func = fixturedef.func
12✔
131
    if isasyncgenfunction(func) or iscoroutinefunction(func):
12✔
132
        if "anyio_backend" in request.fixturenames:
12✔
133
            fixturedef.func = wrapper
12✔
134
            original_argname = fixturedef.argnames
12✔
135

136
            if not (has_backend_arg := "anyio_backend" in fixturedef.argnames):
12✔
137
                fixturedef.argnames += ("anyio_backend",)
12✔
138

139
            if not (has_request_arg := "request" in fixturedef.argnames):
12✔
140
                fixturedef.argnames += ("request",)
12✔
141

142
            try:
12✔
143
                return (yield)
12✔
144
            finally:
145
                fixturedef.func = func
12✔
146
                fixturedef.argnames = original_argname
12✔
147

148
    return (yield)
12✔
149

150

151
@pytest.hookimpl(tryfirst=True)
12✔
152
def pytest_pycollect_makeitem(
12✔
153
    collector: pytest.Module | pytest.Class, name: str, obj: object
154
) -> None:
155
    if collector.istestfunction(obj, name):
12✔
156
        inner_func = obj.hypothesis.inner_test if hasattr(obj, "hypothesis") else obj
12✔
157
        if iscoroutinefunction(inner_func):
12✔
158
            anyio_auto_mode = collector.config.getini("anyio_mode") == "auto"
12✔
159
            marker = collector.get_closest_marker("anyio")
12✔
160
            own_markers = getattr(obj, "pytestmark", ())
12✔
161
            if (
12✔
162
                anyio_auto_mode
163
                or marker
164
                or any(marker.name == "anyio" for marker in own_markers)
165
            ):
166
                pytest.mark.usefixtures("anyio_backend")(obj)
12✔
167

168

169
def pytest_collection_finish(session: pytest.Session) -> None:
12✔
170
    for i, item in reversed(list(enumerate(session.items))):
12✔
171
        if (
12✔
172
            isinstance(item, pytest.Function)
173
            and iscoroutinefunction(item.function)
174
            and item.get_closest_marker("anyio") is not None
175
            and "anyio_backend" not in item.fixturenames
176
        ):
177
            new_items = []
12✔
178
            try:
12✔
179
                cs_fields = {f.name for f in dataclasses.fields(CallSpec2)}
12✔
UNCOV
180
            except TypeError:
×
UNCOV
181
                cs_fields = set()
×
182

183
            for param_index, backend in enumerate(get_available_backends()):
12✔
184
                if "_arg2scope" in cs_fields:  # pytest >= 8
12✔
185
                    callspec = CallSpec2(
12✔
186
                        params={"anyio_backend": backend},
187
                        indices={"anyio_backend": param_index},
188
                        _arg2scope={"anyio_backend": Scope.Module},
189
                        _idlist=[backend],
190
                        marks=[],
191
                    )
192
                else:  # pytest 7.x
UNCOV
193
                    callspec = CallSpec2(  # type: ignore[call-arg]
×
194
                        funcargs={},
195
                        params={"anyio_backend": backend},
196
                        indices={"anyio_backend": param_index},
197
                        arg2scope={"anyio_backend": Scope.Module},
198
                        idlist=[backend],
199
                        marks=[],
200
                    )
201

202
                fi = item._fixtureinfo
12✔
203
                new_names_closure = list(fi.names_closure)
12✔
204
                if "anyio_backend" not in new_names_closure:
12✔
205
                    new_names_closure.append("anyio_backend")
12✔
206

207
                new_fixtureinfo = FuncFixtureInfo(
12✔
208
                    argnames=fi.argnames,
209
                    initialnames=fi.initialnames,
210
                    names_closure=new_names_closure,
211
                    name2fixturedefs=fi.name2fixturedefs,
212
                )
213
                new_item = pytest.Function.from_parent(
12✔
214
                    item.parent,
215
                    name=f"{item.originalname}[{backend}]",
216
                    callspec=callspec,
217
                    callobj=item.obj,
218
                    fixtureinfo=new_fixtureinfo,
219
                    keywords=item.keywords,
220
                    originalname=item.originalname,
221
                )
222
                new_items.append(new_item)
12✔
223

224
            session.items[i : i + 1] = new_items
12✔
225

226

227
@pytest.hookimpl(tryfirst=True)
12✔
228
def pytest_pyfunc_call(pyfuncitem: Any) -> bool | None:
12✔
229
    def run_with_hypothesis(**kwargs: Any) -> None:
12✔
230
        with get_runner(backend_name, backend_options) as runner:
12✔
231
            runner.run_test(original_func, kwargs)
12✔
232

233
    backend = pyfuncitem.funcargs.get("anyio_backend")
12✔
234
    if backend:
12✔
235
        backend_name, backend_options = extract_backend_and_options(backend)
12✔
236

237
        if hasattr(pyfuncitem.obj, "hypothesis"):
12✔
238
            # Wrap the inner test function unless it's already wrapped
239
            original_func = pyfuncitem.obj.hypothesis.inner_test
12✔
240
            if original_func.__qualname__ != run_with_hypothesis.__qualname__:
12✔
241
                if iscoroutinefunction(original_func):
12✔
242
                    pyfuncitem.obj.hypothesis.inner_test = run_with_hypothesis
12✔
243

244
            return None
12✔
245

246
        if iscoroutinefunction(pyfuncitem.obj):
12✔
247
            funcargs = pyfuncitem.funcargs
12✔
248
            testargs = {arg: funcargs[arg] for arg in pyfuncitem._fixtureinfo.argnames}
12✔
249
            with get_runner(backend_name, backend_options) as runner:
12✔
250
                try:
12✔
251
                    runner.run_test(pyfuncitem.obj, testargs)
12✔
252
                except ExceptionGroup as excgrp:
12✔
253
                    for exc in iterate_exceptions(excgrp):
12✔
254
                        if isinstance(exc, (Exit, KeyboardInterrupt, SystemExit)):
12✔
255
                            raise exc from excgrp
12✔
256

257
                    raise
×
258

259
            return True
12✔
260

261
    return None
12✔
262

263

264
@pytest.fixture(scope="module", params=get_available_backends())
12✔
265
def anyio_backend(request: Any) -> Any:
12✔
266
    return request.param
12✔
267

268

269
@pytest.fixture
12✔
270
def anyio_backend_name(anyio_backend: Any) -> str:
12✔
271
    if isinstance(anyio_backend, str):
12✔
272
        return anyio_backend
12✔
273
    else:
274
        return anyio_backend[0]
12✔
275

276

277
@pytest.fixture
12✔
278
def anyio_backend_options(anyio_backend: Any) -> dict[str, Any]:
12✔
279
    if isinstance(anyio_backend, str):
12✔
280
        return {}
11✔
281
    else:
282
        return anyio_backend[1]
12✔
283

284

285
class FreePortFactory:
12✔
286
    """
287
    Manages port generation based on specified socket kind, ensuring no duplicate
288
    ports are generated.
289

290
    This class provides functionality for generating available free ports on the
291
    system. It is initialized with a specific socket kind and can generate ports
292
    for given address families while avoiding reuse of previously generated ports.
293

294
    Users should not instantiate this class directly, but use the
295
    ``free_tcp_port_factory`` and ``free_udp_port_factory`` fixtures instead. For simple
296
    uses cases, ``free_tcp_port`` and ``free_udp_port`` can be used instead.
297
    """
298

299
    def __init__(self, kind: socket.SocketKind) -> None:
12✔
300
        self._kind = kind
12✔
301
        self._generated = set[int]()
12✔
302

303
    @property
12✔
304
    def kind(self) -> socket.SocketKind:
12✔
305
        """
306
        The type of socket connection (e.g., :data:`~socket.SOCK_STREAM` or
307
        :data:`~socket.SOCK_DGRAM`) used to bind for checking port availability
308

309
        """
310
        return self._kind
×
311

312
    def __call__(self, family: socket.AddressFamily | None = None) -> int:
12✔
313
        """
314
        Return an unbound port for the given address family.
315

316
        :param family: if omitted, both IPv4 and IPv6 addresses will be tried
317
        :return: a port number
318

319
        """
320
        if family is not None:
12✔
321
            families = [family]
×
322
        else:
323
            families = [socket.AF_INET]
12✔
324
            if socket.has_ipv6:
12✔
325
                families.append(socket.AF_INET6)
12✔
326

327
        while True:
12✔
328
            port = 0
12✔
329
            with ExitStack() as stack:
12✔
330
                for family in families:
12✔
331
                    sock = stack.enter_context(socket.socket(family, self._kind))
12✔
332
                    addr = "::1" if family == socket.AF_INET6 else "127.0.0.1"
12✔
333
                    try:
12✔
334
                        sock.bind((addr, port))
12✔
UNCOV
335
                    except OSError:
×
UNCOV
336
                        break
×
337

338
                    if not port:
12✔
339
                        port = sock.getsockname()[1]
12✔
340
                else:
341
                    if port not in self._generated:
12✔
342
                        self._generated.add(port)
12✔
343
                        return port
12✔
344

345

346
@pytest.fixture(scope="session")
12✔
347
def free_tcp_port_factory() -> FreePortFactory:
12✔
348
    return FreePortFactory(socket.SOCK_STREAM)
12✔
349

350

351
@pytest.fixture(scope="session")
12✔
352
def free_udp_port_factory() -> FreePortFactory:
12✔
353
    return FreePortFactory(socket.SOCK_DGRAM)
12✔
354

355

356
@pytest.fixture
12✔
357
def free_tcp_port(free_tcp_port_factory: Callable[[], int]) -> int:
12✔
358
    return free_tcp_port_factory()
12✔
359

360

361
@pytest.fixture
12✔
362
def free_udp_port(free_udp_port_factory: Callable[[], int]) -> int:
12✔
363
    return free_udp_port_factory()
12✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc