• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cjrh / aiomsg / 26719342896

31 May 2026 05:23PM UTC coverage: 85.281% (+0.1%) from 85.16%
26719342896

push

github

web-flow
feat: big overhaul, multiple language support (#123)

* Add multi-language overhaul design and canonical protocol v1 specs

DESIGN.md lays out the plan to turn aiomsg into a family of native,
idiomatic implementations (Python, Rust async/sync, Go) sharing one wire
protocol, plus repo layout, per-language API mappings, a cross-language
conformance harness, and a phased roadmap.

PROTOCOL.md is the standalone canonical wire spec (v1): u32-framed,
typed-envelope messages (HELLO/HEARTBEAT/DATA/DATA_REQ/ACK) that resolve
the ambiguous REQ/REP header in the original reference.

* Relocate Python implementation into python-lib/

Move the reference Python implementation (aiomsg/, tests/, examples/,
source/, docker/, pyproject.toml, uv.lock, .coveragerc, .dockerignore,
RELEASING.md, justfile) into python-lib/ as the first step of the
multi-language repo overhaul.

- Give python-lib its own README.md and LICENSE so the package builds
  self-contained; repoint pyproject readme to README.md.
- Add a top-level justfile that dispatches per-language test recipes.
- Make 'just test' self-contained with --group test.
- Scope CI to python-lib via path filters + working-directory; fix the
  release workflow build path and coverage upload path.

Verified: 101 passed, 2 skipped from python-lib.

* Adopt canonical protocol v1 typed envelope in Python reference

Replace the ambiguous regex-based REQ/REP header with the typed envelope
from PROTOCOL.md, and the literal heartbeat/identity frames with HELLO and
HEARTBEAT envelopes:

- New aiomsg/envelope.py: MsgType + encode/decode for HELLO, HEARTBEAT,
  DATA, DATA_REQ, ACK. Application data is wrapped, so payloads can no
  longer collide with control frames; msg_id/identity are fixed-width.
- Handshake now exchanges HELLO (version + identity) with a version check.
- Connection._recv decodes envelopes and filters HEARTBEAT; data envelopes
  flow to Søcket.raw_recv, which delivers DATA/DATA_REQ payloads, ACKs
  DATA_REQ, and ca... (continued)

65 of 90 new or added lines in 2 files covered. (72.22%)

394 of 462 relevant lines covered (85.28%)

3.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.96
/python-lib/aiomsg/__init__.py
1
"""
2

3
aiomsg
4
======
5

6
 (Servers)
7

8
Broadly 3 kinds of transmission (ends):
9

10
- receive-only
11
- send-only
12
- duplex
13

14
Broadly 2 kinds of distribution patterns:
15

16
- other ends receive all messages
17
- other ends get round-robin
18

19
Broadly 2 kinds of receiving patterns (this is minor):
20

21
- keep receiving from a client while there is data
22
- force switch after each message
23

24
Broadly 2 kinds of health/heartbeat patterns:
25

26
- for send-only+receive-only: receiver reconnects on timeout
27
- for duplex: connector sends a ping, binder sends pong. Connector must
28
  reconnect on a pong timeout
29

30
Run tests with watchmedo (available after ``pip install Watchdog`` ):
31

32
.. code-block:: bash
33

34
    watchmedo shell-command -W -D -R \\
35
        -c 'clear && py.test -s --durations=10 -vv' \\
36
        -p '*.py'
37

38
"""
39
import logging
4✔
40
import asyncio
4✔
41
import uuid
4✔
42
import json
4✔
43
from enum import Enum, auto
4✔
44
from asyncio import StreamReader, StreamWriter
4✔
45
from collections import UserDict
4✔
46
from itertools import cycle
4✔
47
from weakref import WeakSet
4✔
48
from typing import (
4✔
49
    Dict,
50
    Optional,
51
    Tuple,
52
    Union,
53
    List,
54
    AsyncGenerator,
55
    Callable,
56
    MutableMapping,
57
    Awaitable,
58
    Sequence,
59
)
60

61
from . import envelope
4✔
62
from . import msgproto
4✔
63

64
__all__ = ["Søcket", "SendMode", "DeliveryGuarantee"]
4✔
65

66
logger = logging.getLogger(__name__)
4✔
67
SEND_MODES = ["round_robin", "publish"]
4✔
68
JSONCompatible = Union[str, int, float, bool, List, Dict, None]
4✔
69

70

71
class NoConnectionsAvailableError(Exception):
4✔
72
    pass
4✔
73

74

75
class SendMode(Enum):
4✔
76
    PUBLISH = auto()
4✔
77
    ROUNDROBIN = auto()
4✔
78

79

80
class ConnectionEnd(Enum):
4✔
81
    BINDER = auto()
4✔
82
    CONNECTOR = auto()
4✔
83

84

85
class DeliveryGuarantee(Enum):
4✔
86
    AT_MOST_ONCE = auto()
4✔
87
    AT_LEAST_ONCE = auto()
4✔
88

89

90
class ConnectionsDict(UserDict):
4✔
91
    def __init__(self, *args, **kwargs):
4✔
92
        super().__init__(*args, **kwargs)
4✔
93
        self.cycle = None
4✔
94
        self.update_cycle()
4✔
95

96
    def __setitem__(self, key, value):
4✔
97
        super().__setitem__(key, value)
4✔
98
        self.update_cycle()
4✔
99

100
    def __delitem__(self, key):
4✔
101
        super().__delitem__(key)
4✔
102
        self.update_cycle()
4✔
103

104
    def update_cycle(self):
4✔
105
        self.cycle = cycle(self.data)
4✔
106

107
    def __next__(self):
4✔
108
        try:
4✔
109
            return next(self.cycle)
4✔
110
        except StopIteration:
4✔
111
            raise NoConnectionsAvailableError
4✔
112

113

114
# noinspection NonAsciiCharacters
115
class Søcket:
4✔
116
    def __init__(
4✔
117
        self,
118
        send_mode: SendMode = SendMode.ROUNDROBIN,
119
        delivery_guarantee: DeliveryGuarantee = DeliveryGuarantee.AT_MOST_ONCE,
120
        identity: Optional[bytes] = None,
121
        loop=None,
122
        reconnection_delay: Callable[[], float] = lambda: 0.1,
123
    ):
124
        """
125
        :param reconnection_delay: In large microservices
126
            architectures, an outage in one service will result in all the
127
            dependant services trying to connect over and over again (and
128
            sending their buffered data immediately). This parameter lets you
129
            provide a means of staggering the reconnections to avoid
130
            overwhelming the service that comes back into action after an
131
            outage. For example, you could stagger all your dependent
132
            microservices by providing:
133

134
                lambda: random.random(10)
135

136
            which means that the reconnection delay for a specific socket
137
            will be a random number of seconds between 0 and 10. This will
138
            spread out all the reconnecting services over a 10 second
139
            window.
140
        """
141
        self._tasks = WeakSet()
4✔
142
        self.send_mode = send_mode
4✔
143
        self.delivery_guarantee = delivery_guarantee
4✔
144
        self.identity = identity or uuid.uuid4().bytes
4✔
145
        self.loop = loop or asyncio.get_event_loop()
4✔
146

147
        self._queue_recv = asyncio.Queue(maxsize=65536)
4✔
148
        self._connections: MutableMapping[bytes, Connection] = ConnectionsDict()
4✔
149
        self._user_send_queue = asyncio.Queue()
4✔
150

151
        self.server = None
4✔
152
        # Long-running task created by ``connect()`` that keeps (re)connecting
153
        # for the life of the socket. Tracked so ``close()`` can cancel it.
154
        self.connect_task: Optional[asyncio.Task] = None
4✔
155
        self.socket_type: Optional[ConnectionEnd] = None
4✔
156
        self.closed = False
4✔
157
        self.at_least_one_connection = asyncio.Event()
4✔
158

159
        # Keyed by the 16-byte msg_id of an in-flight DATA_REQ; the handle is
160
        # the scheduled resend, cancelled when the matching ACK arrives.
161
        self.waiting_for_acks: Dict[bytes, asyncio.Handle] = {}
4✔
162
        self.reconnection_delay = reconnection_delay
4✔
163

164
        logger.debug("Starting the sender task.")
4✔
165
        # Note this task is started before any connections have been made.
166
        self.sender_task = self.loop.create_task(self._sender_main())
4✔
167
        if send_mode is SendMode.PUBLISH:
4✔
168
            self.sender_handler = self._sender_publish
4✔
169
        elif send_mode is SendMode.ROUNDROBIN:
4✔
170
            self.sender_handler = self._sender_robin
4✔
171
        else:  # pragma: no cover
172
            raise Exception("Unknown send mode.")
173

174
    def idstr(self) -> str:
4✔
175
        return self.identity.hex()
4✔
176

177
    async def bind(
4✔
178
        self,
179
        hostname: Optional[Union[str, Sequence[str]]] = "127.0.0.1",
180
        port: int = 25000,
181
        ssl_context=None,
182
        **kwargs,
183
    ):
184
        """
185
        :param hostname: Hostname to bind. This can be a few different types,
186
            see the documentation for `asyncio.start_server()`.
187
        :param port: See documentation for `asyncio.start_server()`.
188
        :param ssl_context: See documentation for `asyncio.start_server()`.
189
        :param kwargs: All extra kwargs are passed through to
190
            `asyncio.start_server`. See the asyncio documentation for
191
            details.
192
        """
193
        self.check_socket_type()
4✔
194
        logger.info(f"Binding socket {self.idstr()} to {hostname}:{port}")
4✔
195
        self.server = await asyncio.start_server(
4✔
196
            self._connection,
197
            hostname,
198
            port,
199
            ssl=ssl_context,
200
            reuse_address=True,
201
            **kwargs,
202
        )
203
        logger.info("Server started.")
4✔
204
        return self
4✔
205

206
    async def connect(
4✔
207
        self,
208
        hostname: str = "127.0.0.1",
209
        port: int = 25000,
210
        ssl_context=None,
211
        connect_timeout: float = 1.0,
212
    ):
213
        self.check_socket_type()
4✔
214

215
        async def new_connection():
4✔
216
            """Called each time a new connection is attempted. This
217
            suspend while the connection is up."""
218
            writer = None
4✔
219
            try:
4✔
220
                logger.debug("Attempting to open connection")
4✔
221
                reader, writer = await asyncio.wait_for(
4✔
222
                    asyncio.open_connection(
223
                        hostname, port, ssl=ssl_context
224
                    ),
225
                    timeout=connect_timeout,
226
                )
227
                logger.info(f"Socket {self.idstr()} connected.")
4✔
228
                await self._connection(reader, writer)
4✔
229
            except asyncio.TimeoutError:
4✔
230
                # Make timeouts look like socket connection errors
231
                raise OSError
×
232
            finally:
233
                logger.info(f"Socket {self.idstr()} disconnected.")
4✔
234
                # NOTE: the writer is closed inside _connection.
235

236
        async def connect_with_retry():
4✔
237
            """This is a long-running task that is intended to run
238
            for the life of the Socket object. It will continually
239
            try to connect."""
240
            logger.info(f"Socket {self.idstr()} connecting to {hostname}:{port}")
4✔
241
            while not self.closed:
4✔
242
                try:
4✔
243
                    await new_connection()
4✔
244
                    if self.closed:
4✔
245
                        break
4✔
246
                except OSError:
4✔
247
                    if self.closed:
4✔
248
                        break
1✔
249
                    else:
250
                        logger.warning("Connection error, reconnecting...")
4✔
251
                        await asyncio.sleep(self.reconnection_delay())
4✔
252
                        continue
4✔
253
                except asyncio.CancelledError:
4✔
254
                    break
4✔
255
                except Exception:
×
256
                    logger.exception("Unexpected error")
×
257

258
        self.connect_task = self.loop.create_task(connect_with_retry())
4✔
259
        return self
4✔
260

261
    async def messages(self) -> AsyncGenerator[bytes, None]:
4✔
262
        """Convenience method to make it a little easier to get started
263
        with basic, reactive sockets. This method is intended to be
264
        consumed with ``async for``, like this:
265

266
        .. code-block: python3
267

268
            import asyncio
269
            from aiomsg import SmartSock
270

271
            async def main(addr: str):
272
                async for msg in SmartSock().bind(addr).messages():
273
                    print(f'Got a message: {msg}')
274

275
            asyncio.run(main('localhost:8080'))
276

277
        (This is a complete program btw!)
278
        """
279
        async for source, msg in self.identity_messages():
4✔
280
            yield msg
4✔
281

282
    async def identity_messages(self) -> AsyncGenerator[Tuple[bytes, bytes], None]:
4✔
283
        """This is like the ``.messages`` asynchronous generator, but it
284
        returns a tuple of (identity, message) rather than only the message.
285

286
        Example:
287

288
        .. code-block: python3
289

290
            import asyncio
291
            from aiomsg import SmartSock
292

293
            async def main(addr: str):
294
                async for src, msg in SmartSock().bind(addr).messages():
295
                    print(f'Got a message from {src.hex()}: {msg}')
296

297
            asyncio.run(main('localhost:8080'))
298

299
        """
300
        while True:
4✔
301
            yield await self.recv_identity()
4✔
302

303
    async def _connection(self, reader: StreamReader, writer: StreamWriter):
4✔
304
        """Each new connection will create a task with this coroutine."""
305
        logger.debug("Creating new connection")
4✔
306

307
        # Handshake: both ends send a HELLO (version + identity) and read the
308
        # peer's HELLO before any other traffic. See PROTOCOL.md §4.
309
        logger.debug(f"Sending my identity {self.idstr()}")
4✔
310
        await msgproto.send_msg(writer, envelope.hello(self.identity))
4✔
311
        hello_raw = await msgproto.read_msg(reader)
4✔
312
        if not hello_raw:
4✔
313
            return
×
314

315
        hello = envelope.decode(hello_raw)
4✔
316
        if hello is None or hello.type is not envelope.MsgType.HELLO:
4✔
NEW
317
            logger.error("Expected a HELLO handshake; closing connection.")
×
NEW
318
            return
×
319
        if hello.version != envelope.PROTOCOL_VERSION:
4✔
NEW
320
            logger.error(
×
321
                f"Unsupported protocol version {hello.version}; closing connection."
322
            )
NEW
323
            return
×
324
        identity = hello.identity
4✔
325

326
        logger.debug(f"Received identity {identity.hex()}")
4✔
327
        if identity in self._connections:
4✔
328
            logger.error(
×
329
                f"Socket with identity {identity.hex()} is already "
330
                f"connected. This connection will not be created."
331
            )
332
            return
×
333

334
        # Create the connection object. These objects are kept in a
335
        # collection that is used for message distribution.
336
        connection = Connection(
4✔
337
            identity=identity, reader=reader, writer=writer, recv_event=self.raw_recv
338
        )
339
        if len(self._connections) == 0:
4✔
340
            logger.warning("First connection made")
4✔
341
            self.at_least_one_connection.set()
4✔
342
        self._connections[connection.identity] = connection
4✔
343

344
        try:
4✔
345
            await connection.run()
4✔
346
        except asyncio.CancelledError:
×
347
            logger.info(f"Connection {identity.hex()} cancelled.")
×
348
        except Exception:
×
349
            logger.exception(f"Unhandled exception inside _connection")
×
350
            raise
×
351
        finally:
352
            logger.debug("connection closed")
4✔
353
            if connection.identity in self._connections:
4✔
354
                del self._connections[connection.identity]
4✔
355

356
            writer.close()
4✔
357
            await writer.wait_closed()
4✔
358

359
            if not self._connections:
4✔
360
                logger.warning("No connections!")
4✔
361
                self.at_least_one_connection.clear()
4✔
362

363
    async def _close(self):
4✔
364
        logger.info(f"Closing {self.idstr()}")
4✔
365
        self.closed = True
4✔
366

367
        # REP dict, close all events waiting to fire
368
        for msg_id, handle in self.waiting_for_acks.items():
4✔
369
            logger.debug(f"Cancelling pending resend event for msg_id {msg_id}")
×
370
            handle.cancel()
×
371

372
        if self.server:
4✔
373
            # Stop new connections from being accepted.
374
            self.server.close()
4✔
375

376
        # Close all active connections *before* awaiting
377
        # ``server.wait_closed()``. From Python 3.13, ``wait_closed()`` only
378
        # returns once every active connection has also closed; if we waited
379
        # first, a still-connected peer would block close() until its timeout.
380
        await asyncio.gather(
4✔
381
            *(c.close() for c in self._connections.values()), return_exceptions=True
382
        )
383

384
        if self.server:
4✔
385
            await self.server.wait_closed()
4✔
386

387
        # Stop the reconnection loop (no-op for bind-only sockets). By now any
388
        # live connection has been closed above, so the task is either between
389
        # retries or about to observe ``self.closed``; cancelling joins it.
390
        if self.connect_task:
4✔
391
            self.connect_task.cancel()
4✔
392
            await asyncio.gather(self.connect_task, return_exceptions=True)
4✔
393

394
        self.sender_task.cancel()
4✔
395
        await self.sender_task
4✔
396

397
        for task in self._tasks:
4✔
398
            task.cancel()
×
399
        await asyncio.gather(*self._tasks, return_exceptions=True)
4✔
400

401
        logger.info(f"Closed {self.idstr()}")
4✔
402

403
    async def close(self, timeout=10):
4✔
404
        try:
4✔
405
            await asyncio.wait_for(self._close(), timeout)
4✔
406
        except asyncio.TimeoutError:
×
407
            logger.exception("Timed out during close:")
×
408

409

410
        if not self.sender_task.done():
4✔
411
            logger.warning('sender_task was not complete.')
×
412

413
    def raw_recv(self, identity: bytes, env: envelope.Envelope):
4✔
414
        """Called when *any* active connection receives a data envelope.
415

416
        Connection-level frames (HELLO, HEARTBEAT) are handled inside
417
        :class:`Connection`; only DATA, DATA_REQ and ACK reach here.
418
        """
419
        logger.debug(f"In raw_recv, identity: {identity.hex()} type: {env.type}")
4✔
420

421
        if env.type is envelope.MsgType.DATA:
4✔
422
            # Plain application message (AT_MOST_ONCE). Pass it on as-is.
423
            self._queue_recv.put_nowait((identity, env.payload))
4✔
424
            return
4✔
425

NEW
426
        if env.type is envelope.MsgType.DATA_REQ:
×
427
            # An AT_LEAST_ONCE message. Deliver the payload to the application
428
            # and acknowledge receipt back to the sender on the same connection.
NEW
429
            self._queue_recv.put_nowait((identity, env.payload))
×
430

NEW
431
            msg_id = env.msg_id
×
432

NEW
433
            def notify_ack():
×
NEW
434
                logger.debug(f"Acknowledging DATA_REQ msg_id: {msg_id.hex()}")
×
435
                # Specifying the identity routes the ACK to the exact connection
436
                # the DATA_REQ arrived on.
NEW
437
                self._user_send_queue.put_nowait((identity, envelope.ack(msg_id)))
×
438

439
            # Defer the ACK slightly so the payload is handed to the application
440
            # before the sender learns it was received. Otherwise there is a
441
            # race where the ACK arrives, the sender stops, and the receiver
442
            # crashes before processing the payload — making the "guarantee"
443
            # a lie. 20 ms is plenty.
NEW
444
            self.loop.call_later(0.02, notify_ack)
×
445
            return
×
446

NEW
447
        if env.type is envelope.MsgType.ACK:
×
448
            # Acknowledgement of one of our DATA_REQ sends: cancel the pending
449
            # resend. ACKs are never surfaced to the application.
NEW
450
            handle = self.waiting_for_acks.pop(env.msg_id, None)
×
NEW
451
            logger.debug(f"ACK for {env.msg_id.hex()}, handle: {handle}")
×
NEW
452
            if handle:
×
NEW
453
                handle.cancel()
×
NEW
454
            return
×
455

456
    async def recv_identity(self) -> Tuple[bytes, bytes]:
4✔
457
        # Some connection sent us some data
458
        identity, message = await self._queue_recv.get()
4✔
459
        logger.debug(f"Received message from {identity.hex()}: {message}")
4✔
460

461
        return identity, message
4✔
462

463
    async def recv(self) -> bytes:
4✔
464
        # Just drop the identity
465
        _, message = await self.recv_identity()
4✔
466
        return message
4✔
467

468
    async def recv_string(self, **kwargs) -> str:
4✔
469
        """Automatically decode messages into strings.
470

471
        The ``kwargs`` are passed to the ``.decode()`` method of the
472
        received bytes object; for example ``encoding`` and ``errors``.
473
        If you wanted to override the error handler for decoding unicode,
474
        you might do something like the following:
475

476
        .. code-block:: python3
477

478
            msg_str = await sock.recv_string(errors='backslashreplace')
479

480
        Which will substitute unicode-invalid bytes with hexadecimal values
481
        formatted like ``\\xNN``.
482
        """
483
        return (await self.recv()).decode(**kwargs)
4✔
484

485
    async def recv_json(self, **kwargs) -> JSONCompatible:
4✔
486
        """Automatically deserialize messages in JSON format
487

488
        The ``kwargs`` are passed to the ``json.loads()`` method.
489
        """
490
        data = await self.recv()
4✔
491
        return json.loads(data, **kwargs)
4✔
492

493
    async def send(self, data: bytes, identity: Optional[bytes] = None, retries=None):
4✔
494
        logger.debug(f"Adding message to user queue: {data[:20]}")
4✔
495
        if (
4✔
496
            identity or self.send_mode is SendMode.ROUNDROBIN
497
        ) and self.delivery_guarantee is DeliveryGuarantee.AT_LEAST_ONCE:
498
            # AT_LEAST_ONCE: send a DATA_REQ and schedule a resend that fires
499
            # unless an ACK cancels it. (Not reachable for PUBLISH, which is
500
            # intentionally unsupported — see PROTOCOL.md §6.)
501
            #####################################################################
NEW
502
            msg_id = uuid.uuid4().bytes
×
NEW
503
            wire = envelope.data_req(msg_id, data)
×
504

505
            def resend(retries):
×
506
                if retries == 0:
×
NEW
507
                    logger.info(f"No more retries to send. Dropping [{data[:20]}...]")
×
508
                    return
×
509

NEW
510
                self._tasks.add(self.loop.create_task(self.send(data, identity)))
×
511
                # After deleting this here, a new one will be created when
512
                # we re-enter ``async def send()``
513
                logger.debug(f"Removing the acks entry")
×
NEW
514
                del self.waiting_for_acks[msg_id]
×
515

516
            handle: asyncio.Handle = self.loop.call_later(
×
517
                5.0, resend, 5 if retries is None else retries - 1
518
            )
519
            # In self.raw_recv(), this handle will be cancelled if the other
520
            # side sends back an acknowledgement of receipt (ACK).
521
            logger.debug("Creating future resend acks entry")
×
NEW
522
            self.waiting_for_acks[msg_id] = handle
×
523
            #####################################################################
524
        else:
525
            # AT_MOST_ONCE: a plain DATA envelope, no acknowledgement.
526
            wire = envelope.data(data)
4✔
527

528
        await self._user_send_queue.put((identity, wire))
4✔
529

530
    async def send_string(self, data: str, identity: Optional[bytes] = None, **kwargs):
4✔
531
        """Automatically convert the string to bytes when sending.
532

533
        The ``kwargs`` are passed to the internal ``data.encode()`` method. """
534
        await self.send(data.encode(**kwargs), identity)
4✔
535

536
    async def send_json(
4✔
537
        self, obj: JSONCompatible, identity: Optional[bytes] = None, **kwargs
538
    ):
539
        """Automatically serialise the given ``obj`` to a JSON representation
540
        when sending.
541

542
        The ``kwargs`` are passed to the ``json.dumps()`` method. In particular,
543
        you might find the ``default`` parameter of ``dumps`` useful, since
544
        this can be used to automatically convert an otherwise
545
        JSON-incompatible attribute into something that can be represented.
546
        For example:
547

548
        .. code-block:: python3
549

550
            class Blah:
551
                def __init__(self, x, y):
552
                    self.x = x
553
                    self.y = y
554

555
                def __str__(self):
556
                    return f'{x},{y}'
557

558
            d = dict(text='hi', obj=Blah(1, 2))
559

560
            await sock.send_json(d, default=str)
561
            # The bytes that will be sent: {"text": "hi", "obj": "1,2"}
562

563
        It requires a bit more work to make a class properly serialize and
564
        deserialize to JSON, however. You will need to carefully study
565
        how to use the ``object_hook`` parameter in the ``json.loads()``
566
        method.
567
        """
568
        await self.send_string(json.dumps(obj, **kwargs), identity)
4✔
569

570
    def _sender_publish(self, message: bytes):
4✔
571
        logger.debug(f"Sending message via publish")
4✔
572
        # TODO: implement grouping by named channels
573
        if not self._connections:
4✔
574
            raise NoConnectionsAvailableError
4✔
575

576
        for identity, c in self._connections.items():
4✔
577
            logger.debug(f"Sending to connection: {identity.hex()}")
4✔
578
            try:
4✔
579
                c.writer_queue.put_nowait(message)
4✔
580
                logger.debug("Placed message on connection writer queue.")
4✔
581
            except asyncio.QueueFull:
4✔
582
                logger.error(
4✔
583
                    f"Dropped msg to Connection {identity.hex()}, its write queue is full."
584
                )
585

586
    def _sender_robin(self, message: bytes):
4✔
587
        """
588
        Raises:
589

590
        - NoConnectionsAvailableError
591

592
        """
593
        logger.debug(f"Sending message via round_robin")
4✔
594
        queues_full = set()
4✔
595
        while True:
4✔
596
            identity = next(self._connections)
4✔
597
            logger.debug(f"Got connection: {identity.hex()}")
4✔
598
            if identity in queues_full:
4✔
599
                logger.warning(f"All send queues are full. Dropping message.")
4✔
600
                return
4✔
601
            try:
4✔
602
                connection = self._connections[identity]
4✔
603
                connection.writer_queue.put_nowait(message)
4✔
604
                logger.debug(f"Added message to connection send queue.")
4✔
605
                return
4✔
606
            except asyncio.QueueFull:
4✔
607
                logger.warning(
4✔
608
                    "Cannot send to Connection blah, its write queue is full! "
609
                    "Trying a different peer..."
610
                )
611
                queues_full.add(identity)
4✔
612

613
    def _sender_identity(self, message: bytes, identity: bytes):
4✔
614
        """Send directly to a peer with a distinct identity"""
615
        logger.debug(
4✔
616
            f"Sending message via identity {identity.hex()}: {message[:20]}..."
617
        )
618
        c = self._connections.get(identity)
4✔
619
        if not c:
4✔
620
            logger.error(
4✔
621
                f"Peer {identity.hex()} is not connected. Message will be dropped."
622
            )
623
            return
4✔
624

625
        try:
4✔
626
            c.writer_queue.put_nowait(message)
4✔
627
            logger.debug("Placed message on connection writer queue.")
4✔
628
        except asyncio.QueueFull:
4✔
629
            logger.error("Dropped msg to Connection blah, its write " "queue is full.")
4✔
630

631
    async def _sender_main(self):
4✔
632
        while True:
4✔
633
            q_task: asyncio.Task = self.loop.create_task(self._user_send_queue.get())
4✔
634
            w_task: asyncio.Task = self.loop.create_task(
4✔
635
                self.at_least_one_connection.wait()
636
            )
637
            try:
4✔
638
                await asyncio.wait([w_task, q_task], return_when=asyncio.ALL_COMPLETED)
4✔
639
            except asyncio.CancelledError:
4✔
640
                q_task.cancel()
4✔
641
                w_task.cancel()
4✔
642
                return
4✔
643

644
            identity, data = q_task.result()
4✔
645
            logger.debug(f"Got data to send: {data[:64]}")
4✔
646
            try:
4✔
647
                if identity is not None:
4✔
648
                    self._sender_identity(data, identity)
4✔
649
                else:
650
                    try:
4✔
651
                        logger.debug(f"Sending msg via handler: {data[:64]}")
4✔
652
                        self.sender_handler(message=data)
4✔
653
                    except NoConnectionsAvailableError:
1✔
654
                        logger.error("No connections available")
×
655
                        self.at_least_one_connection.clear()
×
656
                        try:
×
657
                            # Put it back onto the queue
658
                            self._user_send_queue.put_nowait((identity, data))
×
659
                        except asyncio.QueueFull:
×
660
                            logger.error(
×
661
                                "Send queue full when trying to recover "
662
                                "from no connections being available. "
663
                                "Dropping data!"
664
                            )
665
            except Exception as e:
×
666
                logger.exception(f"Unexpected error when sending a message: {e}")
×
667

668
    def check_socket_type(self):
4✔
669
        if self.socket_type is not None:  # pragma: no cover
670
            raise SystemError(f"Socket type already set: {self.socket_type}")
671

672
    async def __aenter__(self) -> "Søcket":
4✔
673
        return self
4✔
674

675
    async def __aexit__(self, exc_type, exc_val, exc_tb):
4✔
676
        await self.close()
4✔
677

678

679
class HeartBeatFailed(ConnectionError):
4✔
680
    pass
4✔
681

682

683
class Connection:
4✔
684
    def __init__(
4✔
685
        self,
686
        identity: bytes,
687
        reader: StreamReader,
688
        writer: StreamWriter,
689
        recv_event: Callable[[bytes, "envelope.Envelope"], None],
690
        loop=None,
691
        writer_queue_maxsize=0,
692
    ):
693
        self.loop = loop or asyncio.get_event_loop()
4✔
694
        self.identity = identity
4✔
695
        self.reader = reader
4✔
696
        self.writer = writer
4✔
697
        self.writer_queue = asyncio.Queue(maxsize=writer_queue_maxsize)
4✔
698
        self.reader_event = recv_event
4✔
699

700
        self.reader_task: Optional[asyncio.Task] = None
4✔
701
        self.writer_task: Optional[asyncio.Task] = None
4✔
702

703
        self.heartbeat_interval = 5
4✔
704
        self.heartbeat_timeout = 15
4✔
705
        self.heartbeat_message = envelope.heartbeat()
4✔
706

707
    def warn_dropping_data(self):  # pragma: no cover
708
        qsize = self.writer_queue.qsize()
709
        if qsize:
710
            logger.warning(
711
                f"Closing connection {self.identity.hex()} but there is "
712
                f"still data in the writer queue: {qsize}. "
713
                f"These messages will be lost."
714
            )
715

716
    async def close(self):
4✔
717
        # Kill the reader task
718
        self.reader_task.cancel()
4✔
719
        try:
4✔
720
            await asyncio.wait_for(self.writer_queue.join(), 10.0)
4✔
721
        except asyncio.TimeoutError:
×
722
            self.warn_dropping_data()
×
723

724
        self.writer_task.cancel()
4✔
725
        await asyncio.gather(self.reader_task, self.writer_task)
4✔
726
        self.reader_task = None
4✔
727
        self.writer_task = None
4✔
728

729
    async def _recv(self):
4✔
730
        while True:
4✔
731
            try:
4✔
732
                logger.debug("Waiting for messages in connection")
4✔
733
                message = await asyncio.wait_for(
4✔
734
                    msgproto.read_msg(self.reader), timeout=self.heartbeat_timeout
735
                )
736
                logger.debug(f"Got message in connection: {message}")
4✔
737
            except asyncio.TimeoutError:
4✔
738
                logger.warning("Heartbeat failed. Closing connection.")
×
739
                self.writer_queue.put_nowait(None)
×
740
                return
×
741
                # raise HeartBeatFailed
742
            except asyncio.CancelledError:
4✔
743
                return
4✔
744

745
            if not message:
4✔
746
                logger.debug("Connection closed (recv)")
4✔
747
                self.writer_queue.put_nowait(None)
4✔
748
                return
4✔
749

750
            env = envelope.decode(message)
4✔
751
            if env is None:
4✔
752
                # Unrecognised envelope type; ignore it (forward-compat).
NEW
753
                logger.debug("Ignoring unrecognised envelope")
×
NEW
754
                continue
×
755

756
            if env.type is envelope.MsgType.HEARTBEAT:
4✔
757
                logger.debug("Heartbeat received")
×
758
                continue
×
759

760
            try:
4✔
761
                logger.debug(
4✔
762
                    f"Received {env.type.name} on connection {self.identity.hex()}"
763
                )
764
                self.reader_event(self.identity, env)
4✔
765
            except asyncio.QueueFull:
×
766
                logger.error(
×
767
                    # TODO: fix message
768
                    "Data lost on connection blah because the recv "
769
                    "queue is full!"
770
                )
771
            except Exception as e:
×
772
                logger.exception(f"Unhandled error in _recv: {e}")
×
773

774
    async def send_wait(self, message: bytes):
4✔
775
        await msgproto.send_msg(self.writer, message)
4✔
776

777
    @staticmethod
4✔
778
    async def _send(
4✔
779
        identity: bytes,
780
        send_wait: Callable[[bytes], Awaitable[None]],
781
        writer_queue: asyncio.Queue,
782
        heartbeat_interval: float,
783
        heartbeat_message: bytes,
784
        reader_task: asyncio.Task,
785
    ):
786
        while True:
4✔
787
            try:
4✔
788
                try:
4✔
789
                    message = await asyncio.wait_for(
4✔
790
                        writer_queue.get(), timeout=heartbeat_interval
791
                    )
792
                except asyncio.TimeoutError:
4✔
793
                    logger.debug("Sending a heartbeat")
4✔
794
                    message = heartbeat_message
4✔
795
                except asyncio.CancelledError:
4✔
796
                    break
4✔
797
                else:
798
                    writer_queue.task_done()
4✔
799

800
                if not message:
4✔
801
                    logger.info("Connection closed (send)")
4✔
802
                    reader_task.cancel()
4✔
803
                    break
4✔
804

805
                logger.debug(
4✔
806
                    f"Got message from connection writer queue. {message[:64]}"
807
                )
808
                try:
4✔
809
                    await send_wait(message)
4✔
810
                    logger.debug("Sent message")
4✔
811
                except OSError as e:
4✔
812
                    logger.error(
4✔
813
                        f"Connection {identity.hex()} aborted, dropping "
814
                        f"message: {message[:50]}...{message[-50:]}\n"
815
                        f"error: {e}"
816
                    )
817
                    break
4✔
818
                except asyncio.CancelledError:
4✔
819
                    # Try to still send this message.
820
                    # await msgproto.send_msg(self.writer, message)
821
                    break
4✔
822
            except Exception as e:
4✔
823
                logger.error(f"Unhandled error: {e}")
4✔
824

825
    async def run(self):
4✔
826
        logger.info(f"Connection {self.identity.hex()} running.")
4✔
827
        self.reader_task = self.loop.create_task(self._recv())
4✔
828
        self.writer_task = self.loop.create_task(
4✔
829
            self._send(
830
                self.identity,
831
                self.send_wait,
832
                self.writer_queue,
833
                self.heartbeat_interval,
834
                self.heartbeat_message,
835
                self.reader_task,
836
            )
837
        )
838

839
        try:
4✔
840
            await asyncio.wait(
4✔
841
                [self.reader_task, self.writer_task], return_when=asyncio.ALL_COMPLETED
842
            )
843
        except asyncio.CancelledError:
4✔
844
            self.reader_task.cancel()
4✔
845
            self.writer_task.cancel()
4✔
846
            group = asyncio.gather(self.reader_task, self.writer_task)
4✔
847
            await group
4✔
848
            self.warn_dropping_data()
4✔
849
        logger.info(f"Connection {self.identity.hex()} no longer active.")
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc