• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nocarryr / tslumd / 5833948469

pending completion
5833948469

push

github

nocarryr
Use pytest_asyncio.fixture for teardown fixtures

841 of 897 relevant lines covered (93.76%)

2.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.25
/src/tslumd/receiver.py
1
try:
3✔
2
    from loguru import logger
3✔
3
except ImportError: # pragma: no cover
4
    import logging
5
    logger = logging.getLogger(__name__)
6
import asyncio
3✔
7
from typing import Dict, Tuple, Set, Optional
3✔
8

9
from pydispatch import Dispatcher, Property, DictProperty, ListProperty
3✔
10

11
from tslumd import Tally, Screen, TallyKey, Message
3✔
12

13
__all__ = ('UmdReceiver',)
3✔
14

15

16

17

18
class UmdProtocol(asyncio.DatagramProtocol):
3✔
19
    def __init__(self, receiver: 'UmdReceiver'):
3✔
20
        self.receiver = receiver
3✔
21
    def connection_made(self, transport):
3✔
22
        logger.debug(f'transport={transport}')
3✔
23
        self.transport = transport
3✔
24
        self.receiver.connected_evt.set()
3✔
25
    def datagram_received(self, data, addr):
3✔
26
        self.receiver.parse_incoming(data, addr)
3✔
27

28

29
class UmdReceiver(Dispatcher):
3✔
30
    """Receiver for UMD messages
31

32
    Arguments:
33
        hostaddr (str): The local host address to bind the server to. Defaults
34
            to :attr:`DEFAULT_HOST`
35
        hostport (int): The port to listen on. Defaults to :attr:`DEFAULT_PORT`
36

37
    :Events:
38
        .. event:: on_tally_added(tally: Tally)
39

40
            Fired when a :class:`~.Tally` instance is added to :attr:`tallies`
41

42
        .. event:: on_tally_updated(tally: Tally)
43

44
            Fired when any :class:`~.Tally` instance has been updated
45

46
        .. event:: on_tally_control(tally: Tally, data: bytes)
47

48
            Fired when control data has been received for a :class:`~.Tally`
49

50
            .. versionadded:: 0.0.3
51

52
        .. event:: on_screen_added(screen: Screen)
53

54
            Fired when a :class:`~.Screen` instance is added to :attr:`screens`
55

56
            .. versionadded:: 0.0.3
57

58
        .. event:: on_scontrol(screen: int, data: bytes)
59

60
            Fired when a message with :attr:`~.Message.scontrol` data is received
61

62
            * ``screen`` : The :attr:`~.Message.screen` from the incoming
63
              control message
64
            * ``data`` : The control data
65

66
            .. versionadded:: 0.0.2
67

68
    """
69

70
    DEFAULT_HOST: str = '0.0.0.0' #: The default host address to listen on
3✔
71
    DEFAULT_PORT: int = 65000 #: The default host port to listen on
3✔
72

73
    screens: Dict[int, Screen]
3✔
74
    """Mapping of :class:`~.Screen` objects by :attr:`~.Screen.index`
75

76
    .. versionadded:: 0.0.3
77
    """
78

79
    broadcast_screen: Screen
3✔
80
    """A :class:`~.Screen` instance created using :meth:`.Screen.broadcast`
81

82
    .. versionadded:: 0.0.3
83
    """
84

85
    tallies: Dict[TallyKey, Tally]
3✔
86
    """Mapping of :class:`~.Tally` objects by their :attr:`~.Tally.id`
87

88
    .. versionchanged:: 0.0.3
89
        The keys are now a combination of the :class:`~.Screen` and
90
        :class:`.Tally` indices
91
    """
92

93
    running: bool
3✔
94
    """``True`` if the client / server are running
95
    """
96

97
    loop: asyncio.BaseEventLoop
3✔
98
    """The :class:`asyncio.BaseEventLoop` associated with the instance"""
99

100
    _events_ = [
3✔
101
        'on_tally_added', 'on_tally_updated', 'on_tally_control',
102
        'on_screen_added', 'on_scontrol',
103
    ]
104
    def __init__(self, hostaddr: str = DEFAULT_HOST, hostport: int = DEFAULT_PORT):
3✔
105
        self.__hostaddr = hostaddr
3✔
106
        self.__hostport = hostport
3✔
107
        self.screens = {}
3✔
108
        self.broadcast_screen = Screen(0xffff)
3✔
109
        self._bind_screen(self.broadcast_screen)
3✔
110
        self.screens[self.broadcast_screen.index] = self.broadcast_screen
3✔
111
        self.tallies = {}
3✔
112
        self.loop = asyncio.get_event_loop()
3✔
113
        self.running = False
3✔
114
        self._connect_lock = asyncio.Lock()
3✔
115
        self.connected_evt = asyncio.Event()
3✔
116

117
    @property
3✔
118
    def hostaddr(self) -> str:
3✔
119
        """The local host address to bind the server to
120
        """
121
        return self.__hostaddr
3✔
122

123
    @property
3✔
124
    def hostport(self) -> int:
3✔
125
        """The port to listen on
126
        """
127
        return self.__hostport
3✔
128

129
    async def open(self):
3✔
130
        """Open the server
131
        """
132
        async with self._connect_lock:
3✔
133
            if self.running:
3✔
134
                return
×
135
            logger.debug('UmdReceiver.open()')
3✔
136
            self.running = True
3✔
137
            self.connected_evt.clear()
3✔
138
            self.transport, self.protocol = await self.loop.create_datagram_endpoint(
3✔
139
                lambda: UmdProtocol(self),
140
                local_addr=(self.hostaddr, self.hostport),
141
                reuse_port=True,
142
            )
143
            await self.connected_evt.wait()
3✔
144
            logger.info('UmdReceiver running')
3✔
145

146
    async def close(self):
3✔
147
        """Close the server
148
        """
149
        async with self._connect_lock:
3✔
150
            if not self.running:
3✔
151
                return
×
152
            logger.debug('UmdReceiver.close()')
3✔
153
            self.running = False
3✔
154
            self.transport.close()
3✔
155
            self.connected_evt.clear()
3✔
156
            logger.info('UmdReceiver closed')
3✔
157

158
    async def set_bind_address(self, hostaddr: str, hostport: int):
3✔
159
        """Set the :attr:`hostaddr` and :attr:`hostport` and restart the server
160
        """
161
        if hostaddr == self.hostaddr and hostport == self.hostport:
3✔
162
            return
×
163
        running = self.running
3✔
164
        if running:
3✔
165
            await self.close()
3✔
166
        self.__hostaddr = hostaddr
3✔
167
        self.__hostport = hostport
3✔
168
        if running:
3✔
169
            await self.open()
3✔
170

171
    async def set_hostaddr(self, hostaddr: str):
3✔
172
        """Set the :attr:`hostaddr` and restart the server
173
        """
174
        await self.set_bind_address(hostaddr, self.hostport)
3✔
175

176
    async def set_hostport(self, hostport: int):
3✔
177
        """Set the :attr:`hostport` and restart the server
178
        """
179
        await self.set_bind_address(self.hostaddr, hostport)
3✔
180

181
    def parse_incoming(self, data: bytes, addr: Tuple[str, int]):
3✔
182
        """Parse data received by the server
183
        """
184
        while True:
1✔
185
            message, remaining = Message.parse(data)
3✔
186
            if message.screen not in self.screens:
3✔
187
                screen = Screen(message.screen)
3✔
188
                self.screens[screen.index] = screen
3✔
189
                self._bind_screen(screen)
3✔
190
                self.emit('on_screen_added', screen)
3✔
191
                logger.debug(f'new screen: {screen.index}')
3✔
192
            else:
193
                screen = self.screens[message.screen]
3✔
194

195
            if message.is_broadcast:
3✔
196
                for screen in self.screens.values():
3✔
197
                    screen.update_from_message(message)
3✔
198
            else:
199
                screen.update_from_message(message)
3✔
200
            if not len(remaining):
3✔
201
                break
3✔
202

203
    def _bind_screen(self, screen: Screen):
3✔
204
        screen.bind(
3✔
205
            on_tally_added=self._on_screen_tally_added,
206
            on_tally_update=self._on_screen_tally_update,
207
            on_tally_control=self._on_screen_tally_control,
208
            on_control=self._on_screen_control,
209
        )
210

211
    def _on_screen_tally_added(self, tally: Tally, **kwargs):
3✔
212
        if tally.id not in self.tallies:
3✔
213
            self.tallies[tally.id] = tally
3✔
214
        self.emit('on_tally_added', tally, **kwargs)
3✔
215

216
    def _on_screen_tally_update(self, *args, **kwargs):
3✔
217
        self.emit('on_tally_updated', *args, **kwargs)
3✔
218

219
    def _on_screen_tally_control(self, *args, **kwargs):
3✔
220
        self.emit('on_tally_control', *args, **kwargs)
3✔
221

222
    def _on_screen_control(self, *args, **kwargs):
3✔
223
        self.emit('on_scontrol', *args, **kwargs)
3✔
224

225
    async def __aenter__(self):
3✔
226
        await self.open()
3✔
227
        return self
3✔
228

229
    async def __aexit__(self, exc_type, exc_value, traceback):
3✔
230
        await self.close()
3✔
231

232

233
if __name__ == '__main__':
234
    loop = asyncio.get_event_loop()
235
    umd = UmdReceiver()
236

237
    loop.run_until_complete(umd.open())
238
    try:
239
        loop.run_forever()
240
    except KeyboardInterrupt:
241
        loop.run_until_complete(umd.close())
242
    finally:
243
        loop.close()
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc