• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agronholm / anyio / 18592246756

17 Oct 2025 12:09PM UTC coverage: 92.545% (+0.08%) from 92.462%
18592246756

Pull #1001

github

web-flow
Merge 6a1c5c330 into c8c7973d2
Pull Request #1001: Added an async version of the functools module

106 of 107 new or added lines in 1 file covered. (99.07%)

2 existing lines in 1 file now uncovered.

5648 of 6103 relevant lines covered (92.54%)

9.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.07
/src/anyio/functools.py
1
from __future__ import annotations
11✔
2

3
__all__ = ("cache", "lru_cache", "reduce")
11✔
4

5
import functools
11✔
6
import sys
11✔
7
from collections import OrderedDict, defaultdict
11✔
8
from collections.abc import (
11✔
9
    AsyncIterable,
10
    Awaitable,
11
    Callable,
12
    Coroutine,
13
    Hashable,
14
    Iterable,
15
)
16
from functools import update_wrapper
11✔
17
from inspect import iscoroutinefunction
11✔
18
from typing import Any, Generic, NamedTuple, TypedDict, TypeVar, cast, overload
11✔
19

20
from ._core._synchronization import Lock
11✔
21
from .lowlevel import RunVar
11✔
22

23
if sys.version_info >= (3, 11):
11✔
24
    from typing import ParamSpec
7✔
25
else:
26
    from typing_extensions import ParamSpec
4✔
27

28
T = TypeVar("T")
11✔
29
S = TypeVar("S")
11✔
30
P = ParamSpec("P")
11✔
31
lru_cache_items: RunVar[
11✔
32
    defaultdict[Callable[..., Any], tuple[OrderedDict[Hashable, Any], Lock]]
33
] = RunVar("lru_cache_items")
34

35

36
class _InitialMissingType:
11✔
37
    pass
11✔
38

39

40
initial_missing: _InitialMissingType = _InitialMissingType()
11✔
41

42

43
class AsyncCacheInfo(NamedTuple):
11✔
44
    hits: int
11✔
45
    misses: int
11✔
46
    maxsize: int | None
11✔
47
    currsize: int
11✔
48

49

50
class AsyncCacheParameters(TypedDict):
11✔
51
    maxsize: int | None
11✔
52
    typed: bool
11✔
53

54

55
class AsyncLRUCacheWrapper(Generic[P, T]):
11✔
56
    def __init__(
11✔
57
        self, func: Callable[..., Awaitable[T]], maxsize: int | None, typed: bool
58
    ):
59
        self.__wrapped__ = func
11✔
60
        self._hits: int = 0
11✔
61
        self._misses: int = 0
11✔
62
        self._maxsize = max(maxsize, 0) if maxsize is not None else None
11✔
63
        self._currsize: int = 0
11✔
64
        self._typed = typed
11✔
65
        update_wrapper(self, func)
11✔
66

67
    def cache_info(self) -> AsyncCacheInfo:
11✔
68
        return AsyncCacheInfo(self._hits, self._misses, self._maxsize, self._currsize)
11✔
69

70
    def cache_parameters(self) -> AsyncCacheParameters:
11✔
71
        return AsyncCacheParameters(maxsize=self._maxsize, typed=self._typed)
11✔
72

73
    def cache_clear(self) -> None:
11✔
74
        if cache := lru_cache_items.get(None):
11✔
75
            cache.pop(self.__wrapped__, None)
11✔
76
            self._hits = self._misses = self._currsize = 0
11✔
77

78
    async def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T:
11✔
79
        # Easy case first: if maxsize == 0, no caching is done
80
        if self._maxsize == 0:
11✔
81
            value = await self.__wrapped__(*args, **kwargs)
11✔
82
            self._misses += 1
11✔
83
            return value
11✔
84

85
        # The key is constructed as a flat tuple to avoid memory overhead
86
        key: tuple[Any, ...] = args
11✔
87
        if kwargs:
11✔
88
            # initial_missing is used as a separator
89
            key += (initial_missing,) + sum(kwargs.items(), ())
11✔
90

91
        if self._typed:
11✔
92
            key += tuple(type(arg) for arg in args)
11✔
93
            if kwargs:
11✔
NEW
94
                key += (initial_missing,) + tuple(type(val) for val in kwargs.values())
×
95

96
        try:
11✔
97
            cache = lru_cache_items.get()
11✔
98
        except LookupError:
11✔
99
            cache = defaultdict(lambda: (OrderedDict(), Lock()))
11✔
100
            lru_cache_items.set(cache)
11✔
101

102
        cache_entry, lock = cache[self.__wrapped__]
11✔
103
        async with lock:
11✔
104
            try:
11✔
105
                value = cache_entry[key]
11✔
106
            except KeyError:
11✔
107
                self._misses += 1
11✔
108
                if self._maxsize is not None and len(cache_entry) >= self._maxsize:
11✔
109
                    cache_entry.popitem(last=False)
11✔
110
                else:
111
                    self._currsize += 1
11✔
112

113
                cache_entry[key] = value = await self.__wrapped__(*args, **kwargs)
11✔
114
            else:
115
                self._hits += 1
11✔
116
                cache_entry.move_to_end(key)
11✔
117

118
        return value
11✔
119

120

121
class _LRUCacheWrapper(Generic[T]):
11✔
122
    def __init__(self, maxsize: int | None, typed: bool):
11✔
123
        self._maxsize = maxsize
11✔
124
        self._typed = typed
11✔
125

126
    @overload
127
    def __call__(  # type: ignore[overload-overlap]
128
        self, func: Callable[P, Coroutine[Any, Any, T]], /
129
    ) -> AsyncLRUCacheWrapper[P, T]: ...
130

131
    @overload
132
    def __call__(
133
        self, func: Callable[..., T], /
134
    ) -> functools._lru_cache_wrapper[T]: ...
135

136
    def __call__(
11✔
137
        self, f: Callable[P, Coroutine[Any, Any, T]] | Callable[..., T], /
138
    ) -> AsyncLRUCacheWrapper[P, T] | functools._lru_cache_wrapper[T]:
139
        if iscoroutinefunction(f):
11✔
140
            return AsyncLRUCacheWrapper(f, self._maxsize, self._typed)
11✔
141

142
        return functools.lru_cache(maxsize=self._maxsize, typed=self._typed)(f)  # type: ignore[arg-type]
11✔
143

144

145
@overload
146
def cache(  # type: ignore[overload-overlap]
147
    func: Callable[P, Coroutine[Any, Any, T]], /
148
) -> AsyncLRUCacheWrapper[P, T]: ...
149

150

151
@overload
152
def cache(func: Callable[..., T], /) -> functools._lru_cache_wrapper[T]: ...
153

154

155
def cache(
11✔
156
    func: Callable[..., T] | Callable[P, Coroutine[Any, Any, T]], /
157
) -> AsyncLRUCacheWrapper[P, T] | functools._lru_cache_wrapper[T]:
158
    """
159
    A convenient shortcut for :func:`lru_cache` with ``maxsize=None``.
160

161
    This is the asynchronous equivalent to :func:`functools.cache`.
162

163
    """
164
    return lru_cache(maxsize=None)(func)
11✔
165

166

167
@overload
168
def lru_cache(
169
    *, maxsize: int | None = 128, typed: bool = False
170
) -> _LRUCacheWrapper[Any]: ...
171

172

173
@overload
174
def lru_cache(  # type: ignore[overload-overlap]
175
    func: Callable[P, Coroutine[Any, Any, T]], /
176
) -> AsyncLRUCacheWrapper[P, T]: ...
177

178

179
@overload
180
def lru_cache(func: Callable[..., T], /) -> functools._lru_cache_wrapper[T]: ...
181

182

183
def lru_cache(
11✔
184
    func: Callable[P, Coroutine[Any, Any, T]] | Callable[..., T] | None = None,
185
    /,
186
    *,
187
    maxsize: int | None = 128,
188
    typed: bool = False,
189
) -> (
190
    AsyncLRUCacheWrapper[P, T] | functools._lru_cache_wrapper[T] | _LRUCacheWrapper[Any]
191
):
192
    """
193
    An asynchronous version of :func:`functools.lru_cache`.
194

195
    If a synchronous function is passed, the standard library
196
    :func:`functools.lru_cache` is applied instead.
197

198
    .. note:: Caches and locks are managed on a per-event loop basis.
199

200
    """
201
    if func is None:
11✔
202
        return _LRUCacheWrapper[Any](maxsize, typed)
11✔
203

204
    return _LRUCacheWrapper[T](maxsize, typed)(func)
11✔
205

206

207
@overload
208
async def reduce(
209
    function: Callable[[T, S], Awaitable[T]],
210
    iterable: Iterable[S] | AsyncIterable[S],
211
    /,
212
    initial: T,
213
) -> T: ...
214

215

216
@overload
217
async def reduce(
218
    function: Callable[[T, T], Awaitable[T]],
219
    iterable: Iterable[T] | AsyncIterable[T],
220
    /,
221
) -> T: ...
222

223

224
async def reduce(  # type: ignore[misc]
11✔
225
    function: Callable[[T, T], Awaitable[T]] | Callable[[T, S], Awaitable[T]],
226
    iterable: Iterable[T] | Iterable[S] | AsyncIterable[T] | AsyncIterable[S],
227
    /,
228
    initial: T | _InitialMissingType = initial_missing,
229
) -> T:
230
    """Asynchronous version of :func:`functools.reduce`."""
231
    element: Any
232
    if isinstance(iterable, AsyncIterable):
11✔
233
        async_it = iterable.__aiter__()
11✔
234
        if initial is initial_missing:
11✔
235
            value = cast(T, await async_it.__anext__())
11✔
236
        else:
237
            value = cast(T, initial)
11✔
238

239
        async for element in async_it:
11✔
240
            value = await function(value, element)
11✔
241
    elif isinstance(iterable, Iterable):
11✔
242
        it = iter(iterable)
11✔
243
        if initial is initial_missing:
11✔
244
            value = cast(T, next(it))
11✔
245
        else:
246
            value = cast(T, initial)
11✔
247

248
        for element in it:
11✔
249
            value = await function(value, element)
11✔
250
    else:
251
        raise TypeError("reduce() argument 2 must be an iterable or async iterable")
11✔
252

253
    return value
11✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc