• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / SpiNNMan / 6574804013

19 Oct 2023 12:47PM UTC coverage: 51.937% (+1.2%) from 50.777%
6574804013

Pull #327

github

Christian-B
typing changes
Pull Request #327: Type Annotations and Checking

105 of 1288 branches covered (0.0%)

Branch coverage included in aggregate %.

2375 of 2375 new or added lines in 180 files covered. (100.0%)

4775 of 8108 relevant lines covered (58.89%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

30.94
/spinnman/messages/eieio/data_messages/eieio_data_message.py
1
# Copyright (c) 2015 The University of Manchester
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     https://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

15
import struct
1✔
16
from typing import Optional
1✔
17
from spinn_utilities.overrides import overrides
1✔
18
from spinnman.exceptions import (
1✔
19
    SpinnmanInvalidPacketException, SpinnmanInvalidParameterException)
20
from spinnman.messages.eieio import (
1✔
21
    AbstractEIEIOMessage, EIEIOType, EIEIOPrefix)
22
from spinnman.constants import UDP_MESSAGE_MAX_SIZE
1✔
23
from .eieio_data_header import EIEIODataHeader
1✔
24
from .key_data_element import KeyDataElement
1✔
25
from .key_payload_data_element import KeyPayloadDataElement
1✔
26
from .abstract_data_element import AbstractDataElement
1✔
27

28
_ONE_SHORT = struct.Struct("<H")
1✔
29
_TWO_SHORTS = struct.Struct("<HH")
1✔
30
_ONE_WORD = struct.Struct("<I")
1✔
31
_TWO_WORDS = struct.Struct("<II")
1✔
32

33

34
class EIEIODataMessage(AbstractEIEIOMessage):
1✔
35
    """
36
    An EIEIO Data message.
37
    """
38
    __slots__ = (
1✔
39
        "_data",
40
        "_elements",
41
        "_elements_read",
42
        "_header",
43
        "_offset")
44

45
    def __init__(self, eieio_header, data=None, offset=0):
1✔
46
        """
47
        :param EIEIODataHeader eieio_header: The header of the message
48
        :param bytes data: Optional data contained within the packet
49
        :param int offset: Optional offset where the valid data starts
50
        """
51
        # The header
52
        self._header = eieio_header
×
53

54
        # Elements to be written
55
        self._elements = b""
×
56

57
        # Keeping track of the reading of the data
58
        self._data = data
×
59
        self._offset = offset
×
60
        self._elements_read = 0
×
61

62
    @staticmethod
1✔
63
    def create(
1✔
64
            eieio_type, count=0, data=None, offset=0, key_prefix=None,
65
            payload_prefix=None, timestamp=None,
66
            prefix_type=EIEIOPrefix.LOWER_HALF_WORD):
67
        """
68
        Create a data message.
69

70
        :param EIEIOType eieio_type: The type of the message
71
        :param int count: The number of items in the message
72
        :param bytes data: The data in the message
73
        :param int offset:
74
            The offset in the data where the actual data starts
75
        :param int key_prefix: The prefix of the keys
76
        :param int payload_prefix: The prefix of the payload
77
        :param int timestamp: The timestamp of the packet
78
        :param EIEIOPrefix prefix_type: The type of the key prefix if 16-bits
79
        """
80
        # pylint: disable=too-many-arguments
81
        payload_base = payload_prefix
×
82
        if timestamp is not None:
×
83
            payload_base = timestamp
×
84
        return EIEIODataMessage(
×
85
            eieio_header=EIEIODataHeader(
86
                eieio_type, count=count, prefix=key_prefix,
87
                payload_base=payload_base, prefix_type=prefix_type,
88
                is_time=timestamp is not None),
89
            data=data, offset=offset)
90

91
    @property
1✔
92
    @overrides(AbstractEIEIOMessage.eieio_header)
1✔
93
    def eieio_header(self) -> EIEIODataHeader:
1✔
94
        """
95
        :rtype: EIEIODataHeader
96
        """
97
        return self._header
×
98

99
    @staticmethod
1✔
100
    def min_packet_length(
1✔
101
            eieio_type, is_prefix=False, is_payload_base=False,
102
            is_timestamp=False):
103
        """
104
        The minimum length of a message with the given header, in bytes.
105

106
        :param EIEIOType eieio_type: the type of message
107
        :param bool is_prefix: True if there is a prefix, False otherwise
108
        :param bool is_payload_base:
109
            True if there is a payload base, False otherwise
110
        :param bool is_timestamp:
111
            True if there is a timestamp, False otherwise
112
        :return: The minimum size of the packet in bytes
113
        :rtype: int
114
        """
115
        header_size = EIEIODataHeader.get_header_size(
×
116
            eieio_type, is_prefix, is_payload_base | is_timestamp)
117
        return header_size + eieio_type.payload_bytes
×
118

119
    def get_min_packet_length(self) -> int:
1✔
120
        """
121
        Get the minimum length of a message instance in bytes.
122

123
        :rtype: int
124
        """
125
        return self.min_packet_length(
×
126
            eieio_type=self._header.eieio_type,
127
            is_prefix=self._header.prefix is not None,
128
            is_payload_base=self._header.payload_base is not None)
129

130
    @property
1✔
131
    def max_n_elements(self) -> int:
1✔
132
        """
133
        The maximum number of elements that can fit in the packet.
134

135
        :rtype: int
136
        """
137
        ty = self._header.eieio_type
×
138
        return (UDP_MESSAGE_MAX_SIZE - self._header.size) // (
×
139
            ty.key_bytes + ty.payload_bytes)
140

141
    @property
1✔
142
    def n_elements(self) -> int:
1✔
143
        """
144
        The number of elements in the packet.
145

146
        :rtype: int
147
        """
148
        return self._header.count
×
149

150
    @property
1✔
151
    def size(self) -> int:
1✔
152
        """
153
        The size of the packet with the current contents.
154

155
        :rtype: int
156
        """
157
        ty = self._header.eieio_type
×
158
        return (self._header.size +
×
159
                (ty.key_bytes + ty.payload_bytes) * self._header.count)
160

161
    def add_key_and_payload(self, key: int, payload: int):
1✔
162
        """
163
        Adds a key and payload to the packet.
164

165
        :param int key: The key to add
166
        :param int payload: The payload to add
167
        :raise SpinnmanInvalidParameterException:
168
            If the key or payload is too big for the format, or the format
169
            doesn't expect a payload
170
        """
171
        if key > self._header.eieio_type.max_value:
×
172
            raise SpinnmanInvalidParameterException(
×
173
                "key", key,
174
                "Larger than the maximum allowed of {}".format(
175
                    self._header.eieio_type.max_value))
176
        if payload > self._header.eieio_type.max_value:
×
177
            raise SpinnmanInvalidParameterException(
×
178
                "payload", payload,
179
                "Larger than the maximum allowed of {}".format(
180
                    self._header.eieio_type.max_value))
181

182
        self.add_element(KeyPayloadDataElement(
×
183
            key, payload, self._header.is_time))
184

185
    def add_key(self, key: int):
1✔
186
        """
187
        Add a key to the packet.
188

189
        :param int key: The key to add
190
        :raise SpinnmanInvalidParameterException:
191
            If the key is too big for the format, or the format expects a
192
            payload
193
        """
194
        if key > self._header.eieio_type.max_value:
×
195
            raise SpinnmanInvalidParameterException(
×
196
                "key", key,
197
                "Larger than the maximum allowed of {}".format(
198
                    self._header.eieio_type.max_value))
199
        self.add_element(KeyDataElement(key))
×
200

201
    def add_element(self, element: AbstractDataElement):
1✔
202
        """
203
        Add an element to the message.  The correct type of element must
204
        be added, depending on the header values.
205

206
        :param AbstractDataElement element: The element to be added
207
        :raise SpinnmanInvalidParameterException:
208
            If the element is not compatible with the header
209
        :raise SpinnmanInvalidPacketException:
210
            If the message was created to read data
211
        """
212
        if self._data is not None:
×
213
            raise SpinnmanInvalidPacketException(
×
214
                "EIEIODataMessage", "This packet is read-only")
215

216
        self._elements += element.get_bytestring(self._header.eieio_type)
×
217
        self._header.increment_count()
×
218

219
    @property
1✔
220
    def is_next_element(self) -> bool:
1✔
221
        """
222
        Whether there is another element to be read.
223

224
        :rtype: bool
225
        """
226
        return (self._data is not None and
×
227
                self._elements_read < self._header.count)
228

229
    @property
1✔
230
    def next_element(self) -> Optional[AbstractDataElement]:
1✔
231
        """
232
        The next element to be read, or `None` if no more elements.
233
        The exact type of element returned depends on the packet type.
234

235
        :rtype: AbstractDataElement
236
        """
237
        if not self.is_next_element:
×
238
            return None
×
239
        self._elements_read += 1
×
240
        payload: Optional[int] = None
×
241
        if self._header.eieio_type == EIEIOType.KEY_16_BIT:
×
242
            key = _ONE_SHORT.unpack_from(self._data, self._offset)[0]
×
243
            self._offset += 2
×
244
        elif self._header.eieio_type == EIEIOType.KEY_32_BIT:
×
245
            key = _ONE_WORD.unpack_from(self._data, self._offset)[0]
×
246
            self._offset += 4
×
247
        elif self._header.eieio_type == EIEIOType.KEY_PAYLOAD_16_BIT:
×
248
            key, payload = _TWO_SHORTS.unpack_from(self._data, self._offset)
×
249
            self._offset += 4
×
250
        elif self._header.eieio_type == EIEIOType.KEY_PAYLOAD_32_BIT:
×
251
            key, payload = _TWO_WORDS.unpack_from(self._data, self._offset)
×
252
            self._offset += 8
×
253
        else:
254
            raise ValueError(
×
255
                f"unknown EIEIO type code: {self._header.eieio_type}")
256

257
        if self._header.prefix is not None:
×
258
            if self._header.prefix_type == EIEIOPrefix.UPPER_HALF_WORD:
×
259
                key |= self._header.prefix << 16
×
260
            else:
261
                key = key | self._header.prefix
×
262

263
        if self._header.payload_base is not None:
×
264
            if payload is not None:
×
265
                payload |= self._header.payload_base
×
266
            else:
267
                payload = self._header.payload_base
×
268

269
        if payload is None:
×
270
            return KeyDataElement(key)
×
271
        return KeyPayloadDataElement(key, payload, self._header.is_time)
×
272

273
    @property
1✔
274
    @overrides(AbstractEIEIOMessage.bytestring)
1✔
275
    def bytestring(self) -> bytes:
1✔
276
        return self._header.bytestring + self._elements
×
277

278
    def __str__(self):
1✔
279
        if self._data is not None:
×
280
            return f"EIEIODataMessage:{self._header}:{self._header.count}"
×
281
        return f"EIEIODataMessage:{self._header}:{self._elements}"
×
282

283
    def __repr__(self):
1✔
284
        return self.__str__()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc