• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mosquito / aio-pika / 12275803668

11 Dec 2024 11:53AM UTC coverage: 91.724% (-0.2%) from 91.897%
12275803668

push

github

web-flow
Merge pull request #658 from Tasssadar/fix_robust_close

fix: RobustChannel should not reopen after close() call

690 of 802 branches covered (86.03%)

3 of 3 new or added lines in 1 file covered. (100.0%)

4 existing lines in 3 files now uncovered.

1995 of 2175 relevant lines covered (91.72%)

3.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.74
/aio_pika/abc.py
1
from __future__ import annotations
4✔
2

3
import asyncio
4✔
4
import dataclasses
4✔
5
from abc import ABC, abstractmethod
4✔
6
from dataclasses import dataclass
4✔
7
from datetime import datetime, timedelta
4✔
8
from enum import Enum, IntEnum, unique
4✔
9
from functools import singledispatch
4✔
10
from types import TracebackType
4✔
11
from typing import (
4✔
12
    Any, AsyncContextManager, AsyncIterable, Awaitable, Callable, Dict,
13
    Generator, Iterator, Literal, Mapping, Optional, Tuple, Type, TypedDict,
14
    TypeVar, Union, overload,
15
)
16

17
import aiormq.abc
4✔
18
from aiormq.abc import ExceptionType
4✔
19
from pamqp.common import Arguments, FieldValue
4✔
20
from yarl import URL
4✔
21

22
from .pool import PoolInstance
4✔
23
from .tools import (
4✔
24
    CallbackCollection, CallbackSetType, CallbackType, OneShotCallback,
25
)
26

27

28
TimeoutType = Optional[Union[int, float]]
4✔
29

30
NoneType = type(None)
4✔
31
DateType = Optional[Union[int, datetime, float, timedelta]]
4✔
32
ExchangeParamType = Union["AbstractExchange", str]
4✔
33
ConsumerTag = str
4✔
34

35
MILLISECONDS = 1000
4✔
36

37

38
class SSLOptions(TypedDict, total=False):
4✔
39
    cafile: str
4✔
40
    capath: str
4✔
41
    cadata: str
4✔
42
    keyfile: str
4✔
43
    certfile: str
4✔
44
    no_verify_ssl: int
4✔
45

46

47
@unique
4✔
48
class ExchangeType(str, Enum):
4✔
49
    FANOUT = "fanout"
4✔
50
    DIRECT = "direct"
4✔
51
    TOPIC = "topic"
4✔
52
    HEADERS = "headers"
4✔
53
    X_DELAYED_MESSAGE = "x-delayed-message"
4✔
54
    X_CONSISTENT_HASH = "x-consistent-hash"
4✔
55
    X_MODULUS_HASH = "x-modulus-hash"
4✔
56

57

58
@unique
4✔
59
class DeliveryMode(IntEnum):
4✔
60
    NOT_PERSISTENT = 1
4✔
61
    PERSISTENT = 2
4✔
62

63

64
@unique
4✔
65
class TransactionState(str, Enum):
4✔
66
    CREATED = "created"
4✔
67
    COMMITED = "commited"
4✔
68
    ROLLED_BACK = "rolled back"
4✔
69
    STARTED = "started"
4✔
70

71

72
@dataclasses.dataclass(frozen=True)
4✔
73
class DeclarationResult:
4✔
74
    message_count: int
4✔
75
    consumer_count: int
4✔
76

77

78
class AbstractTransaction:
4✔
79
    state: TransactionState
4✔
80

81
    @abstractmethod
4✔
82
    async def select(
4✔
83
        self, timeout: TimeoutType = None,
84
    ) -> aiormq.spec.Tx.SelectOk:
85
        raise NotImplementedError
86

87
    @abstractmethod
4✔
88
    async def rollback(
4✔
89
        self, timeout: TimeoutType = None,
90
    ) -> aiormq.spec.Tx.RollbackOk:
91
        raise NotImplementedError
92

93
    async def commit(
4✔
94
        self, timeout: TimeoutType = None,
95
    ) -> aiormq.spec.Tx.CommitOk:
96
        raise NotImplementedError
97

98
    async def __aenter__(self) -> "AbstractTransaction":
4✔
99
        raise NotImplementedError
100

101
    async def __aexit__(
4✔
102
        self,
103
        exc_type: Optional[Type[BaseException]],
104
        exc_val: Optional[BaseException],
105
        exc_tb: Optional[TracebackType],
106
    ) -> None:
107
        raise NotImplementedError
108

109

110
HeadersType = Dict[str, FieldValue]
4✔
111

112

113
class MessageInfo(TypedDict, total=False):
4✔
114
    app_id: Optional[str]
4✔
115
    body_size: int
4✔
116
    cluster_id: Optional[str]
4✔
117
    consumer_tag: Optional[str]
4✔
118
    content_encoding: Optional[str]
4✔
119
    content_type: Optional[str]
4✔
120
    correlation_id: Optional[str]
4✔
121
    delivery_mode: DeliveryMode
4✔
122
    delivery_tag: Optional[int]
4✔
123
    exchange: Optional[str]
4✔
124
    expiration: Optional[DateType]
4✔
125
    headers: HeadersType
4✔
126
    message_id: Optional[str]
4✔
127
    priority: Optional[int]
4✔
128
    redelivered: Optional[bool]
4✔
129
    routing_key: Optional[str]
4✔
130
    reply_to: Optional[str]
4✔
131
    timestamp: Optional[datetime]
4✔
132
    type: str
4✔
133
    user_id: Optional[str]
4✔
134

135

136
class AbstractMessage(ABC):
4✔
137
    __slots__ = ()
4✔
138

139
    body: bytes
4✔
140
    body_size: int
4✔
141
    headers: HeadersType
4✔
142
    content_type: Optional[str]
4✔
143
    content_encoding: Optional[str]
4✔
144
    delivery_mode: DeliveryMode
4✔
145
    priority: Optional[int]
4✔
146
    correlation_id: Optional[str]
4✔
147
    reply_to: Optional[str]
4✔
148
    expiration: Optional[DateType]
4✔
149
    message_id: Optional[str]
4✔
150
    timestamp: Optional[datetime]
4✔
151
    type: Optional[str]
4✔
152
    user_id: Optional[str]
4✔
153
    app_id: Optional[str]
4✔
154

155
    @abstractmethod
4✔
156
    def info(self) -> MessageInfo:
4✔
157
        raise NotImplementedError
158

159
    @property
4✔
160
    @abstractmethod
4✔
161
    def locked(self) -> bool:
4✔
162
        raise NotImplementedError
163

164
    @property
4✔
165
    @abstractmethod
4✔
166
    def properties(self) -> aiormq.spec.Basic.Properties:
4✔
167
        raise NotImplementedError
168

169
    @abstractmethod
4✔
170
    def __iter__(self) -> Iterator[int]:
4✔
171
        raise NotImplementedError
172

173
    @abstractmethod
4✔
174
    def lock(self) -> None:
4✔
175
        raise NotImplementedError
176

177
    def __copy__(self) -> "AbstractMessage":
4✔
178
        raise NotImplementedError
179

180

181
class AbstractIncomingMessage(AbstractMessage, ABC):
4✔
182
    __slots__ = ()
4✔
183

184
    cluster_id: Optional[str]
4✔
185
    consumer_tag: Optional["ConsumerTag"]
4✔
186
    delivery_tag: Optional[int]
4✔
187
    redelivered: Optional[bool]
4✔
188
    message_count: Optional[int]
4✔
189
    routing_key: Optional[str]
4✔
190
    exchange: Optional[str]
4✔
191

192
    @property
4✔
193
    @abstractmethod
4✔
194
    def channel(self) -> aiormq.abc.AbstractChannel:
4✔
195
        raise NotImplementedError
196

197
    @abstractmethod
4✔
198
    def process(
4✔
199
        self,
200
        requeue: bool = False,
201
        reject_on_redelivered: bool = False,
202
        ignore_processed: bool = False,
203
    ) -> "AbstractProcessContext":
204
        raise NotImplementedError
205

206
    @abstractmethod
4✔
207
    async def ack(self, multiple: bool = False) -> None:
4✔
208
        raise NotImplementedError
209

210
    @abstractmethod
4✔
211
    async def reject(self, requeue: bool = False) -> None:
4✔
212
        raise NotImplementedError
213

214
    @abstractmethod
4✔
215
    async def nack(self, multiple: bool = False, requeue: bool = True) -> None:
4✔
216
        raise NotImplementedError
217

218
    def info(self) -> MessageInfo:
4✔
219
        raise NotImplementedError
220

221
    @property
4✔
222
    @abstractmethod
4✔
223
    def processed(self) -> bool:
4✔
224
        raise NotImplementedError
225

226

227
class AbstractProcessContext(AsyncContextManager):
4✔
228
    @abstractmethod
4✔
229
    async def __aenter__(self) -> AbstractIncomingMessage:
4✔
230
        raise NotImplementedError
231

232
    @abstractmethod
4✔
233
    async def __aexit__(
4✔
234
        self,
235
        exc_type: Optional[Type[BaseException]],
236
        exc_val: Optional[BaseException],
237
        exc_tb: Optional[TracebackType],
238
    ) -> None:
239
        raise NotImplementedError
240

241

242
class AbstractQueue:
4✔
243
    __slots__ = ()
4✔
244

245
    channel: "AbstractChannel"
4✔
246
    name: str
4✔
247
    durable: bool
4✔
248
    exclusive: bool
4✔
249
    auto_delete: bool
4✔
250
    arguments: Arguments
4✔
251
    passive: bool
4✔
252
    declaration_result: aiormq.spec.Queue.DeclareOk
4✔
253
    close_callbacks: CallbackCollection[
4✔
254
        AbstractQueue,
255
        [Optional[BaseException]],
256
    ]
257

258
    @abstractmethod
4✔
259
    def __init__(
4✔
260
        self,
261
        channel: aiormq.abc.AbstractChannel,
262
        name: Optional[str],
263
        durable: bool,
264
        exclusive: bool,
265
        auto_delete: bool,
266
        arguments: Arguments,
267
        passive: bool = False,
268
    ):
269
        raise NotImplementedError(
270
            dict(
271
                channel=channel,
272
                name=name,
273
                durable=durable,
274
                exclusive=exclusive,
275
                auto_delete=auto_delete,
276
                arguments=arguments,
277
                passive=passive,
278
            ),
279
        )
280

281
    @abstractmethod
4✔
282
    async def declare(
4✔
283
        self, timeout: TimeoutType = None,
284
    ) -> aiormq.spec.Queue.DeclareOk:
285
        raise NotImplementedError
286

287
    @abstractmethod
4✔
288
    async def bind(
4✔
289
        self,
290
        exchange: ExchangeParamType,
291
        routing_key: Optional[str] = None,
292
        *,
293
        arguments: Arguments = None,
294
        timeout: TimeoutType = None,
295
    ) -> aiormq.spec.Queue.BindOk:
296
        raise NotImplementedError
297

298
    @abstractmethod
4✔
299
    async def unbind(
4✔
300
        self,
301
        exchange: ExchangeParamType,
302
        routing_key: Optional[str] = None,
303
        arguments: Arguments = None,
304
        timeout: TimeoutType = None,
305
    ) -> aiormq.spec.Queue.UnbindOk:
306
        raise NotImplementedError
307

308
    @abstractmethod
4✔
309
    async def consume(
4✔
310
        self,
311
        callback: Callable[[AbstractIncomingMessage], Awaitable[Any]],
312
        no_ack: bool = False,
313
        exclusive: bool = False,
314
        arguments: Arguments = None,
315
        consumer_tag: Optional[ConsumerTag] = None,
316
        timeout: TimeoutType = None,
317
    ) -> ConsumerTag:
318
        raise NotImplementedError
319

320
    @abstractmethod
4✔
321
    async def cancel(
4✔
322
        self, consumer_tag: ConsumerTag,
323
        timeout: TimeoutType = None,
324
        nowait: bool = False,
325
    ) -> aiormq.spec.Basic.CancelOk:
326
        raise NotImplementedError
327

328
    @overload
4✔
329
    async def get(
4✔
330
        self, *, no_ack: bool = False,
331
        fail: Literal[True] = ..., timeout: TimeoutType = ...,
332
    ) -> AbstractIncomingMessage:
333
        ...
×
334

335
    @overload
4✔
336
    async def get(
4✔
337
        self, *, no_ack: bool = False,
338
        fail: Literal[False] = ..., timeout: TimeoutType = ...,
339
    ) -> Optional[AbstractIncomingMessage]:
340
        ...
×
341

342
    @abstractmethod
4✔
343
    async def get(
4✔
344
        self, *, no_ack: bool = False,
345
        fail: bool = True, timeout: TimeoutType = 5,
346
    ) -> Optional[AbstractIncomingMessage]:
347
        raise NotImplementedError
348

349
    @abstractmethod
4✔
350
    async def purge(
4✔
351
        self, no_wait: bool = False, timeout: TimeoutType = None,
352
    ) -> aiormq.spec.Queue.PurgeOk:
353
        raise NotImplementedError
354

355
    @abstractmethod
4✔
356
    async def delete(
4✔
357
        self, *, if_unused: bool = True, if_empty: bool = True,
358
        timeout: TimeoutType = None,
359
    ) -> aiormq.spec.Queue.DeleteOk:
360
        raise NotImplementedError
361

362
    @abstractmethod
4✔
363
    def iterator(self, **kwargs: Any) -> "AbstractQueueIterator":
4✔
364
        raise NotImplementedError
365

366

367
class AbstractQueueIterator(AsyncIterable[AbstractIncomingMessage]):
4✔
368
    _amqp_queue: AbstractQueue
4✔
369
    _queue: asyncio.Queue
4✔
370
    _consumer_tag: ConsumerTag
4✔
371
    _consume_kwargs: Dict[str, Any]
4✔
372

373
    @abstractmethod
4✔
374
    def close(self) -> Awaitable[Any]:
4✔
375
        raise NotImplementedError
376

377
    @abstractmethod
4✔
378
    async def on_message(self, message: AbstractIncomingMessage) -> None:
4✔
379
        raise NotImplementedError
380

381
    @abstractmethod
4✔
382
    async def consume(self) -> None:
4✔
383
        raise NotImplementedError
384

385
    @abstractmethod
4✔
386
    def __aiter__(self) -> "AbstractQueueIterator":
4✔
387
        raise NotImplementedError
388

389
    @abstractmethod
4✔
390
    def __aenter__(self) -> Awaitable["AbstractQueueIterator"]:
4✔
391
        raise NotImplementedError
392

393
    @abstractmethod
4✔
394
    async def __aexit__(
4✔
395
        self,
396
        exc_type: Optional[Type[BaseException]],
397
        exc_val: Optional[BaseException],
398
        exc_tb: Optional[TracebackType],
399
    ) -> None:
400
        raise NotImplementedError
401

402
    @abstractmethod
4✔
403
    async def __anext__(self) -> AbstractIncomingMessage:
4✔
404
        raise NotImplementedError
405

406

407
class AbstractExchange(ABC):
4✔
408
    name: str
4✔
409

410
    @abstractmethod
4✔
411
    def __init__(
4✔
412
        self,
413
        channel: "AbstractChannel",
414
        name: str,
415
        type: Union[ExchangeType, str] = ExchangeType.DIRECT,
416
        *,
417
        auto_delete: bool = False,
418
        durable: bool = False,
419
        internal: bool = False,
420
        passive: bool = False,
421
        arguments: Arguments = None,
422
    ):
423
        raise NotImplementedError
424

425
    @abstractmethod
4✔
426
    async def declare(
4✔
427
        self, timeout: TimeoutType = None,
428
    ) -> aiormq.spec.Exchange.DeclareOk:
429
        raise NotImplementedError
430

431
    @abstractmethod
4✔
432
    async def bind(
4✔
433
        self,
434
        exchange: ExchangeParamType,
435
        routing_key: str = "",
436
        *,
437
        arguments: Arguments = None,
438
        timeout: TimeoutType = None,
439
    ) -> aiormq.spec.Exchange.BindOk:
440
        raise NotImplementedError
441

442
    @abstractmethod
4✔
443
    async def unbind(
4✔
444
        self,
445
        exchange: ExchangeParamType,
446
        routing_key: str = "",
447
        arguments: Arguments = None,
448
        timeout: TimeoutType = None,
449
    ) -> aiormq.spec.Exchange.UnbindOk:
450
        raise NotImplementedError
451

452
    @abstractmethod
4✔
453
    async def publish(
4✔
454
        self,
455
        message: "AbstractMessage",
456
        routing_key: str,
457
        *,
458
        mandatory: bool = True,
459
        immediate: bool = False,
460
        timeout: TimeoutType = None,
461
    ) -> Optional[aiormq.abc.ConfirmationFrameType]:
462
        raise NotImplementedError
463

464
    @abstractmethod
4✔
465
    async def delete(
4✔
466
        self, if_unused: bool = False, timeout: TimeoutType = None,
467
    ) -> aiormq.spec.Exchange.DeleteOk:
468
        raise NotImplementedError
469

470

471
@dataclasses.dataclass(frozen=True)
4✔
472
class UnderlayChannel:
4✔
473
    channel: aiormq.abc.AbstractChannel
4✔
474
    close_callback: OneShotCallback
4✔
475

476
    @classmethod
4✔
477
    async def create(
4✔
478
        cls, connection: aiormq.abc.AbstractConnection,
479
        close_callback: Callable[..., Awaitable[Any]], **kwargs: Any,
480
    ) -> "UnderlayChannel":
481
        close_callback = OneShotCallback(close_callback)
4✔
482

483
        await connection.ready()
4✔
484
        connection.closing.add_done_callback(close_callback)
4✔
485
        channel = await connection.channel(**kwargs)
4✔
486
        channel.closing.add_done_callback(close_callback)
4✔
487

488
        return cls(
4✔
489
            channel=channel,
490
            close_callback=close_callback,
491
        )
492

493
    async def close(self, exc: Optional[ExceptionType] = None) -> Any:
4✔
494
        if self.close_callback.finished.is_set():
4✔
495
            return
4✔
496

497
        # close callbacks must be fired when closing
498
        # and should be deleted later to prevent memory leaks
499
        await self.channel.close(exc)
4✔
500
        await self.close_callback.wait()
4✔
501

502
        self.channel.closing.remove_done_callback(self.close_callback)
4✔
503
        self.channel.connection.closing.remove_done_callback(
4✔
504
            self.close_callback,
505
        )
506

507

508
class AbstractChannel(PoolInstance, ABC):
4✔
509
    QUEUE_CLASS: Type[AbstractQueue]
4✔
510
    EXCHANGE_CLASS: Type[AbstractExchange]
4✔
511

512
    close_callbacks: CallbackCollection[
4✔
513
        AbstractChannel,
514
        [Optional[BaseException]],
515
    ]
516
    return_callbacks: CallbackCollection[
4✔
517
        AbstractChannel,
518
        [AbstractIncomingMessage],
519
    ]
520
    default_exchange: AbstractExchange
4✔
521

522
    publisher_confirms: bool
4✔
523

524
    @property
4✔
525
    @abstractmethod
4✔
526
    def is_initialized(self) -> bool:
4✔
527
        return hasattr(self, "_channel")
×
528

529
    @property
4✔
530
    @abstractmethod
4✔
531
    def is_closed(self) -> bool:
4✔
532
        raise NotImplementedError
533

534
    @abstractmethod
4✔
535
    def close(self, exc: Optional[ExceptionType] = None) -> Awaitable[None]:
4✔
536
        raise NotImplementedError
537

538
    @abstractmethod
4✔
539
    def closed(self) -> Awaitable[Literal[True]]:
4✔
540
        raise NotImplementedError
541

542
    @abstractmethod
4✔
543
    async def get_underlay_channel(self) -> aiormq.abc.AbstractChannel:
4✔
544
        raise NotImplementedError
545

546
    @property
4✔
547
    @abstractmethod
4✔
548
    def number(self) -> Optional[int]:
4✔
549
        raise NotImplementedError
550

551
    @abstractmethod
4✔
552
    async def __aenter__(self) -> "AbstractChannel":
4✔
553
        raise NotImplementedError
554

555
    @abstractmethod
4✔
556
    def __aexit__(
4✔
557
        self,
558
        exc_type: Optional[Type[BaseException]],
559
        exc_val: Optional[BaseException],
560
        exc_tb: Optional[TracebackType],
561
    ) -> Awaitable[None]:
562
        raise NotImplementedError
563

564
    @abstractmethod
4✔
565
    async def initialize(self, timeout: TimeoutType = None) -> None:
4✔
566
        raise NotImplementedError
567

568
    @abstractmethod
4✔
569
    def reopen(self) -> Awaitable[None]:
4✔
570
        raise NotImplementedError
571

572
    @abstractmethod
4✔
573
    async def declare_exchange(
4✔
574
        self,
575
        name: str,
576
        type: Union[ExchangeType, str] = ExchangeType.DIRECT,
577
        *,
578
        durable: bool = False,
579
        auto_delete: bool = False,
580
        internal: bool = False,
581
        passive: bool = False,
582
        arguments: Arguments = None,
583
        timeout: TimeoutType = None,
584
    ) -> AbstractExchange:
585
        raise NotImplementedError
586

587
    @abstractmethod
4✔
588
    async def get_exchange(
4✔
589
        self, name: str, *, ensure: bool = True,
590
    ) -> AbstractExchange:
591
        raise NotImplementedError
592

593
    @abstractmethod
4✔
594
    async def declare_queue(
4✔
595
        self,
596
        name: Optional[str] = None,
597
        *,
598
        durable: bool = False,
599
        exclusive: bool = False,
600
        passive: bool = False,
601
        auto_delete: bool = False,
602
        arguments: Arguments = None,
603
        timeout: TimeoutType = None,
604
    ) -> AbstractQueue:
605
        raise NotImplementedError
606

607
    @abstractmethod
4✔
608
    async def get_queue(
4✔
609
        self, name: str, *, ensure: bool = True,
610
    ) -> AbstractQueue:
611
        raise NotImplementedError
612

613
    @abstractmethod
4✔
614
    async def set_qos(
4✔
615
        self,
616
        prefetch_count: int = 0,
617
        prefetch_size: int = 0,
618
        global_: bool = False,
619
        timeout: TimeoutType = None,
620
        all_channels: Optional[bool] = None,
621
    ) -> aiormq.spec.Basic.QosOk:
622
        raise NotImplementedError
623

624
    @abstractmethod
4✔
625
    async def queue_delete(
4✔
626
        self,
627
        queue_name: str,
628
        timeout: TimeoutType = None,
629
        if_unused: bool = False,
630
        if_empty: bool = False,
631
        nowait: bool = False,
632
    ) -> aiormq.spec.Queue.DeleteOk:
633
        raise NotImplementedError
634

635
    @abstractmethod
4✔
636
    async def exchange_delete(
4✔
637
        self,
638
        exchange_name: str,
639
        timeout: TimeoutType = None,
640
        if_unused: bool = False,
641
        nowait: bool = False,
642
    ) -> aiormq.spec.Exchange.DeleteOk:
643
        raise NotImplementedError
644

645
    @abstractmethod
4✔
646
    def transaction(self) -> AbstractTransaction:
4✔
647
        raise NotImplementedError
648

649
    @abstractmethod
4✔
650
    async def flow(self, active: bool = True) -> aiormq.spec.Channel.FlowOk:
4✔
651
        raise NotImplementedError
652

653
    @abstractmethod
4✔
654
    def __await__(self) -> Generator[Any, Any, "AbstractChannel"]:
4✔
655
        raise NotImplementedError
656

657

658
@dataclasses.dataclass(frozen=True)
4✔
659
class UnderlayConnection:
4✔
660
    connection: aiormq.abc.AbstractConnection
4✔
661
    close_callback: OneShotCallback
4✔
662

663
    @classmethod
4✔
664
    async def make_connection(
4✔
665
        cls, url: URL, timeout: TimeoutType = None, **kwargs: Any,
666
    ) -> aiormq.abc.AbstractConnection:
667
        connection: aiormq.abc.AbstractConnection = await asyncio.wait_for(
4✔
668
            aiormq.connect(url, **kwargs), timeout=timeout,
669
        )
670
        await connection.ready()
4✔
671
        return connection
4✔
672

673
    @classmethod
4✔
674
    async def connect(
4✔
675
        cls, url: URL, close_callback: Callable[..., Awaitable[Any]],
676
        timeout: TimeoutType = None, **kwargs: Any,
677
    ) -> "UnderlayConnection":
678
        try:
4✔
679
            connection = await cls.make_connection(
4✔
680
                url, timeout=timeout, **kwargs,
681
            )
682
            close_callback = OneShotCallback(close_callback)
4✔
683
            connection.closing.add_done_callback(close_callback)
4✔
684
        except Exception as e:
4✔
685
            closing = asyncio.get_event_loop().create_future()
4✔
686
            closing.set_exception(e)
4✔
687
            await close_callback(closing)
4✔
688
            raise
4✔
689

690
        await connection.ready()
4✔
691
        return cls(
4✔
692
            connection=connection,
693
            close_callback=close_callback,
694
        )
695

696
    def ready(self) -> Awaitable[Any]:
4✔
697
        return self.connection.ready()
4✔
698

699
    async def close(self, exc: Optional[aiormq.abc.ExceptionType]) -> Any:
4✔
700
        if self.close_callback.finished.is_set():
4!
UNCOV
701
            return
×
702
        try:
4✔
703
            return await self.connection.close(exc)
4✔
704
        except asyncio.CancelledError:
×
705
            raise
×
706
        finally:
707
            await self.close_callback.wait()
4!
708

709

710
@dataclass
4✔
711
class ConnectionParameter:
4✔
712
    name: str
4✔
713
    parser: Callable[[str], Any]
4✔
714
    default: Optional[str] = None
4✔
715
    is_kwarg: bool = False
4✔
716

717
    def parse(self, value: Optional[str]) -> Any:
4✔
718
        if value is None:
4!
719
            return self.default
×
720
        try:
4✔
721
            return self.parser(value)
4✔
722
        except ValueError:
4✔
723
            return self.default
4✔
724

725

726
class AbstractConnection(PoolInstance, ABC):
4✔
727
    PARAMETERS: Tuple[ConnectionParameter, ...]
4✔
728

729
    close_callbacks: CallbackCollection[
4✔
730
        AbstractConnection,
731
        [Optional[BaseException]],
732
    ]
733
    connected: asyncio.Event
4✔
734
    transport: Optional[UnderlayConnection]
4✔
735
    kwargs: Mapping[str, Any]
4✔
736

737
    @abstractmethod
4✔
738
    def __init__(
4✔
739
        self, url: URL, loop: Optional[asyncio.AbstractEventLoop] = None,
740
        **kwargs: Any,
741
    ):
742
        raise NotImplementedError(
743
            f"Method not implemented, passed: url={url}, loop={loop!r}",
744
        )
745

746
    @property
4✔
747
    @abstractmethod
4✔
748
    def is_closed(self) -> bool:
4✔
749
        raise NotImplementedError
750

751
    @abstractmethod
4✔
752
    async def close(self, exc: ExceptionType = asyncio.CancelledError) -> None:
4✔
753
        raise NotImplementedError
754

755
    @abstractmethod
4✔
756
    def closed(self) -> Awaitable[Literal[True]]:
4✔
757
        raise NotImplementedError
758

759
    @abstractmethod
4✔
760
    async def connect(self, timeout: TimeoutType = None) -> None:
4✔
761
        raise NotImplementedError
762

763
    @abstractmethod
4✔
764
    def channel(
4✔
765
        self,
766
        channel_number: Optional[int] = None,
767
        publisher_confirms: bool = True,
768
        on_return_raises: bool = False,
769
    ) -> AbstractChannel:
770
        raise NotImplementedError
771

772
    @abstractmethod
4✔
773
    async def ready(self) -> None:
4✔
774
        raise NotImplementedError
775

776
    @abstractmethod
4✔
777
    async def __aenter__(self) -> "AbstractConnection":
4✔
778
        raise NotImplementedError
779

780
    @abstractmethod
4✔
781
    async def __aexit__(
4✔
782
        self,
783
        exc_type: Optional[Type[BaseException]],
784
        exc_val: Optional[BaseException],
785
        exc_tb: Optional[TracebackType],
786
    ) -> None:
787
        raise NotImplementedError
788

789
    @abstractmethod
4✔
790
    async def update_secret(
4✔
791
        self, new_secret: str, *,
792
        reason: str = "", timeout: TimeoutType = None,
793
    ) -> aiormq.spec.Connection.UpdateSecretOk:
794
        raise NotImplementedError
795

796

797
class AbstractRobustQueue(AbstractQueue):
4✔
798
    __slots__ = ()
4✔
799

800
    @abstractmethod
4✔
801
    def restore(self) -> Awaitable[None]:
4✔
802
        raise NotImplementedError
803

804
    @abstractmethod
4✔
805
    async def bind(
4✔
806
        self,
807
        exchange: ExchangeParamType,
808
        routing_key: Optional[str] = None,
809
        *,
810
        arguments: Arguments = None,
811
        timeout: TimeoutType = None,
812
        robust: bool = True,
813
    ) -> aiormq.spec.Queue.BindOk:
814
        raise NotImplementedError
815

816
    @abstractmethod
4✔
817
    async def consume(
4✔
818
        self,
819
        callback: Callable[[AbstractIncomingMessage], Any],
820
        no_ack: bool = False,
821
        exclusive: bool = False,
822
        arguments: Arguments = None,
823
        consumer_tag: Optional[ConsumerTag] = None,
824
        timeout: TimeoutType = None,
825
        robust: bool = True,
826
    ) -> ConsumerTag:
827
        raise NotImplementedError
828

829

830
class AbstractRobustExchange(AbstractExchange):
4✔
831
    @abstractmethod
4✔
832
    def restore(self) -> Awaitable[None]:
4✔
833
        raise NotImplementedError
834

835
    @abstractmethod
4✔
836
    async def bind(
4✔
837
        self,
838
        exchange: ExchangeParamType,
839
        routing_key: str = "",
840
        *,
841
        arguments: Arguments = None,
842
        timeout: TimeoutType = None,
843
        robust: bool = True,
844
    ) -> aiormq.spec.Exchange.BindOk:
845
        raise NotImplementedError
846

847

848
class AbstractRobustChannel(AbstractChannel):
4✔
849
    reopen_callbacks: CallbackCollection[AbstractRobustChannel, []]
4✔
850

851
    @abstractmethod
4✔
852
    def reopen(self) -> Awaitable[None]:
4✔
853
        raise NotImplementedError
854

855
    @abstractmethod
4✔
856
    async def restore(self) -> None:
4✔
857
        raise NotImplementedError
858

859
    @abstractmethod
4✔
860
    async def declare_exchange(
4✔
861
        self,
862
        name: str,
863
        type: Union[ExchangeType, str] = ExchangeType.DIRECT,
864
        *,
865
        durable: bool = False,
866
        auto_delete: bool = False,
867
        internal: bool = False,
868
        passive: bool = False,
869
        arguments: Arguments = None,
870
        timeout: TimeoutType = None,
871
        robust: bool = True,
872
    ) -> AbstractRobustExchange:
873
        raise NotImplementedError
874

875
    @abstractmethod
4✔
876
    async def declare_queue(
4✔
877
        self,
878
        name: Optional[str] = None,
879
        *,
880
        durable: bool = False,
881
        exclusive: bool = False,
882
        passive: bool = False,
883
        auto_delete: bool = False,
884
        arguments: Optional[Dict[str, Any]] = None,
885
        timeout: TimeoutType = None,
886
        robust: bool = True,
887
    ) -> AbstractRobustQueue:
888
        raise NotImplementedError
889

890

891
class AbstractRobustConnection(AbstractConnection):
4✔
892
    reconnect_callbacks: CallbackCollection[AbstractRobustConnection, []]
4✔
893

894
    @property
4✔
895
    @abstractmethod
4✔
896
    def reconnecting(self) -> bool:
4✔
897
        raise NotImplementedError
898

899
    @abstractmethod
4✔
900
    def reconnect(self) -> Awaitable[None]:
4✔
901
        raise NotImplementedError
902

903
    @abstractmethod
4✔
904
    def channel(
4✔
905
        self,
906
        channel_number: Optional[int] = None,
907
        publisher_confirms: bool = True,
908
        on_return_raises: bool = False,
909
    ) -> AbstractRobustChannel:
910
        raise NotImplementedError
911

912

913
ChannelCloseCallback = Callable[
4✔
914
    [Optional[AbstractChannel], Optional[BaseException]], Any,
915
]
916
ConnectionCloseCallback = Callable[
4✔
917
    [Optional[AbstractConnection], Optional[BaseException]], Any,
918
]
919
ConnectionType = TypeVar("ConnectionType", bound=AbstractConnection)
4✔
920

921

922
@singledispatch
4✔
923
def get_exchange_name(value: Any) -> str:
4✔
924
    raise ValueError(
×
925
        "exchange argument must be an exchange "
926
        f"instance or str not {value!r}",
927
    )
928

929

930
@get_exchange_name.register(AbstractExchange)
4✔
931
def _get_exchange_name_from_exchnage(value: AbstractExchange) -> str:
4✔
932
    return value.name
4✔
933

934

935
@get_exchange_name.register(str)
4✔
936
def _get_exchange_name_from_str(value: str) -> str:
4✔
937
    return value
×
938

939

940
__all__ = (
4✔
941
    "AbstractChannel",
942
    "AbstractConnection",
943
    "AbstractExchange",
944
    "AbstractIncomingMessage",
945
    "AbstractMessage",
946
    "AbstractProcessContext",
947
    "AbstractQueue",
948
    "AbstractQueueIterator",
949
    "AbstractRobustChannel",
950
    "AbstractRobustConnection",
951
    "AbstractRobustExchange",
952
    "AbstractRobustQueue",
953
    "AbstractTransaction",
954
    "CallbackSetType",
955
    "CallbackType",
956
    "ChannelCloseCallback",
957
    "ConnectionCloseCallback",
958
    "ConnectionParameter",
959
    "ConsumerTag",
960
    "DateType",
961
    "DeclarationResult",
962
    "DeliveryMode",
963
    "ExchangeParamType",
964
    "ExchangeType",
965
    "FieldValue",
966
    "HeadersType",
967
    "MILLISECONDS",
968
    "MessageInfo",
969
    "NoneType",
970
    "SSLOptions",
971
    "TimeoutType",
972
    "TransactionState",
973
    "UnderlayChannel",
974
    "UnderlayConnection",
975
    "get_exchange_name",
976
)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc