• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nocarryr / tslumd / 7388595068

02 Jan 2024 05:18PM UTC coverage: 94.411%. First build
7388595068

push

github

nocarryr
Run doctest in separate pytest invocation

929 of 984 relevant lines covered (94.41%)

4.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.33
/src/tslumd/messages.py
1
from __future__ import annotations
5✔
2
import asyncio
5✔
3
import dataclasses
5✔
4
from dataclasses import dataclass, field
5✔
5
import enum
5✔
6
import struct
5✔
7
from typing import Tuple, Iterator, Any, cast
5✔
8

9
from tslumd import MessageType, TallyColor, Tally
5✔
10

11
__all__ = (
5✔
12
    'Display', 'Message', 'ParseError', 'MessageParseError',
13
    'DmsgParseError', 'DmsgControlParseError', 'MessageLengthError',
14
)
15

16

17
class ParseError(Exception):
5✔
18
    """Raised on errors during message parsing
19

20
    .. versionadded:: 0.0.2
21
    """
22
    msg: str #: Error message
5✔
23
    value: bytes #: The relevant message bytes containing the error
5✔
24
    def __init__(self, msg: str, value: bytes):
5✔
25
        self.msg = msg
5✔
26
        self.value = value
5✔
27
    def __str__(self):
5✔
28
        return f'{self.msg}: "{self.value!r}"'
5✔
29

30
class MessageParseError(ParseError):
5✔
31
    """Raised on errors while parsing :class:`Message` objects
32

33
    .. versionadded:: 0.0.2
34
    """
35
    pass
5✔
36

37
class DmsgParseError(ParseError):
5✔
38
    """Raised on errors while parsing :class:`Display` objects
39

40
    .. versionadded:: 0.0.2
41
    """
42
    pass
5✔
43

44
class DmsgControlParseError(ParseError):
5✔
45
    """Raised on errors when parsing :attr:`Display.control` data
46

47
    .. versionadded:: 0.0.2
48
    """
49
    pass
5✔
50

51
class MessageLengthError(ValueError):
5✔
52
    """Raised when message length is larger than 2048 bytes
53

54
    .. versionadded:: 0.0.4
55
    """
56

57

58
class Flags(enum.IntFlag):
5✔
59
    """Message flags
60
    """
61
    NO_FLAGS = 0 #: No flags set
5✔
62
    UTF16 = 1
5✔
63
    """Indicates text formatted as ``UTF-16LE`` if set, otherwise ``UTF-8``"""
2✔
64

65
    SCONTROL = 2
5✔
66
    """Indicates the message contains ``SCONTROL`` data if set, otherwise ``DMESG``
5✔
67
    """
68

69

70
@dataclass
5✔
71
class Display:
4✔
72
    """A single tally "display"
73
    """
74
    index: int #: The display index from 0 to 65534 (``0xFFFE``)
5✔
75
    rh_tally: TallyColor = TallyColor.OFF #: Right hand tally indicator
5✔
76
    txt_tally: TallyColor = TallyColor.OFF #: Text tally indicator
5✔
77
    lh_tally: TallyColor = TallyColor.OFF #: Left hand tally indicator
5✔
78
    brightness: int = 3 #: Display brightness (from 0 to 3)
5✔
79
    text: str = '' #: Text to display
5✔
80
    control: bytes = b''
5✔
81
    """Control data (if :attr:`type` is :attr:`~.MessageType.control`)
2✔
82

83
    .. versionadded:: 0.0.2
84
    """
85

86
    type: MessageType = MessageType.display
5✔
87
    """The message type. One of :attr:`~.MessageType.display` or
2✔
88
    :attr:`~.MessageType.control`.
89

90
    * For :attr:`~.MessageType.display` (the default), the message contains
91
      :attr:`text` information and the :attr:`control` field must be empty.
92
    * For :attr:`~.MessageType.control`, the message contains :attr:`control`
93
      data and the :attr:`text` field must be empty
94

95
    .. versionadded:: 0.0.2
96
    """
97

98
    is_broadcast: bool = field(init=False)
5✔
99
    """``True`` if the display is to a "broadcast", meaning sent to all display
2✔
100
    indices.
101

102
    (if the :attr:`index` is ``0xffff``)
103

104
    .. versionadded:: 0.0.2
105
    """
106

107
    def __post_init__(self):
5✔
108
        self.is_broadcast = self.index == 0xffff
5✔
109
        if len(self.control):
5✔
110
            self.type = MessageType.control
5✔
111
        if self.type == MessageType.control and len(self.text):
5✔
112
            raise ValueError('Control message cannot contain text')
5✔
113

114
    @classmethod
5✔
115
    def broadcast(cls, **kwargs) -> Display:
4✔
116
        """Create a :attr:`broadcast <is_broadcast>` display
117

118
        (with :attr:`index` set to ``0xffff``)
119

120
        .. versionadded:: 0.0.2
121
        """
122
        kwargs = kwargs.copy()
5✔
123
        kwargs['index'] = 0xffff
5✔
124
        return cls(**kwargs)
5✔
125

126
    @classmethod
5✔
127
    def from_dmsg(cls, flags: Flags, dmsg: bytes) -> Tuple[Display, bytes]:
4✔
128
        """Construct an instance from a ``DMSG`` portion of received message.
129

130
        Any remaining message data after the relevant ``DMSG`` is returned along
131
        with the instance.
132
        """
133
        if len(dmsg) < 4:
5✔
134
            raise DmsgParseError('Invalid dmsg length', dmsg)
5✔
135
        hdr = struct.unpack('<2H', dmsg[:4])
5✔
136
        hdr = cast(Tuple[int, int], hdr)
5✔
137
        dmsg = dmsg[4:]
5✔
138
        ctrl = hdr[1]
5✔
139
        kw: dict[str, Any] = dict(
5✔
140
            index=hdr[0],
141
            rh_tally=TallyColor(ctrl & 0b11),
142
            txt_tally=TallyColor(ctrl >> 2 & 0b11),
143
            lh_tally=TallyColor(ctrl >> 4 & 0b11),
144
            brightness=ctrl >> 6 & 0b11,
145
        )
146
        is_control_data = ctrl & 0x8000 == 0x8000
5✔
147
        if is_control_data:
5✔
148
            ctrl, dmsg = cls._unpack_control_data(dmsg)
5✔
149
            kw['control'] = ctrl
5✔
150
            kw['type'] = MessageType.control
5✔
151
        else:
152
            if len(dmsg) < 2:
5✔
153
                raise DmsgParseError('Invalid text length field', dmsg)
5✔
154
            txt_byte_len = struct.unpack('<H', dmsg[:2])[0]
5✔
155
            dmsg = dmsg[2:]
5✔
156
            txt_bytes = dmsg[:txt_byte_len]
5✔
157
            dmsg = dmsg[txt_byte_len:]
5✔
158
            if len(txt_bytes) != txt_byte_len:
5✔
159
                raise DmsgParseError(
5✔
160
                    f'Invalid text bytes. Expected {txt_byte_len}',
161
                    txt_bytes,
162
                )
163
            if Flags.UTF16 in flags:
5✔
164
                txt = txt_bytes.decode('UTF-16le')
×
165
            else:
166
                if b'\0' in txt_bytes:
5✔
167
                    txt_bytes = txt_bytes.split(b'\0')[0]
5✔
168
                txt = txt_bytes.decode('UTF-8')
5✔
169
            kw['text'] = txt
5✔
170
        return cls(**kw), dmsg
5✔
171

172
    @staticmethod
5✔
173
    def _unpack_control_data(data: bytes) -> Tuple[bytes, bytes]:
4✔
174
        """Unpack control data (if control bit 15 is set)
175

176
        Arguments:
177
            data: The portion of the ``dmsg`` at the start of the
178
                "Control Data" field
179

180
        Returns:
181
            bytes: remaining
182
                The remaining message data after the control data field
183

184
        Note:
185
            This is undefined as of UMDv5.0 and its implementation is
186
            the author's "best guess" based off of other areas of the protocol
187

188
        .. versionadded:: 0.0.2
189

190
        :meta public:
191
        """
192
        if len(data) < 2:
5✔
193
            raise DmsgControlParseError('Unknown control data format', data)
5✔
194
        length = struct.unpack('<H', data[:2])[0]
5✔
195
        data = data[2:]
5✔
196
        if len(data) < length:
5✔
197
            raise DmsgControlParseError('Unknown control data format', data)
5✔
198
        return data[:length], data[length:]
5✔
199

200
    @staticmethod
5✔
201
    def _pack_control_data(data: bytes) -> bytes:
4✔
202
        """Pack control data (if control bit 15 is set)
203

204
        Arguments:
205
            data: The control data to pack
206

207
        Returns:
208
            bytes: packed
209
                The packed control data
210

211
        Note:
212
            This is undefined as of UMDv5.0 and its implementation is
213
            the author's "best guess" based off of other areas of the protocol
214

215
        .. versionadded:: 0.0.2
216

217
        :meta public:
218
        """
219
        length = len(data)
5✔
220
        return struct.pack(f'<H{length}s', length, data)
5✔
221

222
    def to_dmsg(self, flags: Flags) -> bytes:
5✔
223
        """Build ``dmsg`` bytes to be included in a message
224
        (called from :meth:`Message.build_message`)
225
        """
226
        ctrl = self.rh_tally & 0b11
5✔
227
        ctrl += (self.txt_tally & 0b11) << 2
5✔
228
        ctrl += (self.lh_tally & 0b11) << 4
5✔
229
        ctrl += (self.brightness & 0b11) << 6
5✔
230
        if self.type == MessageType.control:
5✔
231
            ctrl |= 0x8000
5✔
232
            data = bytearray(struct.pack('<2H', self.index, ctrl))
5✔
233
            data.extend(self._pack_control_data(self.control))
5✔
234
        else:
235
            if Flags.UTF16 in flags:
5✔
236
                txt_bytes = bytes(self.text, 'UTF-16le')
×
237
            else:
238
                txt_bytes = bytes(self.text, 'UTF-8')
5✔
239
            txt_byte_len = len(txt_bytes)
5✔
240
            data = bytearray(struct.pack('<3H', self.index, ctrl, txt_byte_len))
5✔
241
            data.extend(txt_bytes)
5✔
242
        return data
5✔
243

244
    def to_dict(self) -> dict:
5✔
245
        d = dataclasses.asdict(self)
5✔
246
        del d['is_broadcast']
5✔
247
        return d
5✔
248

249
    @classmethod
5✔
250
    def from_tally(cls, tally: Tally, msg_type: MessageType = MessageType.display) -> Display:
5✔
251
        """Create a :class:`Display` from the given :class:`~.Tally`
252

253
        .. versionadded:: 0.0.2
254
            The msg_type argument
255
        """
256
        kw = tally.to_dict()
5✔
257
        del kw['id']
5✔
258
        if msg_type == MessageType.control:
5✔
259
            del kw['text']
5✔
260
        elif msg_type == MessageType.display:
5✔
261
            del kw['control']
5✔
262
        kw['type'] = msg_type
5✔
263
        return cls(**kw)
5✔
264

265
    def __eq__(self, other):
5✔
266
        if not isinstance(other, (Display, Tally)):
5✔
267
            return NotImplemented
268
        self_dict = self.to_dict()
5✔
269
        oth_dict = other.to_dict()
5✔
270
        if isinstance(other, Display):
5✔
271
            return self_dict == oth_dict
5✔
272
        else:
273
            del oth_dict['id']
5✔
274

275
        del self_dict['type']
5✔
276
        if self.type == MessageType.control:
5✔
277
            del self_dict['text']
5✔
278
            del oth_dict['text']
5✔
279
        else:
280
            del self_dict['control']
5✔
281
            del oth_dict['control']
5✔
282

283
        return self_dict == oth_dict
5✔
284

285
    def __ne__(self, other):
5✔
286
        if not isinstance(other, (Display, Tally)):
×
287
            return NotImplemented
288
        return not self.__eq__(other)
×
289

290
@dataclass
5✔
291
class Message:
4✔
292
    """A single UMDv5 message packet
293
    """
294
    version: int = 0 #: Protocol minor version
5✔
295
    flags: Flags = Flags.NO_FLAGS #: The message :class:`Flags` field
5✔
296
    screen: int = 0 #: Screen index from 0 to 65534 (``0xFFFE``)
5✔
297
    displays: list[Display] = field(default_factory=list)
5✔
298
    """A list of :class:`Display` instances"""
2✔
299

300
    scontrol: bytes = b''
5✔
301
    """SCONTROL data (if :attr:`type` is :attr:`~.MessageType.control`)"""
2✔
302

303
    type: MessageType = MessageType.display
5✔
304
    """The message type. One of :attr:`~.MessageType.display` or
2✔
305
    :attr:`~.MessageType.control`.
306

307
    * For :attr:`~.MessageType.display` (the default), the contents of
308
      :attr:`displays` are used and the :attr:`scontrol` field must be empty.
309
    * For :attr:`~.MessageType.control`, the :attr:`scontrol` field is used and
310
      :attr:`displays` must be empty.
311

312
    .. versionadded:: 0.0.2
313
    """
314

315
    is_broadcast: bool = field(init=False)
5✔
316
    """``True`` if the message is to be "broadcast" to all screens.
2✔
317

318
    (if :attr:`screen` is ``0xffff``)
319

320
    .. versionadded:: 0.0.2
321
    """
322

323
    def __post_init__(self):
5✔
324
        self.is_broadcast = self.screen == 0xffff
5✔
325
        if not isinstance(self.flags, Flags):
5✔
326
            self.flags = Flags(self.flags)
5✔
327

328
        if len(self.scontrol) and len(self.displays):
5✔
329
            raise ValueError('SCONTROL message cannot contain displays')
5✔
330

331
        if len(self.scontrol):
5✔
332
            self.type = MessageType.control
5✔
333

334
        if self.type == MessageType.control:
5✔
335
            self.flags |= Flags.SCONTROL
5✔
336
        elif self.type == MessageType._unset:
5✔
337
            if Flags.SCONTROL in self.flags:
5✔
338
                self.type = MessageType.control
5✔
339
            else:
340
                self.type = MessageType.display
5✔
341

342
    @classmethod
5✔
343
    def broadcast(cls, **kwargs) -> Message:
4✔
344
        """Create a :attr:`broadcast <is_broadcast>` message
345

346
        (with :attr:`screen` set to ``0xffff``)
347

348
        .. versionadded:: 0.0.2
349
        """
350
        kwargs = kwargs.copy()
5✔
351
        kwargs['screen'] = 0xffff
5✔
352
        return cls(**kwargs)
5✔
353

354
    @classmethod
5✔
355
    def parse(cls, msg: bytes) -> Tuple[Message, bytes]:
4✔
356
        """Parse incoming message data to create a :class:`Message` instance.
357

358
        Any remaining message data after parsing is returned along with the instance.
359
        """
360
        if len(msg) < 6:
5✔
361
            raise MessageParseError('Invalid header length', msg)
5✔
362
        data = struct.unpack('<HBBH', msg[:6])
5✔
363
        byte_count, version, flags, screen = data
5✔
364
        kw = dict(
5✔
365
            version=version,
366
            flags=Flags(flags),
367
            screen=screen,
368
            type=MessageType._unset,
369
        )
370
        msg = msg[2:]
5✔
371
        if len(msg) < byte_count:
5✔
372
            raise MessageParseError(
5✔
373
                f'Invalid byte count. Expected {byte_count}, got {len(msg)}',
374
                msg,
375
            )
376
        remaining = msg[byte_count:]
5✔
377
        msg = msg[4:byte_count]
5✔
378
        obj = cls(**kw)
5✔
379
        if obj.type == MessageType.control:
5✔
380
            obj.scontrol = msg
5✔
381
            return obj, remaining
5✔
382
        while len(msg):
5✔
383
            disp, msg = Display.from_dmsg(obj.flags, msg)
5✔
384
            obj.displays.append(disp)
5✔
385
        return obj, remaining
5✔
386

387
    def build_message(self, ignore_packet_length: bool = False) -> bytes:
5✔
388
        """Build a message packet from data in this instance
389

390
        Arguments:
391
            ignore_packet_length (bool, optional): If ``False``, the message limit
392
                of 2048 bytes is respected, and if exceeded, an exception is raised.
393
                Otherwise, the limit is ignored. (default is False)
394

395
        Raises:
396
            MessageLengthError: If the message packet is larger than 2048 bytes
397
                (and ``ignore_packet_length`` is False)
398

399
        Note:
400
            This method is retained for backwards compatability. To properly
401
            handle the message limit, use :meth:`build_messages`
402

403
        .. versionchanged:: 0.0.4
404

405
            * The ``ignore_packet_length`` parameter was added
406
            * Message length is limited to 2048 bytes
407
        """
408
        it = self.build_messages(ignore_packet_length=ignore_packet_length)
5✔
409
        data = next(it)
5✔
410
        try:
5✔
411
            next_data = next(it)
5✔
412
        except StopIteration:
5✔
413
            pass
5✔
414
        else:
415
            if not ignore_packet_length:
5✔
416
                raise MessageLengthError()
5✔
417
        return data
5✔
418

419
    def build_messages(self, ignore_packet_length: bool = False) -> Iterator[bytes]:
5✔
420
        """Build message packet(s) from data in this instance as an iterator
421

422
        The specified maximum packet length of 2048 is respected and if
423
        necessary, the data will be split into separate messages.
424

425
        This method will always function as a :term:`generator`, regardless of
426
        the number of message packets produced.
427

428
        .. versionadded:: 0.0.4
429
        """
430
        msg_len_exceeded = False
5✔
431
        next_disp_index = None
5✔
432
        if self.type == MessageType.control:
5✔
433
            payload = bytearray(self.scontrol)
5✔
434
            byte_count = len(payload)
5✔
435
            if byte_count + 6 > 2048:
5✔
436
                raise MessageLengthError()
5✔
437
        else:
438
            byte_count = 0
5✔
439
            payload = bytearray()
5✔
440
            for disp_index, display in enumerate(self.displays):
5✔
441
                disp_payload = display.to_dmsg(self.flags)
5✔
442
                disp_len = len(disp_payload)
5✔
443
                if not ignore_packet_length:
5✔
444
                    if byte_count + disp_len + 6 >= 2048:
5✔
445
                        if disp_index == 0:
5✔
446
                            raise MessageLengthError()
5✔
447
                        msg_len_exceeded = True
5✔
448
                        next_disp_index = disp_index
5✔
449
                        break
5✔
450
                byte_count += disp_len
5✔
451
                payload.extend(disp_payload)
5✔
452
        fmt = f'<HBBH{byte_count}B'
5✔
453
        pbc = struct.calcsize(fmt) - 2
5✔
454
        data = bytearray(struct.pack('<HBBH', pbc, self.version, self.flags, self.screen))
5✔
455
        data.extend(payload)
5✔
456
        yield bytes(data)
5✔
457

458
        if msg_len_exceeded:
5✔
459
            displays = self.displays[next_disp_index:]
5✔
460
            attrs = ('version', 'flags', 'screen', 'scontrol', 'type')
5✔
461
            kw = {attr:getattr(self, attr) for attr in attrs}
5✔
462
            kw['displays'] = displays
5✔
463
            sub_msg = Message(**kw)
5✔
464
            yield from sub_msg.build_messages()
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc