• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mosquito / aio-pika / 12275803668

11 Dec 2024 11:53AM UTC coverage: 91.724% (-0.2%) from 91.897%
12275803668

push

github

web-flow
Merge pull request #658 from Tasssadar/fix_robust_close

fix: RobustChannel should not reopen after close() call

690 of 802 branches covered (86.03%)

3 of 3 new or added lines in 1 file covered. (100.0%)

4 existing lines in 3 files now uncovered.

1995 of 2175 relevant lines covered (91.72%)

3.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.49
/aio_pika/robust_exchange.py
1
import asyncio
4✔
2
import warnings
4✔
3
from typing import Any, Dict, Union
4✔
4

5
import aiormq
4✔
6
from pamqp.common import Arguments
4✔
7

8
from .abc import (
4✔
9
    AbstractChannel, AbstractExchange, AbstractRobustExchange,
10
    ExchangeParamType, TimeoutType,
11
)
12
from .exchange import Exchange, ExchangeType
4✔
13
from .log import get_logger
4✔
14

15

16
log = get_logger(__name__)
4✔
17

18

19
class RobustExchange(Exchange, AbstractRobustExchange):
4✔
20
    """ Exchange abstraction """
21

22
    _bindings: Dict[Union[AbstractExchange, str], Dict[str, Any]]
4✔
23

24
    def __init__(
4✔
25
        self,
26
        channel: AbstractChannel,
27
        name: str,
28
        type: Union[ExchangeType, str] = ExchangeType.DIRECT,
29
        *,
30
        auto_delete: bool = False,
31
        durable: bool = False,
32
        internal: bool = False,
33
        passive: bool = False,
34
        arguments: Arguments = None,
35
    ):
36
        super().__init__(
4✔
37
            channel=channel,
38
            name=name,
39
            type=type,
40
            auto_delete=auto_delete,
41
            durable=durable,
42
            internal=internal,
43
            passive=passive,
44
            arguments=arguments,
45
        )
46

47
        self._bindings = {}
4✔
48
        self.__restore_lock = asyncio.Lock()
4✔
49

50
    async def restore(self, channel: Any = None) -> None:
4✔
51
        if channel is not None:
4!
52
            warnings.warn(
×
53
                "Channel argument will be ignored because you "
54
                "don't need to pass this anymore.",
55
                DeprecationWarning,
56
            )
57
        async with self.__restore_lock:
4✔
58
            try:
4✔
59
                # special case for default exchange
60
                if self.name == "":
4!
61
                    return
×
62

63
                await self.declare()
4✔
64

65
                for exchange, kwargs in tuple(self._bindings.items()):
4!
66
                    await self.bind(exchange, **kwargs)
×
UNCOV
67
            except Exception:
×
UNCOV
68
                raise
×
69

70
    async def bind(
4✔
71
        self,
72
        exchange: ExchangeParamType,
73
        routing_key: str = "",
74
        *,
75
        arguments: Arguments = None,
76
        timeout: TimeoutType = None,
77
        robust: bool = True,
78
    ) -> aiormq.spec.Exchange.BindOk:
79
        result = await super().bind(
4✔
80
            exchange,
81
            routing_key=routing_key,
82
            arguments=arguments,
83
            timeout=timeout,
84
        )
85

86
        if robust:
4!
87
            self._bindings[exchange] = dict(
4✔
88
                routing_key=routing_key,
89
                arguments=arguments,
90
            )
91

92
        return result
4✔
93

94
    async def unbind(
4✔
95
        self,
96
        exchange: ExchangeParamType,
97
        routing_key: str = "",
98
        arguments: Arguments = None,
99
        timeout: TimeoutType = None,
100
    ) -> aiormq.spec.Exchange.UnbindOk:
101
        result = await super().unbind(
4✔
102
            exchange, routing_key, arguments=arguments, timeout=timeout,
103
        )
104
        self._bindings.pop(exchange, None)
4✔
105
        return result
4✔
106

107

108
__all__ = ("RobustExchange",)
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc