• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agronholm / anyio / 18744802016

23 Oct 2025 10:03AM UTC coverage: 92.606%. First build
18744802016

Pull #1001

github

web-flow
Merge 6933b8da7 into 350687ac4
Pull Request #1001: Added an async version of the functools module

133 of 134 new or added lines in 1 file covered. (99.25%)

5699 of 6154 relevant lines covered (92.61%)

9.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.25
/src/anyio/functools.py
1
from __future__ import annotations
11✔
2

3
__all__ = (
11✔
4
    "AsyncCacheInfo",
5
    "AsyncCacheParameters",
6
    "AsyncLRUCacheWrapper",
7
    "cache",
8
    "lru_cache",
9
    "reduce",
10
)
11

12
import functools
11✔
13
import sys
11✔
14
from collections import OrderedDict
11✔
15
from collections.abc import (
11✔
16
    AsyncIterable,
17
    Awaitable,
18
    Callable,
19
    Coroutine,
20
    Hashable,
21
    Iterable,
22
)
23
from functools import update_wrapper
11✔
24
from inspect import iscoroutinefunction
11✔
25
from typing import (
11✔
26
    Any,
27
    Generic,
28
    NamedTuple,
29
    TypedDict,
30
    TypeVar,
31
    cast,
32
    final,
33
    overload,
34
)
35
from weakref import WeakKeyDictionary
11✔
36

37
from ._core._synchronization import Lock
11✔
38
from .lowlevel import RunVar, checkpoint
11✔
39

40
if sys.version_info >= (3, 11):
11✔
41
    from typing import ParamSpec
7✔
42
else:
43
    from typing_extensions import ParamSpec
4✔
44

45
T = TypeVar("T")
11✔
46
S = TypeVar("S")
11✔
47
P = ParamSpec("P")
11✔
48
lru_cache_items: RunVar[
11✔
49
    WeakKeyDictionary[
50
        AsyncLRUCacheWrapper[Any, Any],
51
        OrderedDict[Hashable, tuple[_InitialMissingType, Lock] | tuple[Any, None]],
52
    ]
53
] = RunVar("lru_cache_items")
54

55

56
class _InitialMissingType:
11✔
57
    pass
11✔
58

59

60
initial_missing: _InitialMissingType = _InitialMissingType()
11✔
61

62

63
class AsyncCacheInfo(NamedTuple):
11✔
64
    hits: int
11✔
65
    misses: int
11✔
66
    maxsize: int | None
11✔
67
    currsize: int
11✔
68

69

70
class AsyncCacheParameters(TypedDict):
11✔
71
    maxsize: int | None
11✔
72
    typed: bool
11✔
73

74

75
@final
11✔
76
class AsyncLRUCacheWrapper(Generic[P, T]):
11✔
77
    def __init__(
11✔
78
        self, func: Callable[..., Awaitable[T]], maxsize: int | None, typed: bool
79
    ):
80
        self.__wrapped__ = func
11✔
81
        self._hits: int = 0
11✔
82
        self._misses: int = 0
11✔
83
        self._maxsize = max(maxsize, 0) if maxsize is not None else None
11✔
84
        self._currsize: int = 0
11✔
85
        self._typed = typed
11✔
86
        update_wrapper(self, func)
11✔
87

88
    def cache_info(self) -> AsyncCacheInfo:
11✔
89
        return AsyncCacheInfo(self._hits, self._misses, self._maxsize, self._currsize)
11✔
90

91
    def cache_parameters(self) -> AsyncCacheParameters:
11✔
92
        return {"maxsize": self._maxsize, "typed": self._typed}
11✔
93

94
    def cache_clear(self) -> None:
11✔
95
        if cache := lru_cache_items.get(None):
11✔
96
            cache.pop(self, None)
11✔
97
            self._hits = self._misses = self._currsize = 0
11✔
98

99
    async def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T:
11✔
100
        # Easy case first: if maxsize == 0, no caching is done
101
        if self._maxsize == 0:
11✔
102
            value = await self.__wrapped__(*args, **kwargs)
11✔
103
            self._misses += 1
11✔
104
            return value
11✔
105

106
        # The key is constructed as a flat tuple to avoid memory overhead
107
        key: tuple[Any, ...] = args
11✔
108
        if kwargs:
11✔
109
            # initial_missing is used as a separator
110
            key += (initial_missing,) + sum(kwargs.items(), ())
11✔
111

112
        if self._typed:
11✔
113
            key += tuple(type(arg) for arg in args)
11✔
114
            if kwargs:
11✔
NEW
115
                key += (initial_missing,) + tuple(type(val) for val in kwargs.values())
×
116

117
        try:
11✔
118
            cache = lru_cache_items.get()
11✔
119
        except LookupError:
11✔
120
            cache = WeakKeyDictionary()
11✔
121
            lru_cache_items.set(cache)
11✔
122

123
        try:
11✔
124
            cache_entry = cache[self]
11✔
125
        except KeyError:
11✔
126
            cache_entry = cache[self] = OrderedDict()
11✔
127

128
        cached_value: T | _InitialMissingType
129
        try:
11✔
130
            cached_value, lock = cache_entry[key]
11✔
131
        except KeyError:
11✔
132
            # We're the first task to call this function
133
            cached_value, lock = initial_missing, Lock(fast_acquire=True)
11✔
134
            cache_entry[key] = cached_value, lock
11✔
135

136
        if lock is None:
11✔
137
            # The value was already cached
138
            self._hits += 1
11✔
139
            cache_entry.move_to_end(key)
11✔
140
            return cast(T, cached_value)
11✔
141

142
        async with lock:
11✔
143
            # Check if another task filled the cache while we acquired the lock
144
            if (cached_value := cache_entry[key][0]) is initial_missing:
11✔
145
                self._misses += 1
11✔
146
                if self._maxsize is not None and self._currsize >= self._maxsize:
11✔
147
                    cache_entry.popitem(last=False)
11✔
148
                else:
149
                    self._currsize += 1
11✔
150

151
                value = await self.__wrapped__(*args, **kwargs)
11✔
152
                cache_entry[key] = value, None
11✔
153
            else:
154
                # Another task filled the cache while we were waiting for the lock
155
                self._hits += 1
11✔
156
                cache_entry.move_to_end(key)
11✔
157
                value = cast(T, cached_value)
11✔
158

159
        return value
11✔
160

161

162
class _LRUCacheWrapper(Generic[T]):
11✔
163
    def __init__(self, maxsize: int | None, typed: bool):
11✔
164
        self._maxsize = maxsize
11✔
165
        self._typed = typed
11✔
166

167
    @overload
168
    def __call__(  # type: ignore[overload-overlap]
169
        self, func: Callable[P, Coroutine[Any, Any, T]], /
170
    ) -> AsyncLRUCacheWrapper[P, T]: ...
171

172
    @overload
173
    def __call__(
174
        self, func: Callable[..., T], /
175
    ) -> functools._lru_cache_wrapper[T]: ...
176

177
    def __call__(
11✔
178
        self, f: Callable[P, Coroutine[Any, Any, T]] | Callable[..., T], /
179
    ) -> AsyncLRUCacheWrapper[P, T] | functools._lru_cache_wrapper[T]:
180
        if iscoroutinefunction(f):
11✔
181
            return AsyncLRUCacheWrapper(f, self._maxsize, self._typed)
11✔
182

183
        return functools.lru_cache(maxsize=self._maxsize, typed=self._typed)(f)  # type: ignore[arg-type]
11✔
184

185

186
@overload
187
def cache(  # type: ignore[overload-overlap]
188
    func: Callable[P, Coroutine[Any, Any, T]], /
189
) -> AsyncLRUCacheWrapper[P, T]: ...
190

191

192
@overload
193
def cache(func: Callable[..., T], /) -> functools._lru_cache_wrapper[T]: ...
194

195

196
def cache(
11✔
197
    func: Callable[..., T] | Callable[P, Coroutine[Any, Any, T]], /
198
) -> AsyncLRUCacheWrapper[P, T] | functools._lru_cache_wrapper[T]:
199
    """
200
    A convenient shortcut for :func:`lru_cache` with ``maxsize=None``.
201

202
    This is the asynchronous equivalent to :func:`functools.cache`.
203

204
    """
205
    return lru_cache(maxsize=None)(func)
11✔
206

207

208
@overload
209
def lru_cache(
210
    *, maxsize: int | None = 128, typed: bool = False
211
) -> _LRUCacheWrapper[Any]: ...
212

213

214
@overload
215
def lru_cache(  # type: ignore[overload-overlap]
216
    func: Callable[P, Coroutine[Any, Any, T]], /
217
) -> AsyncLRUCacheWrapper[P, T]: ...
218

219

220
@overload
221
def lru_cache(func: Callable[..., T], /) -> functools._lru_cache_wrapper[T]: ...
222

223

224
def lru_cache(
11✔
225
    func: Callable[P, Coroutine[Any, Any, T]] | Callable[..., T] | None = None,
226
    /,
227
    *,
228
    maxsize: int | None = 128,
229
    typed: bool = False,
230
) -> (
231
    AsyncLRUCacheWrapper[P, T] | functools._lru_cache_wrapper[T] | _LRUCacheWrapper[Any]
232
):
233
    """
234
    An asynchronous version of :func:`functools.lru_cache`.
235

236
    If a synchronous function is passed, the standard library
237
    :func:`functools.lru_cache` is applied instead.
238

239
    .. note:: Caches and locks are managed on a per-event loop basis.
240

241
    """
242
    if func is None:
11✔
243
        return _LRUCacheWrapper[Any](maxsize, typed)
11✔
244

245
    if not callable(func):
11✔
246
        raise TypeError("the first argument must be callable")
11✔
247

248
    return _LRUCacheWrapper[T](maxsize, typed)(func)
11✔
249

250

251
@overload
252
async def reduce(
253
    function: Callable[[T, S], Awaitable[T]],
254
    iterable: Iterable[S] | AsyncIterable[S],
255
    /,
256
    initial: T,
257
) -> T: ...
258

259

260
@overload
261
async def reduce(
262
    function: Callable[[T, T], Awaitable[T]],
263
    iterable: Iterable[T] | AsyncIterable[T],
264
    /,
265
) -> T: ...
266

267

268
async def reduce(  # type: ignore[misc]
11✔
269
    function: Callable[[T, T], Awaitable[T]] | Callable[[T, S], Awaitable[T]],
270
    iterable: Iterable[T] | Iterable[S] | AsyncIterable[T] | AsyncIterable[S],
271
    /,
272
    initial: T | _InitialMissingType = initial_missing,
273
) -> T:
274
    """
275
    Asynchronous version of :func:`functools.reduce`.
276

277
    :param function: a coroutine function that takes two arguments: the accumulated
278
        value and the next element from the iterable
279
    :param iterable: an iterable or async iterable
280
    :param initial: the initial value (if missing, the first element of the iterable is
281
        used as the initial value)
282

283
    """
284
    element: Any
285
    function_called = False
11✔
286
    if isinstance(iterable, AsyncIterable):
11✔
287
        async_it = iterable.__aiter__()
11✔
288
        if initial is initial_missing:
11✔
289
            try:
11✔
290
                value = cast(T, await async_it.__anext__())
11✔
291
            except StopAsyncIteration:
11✔
292
                raise TypeError(
11✔
293
                    "reduce() of empty sequence with no initial value"
294
                ) from None
295
        else:
296
            value = cast(T, initial)
11✔
297

298
        async for element in async_it:
11✔
299
            value = await function(value, element)
11✔
300
            function_called = True
11✔
301
    elif isinstance(iterable, Iterable):
11✔
302
        it = iter(iterable)
11✔
303
        if initial is initial_missing:
11✔
304
            try:
11✔
305
                value = cast(T, next(it))
11✔
306
            except StopIteration:
11✔
307
                raise TypeError(
11✔
308
                    "reduce() of empty sequence with no initial value"
309
                ) from None
310
        else:
311
            value = cast(T, initial)
11✔
312

313
        for element in it:
11✔
314
            value = await function(value, element)
11✔
315
            function_called = True
11✔
316
    else:
317
        raise TypeError("reduce() argument 2 must be an iterable or async iterable")
11✔
318

319
    # Make sure there is at least one checkpoint, even if an empty iterable and an
320
    # initial value were given
321
    if not function_called:
11✔
322
        await checkpoint()
11✔
323

324
    return value
11✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc