• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sneakers-the-rat / noob / 22165807119

12 Feb 2026 11:59PM UTC coverage: 87.415% (+1.9%) from 85.469%
22165807119

push

github

web-flow
Merge pull request #134 from miniscope/map-tests

initial map test cases

3 of 4 new or added lines in 1 file covered. (75.0%)

279 existing lines in 15 files now uncovered.

2813 of 3218 relevant lines covered (87.41%)

6.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.13
/src/noob/network/loop.py
1
import asyncio
8✔
2
import sys
8✔
3
from collections import defaultdict
8✔
4
from collections.abc import Callable, Coroutine
8✔
5
from typing import Any
8✔
6

7
try:
8✔
8
    from zmq.asyncio import Context, Socket
8✔
UNCOV
9
except ImportError as e:
×
UNCOV
10
    raise ImportError(
×
11
        "Attempted to import zmq runner, but zmq deps are not installed. install with `noob[zmq]`",
12
    ) from e
13

14
if sys.version_info < (3, 12):
8✔
15
    from typing_extensions import TypedDict
3✔
16
else:
17
    from typing import TypedDict
5✔
18

19
from noob.logging import init_logger
8✔
20
from noob.network.message import Message
8✔
21
from noob.utils import iscoroutinefunction_partial
8✔
22

23

24
class _CallbackDict(TypedDict):
8✔
25
    sync: list[Callable[[Message], Any]]
8✔
26
    asyncio: list[Callable[[Message], Coroutine]]
8✔
27

28

29
class EventloopMixin:
8✔
30
    """
31
    Mixin to provide common asyncio zmq scaffolding to networked classes.
32

33
    Inheriting classes should, in order
34

35
    * call the ``_init_loop`` method to create the eventloop, context, and poller
36
    * populate the private ``_sockets``  and ``_receivers`` dicts
37
    * await the ``_poll_sockets`` method, which polls indefinitely.
38

39
    Inheriting classes **must** ensure that ``_init_loop``
40
    is called in the thread it is intended to run in,
41
    and that thread must already have a running eventloop.
42
    asyncio eventloops (and most of asyncio) are **not** thread safe.
43

44
    To help avoid cross-threading issues, the :meth:`.context`  and :meth:`.loop`
45
    properties do *not* automatically create the objects,
46
    raising a :class:`.RuntimeError` if they are accessed before ``_init_loop`` is called.
47
    """
48

49
    def __init__(self):
8✔
50
        self._context = None
5✔
51
        self._loop = None
5✔
52
        self._quitting: asyncio.Event = None  # type: ignore[assignment]
5✔
53
        self._sockets: dict[str, Socket] = {}
5✔
54
        """
5✔
55
        All sockets, mapped from some common name to the socket.
56
        The same key used here should be shared between _receivers and _callbacks
57
        """
58
        self._receivers: dict[str, Socket] = {}
5✔
59
        """Sockets that should be polled for incoming messages"""
5✔
60
        self._callbacks: dict[str, _CallbackDict] = defaultdict(
5✔
61
            lambda: _CallbackDict(sync=[], asyncio=[])
62
        )
63
        """Callbacks for each receiver socket"""
5✔
64
        if not hasattr(self, "logger"):
5✔
UNCOV
65
            self.logger = init_logger("eventloop")
×
66

67
    @property
8✔
68
    def context(self) -> Context:
8✔
69
        if self._context is None:
5✔
UNCOV
70
            raise RuntimeError("Loop has not been initialized with _init_loop!")
×
71
        return self._context
5✔
72

73
    @property
8✔
74
    def loop(self) -> asyncio.AbstractEventLoop:
8✔
75
        if self._loop is None:
5✔
UNCOV
76
            raise RuntimeError("Loop has not been initialized with _init_loop!")
×
77
        return self._loop
5✔
78

79
    @property
8✔
80
    def sockets(self) -> dict[str, Socket]:
8✔
81
        return self._sockets
5✔
82

83
    def register_socket(self, name: str, socket: Socket, receiver: bool = False) -> None:
8✔
84
        """Register a socket, optionally declaring it as a receiver socket to poll"""
85
        if name in self._sockets:
5✔
UNCOV
86
            raise KeyError(f"Socket {name} already declared!")
×
87
        self._sockets[name] = socket
5✔
88
        if receiver:
5✔
89
            self._receivers[name] = socket
5✔
90

91
    def add_callback(
8✔
92
        self, socket: str, callback: Callable[[Message], Any] | Callable[[Message], Coroutine]
93
    ) -> None:
94
        """
95
        Add a callback to be called when the socket receives a message.
96
        Callbacks are called in the order in which they are added.
97
        """
98
        if socket not in self._receivers:
5✔
UNCOV
99
            raise KeyError(f"Socket {socket} does not exist or is not a receiving socket")
×
100
        if iscoroutinefunction_partial(callback):
5✔
101
            self._callbacks[socket]["asyncio"].append(callback)
5✔
102
        else:
103
            self._callbacks[socket]["sync"].append(callback)
5✔
104

105
    def clear_callbacks(self) -> None:
8✔
106
        self._callbacks = defaultdict(lambda: _CallbackDict(sync=[], asyncio=[]))
5✔
107

108
    def _init_loop(self) -> None:
8✔
109
        self._loop = asyncio.get_running_loop()
5✔
110
        self._context = Context.instance()
5✔
111
        self._quitting = asyncio.Event()
5✔
112

113
    def _stop_loop(self) -> None:
8✔
UNCOV
114
        if self._quitting is None:
×
UNCOV
115
            return
×
UNCOV
116
        self._quitting.set()
×
117

118
    async def _poll_receivers(self) -> None:
8✔
119
        """
120
        Rather than using the zmq.asyncio.Poller which wastes a ton of time,
121
        it turns out doing it this way is roughly 4x as fast:
122
        just manually poll the sockets, and if you have multiple sockets,
123
        gather multiple coroutines where you're polling the sockets.
124
        """
125
        if len(self._receivers) == 1:
5✔
UNCOV
126
            await self._poll_receiver(next(iter(self._receivers.keys())))
×
127
        else:
128
            await asyncio.gather(*[self._poll_receiver(name) for name in self._receivers])
5✔
129

130
    async def _poll_receiver(self, name: str) -> None:
8✔
131
        socket = self._receivers[name]
5✔
132
        while not self._quitting.is_set():
5✔
133
            msg_bytes = await socket.recv_multipart()
5✔
134
            try:
5✔
135
                msg = Message.from_bytes(msg_bytes)
5✔
UNCOV
136
            except Exception as e:
×
UNCOV
137
                self.logger.exception(
×
138
                    "Exception decoding message for socket %s: %s,  %s", name, msg_bytes, e
139
                )
UNCOV
140
                continue
×
141

142
            # purposely don't catch errors here because we want them to bubble up into the caller
143
            for acb in self._callbacks[name]["asyncio"]:
5✔
144
                await acb(msg)
5✔
145
            for cb in self._callbacks[name]["sync"]:
5✔
146
                self.loop.run_in_executor(None, cb, msg)
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc