• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agronholm / anyio / 10672190754

02 Sep 2024 07:27PM UTC coverage: 91.511% (-0.3%) from 91.77%
10672190754

Pull #774

github

web-flow
Merge 7a6b1b0cd into 5a1e4191e
Pull Request #774: Fixed TaskGroup and CancelScope exit issues on asyncio

88 of 89 new or added lines in 10 files covered. (98.88%)

17 existing lines in 1 file now uncovered.

4592 of 5018 relevant lines covered (91.51%)

8.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.51
/src/anyio/lowlevel.py
1
from __future__ import annotations
10✔
2

3
import enum
10✔
4
from dataclasses import dataclass
10✔
5
from typing import Any, Generic, Literal, TypeVar, overload
10✔
6
from weakref import WeakKeyDictionary
10✔
7

8
from ._core._eventloop import get_async_backend
10✔
9

10
T = TypeVar("T")
10✔
11
D = TypeVar("D")
10✔
12

13

14
async def checkpoint() -> None:
10✔
15
    """
16
    Check for cancellation and allow the scheduler to switch to another task.
17

18
    Equivalent to (but more efficient than)::
19

20
        await checkpoint_if_cancelled()
21
        await cancel_shielded_checkpoint()
22

23

24
    .. versionadded:: 3.0
25

26
    """
27
    await get_async_backend().checkpoint()
10✔
28

29

30
async def checkpoint_if_cancelled() -> None:
10✔
31
    """
32
    Enter a checkpoint if the enclosing cancel scope has been cancelled.
33

34
    This does not allow the scheduler to switch to a different task.
35

36
    .. versionadded:: 3.0
37

38
    """
39
    await get_async_backend().checkpoint_if_cancelled()
10✔
40

41

42
async def cancel_shielded_checkpoint() -> None:
10✔
43
    """
44
    Allow the scheduler to switch to another task but without checking for cancellation.
45

46
    Equivalent to (but potentially more efficient than)::
47

48
        with CancelScope(shield=True):
49
            await checkpoint()
50

51

52
    .. versionadded:: 3.0
53

54
    """
55
    await get_async_backend().cancel_shielded_checkpoint()
10✔
56

57

58
def current_token() -> object:
10✔
59
    """
60
    Return a backend specific token object that can be used to get back to the event
61
    loop.
62

63
    """
64
    return get_async_backend().current_token()
10✔
65

66

67
_run_vars: WeakKeyDictionary[Any, dict[str, Any]] = WeakKeyDictionary()
10✔
68
_token_wrappers: dict[Any, _TokenWrapper] = {}
10✔
69

70

71
@dataclass(frozen=True)
10✔
72
class _TokenWrapper:
10✔
73
    __slots__ = "_token", "__weakref__"
10✔
74
    _token: object
10✔
75

76

77
class _NoValueSet(enum.Enum):
10✔
78
    NO_VALUE_SET = enum.auto()
10✔
79

80

81
class RunvarToken(Generic[T]):
10✔
82
    __slots__ = "_var", "_value", "_redeemed"
10✔
83

84
    def __init__(self, var: RunVar[T], value: T | Literal[_NoValueSet.NO_VALUE_SET]):
10✔
85
        self._var = var
10✔
86
        self._value: T | Literal[_NoValueSet.NO_VALUE_SET] = value
10✔
87
        self._redeemed = False
10✔
88

89

90
class RunVar(Generic[T]):
10✔
91
    """
92
    Like a :class:`~contextvars.ContextVar`, except scoped to the running event loop.
93
    """
94

95
    __slots__ = "_name", "_default"
10✔
96

97
    NO_VALUE_SET: Literal[_NoValueSet.NO_VALUE_SET] = _NoValueSet.NO_VALUE_SET
10✔
98

99
    _token_wrappers: set[_TokenWrapper] = set()
10✔
100

101
    def __init__(
10✔
102
        self, name: str, default: T | Literal[_NoValueSet.NO_VALUE_SET] = NO_VALUE_SET
103
    ):
104
        self._name = name
10✔
105
        self._default = default
10✔
106

107
    @property
10✔
108
    def _current_vars(self) -> dict[str, T]:
10✔
109
        token = current_token()
10✔
110
        try:
10✔
111
            return _run_vars[token]
10✔
112
        except KeyError:
10✔
113
            run_vars = _run_vars[token] = {}
10✔
114
            return run_vars
10✔
115

116
    @overload
10✔
117
    def get(self, default: D) -> T | D: ...
10✔
118

119
    @overload
10✔
120
    def get(self) -> T: ...
10✔
121

122
    def get(
10✔
123
        self, default: D | Literal[_NoValueSet.NO_VALUE_SET] = NO_VALUE_SET
124
    ) -> T | D:
125
        try:
10✔
126
            return self._current_vars[self._name]
10✔
127
        except KeyError:
10✔
128
            if default is not RunVar.NO_VALUE_SET:
10✔
129
                return default
10✔
130
            elif self._default is not RunVar.NO_VALUE_SET:
10✔
131
                return self._default
×
132

133
        raise LookupError(
10✔
134
            f'Run variable "{self._name}" has no value and no default set'
135
        )
136

137
    def set(self, value: T) -> RunvarToken[T]:
10✔
138
        current_vars = self._current_vars
10✔
139
        token = RunvarToken(self, current_vars.get(self._name, RunVar.NO_VALUE_SET))
10✔
140
        current_vars[self._name] = value
10✔
141
        return token
10✔
142

143
    def reset(self, token: RunvarToken[T]) -> None:
10✔
144
        if token._var is not self:
10✔
145
            raise ValueError("This token does not belong to this RunVar")
10✔
146

147
        if token._redeemed:
10✔
148
            raise ValueError("This token has already been used")
10✔
149

150
        if token._value is _NoValueSet.NO_VALUE_SET:
10✔
151
            try:
10✔
152
                del self._current_vars[self._name]
10✔
153
            except KeyError:
×
154
                pass
×
155
        else:
156
            self._current_vars[self._name] = token._value
×
157

158
        token._redeemed = True
10✔
159

160
    def __repr__(self) -> str:
10✔
161
        return f"<RunVar name={self._name!r}>"
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc