• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

aiokitchen / aiomisc / 20951339025

13 Jan 2026 09:20AM UTC coverage: 52.953% (-28.6%) from 81.562%
20951339025

push

github

web-flow
Merge pull request #241 from aiokitchen/feature/modern-typing-modern-tools

Feature/modern typing modern tools

584 of 918 branches covered (63.62%)

Branch coverage included in aggregate %.

182 of 453 new or added lines in 53 files covered. (40.18%)

1354 existing lines in 50 files now uncovered.

2438 of 4789 relevant lines covered (50.91%)

2.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.22
/aiomisc/iterator_wrapper.py
UNCOV
1
import asyncio
×
UNCOV
2
import inspect
×
NEW
3
import threading
×
UNCOV
4
from abc import abstractmethod
×
UNCOV
5
from collections import deque
×
NEW
6
from collections.abc import AsyncIterator, Awaitable, Callable, Generator
×
UNCOV
7
from concurrent.futures import Executor
×
NEW
8
from queue import Empty as QueueEmpty, Queue
×
UNCOV
9
from time import time
×
UNCOV
10
from types import TracebackType
×
NEW
11
from typing import Any, Generic, NoReturn, ParamSpec, TypeVar, Union
×
UNCOV
12
from weakref import finalize
×
13

UNCOV
14
from aiomisc.compat import EventLoopMixin
×
UNCOV
15
from aiomisc.counters import Statistic
×
16

UNCOV
17
T = TypeVar("T")
×
UNCOV
18
R = TypeVar("R")
×
UNCOV
19
P = ParamSpec("P")
×
20

UNCOV
21
GenType = Generator[T, None, None]
×
UNCOV
22
FuncType = Callable[[], GenType]
×
23

24

UNCOV
25
class ChannelClosed(RuntimeError):
×
UNCOV
26
    pass
×
27

28

UNCOV
29
class QueueWrapperBase:
×
UNCOV
30
    @abstractmethod
×
UNCOV
31
    def put(self, item: Any) -> None:
×
32
        raise NotImplementedError
×
33

UNCOV
34
    def get(self) -> Any:
×
35
        raise NotImplementedError
×
36

37

UNCOV
38
class DequeWrapper(QueueWrapperBase):
×
NEW
39
    __slots__ = ("queue",)
×
40

UNCOV
41
    def __init__(self) -> None:
×
42
        self.queue: deque[Any] = deque()
4✔
43

UNCOV
44
    def get(self) -> Any:
×
45
        if not self.queue:
4✔
46
            raise QueueEmpty
4✔
47
        return self.queue.popleft()
4✔
48

UNCOV
49
    def put(self, item: Any) -> None:
×
50
        return self.queue.append(item)
4✔
51

52

UNCOV
53
class QueueWrapper(QueueWrapperBase):
×
NEW
54
    __slots__ = ("queue",)
×
55

UNCOV
56
    def __init__(self, max_size: int) -> None:
×
57
        self.queue: Queue = Queue(maxsize=max_size)
4✔
58

UNCOV
59
    def put(self, item: Any) -> None:
×
60
        return self.queue.put(item)
4✔
61

UNCOV
62
    def get(self) -> Any:
×
63
        return self.queue.get_nowait()
4✔
64

65

UNCOV
66
def make_queue(max_size: int = 0) -> QueueWrapperBase:
×
67
    if max_size > 0:
4✔
68
        return QueueWrapper(max_size)
4✔
69
    return DequeWrapper()
4✔
70

71

UNCOV
72
class FromThreadChannel:
×
UNCOV
73
    SLEEP_LOW_THRESHOLD = 0.0001
×
UNCOV
74
    SLEEP_DIFFERENCE_DIVIDER = 10
×
75

NEW
76
    __slots__ = ("__closed", "__last_received_item", "queue")
×
77

UNCOV
78
    def __init__(self, maxsize: int = 0):
×
79
        self.queue: QueueWrapperBase = make_queue(max_size=maxsize)
4✔
80
        self.__closed = False
4✔
81
        self.__last_received_item: float = time()
4✔
82

UNCOV
83
    def close(self) -> None:
×
84
        self.__closed = True
4✔
85

UNCOV
86
    @property
×
UNCOV
87
    def is_closed(self) -> bool:
×
88
        return self.__closed
4✔
89

UNCOV
90
    def __enter__(self) -> "FromThreadChannel":
×
91
        return self
4✔
92

UNCOV
93
    def __exit__(
×
94
        self,
95
        exc_type: type[Exception],
96
        exc_val: Exception,
97
        exc_tb: TracebackType,
98
    ) -> None:
99
        self.close()
4✔
100

UNCOV
101
    def put(self, item: Any) -> None:
×
102
        if self.is_closed:
4✔
103
            raise ChannelClosed
4✔
104

105
        self.queue.put(item)
4✔
106
        self.__last_received_item = time()
4✔
107

NEW
108
    def _compute_sleep_time(self) -> float | int:
×
109
        if self.__last_received_item < 0:
4!
110
            return 0
×
111

112
        delta = time() - self.__last_received_item
4✔
113

114
        if delta > 1:
4✔
115
            return 1
4✔
116

117
        sleep_time = delta / self.SLEEP_DIFFERENCE_DIVIDER
4✔
118

119
        if sleep_time < self.SLEEP_LOW_THRESHOLD:
4✔
120
            return 0
4✔
121
        return sleep_time
4✔
122

UNCOV
123
    def __await__(self) -> Any:
×
124
        while True:
4✔
125
            try:
4✔
126
                res = self.queue.get()
4✔
127
                return res
4✔
128
            except QueueEmpty:
4✔
129
                if self.is_closed:
4✔
130
                    raise ChannelClosed
4✔
131

132
                sleep_time = self._compute_sleep_time()
4✔
133
                yield from asyncio.sleep(sleep_time).__await__()
4✔
134

UNCOV
135
    async def get(self) -> Any:
×
136
        return await self
4✔
137

138

UNCOV
139
class IteratorWrapperStatistic(Statistic):
×
UNCOV
140
    started: int
×
UNCOV
141
    queue_size: int
×
UNCOV
142
    queue_length: int
×
UNCOV
143
    yielded: int
×
UNCOV
144
    enqueued: int
×
145

146

UNCOV
147
class IteratorWrapper(Generic[P, T], AsyncIterator, EventLoopMixin):
×
UNCOV
148
    __slots__ = (
×
149
        "__channel",
150
        "__close_event",
151
        "__gen_func",
152
        "__gen_task",
153
        "_run_lock",
154
        "_statistic",
155
        "executor",
156
    ) + EventLoopMixin.__slots__
157

UNCOV
158
    def __init__(
×
159
        self,
160
        gen_func: Callable[P, Generator[T, None, None]],
161
        loop: asyncio.AbstractEventLoop | None = None,
162
        max_size: int = 0,
163
        executor: Executor | None = None,
164
        statistic_name: str | None = None,
165
    ):
166
        self._loop = loop
4✔
167
        self.executor = executor
4✔
168

169
        self.__close_event = asyncio.Event()
4✔
170
        self.__channel: FromThreadChannel = FromThreadChannel(maxsize=max_size)
4✔
171
        self.__gen_task: asyncio.Future | None = None
4✔
172
        self.__gen_func: Callable = gen_func
4✔
173
        self._statistic = IteratorWrapperStatistic(statistic_name)
4✔
174
        self._statistic.queue_size = max_size
4✔
175
        self._run_lock = threading.Lock()
4✔
176

UNCOV
177
    @property
×
UNCOV
178
    def closed(self) -> bool:
×
179
        return self.__channel.is_closed
4✔
180

UNCOV
181
    @staticmethod
×
UNCOV
182
    def __throw(_: Any) -> NoReturn:
×
183
        raise
×
184

UNCOV
185
    def _in_thread(self) -> None:
×
186
        self._statistic.started += 1
4✔
187
        with self.__channel:
4✔
188
            try:
4✔
189
                gen = iter(self.__gen_func())
4✔
190

191
                throw = self.__throw
4✔
192
                if inspect.isgenerator(gen):
4!
193
                    throw = gen.throw  # type: ignore
4✔
194

195
                while not self.closed:
4✔
196
                    item = next(gen)
4✔
197
                    try:
4✔
198
                        self.__channel.put((item, False))
4✔
UNCOV
199
                    except Exception as e:
×
UNCOV
200
                        throw(e)
×
201
                        self.__channel.close()
×
202
                        break
×
203
                    finally:
204
                        del item
4✔
205

206
                    self._statistic.enqueued += 1
4✔
207
            except StopIteration:
4✔
208
                return
4✔
209
            except Exception as e:
4✔
210
                if self.closed:
4!
UNCOV
211
                    return
×
212
                self.__channel.put((e, True))
4✔
213
            finally:
214
                self._statistic.started -= 1
4✔
215
                self.loop.call_soon_threadsafe(self.__close_event.set)
4✔
216

UNCOV
217
    def close(self) -> Awaitable[None]:
×
218
        self.__channel.close()
4✔
219
        # if the iterator inside thread is blocked on `.put()`
220
        # we need to wake it up to signal that it is closed.
221
        try:
4✔
222
            self.__channel.queue.get()
4✔
223
        except QueueEmpty:
4✔
224
            pass
4✔
225
        return asyncio.ensure_future(self.wait_closed())
4✔
226

UNCOV
227
    async def wait_closed(self) -> None:
×
228
        await self.__close_event.wait()
4✔
229
        if self.__gen_task:
4!
230
            await asyncio.gather(self.__gen_task, return_exceptions=True)
4✔
231

UNCOV
232
    def _run(self) -> Any:
×
233
        return self.loop.run_in_executor(self.executor, self._in_thread)
4✔
234

NEW
235
    def __start_generator(self) -> None:
×
236
        if not self.loop.is_running():
4!
237
            raise RuntimeError("Event loop is not running")
×
238

239
        with self._run_lock:
4✔
240
            if self.__gen_task is not None:
4✔
241
                return
4✔
242
            gen_task = self._run()
4✔
243
            if gen_task is None:
4!
244
                raise RuntimeError("Iterator task was not created")
×
245
            self.__gen_task = gen_task
4✔
246

NEW
247
    def __aiter__(self) -> AsyncIterator[T]:
×
248
        self.__start_generator()
4✔
249
        return IteratorProxy(self, self.close)
4✔
250

UNCOV
251
    async def __anext__(self) -> T:
×
252
        try:
4✔
253
            item, is_exc = await self.__channel.get()
4✔
254
        except ChannelClosed:
4✔
255
            await self.wait_closed()
4✔
256
            raise StopAsyncIteration
4✔
257

258
        if is_exc:
4✔
259
            await self.close()
4✔
260
            raise item from item
4✔
261

262
        self._statistic.yielded += 1
4✔
263
        return item
4✔
264

UNCOV
265
    async def __aenter__(self) -> "IteratorWrapper":
×
266
        self.__start_generator()
4✔
267
        return self
4✔
268

NEW
269
    async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
×
270
        if self.closed:
4!
271
            return
4✔
272

273
        await self.close()
×
274

275

UNCOV
276
class IteratorProxy(Generic[T], AsyncIterator):
×
UNCOV
277
    def __init__(
×
278
        self, iterator: AsyncIterator[T], finalizer: Callable[[], Any]
279
    ):
280
        self.__iterator = iterator
4✔
281
        finalize(self, finalizer)
4✔
282

UNCOV
283
    def __anext__(self) -> Awaitable[T]:
×
284
        return self.__iterator.__anext__()
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc