• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mosquito / aio-pika / 12055276718

27 Nov 2024 05:19PM UTC coverage: 91.897% (+0.01%) from 91.886%
12055276718

Pull #652

github

web-flow
Merge 85ec86a45 into e6106217e
Pull Request #652: Avoid depending on `typing_extensions` where not needed

692 of 802 branches covered (86.28%)

6 of 6 new or added lines in 1 file covered. (100.0%)

15 existing lines in 1 file now uncovered.

1996 of 2172 relevant lines covered (91.9%)

4.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.19
/aio_pika/robust_queue.py
1
import uuid
5✔
2
import warnings
5✔
3
from typing import Any, Awaitable, Callable, Dict, Optional, Tuple, Union
5✔
4

5
import aiormq
5✔
6
from aiormq import ChannelInvalidStateError
5✔
7
from pamqp.common import Arguments
5✔
8

9
from .abc import (
5✔
10
    AbstractChannel, AbstractExchange, AbstractIncomingMessage,
11
    AbstractQueueIterator, AbstractRobustQueue, ConsumerTag, TimeoutType,
12
)
13
from .exchange import ExchangeParamType
5✔
14
from .log import get_logger
5✔
15
from .queue import Queue, QueueIterator
5✔
16

17

18
log = get_logger(__name__)
5✔
19

20

21
class RobustQueue(Queue, AbstractRobustQueue):
5✔
22
    __slots__ = ("_consumers", "_bindings")
5✔
23

24
    _consumers: Dict[ConsumerTag, Dict[str, Any]]
5✔
25
    _bindings: Dict[Tuple[Union[AbstractExchange, str], str], Dict[str, Any]]
5✔
26

27
    def __init__(
5✔
28
        self,
29
        channel: AbstractChannel,
30
        name: Optional[str],
31
        durable: bool = False,
32
        exclusive: bool = False,
33
        auto_delete: bool = False,
34
        arguments: Arguments = None,
35
        passive: bool = False,
36
    ):
37

38
        super().__init__(
5✔
39
            channel=channel,
40
            name=name or f"amq_{uuid.uuid4().hex}",
41
            durable=durable,
42
            exclusive=exclusive,
43
            auto_delete=auto_delete,
44
            arguments=arguments,
45
            passive=passive,
46
        )
47

48
        self._consumers = {}
5✔
49
        self._bindings = {}
5✔
50

51
    async def restore(self, channel: Any = None) -> None:
5✔
52
        if channel is not None:
5!
53
            warnings.warn(
×
54
                "Channel argument will be ignored because you "
55
                "don't need to pass this anymore.",
56
                DeprecationWarning,
57
            )
58

59
        await self.declare()
5✔
60
        bindings = tuple(self._bindings.items())
5✔
61
        consumers = tuple(self._consumers.items())
5✔
62

63
        for (exchange, routing_key), kwargs in bindings:
5✔
64
            await self.bind(exchange, routing_key, **kwargs)
5✔
65

66
        for consumer_tag, kwargs in consumers:
5✔
67
            await self.consume(consumer_tag=consumer_tag, **kwargs)
5✔
68

69
    async def bind(
5✔
70
        self,
71
        exchange: ExchangeParamType,
72
        routing_key: Optional[str] = None,
73
        *,
74
        arguments: Arguments = None,
75
        timeout: TimeoutType = None,
76
        robust: bool = True,
77
    ) -> aiormq.spec.Queue.BindOk:
78
        if routing_key is None:
5!
79
            routing_key = self.name
×
80

81
        result = await super().bind(
5✔
82
            exchange=exchange, routing_key=routing_key,
83
            arguments=arguments, timeout=timeout,
84
        )
85

86
        if robust:
5!
87
            self._bindings[(exchange, routing_key)] = dict(
5✔
88
                arguments=arguments,
89
            )
90

91
        return result
5✔
92

93
    async def unbind(
5✔
94
        self,
95
        exchange: ExchangeParamType,
96
        routing_key: Optional[str] = None,
97
        arguments: Arguments = None,
98
        timeout: TimeoutType = None,
99
    ) -> aiormq.spec.Queue.UnbindOk:
100
        if routing_key is None:
5!
101
            routing_key = self.name
×
102

103
        result = await super().unbind(
5✔
104
            exchange, routing_key, arguments, timeout,
105
        )
106
        self._bindings.pop((exchange, routing_key), None)
5✔
107

108
        return result
5✔
109

110
    async def consume(
5✔
111
        self,
112
        callback: Callable[[AbstractIncomingMessage], Awaitable[Any]],
113
        no_ack: bool = False,
114
        exclusive: bool = False,
115
        arguments: Arguments = None,
116
        consumer_tag: Optional[ConsumerTag] = None,
117
        timeout: TimeoutType = None,
118
        robust: bool = True,
119
    ) -> ConsumerTag:
120
        consumer_tag = await super().consume(
5✔
121
            consumer_tag=consumer_tag,
122
            timeout=timeout,
123
            callback=callback,
124
            no_ack=no_ack,
125
            exclusive=exclusive,
126
            arguments=arguments,
127
        )
128

129
        if robust:
5!
130
            self._consumers[consumer_tag] = dict(
5✔
131
                callback=callback,
132
                no_ack=no_ack,
133
                exclusive=exclusive,
134
                arguments=arguments,
135
            )
136

137
        return consumer_tag
5✔
138

139
    async def cancel(
5✔
140
        self,
141
        consumer_tag: ConsumerTag,
142
        timeout: TimeoutType = None,
143
        nowait: bool = False,
144
    ) -> aiormq.spec.Basic.CancelOk:
145
        result = await super().cancel(consumer_tag, timeout, nowait)
5✔
146
        self._consumers.pop(consumer_tag, None)
5✔
147
        return result
5✔
148

149
    def iterator(self, **kwargs: Any) -> AbstractQueueIterator:
5✔
150
        return RobustQueueIterator(self, **kwargs)
5✔
151

152

153
class RobustQueueIterator(QueueIterator):
5✔
154
    def __init__(self, queue: Queue, **kwargs: Any):
5✔
155
        super().__init__(queue, **kwargs)
5✔
156

157
        self._amqp_queue.close_callbacks.discard(self._set_closed)
5✔
158

159
    async def consume(self) -> None:
5✔
160
        while True:
3✔
161
            try:
5✔
162
                return await super().consume()
5✔
163
            except ChannelInvalidStateError:
×
164
                await self._amqp_queue.channel.get_underlay_channel()
×
165

166

167
__all__ = ("RobustQueue",)
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc