• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nocarryr / tslumd / 7388595068

02 Jan 2024 05:18PM UTC coverage: 94.411%. First build
7388595068

push

github

nocarryr
Run doctest in separate pytest invocation

929 of 984 relevant lines covered (94.41%)

4.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.71
/src/tslumd/receiver.py
1
from __future__ import annotations
5✔
2
try:
5✔
3
    from loguru import logger
5✔
4
except ImportError: # pragma: no cover
5
    import logging
6
    logger = logging.getLogger(__name__)
7
import asyncio
5✔
8
from typing import Tuple
5✔
9

10
from pydispatch import Dispatcher, Property, DictProperty, ListProperty
5✔
11

12
from tslumd import Tally, Screen, TallyKey, Message
5✔
13

14
__all__ = ('UmdReceiver',)
5✔
15

16

17

18

19
class UmdProtocol(asyncio.DatagramProtocol):
5✔
20
    def __init__(self, receiver: 'UmdReceiver'):
5✔
21
        self.receiver = receiver
5✔
22
    def connection_made(self, transport):
5✔
23
        logger.debug(f'transport={transport}')
5✔
24
        self.transport = transport
5✔
25
        self.receiver.connected_evt.set()
5✔
26
    def datagram_received(self, data, addr):
5✔
27
        self.receiver.parse_incoming(data, addr)
5✔
28

29

30
class UmdReceiver(Dispatcher):
5✔
31
    """Receiver for UMD messages
32

33
    Arguments:
34
        hostaddr (str): The local host address to bind the server to. Defaults
35
            to :attr:`DEFAULT_HOST`
36
        hostport (int): The port to listen on. Defaults to :attr:`DEFAULT_PORT`
37

38
    :Events:
39
        .. event:: on_tally_added(tally: Tally)
40

41
            Fired when a :class:`~.Tally` instance is added to :attr:`tallies`
42

43
        .. event:: on_tally_updated(tally: Tally)
44

45
            Fired when any :class:`~.Tally` instance has been updated
46

47
        .. event:: on_tally_control(tally: Tally, data: bytes)
48

49
            Fired when control data has been received for a :class:`~.Tally`
50

51
            .. versionadded:: 0.0.3
52

53
        .. event:: on_screen_added(screen: Screen)
54

55
            Fired when a :class:`~.Screen` instance is added to :attr:`screens`
56

57
            .. versionadded:: 0.0.3
58

59
        .. event:: on_scontrol(screen: int, data: bytes)
60

61
            Fired when a message with :attr:`~.Message.scontrol` data is received
62

63
            * ``screen`` : The :attr:`~.Message.screen` from the incoming
64
              control message
65
            * ``data`` : The control data
66

67
            .. versionadded:: 0.0.2
68

69
    """
70

71
    DEFAULT_HOST: str = '0.0.0.0' #: The default host address to listen on
5✔
72
    DEFAULT_PORT: int = 65000 #: The default host port to listen on
5✔
73

74
    screens: dict[int, Screen]
5✔
75
    """Mapping of :class:`~.Screen` objects by :attr:`~.Screen.index`
2✔
76

77
    .. versionadded:: 0.0.3
78
    """
79

80
    broadcast_screen: Screen
5✔
81
    """A :class:`~.Screen` instance created using :meth:`.Screen.broadcast`
2✔
82

83
    .. versionadded:: 0.0.3
84
    """
85

86
    tallies: dict[TallyKey, Tally]
5✔
87
    """Mapping of :class:`~.Tally` objects by their :attr:`~.Tally.id`
2✔
88

89
    .. versionchanged:: 0.0.3
90
        The keys are now a combination of the :class:`~.Screen` and
91
        :class:`.Tally` indices
92
    """
93

94
    running: bool
5✔
95
    """``True`` if the client / server are running
2✔
96
    """
97

98
    _events_ = [
5✔
99
        'on_tally_added', 'on_tally_updated', 'on_tally_control',
100
        'on_screen_added', 'on_scontrol',
101
    ]
102
    def __init__(self, hostaddr: str = DEFAULT_HOST, hostport: int = DEFAULT_PORT):
5✔
103
        self.__hostaddr = hostaddr
5✔
104
        self.__hostport = hostport
5✔
105
        self.screens = {}
5✔
106
        self.broadcast_screen = Screen(0xffff)
5✔
107
        self._bind_screen(self.broadcast_screen)
5✔
108
        self.screens[self.broadcast_screen.index] = self.broadcast_screen
5✔
109
        self.tallies = {}
5✔
110
        self.__loop: asyncio.AbstractEventLoop|None = None
5✔
111
        self.running = False
5✔
112
        self.__connect_lock: asyncio.Lock|None = None
5✔
113
        self.__connected_evt: asyncio.Event|None = None
5✔
114

115
    @property
5✔
116
    def loop(self) -> asyncio.AbstractEventLoop:
4✔
117
        """The :class:`asyncio.BaseEventLoop` associated with the instance"""
118
        loop = self.__loop
5✔
119
        if loop is None:
5✔
120
            loop = self.__loop = asyncio.get_running_loop()
5✔
121
        return loop
5✔
122

123
    @property
5✔
124
    def connected_evt(self) -> asyncio.Event:
4✔
125
        e = self.__connected_evt
5✔
126
        if e is None:
5✔
127
            e = self.__connected_evt = asyncio.Event()
5✔
128
        return e
5✔
129

130
    @property
5✔
131
    def _connect_lock(self) -> asyncio.Lock:
4✔
132
        l = self.__connect_lock
5✔
133
        if l is None:
5✔
134
            l = self.__connect_lock = asyncio.Lock()
5✔
135
        return l
5✔
136

137
    @property
5✔
138
    def hostaddr(self) -> str:
4✔
139
        """The local host address to bind the server to
140
        """
141
        return self.__hostaddr
5✔
142

143
    @property
5✔
144
    def hostport(self) -> int:
4✔
145
        """The port to listen on
146
        """
147
        return self.__hostport
5✔
148

149
    async def open(self):
5✔
150
        """Open the server
151
        """
152
        async with self._connect_lock:
5✔
153
            if self.running:
5✔
154
                return
×
155
            logger.debug('UmdReceiver.open()')
5✔
156
            self.running = True
5✔
157
            self.connected_evt.clear()
5✔
158
            self.transport, self.protocol = await self.loop.create_datagram_endpoint(
5✔
159
                lambda: UmdProtocol(self),
160
                local_addr=(self.hostaddr, self.hostport),
161
                reuse_port=True,
162
            )
163
            await self.connected_evt.wait()
5✔
164
            logger.info('UmdReceiver running')
5✔
165

166
    async def close(self):
5✔
167
        """Close the server
168
        """
169
        async with self._connect_lock:
5✔
170
            if not self.running:
5✔
171
                return
×
172
            logger.debug('UmdReceiver.close()')
5✔
173
            self.running = False
5✔
174
            self.transport.close()
5✔
175
            self.connected_evt.clear()
5✔
176
            logger.info('UmdReceiver closed')
5✔
177

178
    async def set_bind_address(self, hostaddr: str, hostport: int):
5✔
179
        """Set the :attr:`hostaddr` and :attr:`hostport` and restart the server
180
        """
181
        if hostaddr == self.hostaddr and hostport == self.hostport:
5✔
182
            return
×
183
        running = self.running
5✔
184
        if running:
5✔
185
            await self.close()
5✔
186
        self.__hostaddr = hostaddr
5✔
187
        self.__hostport = hostport
5✔
188
        if running:
5✔
189
            await self.open()
5✔
190

191
    async def set_hostaddr(self, hostaddr: str):
5✔
192
        """Set the :attr:`hostaddr` and restart the server
193
        """
194
        await self.set_bind_address(hostaddr, self.hostport)
5✔
195

196
    async def set_hostport(self, hostport: int):
5✔
197
        """Set the :attr:`hostport` and restart the server
198
        """
199
        await self.set_bind_address(self.hostaddr, hostport)
5✔
200

201
    def parse_incoming(self, data: bytes, addr: Tuple[str, int]):
5✔
202
        """Parse data received by the server
203
        """
204
        while True:
3✔
205
            message, remaining = Message.parse(data)
5✔
206
            if message.screen not in self.screens:
5✔
207
                screen = Screen(message.screen)
5✔
208
                self.screens[screen.index] = screen
5✔
209
                self._bind_screen(screen)
5✔
210
                self.emit('on_screen_added', screen)
5✔
211
                logger.debug(f'new screen: {screen.index}')
5✔
212
            else:
213
                screen = self.screens[message.screen]
5✔
214

215
            if message.is_broadcast:
5✔
216
                for screen in self.screens.values():
5✔
217
                    screen.update_from_message(message)
5✔
218
            else:
219
                screen.update_from_message(message)
5✔
220
            if not len(remaining):
5✔
221
                break
5✔
222

223
    def _bind_screen(self, screen: Screen):
5✔
224
        screen.bind(
5✔
225
            on_tally_added=self._on_screen_tally_added,
226
            on_tally_update=self._on_screen_tally_update,
227
            on_tally_control=self._on_screen_tally_control,
228
            on_control=self._on_screen_control,
229
        )
230

231
    def _on_screen_tally_added(self, tally: Tally, **kwargs):
5✔
232
        if tally.id not in self.tallies:
5✔
233
            self.tallies[tally.id] = tally
5✔
234
        self.emit('on_tally_added', tally, **kwargs)
5✔
235

236
    def _on_screen_tally_update(self, *args, **kwargs):
5✔
237
        self.emit('on_tally_updated', *args, **kwargs)
5✔
238

239
    def _on_screen_tally_control(self, *args, **kwargs):
5✔
240
        self.emit('on_tally_control', *args, **kwargs)
5✔
241

242
    def _on_screen_control(self, *args, **kwargs):
5✔
243
        self.emit('on_scontrol', *args, **kwargs)
5✔
244

245
    async def __aenter__(self):
5✔
246
        await self.open()
5✔
247
        return self
5✔
248

249
    async def __aexit__(self, exc_type, exc_value, traceback):
5✔
250
        await self.close()
5✔
251

252

253
if __name__ == '__main__':
254
    loop = asyncio.get_event_loop()
255
    umd = UmdReceiver()
256

257
    loop.run_until_complete(umd.open())
258
    try:
259
        loop.run_forever()
260
    except KeyboardInterrupt:
261
        loop.run_until_complete(umd.close())
262
    finally:
263
        loop.close()
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc