• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

aiokitchen / aiomisc / 16483151043

23 Jul 2025 10:27PM UTC coverage: 80.862% (-0.3%) from 81.134%
16483151043

push

github

web-flow
Merge pull request #228 from aiokitchen/threaded-typing

Threaded typing

1012 of 1352 branches covered (74.85%)

Branch coverage included in aggregate %.

71 of 86 new or added lines in 3 files covered. (82.56%)

17 existing lines in 8 files now uncovered.

3923 of 4751 relevant lines covered (82.57%)

3.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.95
/aiomisc/iterator_wrapper.py
1
import asyncio
4✔
2
import inspect
4✔
3
from abc import abstractmethod
4✔
4
from collections import deque
4✔
5
from concurrent.futures import Executor
4✔
6
from queue import Empty as QueueEmpty
4✔
7
from queue import Queue
4✔
8
from time import time
4✔
9
from types import TracebackType
4✔
10
from typing import (
4✔
11
    Any, AsyncIterator, Awaitable, Callable, Deque, Generator, NoReturn,
12
    Optional, Type, TypeVar, Union, Generic, ParamSpec,
13
)
14
from weakref import finalize
4✔
15

16
from aiomisc.compat import EventLoopMixin
4✔
17
from aiomisc.counters import Statistic
4✔
18

19

20
T = TypeVar("T")
4✔
21
R = TypeVar("R")
4✔
22
P = ParamSpec("P")
4✔
23

24
GenType = Generator[T, None, None]
4✔
25
FuncType = Callable[[], GenType]
4✔
26

27

28
class ChannelClosed(RuntimeError):
4✔
29
    pass
4✔
30

31

32
class QueueWrapperBase:
4✔
33
    @abstractmethod
4✔
34
    def put(self, item: Any) -> None:
4✔
35
        raise NotImplementedError
×
36

37
    def get(self) -> Any:
4✔
38
        raise NotImplementedError
×
39

40

41
class DequeWrapper(QueueWrapperBase):
4✔
42
    __slots__ = "queue",
4✔
43

44
    def __init__(self) -> None:
4✔
45
        self.queue: Deque[Any] = deque()
4✔
46

47
    def get(self) -> Any:
4✔
48
        if not self.queue:
4✔
49
            raise QueueEmpty
4✔
50
        return self.queue.popleft()
4✔
51

52
    def put(self, item: Any) -> None:
4✔
53
        return self.queue.append(item)
4✔
54

55

56
class QueueWrapper(QueueWrapperBase):
4✔
57
    __slots__ = "queue",
4✔
58

59
    def __init__(self, max_size: int) -> None:
4✔
60
        self.queue: Queue = Queue(maxsize=max_size)
4✔
61

62
    def put(self, item: Any) -> None:
4✔
63
        return self.queue.put(item)
4✔
64

65
    def get(self) -> Any:
4✔
66
        return self.queue.get_nowait()
4✔
67

68

69
def make_queue(max_size: int = 0) -> QueueWrapperBase:
4✔
70
    if max_size > 0:
4✔
71
        return QueueWrapper(max_size)
4✔
72
    return DequeWrapper()
4✔
73

74

75
class FromThreadChannel:
4✔
76
    SLEEP_LOW_THRESHOLD = 0.0001
4✔
77
    SLEEP_DIFFERENCE_DIVIDER = 10
4✔
78

79
    __slots__ = ("queue", "__closed", "__last_received_item")
4✔
80

81
    def __init__(self, maxsize: int = 0):
4✔
82
        self.queue: QueueWrapperBase = make_queue(max_size=maxsize)
4✔
83
        self.__closed = False
4✔
84
        self.__last_received_item: float = time()
4✔
85

86
    def close(self) -> None:
4✔
87
        self.__closed = True
4✔
88

89
    @property
4✔
90
    def is_closed(self) -> bool:
4✔
91
        return self.__closed
4✔
92

93
    def __enter__(self) -> "FromThreadChannel":
4✔
94
        return self
4✔
95

96
    def __exit__(
4✔
97
        self, exc_type: Type[Exception],
98
        exc_val: Exception, exc_tb: TracebackType,
99
    ) -> None:
100
        self.close()
4✔
101

102
    def put(self, item: Any) -> None:
4✔
103
        if self.is_closed:
4✔
104
            raise ChannelClosed
4✔
105

106
        self.queue.put(item)
4✔
107
        self.__last_received_item = time()
4✔
108

109
    def _compute_sleep_time(self) -> Union[float, int]:
4✔
110
        if self.__last_received_item < 0:
4!
111
            return 0
×
112

113
        delta = time() - self.__last_received_item
4✔
114

115
        if delta > 1:
4✔
116
            return 1
4✔
117

118
        sleep_time = delta / self.SLEEP_DIFFERENCE_DIVIDER
4✔
119

120
        if sleep_time < self.SLEEP_LOW_THRESHOLD:
4✔
121
            return 0
4✔
122
        return sleep_time
4✔
123

124
    def __await__(self) -> Any:
4✔
125
        while True:
4✔
126
            try:
4✔
127
                res = self.queue.get()
4✔
128
                return res
4✔
129
            except QueueEmpty:
4✔
130
                if self.is_closed:
4✔
131
                    raise ChannelClosed
4✔
132

133
                sleep_time = self._compute_sleep_time()
4✔
134
                yield from asyncio.sleep(sleep_time).__await__()
4✔
135

136
    async def get(self) -> Any:
4✔
137
        return await self
4✔
138

139

140
class IteratorWrapperStatistic(Statistic):
4✔
141
    started: int
4✔
142
    queue_size: int
4✔
143
    queue_length: int
4✔
144
    yielded: int
4✔
145
    enqueued: int
4✔
146

147

148
class IteratorWrapper(Generic[P, T], AsyncIterator, EventLoopMixin):
4✔
149
    __slots__ = (
4✔
150
        "__channel",
151
        "__close_event",
152
        "__gen_func",
153
        "__gen_task",
154
        "_statistic",
155
        "executor",
156
    ) + EventLoopMixin.__slots__
157

158
    def __init__(
4✔
159
        self,
160
        gen_func: Callable[P, Generator[T, None, None]],
161
        loop: Optional[asyncio.AbstractEventLoop] = None,
162
        max_size: int = 0,
163
        executor: Optional[Executor] = None,
164
        statistic_name: Optional[str] = None,
165
    ):
166

167
        self._loop = loop
4✔
168
        self.executor = executor
4✔
169

170
        self.__close_event = asyncio.Event()
4✔
171
        self.__channel: FromThreadChannel = FromThreadChannel(maxsize=max_size)
4✔
172
        self.__gen_task: Optional[asyncio.Future] = None
4✔
173
        self.__gen_func: Callable = gen_func
4✔
174
        self._statistic = IteratorWrapperStatistic(statistic_name)
4✔
175
        self._statistic.queue_size = max_size
4✔
176

177
    @property
4✔
178
    def closed(self) -> bool:
4✔
179
        return self.__channel.is_closed
4✔
180

181
    @staticmethod
4✔
182
    def __throw(_: Any) -> NoReturn:
4✔
183
        raise
×
184

185
    def _in_thread(self) -> None:
4✔
186
        self._statistic.started += 1
4✔
187
        with self.__channel:
4✔
188
            try:
4✔
189
                gen = iter(self.__gen_func())
4✔
190

191
                throw = self.__throw
4✔
192
                if inspect.isgenerator(gen):
4!
193
                    throw = gen.throw   # type: ignore
4✔
194

195
                while not self.closed:
4✔
196
                    item = next(gen)
4✔
197
                    try:
4✔
198
                        self.__channel.put((item, False))
4✔
UNCOV
199
                    except Exception as e:
×
UNCOV
200
                        throw(e)
×
201
                        self.__channel.close()
×
202
                        break
×
203
                    finally:
204
                        del item
4!
205

206
                    self._statistic.enqueued += 1
4✔
207
            except StopIteration:
4✔
208
                return
4✔
209
            except Exception as e:
4✔
210
                if self.closed:
4!
UNCOV
211
                    return
×
212
                self.__channel.put((e, True))
4✔
213
            finally:
214
                self._statistic.started -= 1
4✔
215
                self.loop.call_soon_threadsafe(self.__close_event.set)
4✔
216

217
    def close(self) -> Awaitable[None]:
4✔
218
        self.__channel.close()
4✔
219
        # if the iterator inside thread is blocked on `.put()`
220
        # we need to wake it up to signal that it is closed.
221
        try:
4✔
222
            self.__channel.queue.get()
4✔
223
        except QueueEmpty:
4✔
224
            pass
4✔
225
        return asyncio.ensure_future(self.wait_closed())
4✔
226

227
    async def wait_closed(self) -> None:
4✔
228
        await self.__close_event.wait()
4✔
229
        if self.__gen_task:
4!
230
            await asyncio.gather(self.__gen_task, return_exceptions=True)
4✔
231

232
    def _run(self) -> Any:
4✔
233
        return self.loop.run_in_executor(self.executor, self._in_thread)
4✔
234

235
    def __aiter__(self) -> AsyncIterator[T]:
4✔
236
        if not self.loop.is_running():
4!
237
            raise RuntimeError("Event loop is not running")
×
238

239
        if self.__gen_task is None:
4!
240
            gen_task = self._run()
4✔
241
            if gen_task is None:
4!
242
                raise RuntimeError("Iterator task was not created")
×
243
            self.__gen_task = gen_task
4✔
244
        return IteratorProxy(self, self.close)
4✔
245

246
    async def __anext__(self) -> T:
4✔
247
        try:
4✔
248
            item, is_exc = await self.__channel.get()
4✔
249
        except ChannelClosed:
4✔
250
            await self.wait_closed()
4✔
251
            raise StopAsyncIteration
4✔
252

253
        if is_exc:
4✔
254
            await self.close()
4✔
255
            raise item from item
4✔
256

257
        self._statistic.yielded += 1
4✔
258
        return item
4✔
259

260
    async def __aenter__(self) -> "IteratorWrapper":
4✔
261
        return self
4✔
262

263
    async def __aexit__(
4✔
264
        self, exc_type: Any, exc_val: Any,
265
        exc_tb: Any,
266
    ) -> None:
267
        if self.closed:
4!
268
            return
4✔
269

270
        await self.close()
×
271

272

273
class IteratorProxy(Generic[T], AsyncIterator):
4✔
274
    def __init__(
4✔
275
        self, iterator: AsyncIterator[T],
276
        finalizer: Callable[[], Any],
277
    ):
278
        self.__iterator = iterator
4✔
279
        finalize(self, finalizer)
4✔
280

281
    def __anext__(self) -> Awaitable[T]:
4✔
282
        return self.__iterator.__anext__()
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc