• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nocarryr / tslumd / 5833948469

pending completion
5833948469

push

github

nocarryr
Use pytest_asyncio.fixture for teardown fixtures

841 of 897 relevant lines covered (93.76%)

2.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.26
/src/tslumd/messages.py
1
import asyncio
3✔
2
import dataclasses
3✔
3
from dataclasses import dataclass, field
3✔
4
import enum
3✔
5
import struct
3✔
6
from typing import List, Tuple, Dict, Iterable, Optional
3✔
7

8
from tslumd import MessageType, TallyColor, Tally
3✔
9

10
__all__ = (
3✔
11
    'Display', 'Message', 'ParseError', 'MessageParseError',
12
    'DmsgParseError', 'DmsgControlParseError', 'MessageLengthError',
13
)
14

15

16
class ParseError(Exception):
3✔
17
    """Raised on errors during message parsing
18

19
    .. versionadded:: 0.0.2
20
    """
21
    msg: str #: Error message
3✔
22
    value: bytes #: The relevant message bytes containing the error
3✔
23
    def __init__(self, msg: str, value: bytes):
3✔
24
        self.msg = msg
3✔
25
        self.value = value
3✔
26
    def __str__(self):
3✔
27
        return f'{self.msg}: "{self.value!r}"'
3✔
28

29
class MessageParseError(ParseError):
3✔
30
    """Raised on errors while parsing :class:`Message` objects
31

32
    .. versionadded:: 0.0.2
33
    """
34
    pass
3✔
35

36
class DmsgParseError(ParseError):
3✔
37
    """Raised on errors while parsing :class:`Display` objects
38

39
    .. versionadded:: 0.0.2
40
    """
41
    pass
3✔
42

43
class DmsgControlParseError(ParseError):
3✔
44
    """Raised on errors when parsing :attr:`Display.control` data
45

46
    .. versionadded:: 0.0.2
47
    """
48
    pass
3✔
49

50
class MessageLengthError(ValueError):
3✔
51
    """Raised when message length is larger than 2048 bytes
52

53
    .. versionadded:: 0.0.4
54
    """
55

56

57
class Flags(enum.IntFlag):
3✔
58
    """Message flags
59
    """
60
    NO_FLAGS = 0 #: No flags set
3✔
61
    UTF16 = 1
3✔
62
    """Indicates text formatted as ``UTF-16LE`` if set, otherwise ``UTF-8``"""
63

64
    SCONTROL = 2
3✔
65
    """Indicates the message contains ``SCONTROL`` data if set, otherwise ``DMESG``
3✔
66
    """
67

68

69
@dataclass
3✔
70
class Display:
2✔
71
    """A single tally "display"
72
    """
73
    index: int #: The display index from 0 to 65534 (``0xFFFE``)
3✔
74
    rh_tally: TallyColor = TallyColor.OFF #: Right hand tally indicator
3✔
75
    txt_tally: TallyColor = TallyColor.OFF #: Text tally indicator
3✔
76
    lh_tally: TallyColor = TallyColor.OFF #: Left hand tally indicator
3✔
77
    brightness: int = 3 #: Display brightness (from 0 to 3)
3✔
78
    text: str = '' #: Text to display
3✔
79
    control: bytes = b''
3✔
80
    """Control data (if :attr:`type` is :attr:`~.MessageType.control`)
81

82
    .. versionadded:: 0.0.2
83
    """
84

85
    type: MessageType = MessageType.display
3✔
86
    """The message type. One of :attr:`~.MessageType.display` or
87
    :attr:`~.MessageType.control`.
88

89
    * For :attr:`~.MessageType.display` (the default), the message contains
90
      :attr:`text` information and the :attr:`control` field must be empty.
91
    * For :attr:`~.MessageType.control`, the message contains :attr:`control`
92
      data and the :attr:`text` field must be empty
93

94
    .. versionadded:: 0.0.2
95
    """
96

97
    is_broadcast: bool = field(init=False)
3✔
98
    """``True`` if the display is to a "broadcast", meaning sent to all display
99
    indices.
100

101
    (if the :attr:`index` is ``0xffff``)
102

103
    .. versionadded:: 0.0.2
104
    """
105

106
    def __post_init__(self):
3✔
107
        self.is_broadcast = self.index == 0xffff
3✔
108
        if len(self.control):
3✔
109
            self.type = MessageType.control
3✔
110
        if self.type == MessageType.control and len(self.text):
3✔
111
            raise ValueError('Control message cannot contain text')
3✔
112

113
    @classmethod
3✔
114
    def broadcast(cls, **kwargs) -> 'Display':
3✔
115
        """Create a :attr:`broadcast <is_broadcast>` display
116

117
        (with :attr:`index` set to ``0xffff``)
118

119
        .. versionadded:: 0.0.2
120
        """
121
        kwargs = kwargs.copy()
3✔
122
        kwargs['index'] = 0xffff
3✔
123
        return cls(**kwargs)
3✔
124

125
    @classmethod
3✔
126
    def from_dmsg(cls, flags: Flags, dmsg: bytes) -> Tuple['Display', bytes]:
3✔
127
        """Construct an instance from a ``DMSG`` portion of received message.
128

129
        Any remaining message data after the relevant ``DMSG`` is returned along
130
        with the instance.
131
        """
132
        if len(dmsg) < 4:
3✔
133
            raise DmsgParseError('Invalid dmsg length', dmsg)
3✔
134
        hdr = struct.unpack('<2H', dmsg[:4])
3✔
135
        dmsg = dmsg[4:]
3✔
136
        ctrl = hdr[1]
3✔
137
        kw = dict(
3✔
138
            index=hdr[0],
139
            rh_tally=TallyColor(ctrl & 0b11),
140
            txt_tally=TallyColor(ctrl >> 2 & 0b11),
141
            lh_tally=TallyColor(ctrl >> 4 & 0b11),
142
            brightness=ctrl >> 6 & 0b11,
143
        )
144
        is_control_data = ctrl & 0x8000 == 0x8000
3✔
145
        if is_control_data:
3✔
146
            ctrl, dmsg = cls._unpack_control_data(dmsg)
3✔
147
            kw['control'] = ctrl
3✔
148
            kw['type'] = MessageType.control
3✔
149
        else:
150
            if len(dmsg) < 2:
3✔
151
                raise DmsgParseError('Invalid text length field', dmsg)
3✔
152
            txt_byte_len = struct.unpack('<H', dmsg[:2])[0]
3✔
153
            dmsg = dmsg[2:]
3✔
154
            txt_bytes = dmsg[:txt_byte_len]
3✔
155
            dmsg = dmsg[txt_byte_len:]
3✔
156
            if len(txt_bytes) != txt_byte_len:
3✔
157
                raise DmsgParseError(
3✔
158
                    f'Invalid text bytes. Expected {txt_byte_len}',
159
                    txt_bytes,
160
                )
161
            if Flags.UTF16 in flags:
3✔
162
                txt = txt_bytes.decode('UTF-16le')
×
163
            else:
164
                if b'\0' in txt_bytes:
3✔
165
                    txt_bytes = txt_bytes.split(b'\0')[0]
3✔
166
                txt = txt_bytes.decode('UTF-8')
3✔
167
            kw['text'] = txt
3✔
168
        return cls(**kw), dmsg
3✔
169

170
    @staticmethod
3✔
171
    def _unpack_control_data(data: bytes) -> bytes:
3✔
172
        """Unpack control data (if control bit 15 is set)
173

174
        Arguments:
175
            data: The portion of the ``dmsg`` at the start of the
176
                "Control Data" field
177

178
        Returns:
179
            bytes: remaining
180
                The remaining message data after the control data field
181

182
        Note:
183
            This is undefined as of UMDv5.0 and its implementation is
184
            the author's "best guess" based off of other areas of the protocol
185

186
        .. versionadded:: 0.0.2
187

188
        :meta public:
189
        """
190
        if len(data) < 2:
3✔
191
            raise DmsgControlParseError('Unknown control data format', data)
3✔
192
        length = struct.unpack('<H', data[:2])[0]
3✔
193
        data = data[2:]
3✔
194
        if len(data) < length:
3✔
195
            raise DmsgControlParseError('Unknown control data format', data)
3✔
196
        return data[:length], data[length:]
3✔
197

198
    @staticmethod
3✔
199
    def _pack_control_data(data: bytes) -> bytes:
3✔
200
        """Pack control data (if control bit 15 is set)
201

202
        Arguments:
203
            data: The control data to pack
204

205
        Returns:
206
            bytes: packed
207
                The packed control data
208

209
        Note:
210
            This is undefined as of UMDv5.0 and its implementation is
211
            the author's "best guess" based off of other areas of the protocol
212

213
        .. versionadded:: 0.0.2
214

215
        :meta public:
216
        """
217
        length = len(data)
3✔
218
        return struct.pack(f'<H{length}s', length, data)
3✔
219

220
    def to_dmsg(self, flags: Flags) -> bytes:
3✔
221
        """Build ``dmsg`` bytes to be included in a message
222
        (called from :meth:`Message.build_message`)
223
        """
224
        ctrl = self.rh_tally & 0b11
3✔
225
        ctrl += (self.txt_tally & 0b11) << 2
3✔
226
        ctrl += (self.lh_tally & 0b11) << 4
3✔
227
        ctrl += (self.brightness & 0b11) << 6
3✔
228
        if self.type == MessageType.control:
3✔
229
            ctrl |= 0x8000
3✔
230
            data = bytearray(struct.pack('<2H', self.index, ctrl))
3✔
231
            data.extend(self._pack_control_data(self.control))
3✔
232
        else:
233
            if Flags.UTF16 in flags:
3✔
234
                txt_bytes = bytes(self.text, 'UTF16-le')
×
235
            else:
236
                txt_bytes = bytes(self.text, 'UTF-8')
3✔
237
            txt_byte_len = len(txt_bytes)
3✔
238
            data = bytearray(struct.pack('<3H', self.index, ctrl, txt_byte_len))
3✔
239
            data.extend(txt_bytes)
3✔
240
        return data
3✔
241

242
    def to_dict(self) -> Dict:
3✔
243
        d = dataclasses.asdict(self)
3✔
244
        del d['is_broadcast']
3✔
245
        return d
3✔
246

247
    @classmethod
3✔
248
    def from_tally(cls, tally: Tally, msg_type: MessageType = MessageType.display) -> 'Display':
3✔
249
        """Create a :class:`Display` from the given :class:`~.Tally`
250

251
        .. versionadded:: 0.0.2
252
            The msg_type argument
253
        """
254
        kw = tally.to_dict()
3✔
255
        del kw['id']
3✔
256
        if msg_type == MessageType.control:
3✔
257
            del kw['text']
3✔
258
        elif msg_type == MessageType.display:
3✔
259
            del kw['control']
3✔
260
        kw['type'] = msg_type
3✔
261
        return cls(**kw)
3✔
262

263
    def __eq__(self, other):
3✔
264
        if not isinstance(other, (Display, Tally)):
3✔
265
            return NotImplemented
266
        self_dict = self.to_dict()
3✔
267
        oth_dict = other.to_dict()
3✔
268
        if isinstance(other, Display):
3✔
269
            return self_dict == oth_dict
3✔
270
        else:
271
            del oth_dict['id']
3✔
272

273
        del self_dict['type']
3✔
274
        if self.type == MessageType.control:
3✔
275
            del self_dict['text']
3✔
276
            del oth_dict['text']
3✔
277
        else:
278
            del self_dict['control']
3✔
279
            del oth_dict['control']
3✔
280

281
        return self_dict == oth_dict
3✔
282

283
    def __ne__(self, other):
3✔
284
        if not isinstance(other, (Display, Tally)):
×
285
            return NotImplemented
286
        return not self.__eq__(other)
×
287

288
@dataclass
3✔
289
class Message:
2✔
290
    """A single UMDv5 message packet
291
    """
292
    version: int = 0 #: Protocol minor version
3✔
293
    flags: int = Flags.NO_FLAGS #: The message :class:`Flags` field
3✔
294
    screen: int = 0 #: Screen index from 0 to 65534 (``0xFFFE``)
3✔
295
    displays: List[Display] = field(default_factory=list)
3✔
296
    """A list of :class:`Display` instances"""
297

298
    scontrol: bytes = b''
3✔
299
    """SCONTROL data (if :attr:`type` is :attr:`~.MessageType.control`)"""
300

301
    type: MessageType = MessageType.display
3✔
302
    """The message type. One of :attr:`~.MessageType.display` or
303
    :attr:`~.MessageType.control`.
304

305
    * For :attr:`~.MessageType.display` (the default), the contents of
306
      :attr:`displays` are used and the :attr:`scontrol` field must be empty.
307
    * For :attr:`~.MessageType.control`, the :attr:`scontrol` field is used and
308
      :attr:`displays` must be empty.
309

310
    .. versionadded:: 0.0.2
311
    """
312

313
    is_broadcast: bool = field(init=False)
3✔
314
    """``True`` if the message is to be "broadcast" to all screens.
315

316
    (if :attr:`screen` is ``0xffff``)
317

318
    .. versionadded:: 0.0.2
319
    """
320

321
    def __post_init__(self):
3✔
322
        self.is_broadcast = self.screen == 0xffff
3✔
323
        if not isinstance(self.flags, Flags):
3✔
324
            self.flags = Flags(self.flags)
3✔
325

326
        if len(self.scontrol) and len(self.displays):
3✔
327
            raise ValueError('SCONTROL message cannot contain displays')
3✔
328

329
        if len(self.scontrol):
3✔
330
            self.type = MessageType.control
3✔
331

332
        if self.type == MessageType.control:
3✔
333
            self.flags |= Flags.SCONTROL
3✔
334
        elif self.type == MessageType._unset:
3✔
335
            if Flags.SCONTROL in self.flags:
3✔
336
                self.type = MessageType.control
3✔
337
            else:
338
                self.type = MessageType.display
3✔
339

340
    @classmethod
3✔
341
    def broadcast(cls, **kwargs) -> 'Message':
3✔
342
        """Create a :attr:`broadcast <is_broadcast>` message
343

344
        (with :attr:`screen` set to ``0xffff``)
345

346
        .. versionadded:: 0.0.2
347
        """
348
        kwargs = kwargs.copy()
3✔
349
        kwargs['screen'] = 0xffff
3✔
350
        return cls(**kwargs)
3✔
351

352
    @classmethod
3✔
353
    def parse(cls, msg: bytes) -> Tuple['Message', bytes]:
3✔
354
        """Parse incoming message data to create a :class:`Message` instance.
355

356
        Any remaining message data after parsing is returned along with the instance.
357
        """
358
        if len(msg) < 6:
3✔
359
            raise MessageParseError('Invalid header length', msg)
3✔
360
        data = struct.unpack('<HBBH', msg[:6])
3✔
361
        byte_count, version, flags, screen = data
3✔
362
        kw = dict(
3✔
363
            version=version,
364
            flags=Flags(flags),
365
            screen=screen,
366
            type=MessageType._unset,
367
        )
368
        msg = msg[2:]
3✔
369
        if len(msg) < byte_count:
3✔
370
            raise MessageParseError(
3✔
371
                f'Invalid byte count. Expected {byte_count}, got {len(msg)}',
372
                msg,
373
            )
374
        remaining = msg[byte_count:]
3✔
375
        msg = msg[4:byte_count]
3✔
376
        obj = cls(**kw)
3✔
377
        if obj.type == MessageType.control:
3✔
378
            obj.scontrol = msg
3✔
379
            return obj, remaining
3✔
380
        while len(msg):
3✔
381
            disp, msg = Display.from_dmsg(obj.flags, msg)
3✔
382
            obj.displays.append(disp)
3✔
383
        return obj, remaining
3✔
384

385
    def build_message(self, ignore_packet_length: Optional[bool] = False) -> bytes:
3✔
386
        """Build a message packet from data in this instance
387

388
        Arguments:
389
            ignore_packet_length (bool, optional): If ``False``, the message limit
390
                of 2048 bytes is respected, and if exceeded, an exception is raised.
391
                Otherwise, the limit is ignored. (default is False)
392

393
        Raises:
394
            MessageLengthError: If the message packet is larger than 2048 bytes
395
                (and ``ignore_packet_length`` is False)
396

397
        Note:
398
            This method is retained for backwards compatability. To properly
399
            handle the message limit, use :meth:`build_messages`
400

401
        .. versionchanged:: 0.0.4
402

403
            * The ``ignore_packet_length`` parameter was added
404
            * Message length is limited to 2048 bytes
405
        """
406
        it = self.build_messages(ignore_packet_length=ignore_packet_length)
3✔
407
        data = next(it)
3✔
408
        try:
3✔
409
            next_data = next(it)
3✔
410
        except StopIteration:
3✔
411
            pass
3✔
412
        else:
413
            if not ignore_packet_length:
3✔
414
                raise MessageLengthError()
3✔
415
        return data
3✔
416

417
    def build_messages(self, ignore_packet_length: Optional[bool] = False) -> Iterable[bytes]:
3✔
418
        """Build message packet(s) from data in this instance as an iterator
419

420
        The specified maximum packet length of 2048 is respected and if
421
        necessary, the data will be split into separate messages.
422

423
        This method will always function as a :term:`generator`, regardless of
424
        the number of message packets produced.
425

426
        .. versionadded:: 0.0.4
427
        """
428
        msg_len_exceeded = False
3✔
429
        next_disp_index = None
3✔
430
        if self.type == MessageType.control:
3✔
431
            payload = bytearray(self.scontrol)
3✔
432
            byte_count = len(payload)
3✔
433
            if byte_count + 6 > 2048:
3✔
434
                raise MessageLengthError()
3✔
435
        else:
436
            byte_count = 0
3✔
437
            payload = bytearray()
3✔
438
            for disp_index, display in enumerate(self.displays):
3✔
439
                disp_payload = display.to_dmsg(self.flags)
3✔
440
                disp_len = len(disp_payload)
3✔
441
                if not ignore_packet_length:
3✔
442
                    if byte_count + disp_len + 6 >= 2048:
3✔
443
                        if disp_index == 0:
3✔
444
                            raise MessageLengthError()
3✔
445
                        msg_len_exceeded = True
3✔
446
                        next_disp_index = disp_index
3✔
447
                        break
3✔
448
                byte_count += disp_len
3✔
449
                payload.extend(disp_payload)
3✔
450
        fmt = f'<HBBH{byte_count}B'
3✔
451
        pbc = struct.calcsize(fmt) - 2
3✔
452
        data = bytearray(struct.pack('<HBBH', pbc, self.version, self.flags, self.screen))
3✔
453
        data.extend(payload)
3✔
454
        yield bytes(data)
3✔
455

456
        if msg_len_exceeded:
3✔
457
            displays = self.displays[next_disp_index:]
3✔
458
            attrs = ('version', 'flags', 'screen', 'scontrol', 'type')
3✔
459
            kw = {attr:getattr(self, attr) for attr in attrs}
3✔
460
            kw['displays'] = displays
3✔
461
            sub_msg = Message(**kw)
3✔
462
            yield from sub_msg.build_messages()
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc