• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kyuupichan / bitcoinX / #2140

02 Mar 2024 07:39PM UTC coverage: 99.476% (+0.4%) from 99.074%
#2140

push

coveralls-python

neil
Remove unintentionally committed debug code

1 of 1 new or added line in 1 file covered. (100.0%)

5 existing lines in 2 files now uncovered.

12337 of 12402 relevant lines covered (99.48%)

0.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.71
/bitcoinx/aiolib.py
1
# Provide timeouts similar to curio, code based on aiorpcX.  They are more useful than
2
# those introduced in Python 3.11.
3
#
4
# Also provide a TaskGroup that is similar to that in Python 3.11, but has slightly
5
# better semantics.  I'm not a fan of those in curio or Python 3.11.
6

7
import sys
1✔
8
from asyncio import get_running_loop, CancelledError, current_task, Semaphore, Event, create_task
1✔
9
from collections import deque
1✔
10

11

12
if sys.version_info < (3, 11):
1✔
13

14
    class BaseExceptionGroup(BaseException):
1✔
15
        def __new__(cls, msg, excs):
1✔
16
            if not excs:
1✔
17
                raise ValueError('exceptions must be a non-empty sequence')
×
18
            if not all(isinstance(exc, BaseException) for exc in excs):
1✔
19
                raise ValueError('exceptions must be instances of BaseException')
×
20
            is_eg = issubclass(cls, ExceptionGroup)
1✔
21
            if all(isinstance(exc, Exception) for exc in excs):
1✔
22
                if not is_eg:
1✔
23
                    cls = ExceptionGroup
1✔
24
            elif is_eg:
1✔
25
                raise TypeError('exceptions must all be instances of Exception')
1✔
26
            return super().__new__(cls, msg, excs)
1✔
27

28
        def __init__(self, msg, excs):
1✔
29
            self._msg = msg
1✔
30
            self._excs = tuple(excs)
1✔
31

32
        @property
1✔
33
        def message(self):
1✔
34
            return self._msg
1✔
35

36
        @property
1✔
37
        def exceptions(self):
1✔
38
            return self._excs
1✔
39

40
    class ExceptionGroup(BaseExceptionGroup, Exception):
1✔
41
        pass
1✔
42

43
else:
UNCOV
44
    BaseExceptionGroup = BaseExceptionGroup
×
UNCOV
45
    ExceptionGroup = ExceptionGroup
×
46

47

48
class TaskGroup:
1✔
49
    '''A class representing a group of executing tasks. New tasks can be added using the
50
    create_task() or add_task() methods below.
51

52
    When join() is called, any task that raises an exception other than CancelledError
53
    causes the all the other tasks in the group to be cancelled.  Similarly, if the join()
54
    operation itself is cancelled then all running tasks in the group are cancelled.  Once
55
    join() returns all tasks have completed and new tasks may not be added.  Tasks can be
56
    added while join() is waiting.
57

58
    A TaskGroup is normally used as a context manager, which calls the join() method on
59
    context-exit.  Each TaskGroup is an independent entity.  Task groups do not form a
60
    hierarchy or any kind of relationship to other previously created task groups or
61
    tasks.
62

63
    A TaskGroup can be used as an asynchronous iterator, where each task is returned as it
64
    completes.
65

66
    All still-running tasks can be cancelled by calling cancel_remaining().  It waits for
67
    the tasks to be cancelled and then returns.  If any task blocks cancellation, this
68
    routine will not return - a similar caution applies to join().
69

70
    The public attribute joined is True if the task group join() operation has completed.
71
    New tasks cannot be added to a joined task group.
72

73
    Once all tasks are done, if any raised an exception then those are raised in a
74
    BaseExceptionGroup.  If the task group itself raised an error (other than an instance
75
    of CancelledError) then that is included.
76
    '''
77

78
    def __init__(self):
1✔
79
        # Tasks that have not yet finished
80
        self._pending = set()
1✔
81
        # Tasks that have completed and whose results have not yet been processed
82
        self._done = deque()
1✔
83
        self._semaphore = Semaphore(0)
1✔
84
        self._errors = []
1✔
85
        self.joined = False
1✔
86

87
    def _on_done(self, task):
1✔
88
        task._task_group = None
1✔
89
        self._pending.discard(task)
1✔
90
        self._done.append(task)
1✔
91
        self._semaphore.release()
1✔
92
        if not task.cancelled():
1✔
93
            exc = task.exception()
1✔
94
            if exc:
1✔
95
                self._errors.append(exc)
1✔
96

97
    def _add_task(self, task):
1✔
98
        '''Add an already existing task to the task group.'''
99
        if hasattr(task, '_task_group'):
1✔
100
            raise RuntimeError('task is already part of a group')
×
101
        task._task_group = self
1✔
102
        if task.done():
1✔
103
            self._on_done(task)
×
104
        else:
105
            self._pending.add(task)
1✔
106
            task.add_done_callback(self._on_done)
1✔
107

108
    def create_task(self, coro, *, name=None, context=None):
1✔
109
        '''Create a new task and put it in the group. Returns a Task instance.'''
110
        if self.joined:
1✔
111
            raise RuntimeError('task group terminated')
×
112
        if context:
1✔
113
            task = create_task(coro, name=name, context=context)
×
114
        else:
115
            task = create_task(coro, name=name)
1✔
116
        self._add_task(task)
1✔
117
        return task
1✔
118

119
    async def add_task(self, task):
1✔
120
        '''Add an already existing task to the task group.'''
121
        if self.joined:
×
122
            raise RuntimeError('task group terminated')
×
123
        self._add_task(task)
×
124

125
    async def next_done(self):
1✔
126
        '''Return the next completed task and remove it from the group.  Return None if no more
127
        tasks remain. A TaskGroup may also be used as an asynchronous iterator.
128
        '''
129
        if self._done or self._pending:
1✔
130
            await self._semaphore.acquire()
1✔
131
        if self._done:
1✔
132
            return self._done.popleft()
1✔
133
        return None
1✔
134

135
    async def next_result(self):
1✔
136
        '''Return the result of the next completed task and remove it from the group. If the task
137
        failed with an exception, that exception is raised. A RuntimeError exception is
138
        raised if no tasks remain.
139
        '''
140
        task = await self.next_done()
×
141
        if not task:
×
142
            raise RuntimeError('no tasks remain')
×
143
        return task.result()
×
144

145
    def _maybe_raise_error(self, exc):
1✔
146
        assert exc is None or isinstance(exc, CancelledError)
1✔
147
        # First priority: put the task errors in a group
148
        if self._errors:
1✔
149
            beg = BaseExceptionGroup('unhandled errors in a TaskGroup', self._errors)
1✔
150
            self._errors = None
1✔
151
            raise beg from None
1✔
152

153
        # Second: the cancellation error
154
        if exc is not None:
1✔
155
            raise exc
1✔
156

157
    async def join(self, *, exc=None):
1✔
158
        '''Wait for tasks in the group to terminate according to the wait policy for the group.
159
        '''
160
        try:
1✔
161
            if exc is None:
1✔
162
                while not self._errors:
1✔
163
                    task = await self.next_done()
1✔
164
                    if not task:
1✔
165
                        break
1✔
166
        except BaseException as e:
1✔
167
            exc = e
1✔
168
        finally:
169
            if exc:
1✔
170
                if not isinstance(exc, CancelledError):
1✔
171
                    self._errors.append(exc)
1✔
172
                    exc = None
1✔
173
            self.joined = True
1✔
174
            await self.cancel_remaining()
1✔
175
            self._maybe_raise_error(exc)
1✔
176

177
    async def _cancel_tasks(self, tasks):
1✔
178
        '''Cancel the passed set of tasks.  Wait for them to complete.'''
179
        for task in tasks:
1✔
180
            task.cancel()
1✔
181

182
        if tasks:
1✔
183
            def pop_task(task):
1✔
184
                unfinished.remove(task)
1✔
185
                if not unfinished:
1✔
186
                    all_done.set()
1✔
187

188
            unfinished = set(tasks)
1✔
189
            all_done = Event()
1✔
190
            for task in tasks:
1✔
191
                task.add_done_callback(pop_task)
1✔
192
            await all_done.wait()
1✔
193

194
    async def cancel_remaining(self):
1✔
195
        '''Cancel all remaining tasks and wait for them to complete.
196
        If any task blocks cancellation this routine will not return.
197
        '''
198
        await self._cancel_tasks(self._pending)
1✔
199

200
    def __aiter__(self):
1✔
201
        return self
1✔
202

203
    async def __anext__(self):
1✔
204
        task = await self.next_done()
1✔
205
        if task:
1✔
206
            return task
1✔
207
        raise StopAsyncIteration
1✔
208

209
    async def __aenter__(self):
1✔
210
        return self
1✔
211

212
    async def __aexit__(self, et, exc, _traceback):
1✔
213
        await self.join(exc=exc)
1✔
214

215

216
class TimeoutCancellationError(Exception):
1✔
217
    '''Raised on an inner timeout context when an outer timeout expires first.'''
218

219

220
class UncaughtTimeoutError(Exception):
1✔
221
    '''Raised when an inner timeout expires, is not handled, and filters through to an outer
222
    context.'''
223

224

225
class Deadline:
1✔
226

227
    def __init__(self, when, *, raise_timeout=True, is_relative=True):
1✔
228
        self._when = when
1✔
229
        self._raise = raise_timeout
1✔
230
        self._is_relative = is_relative
1✔
231
        self._deadline = None
1✔
232
        self._in_use = False
1✔
233
        self.expired = False
1✔
234

235
    @staticmethod
1✔
236
    def reset_timeout(task):
1✔
237
        def on_timeout(task):
1✔
238
            cause = task._timeout_setter
1✔
239
            assert cause is not None
1✔
240
            task.cancel()
1✔
241
            task._timeout_handler = None
1✔
242
            cause.expired = True
1✔
243

244
        # Find out what cause has the earliest deadline
245
        cause = None
1✔
246
        for deadline in task._deadlines:
1✔
247
            if not cause or deadline._deadline < cause._deadline:
1✔
248
                cause = deadline
1✔
249

250
        if task._timeout_handler:
1✔
251
            # Optimisation only - leave the handler if the cause hasn't changed
252
            if task._timeout_setter is cause:
1✔
253
                return
1✔
254
            task._timeout_handler.cancel()
1✔
255
            task._timeout_handler = None
1✔
256
            task._timeout_setter = None
1✔
257

258
        if cause:
1✔
259
            task._timeout_setter = cause
1✔
260
            loop = get_running_loop()
1✔
261
            if cause._deadline <= loop.time():
1✔
262
                on_timeout(task)
1✔
263
            else:
264
                task._timeout_handler = loop.call_at(cause._deadline, on_timeout, task)
1✔
265

266
    async def __aenter__(self):
1✔
267
        if self._in_use:
1✔
268
            raise RuntimeError('timeout already in use')
1✔
269
        self._in_use = True
1✔
270
        self.expired = False
1✔
271
        if self._when is not None:
1✔
272
            self._deadline = self._when
1✔
273
            if self._is_relative:
1✔
274
                self._deadline += get_running_loop().time()
1✔
275
            # Add ourself to the task's deadlines
276
            task = current_task()
1✔
277
            if not hasattr(task, '_deadlines'):
1✔
278
                task._deadlines = set()
1✔
279
                task._timeout_handler = None
1✔
280
                task._timeout_setter = None
1✔
281
            task._deadlines.add(self)
1✔
282
            self.reset_timeout(task)
1✔
283
        return self
1✔
284

285
    async def __aexit__(self, exc_type, exc_value, traceback):
1✔
286
        self._in_use = False
1✔
287
        task = current_task()
1✔
288

289
        if self._deadline is not None:
1✔
290
            # Remove our deadline regardless of cause.
291
            task._deadlines.remove(self)
1✔
292

293
            # If we set the current timeout, it needs to be reset
294
            if task._timeout_setter is self or exc_value is None:
1✔
295
                self.reset_timeout(task)
1✔
296

297
        if exc_type is TimeoutError:
1✔
298
            raise UncaughtTimeoutError
1✔
299

300
        # If a race condition caused an exception to be raised before our cancellation
301
        # was processed, let that through
302
        if self.expired and exc_type in (CancelledError, TimeoutCancellationError):
1✔
303
            if exc_type is CancelledError and hasattr(task, 'uncancel'):
1✔
UNCOV
304
                task.uncancel()
×
305
            if self._raise:
1✔
306
                raise TimeoutError from None
1✔
307
            return True
1✔
308

309
        # Did an outer timeout trigger?
310
        if exc_type is CancelledError and getattr(task, '_timeout_setter', None):
1✔
311
            if hasattr(task, 'uncancel'):
1✔
UNCOV
312
                task.uncancel()
×
313
            raise TimeoutCancellationError
1✔
314

315

316
def timeout_after(seconds):
1✔
317
    '''The result of this function serves as an asynchronous context manager that applies a
318
    timeout to a block of statements.  It issues a cancellation request to the calling
319
    task after seconds have elapsed.  When this leaves the context manager, a TimeoutError
320
    exception is raised.
321

322
    timeout_after() may be composed with other timeout or ignore operations (i.e., nested
323
    timeouts).  If an outer timeout expires first, then TimeoutCancellationError is raised
324
    instead of TaskTimeout.  If an inner timeout expires and its TakeTimeout is uncaught
325
    and propagates to an outer timeout, an UncaughtTimeoutError is raised in the outer
326
    timeout.
327
    '''
328
    return Deadline(seconds)
1✔
329

330

331
def timeout_at(clock):
1✔
332
    '''The same as timeout_after, except an absolute time (in terms of loop.time()) is given,
333
    rather than a relative time.
334
    '''
335
    return Deadline(clock, is_relative=False)
1✔
336

337

338
def ignore_after(seconds):
1✔
339
    '''The same as timeout_after, except that on timing out no exception is raised.'''
340
    return Deadline(seconds, raise_timeout=False)
1✔
341

342

343
def ignore_at(clock):
1✔
344
    '''The same as timeout_at, except that on timing out no exception is raised.'''
345
    return Deadline(clock, raise_timeout=False, is_relative=False)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc