• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / SpiNNMan / 6574804013

19 Oct 2023 12:47PM UTC coverage: 51.937% (+1.2%) from 50.777%
6574804013

Pull #327

github

Christian-B
typing changes
Pull Request #327: Type Annotations and Checking

105 of 1288 branches covered (0.0%)

Branch coverage included in aggregate %.

2375 of 2375 new or added lines in 180 files covered. (100.0%)

4775 of 8108 relevant lines covered (58.89%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.19
/spinnman/connections/scp_request_pipeline.py
1
# Copyright (c) 2015 The University of Manchester
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     https://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

15
import sys
1✔
16
from threading import RLock
1✔
17
import time
1✔
18
from types import TracebackType
1✔
19
from typing import Callable, Dict, Generic, List, Optional, TypeVar, cast
1✔
20
from typing_extensions import TypeAlias
1✔
21
from spinnman.messages.scp.enums import SCPResult
1✔
22
from spinnman.exceptions import SpinnmanTimeoutException, SpinnmanIOException
1✔
23
from spinnman.constants import SCP_TIMEOUT, N_RETRIES
1✔
24
from spinnman.connections.udp_packet_connections import SCAMPConnection
1✔
25
from spinnman.messages.scp.abstract_messages import AbstractSCPRequest
1✔
26
from spinnman.messages.scp.abstract_messages import AbstractSCPResponse
1✔
27

28
#: Type of responses.
29
#: :meta private:
30
R = TypeVar("R", bound=AbstractSCPResponse)
1✔
31
#: Type of response-accepting callbacks.
32
#: :meta private:
33
CB: TypeAlias = Callable[[R], None]
1✔
34
#: Type of error-handling callbacks.
35
#: :meta private:
36
ECB: TypeAlias = Callable[
1✔
37
    [AbstractSCPRequest[R], Exception, TracebackType, SCAMPConnection], None]
38

39
MAX_SEQUENCE = 65536
1✔
40
RETRY_CODES = frozenset([
1✔
41
    SCPResult.RC_TIMEOUT, SCPResult.RC_P2P_TIMEOUT, SCPResult.RC_LEN,
42
    SCPResult.RC_P2P_NOREPLY, SCPResult.RC_P2P_BUSY])
43

44
# Keep a global track of the sequence numbers used
45
_next_sequence = 0
1✔
46
_next_sequence_lock = RLock()
1✔
47

48

49
class SCPRequestPipeLine(Generic[R]):
1✔
50
    """
51
    Allows a set of SCP requests to be grouped together in a communication
52
    across a number of channels for a given connection.
53

54
    This class implements an SCP windowing, first suggested by Andrew Mundy.
55
    This extends the idea by having both send and receive windows.
56
    These are represented by the n_channels and the
57
    intermediate_channel_waits parameters respectively.  This seems to
58
    help with the timeout issue; when a timeout is received, all requests
59
    for which a reply has not been received can also timeout.
60
    """
61
    __slots__ = (
1✔
62
        "_callbacks",
63
        "_connection",
64
        "_error_callbacks",
65
        "_in_progress",
66
        "_intermediate_channel_waits",
67
        "_n_channels",
68
        "_n_resent",
69
        "_n_retries",
70
        "_n_retry_code_resent",
71
        "_n_timeouts",
72
        "_packet_timeout",
73
        "_retry_reason",
74
        "_request_data",
75
        "_requests",
76
        "_retries",
77
        "_send_time")
78

79
    def __init__(self, connection: SCAMPConnection, n_channels=1,
1✔
80
                 intermediate_channel_waits=0,
81
                 n_retries=N_RETRIES, packet_timeout=SCP_TIMEOUT):
82
        """
83
        :param SCAMPConnection connection:
84
            The connection over which the communication is to take place
85
        :param int n_channels: The number of requests to send before checking
86
            for responses.  If `None`, this will be determined automatically
87
        :param int intermediate_channel_waits: The number of outstanding
88
            responses to wait for before continuing sending requests.
89
            If `None`, this will be determined automatically
90
        :param int n_retries: The number of times to resend any packet for any
91
            reason before an error is triggered
92
        :param float packet_timeout: The number of elapsed seconds after
93
            sending a packet before it is considered a timeout.
94
        """
95
        self._connection = connection
1✔
96
        self._n_channels = n_channels
1✔
97
        self._intermediate_channel_waits = intermediate_channel_waits
1✔
98
        self._n_retries = n_retries
1✔
99
        self._packet_timeout = packet_timeout
1✔
100

101
        if (self._n_channels is not None and
1!
102
                self._intermediate_channel_waits is None):
103
            self._intermediate_channel_waits = self._n_channels - 8
×
104
            if self._intermediate_channel_waits < 0:
×
105
                self._intermediate_channel_waits = 0
×
106

107
        # A dictionary of sequence number -> requests in progress
108
        self._requests: Dict[int, AbstractSCPRequest] = dict()
1✔
109
        self._request_data: Dict[int, bytes] = dict()
1✔
110

111
        # A dictionary of sequence number -> number of retries for the packet
112
        self._retries: Dict[int, int] = dict()
1✔
113

114
        # A dictionary of sequence number -> callback function for response
115
        self._callbacks: Dict[int, Optional[CB]] = dict()
1✔
116

117
        # A dictionary of sequence number -> callback function for errors
118
        self._error_callbacks: Dict[int, ECB] = dict()
1✔
119

120
        # A dictionary of sequence number -> retry reason
121
        self._retry_reason: Dict[int, List[str]] = dict()
1✔
122

123
        # The number of responses outstanding
124
        self._in_progress = 0
1✔
125

126
        # The number of timeouts that occurred
127
        self._n_timeouts = 0
1✔
128

129
        # The number of packets that have been resent
130
        self._n_resent = 0
1✔
131
        self._n_retry_code_resent = 0
1✔
132

133
        # self._token_bucket = TokenBucket(43750, 4375000)
134
        # self._token_bucket = TokenBucket(3408, 700000)
135

136
    @staticmethod
1✔
137
    def __get_next_sequence_number() -> int:
1✔
138
        """
139
        Get the next number from the global sequence, applying appropriate
140
        wrapping rules as the sequence numbers have a fixed number of bits.
141

142
        :return: The next number in the sequence.
143
        :rtype: int
144
        """
145
        # pylint: disable=global-statement
146
        global _next_sequence
147
        with _next_sequence_lock:
1✔
148
            sequence = _next_sequence
1✔
149
            _next_sequence = (sequence + 1) % MAX_SEQUENCE
1✔
150
        return sequence
1✔
151

152
    def send_request(
1✔
153
            self, request: AbstractSCPRequest[R], callback: Optional[CB],
154
            error_callback: ECB):
155
        """
156
        Add an SCP request to the set to be sent.
157

158
        :param AbstractSCPRequest request: The SCP request to be sent
159
        :param ~collections.abc.Callable callback:
160
            A callback function to call when the response has been received;
161
            takes a :py:class:`SCPResponse` as a parameter, or `None` if the
162
            response doesn't need to be processed
163
        :param ~collections.abc.Callable error_callback:
164
            A callback function to call when an error is found when processing
165
            the message; takes the original :py:class:`AbstractSCPRequest`, the
166
            exception caught and a list of tuples of (filename, line number,
167
            function name, text) as a traceback
168
        """
169
        # If the connection has not been measured
170
        if self._n_channels is None:
1!
171
            if self._connection.is_ready_to_receive():
×
172
                self._n_channels = self._in_progress + 8
×
173
                if self._n_channels < 12:
×
174
                    self._n_channels = 12
×
175
                self._intermediate_channel_waits = self._n_channels - 8
×
176

177
        # If all the channels are used, start to receive packets
178
        while (self._n_channels is not None and
1!
179
                self._in_progress >= self._n_channels):
180
            self._do_retrieve(
×
181
                self._intermediate_channel_waits, self._packet_timeout)
182

183
        # Get the next sequence to be used
184
        sequence = self.__get_next_sequence_number()
1✔
185

186
        # Update the packet and store required details
187
        request.scp_request_header.sequence = sequence
1✔
188
        request_data = self._connection.get_scp_data(request)
1✔
189
        self._requests[sequence] = request
1✔
190
        self._request_data[sequence] = request_data
1✔
191
        self._retries[sequence] = self._n_retries
1✔
192
        self._callbacks[sequence] = callback
1✔
193
        self._error_callbacks[sequence] = error_callback
1✔
194
        self._retry_reason[sequence] = list()
1✔
195

196
        # Send the request, keeping track of how many are sent
197
        # self._token_bucket.consume(284)
198
        self._connection.send(request_data)
1✔
199
        self._in_progress += 1
1✔
200

201
    def finish(self) -> None:
1✔
202
        """
203
        Indicate the end of the packets to be sent.  This must be called
204
        to ensure that all responses are received and handled.
205
        """
206
        while self._in_progress > 0:
1✔
207
            self._do_retrieve(0, self._packet_timeout)
1✔
208

209
    @property
1✔
210
    def n_timeouts(self) -> int:
1✔
211
        """
212
        The number of timeouts that occurred.
213

214
        :rtype: int
215
        """
216
        return self._n_timeouts
×
217

218
    @property
1✔
219
    def n_channels(self) -> int:
1✔
220
        """
221
        The number of requests to send before checking for responses.
222

223
        :rtype: int
224
        """
225
        return self._n_channels
×
226

227
    @property
1✔
228
    def n_resent(self) -> int:
1✔
229
        """
230
        The number of packets that have been resent.
231

232
        :rtype: int
233
        """
234
        return self._n_resent
×
235

236
    @property
1✔
237
    def n_retry_code_resent(self) -> int:
1✔
238
        """
239
        The number of resends due to reasons for which automated retry is
240
        the correct response in-protocol.
241

242
        :rtype: int
243
        """
244
        return self._n_retry_code_resent
×
245

246
    def _remove_record(self, seq: int) -> None:
1✔
247
        if seq in self._requests:
1!
248
            del self._requests[seq]
1✔
249
        del self._request_data[seq]
1✔
250
        del self._retries[seq]
1✔
251
        del self._callbacks[seq]
1✔
252
        del self._error_callbacks[seq]
1✔
253
        del self._retry_reason[seq]
1✔
254

255
    def _single_retrieve(self, timeout: float):
1✔
256
        # Receive the next response
257
        result, seq, raw_data, offset = \
1✔
258
            self._connection.receive_scp_response(timeout)
259

260
        # Only process responses which have matching requests
261
        if seq in self._requests:
×
262
            self._in_progress -= 1
×
263
            request_sent = self._requests[seq]
×
264

265
            # If the response can be retried, retry it
266
            if result in RETRY_CODES:
×
267
                try:
×
268
                    time.sleep(0.1)
×
269
                    self._resend(seq, request_sent, str(result))
×
270
                    self._n_retry_code_resent += 1
×
271
                except Exception as e:  # pylint: disable=broad-except
×
272
                    self._error_callbacks[seq](
×
273
                        request_sent, e,
274
                        cast(TracebackType, sys.exc_info()[2]),
275
                        self._connection)
276
                    self._remove_record(seq)
×
277
            else:
278

279
                # No retry is possible - try constructing the result
280
                try:
×
281
                    response = request_sent.get_scp_response()
×
282
                    response.read_bytestring(raw_data, offset)
×
283
                    cb = self._callbacks[seq]
×
284
                    if cb is not None:
×
285
                        cb(response)
×
286
                except Exception as e:  # pylint: disable=broad-except
×
287
                    self._error_callbacks[seq](
×
288
                        request_sent, e,
289
                        cast(TracebackType, sys.exc_info()[2]),
290
                        self._connection)
291

292
                # Remove the sequence from the outstanding responses
293
                self._remove_record(seq)
×
294

295
    def _handle_receive_timeout(self) -> None:
1✔
296
        self._n_timeouts += 1
1✔
297

298
        # If there is a timeout, all packets remaining are resent
299
        to_remove = list()
1✔
300
        for seq, request_sent in self._requests.items():
1✔
301
            self._in_progress -= 1
1✔
302
            try:
1✔
303
                self._resend(seq, request_sent, "timeout")
1✔
304
            except Exception as e:  # pylint: disable=broad-except
1✔
305
                self._error_callbacks[seq](
1✔
306
                    request_sent, e, cast(TracebackType, sys.exc_info()[2]),
307
                    self._connection)
308
                to_remove.append(seq)
1✔
309

310
        for seq in to_remove:
1✔
311
            self._remove_record(seq)
1✔
312

313
    def _resend(self, seq: int, request_sent, reason: str):
1✔
314
        if self._retries[seq] <= 0:
1✔
315
            # Report timeouts as timeout exception
316

317
            self._retry_reason[seq].append(reason)
1✔
318
            if all(reason == "timeout" for reason in self._retry_reason[seq]):
1!
319
                raise SpinnmanTimeoutException(
1✔
320
                    request_sent,
321
                    self._packet_timeout)
322

323
            # Report any other exception
324
            raise SpinnmanIOException(
×
325
                f"Errors sending request {request_sent} to "
326
                f"{request_sent.sdp_header.destination_chip_x}, "
327
                f"{request_sent.sdp_header.destination_chip_y}, "
328
                f"{request_sent.sdp_header.destination_cpu} over "
329
                f"{self._n_retries} retries: {self._retry_reason[seq]}")
330

331
        # If the request can be retried, retry it
332
        self._retries[seq] -= 1
1✔
333
        self._in_progress += 1
1✔
334
        self._requests[seq] = request_sent
1✔
335
        self._retry_reason[seq].append(reason)
1✔
336
        self._connection.send(self._request_data[seq])
1✔
337
        self._n_resent += 1
1✔
338

339
    def _do_retrieve(self, n_packets: int, timeout: float):
1✔
340
        """
341
        Receives responses until there are only n_packets responses left.
342

343
        :param int n_packets:
344
            The number of packets that can remain after running
345
        """
346
        # While there are still more packets in progress than some threshold
347
        while self._in_progress > n_packets:
1✔
348
            try:
1✔
349
                # Receive the next response
350
                self._single_retrieve(timeout)
1✔
351
            except SpinnmanTimeoutException:
1✔
352
                self._handle_receive_timeout()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc