• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / SpiNNMan / 6574804013

19 Oct 2023 12:47PM UTC coverage: 51.937% (+1.2%) from 50.777%
6574804013

Pull #327

github

Christian-B
typing changes
Pull Request #327: Type Annotations and Checking

105 of 1288 branches covered (0.0%)

Branch coverage included in aggregate %.

2375 of 2375 new or added lines in 180 files covered. (100.0%)

4775 of 8108 relevant lines covered (58.89%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.27
/spinnman/processes/abstract_multi_connection_process.py
1
# Copyright (c) 2015 The University of Manchester
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     https://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

15
import contextlib
1✔
16
import logging
1✔
17
import sys
1✔
18
from types import TracebackType
1✔
19
from typing import (
1✔
20
    Callable, Dict, Generator, Generic, List, Optional, TypeVar, cast)
21
from typing_extensions import Self, TypeAlias
1✔
22
from spinn_utilities.log import FormatAdapter
1✔
23
from spinnman.connections import SCPRequestPipeLine
1✔
24
from spinnman.constants import SCP_TIMEOUT, N_RETRIES
1✔
25
from spinnman.exceptions import (
1✔
26
    SpinnmanGenericProcessException, SpinnmanGroupedProcessException)
27
from .abstract_multi_connection_process_connection_selector import (
1✔
28
    ConnectionSelector)
29
from spinnman.connections.udp_packet_connections import SCAMPConnection
1✔
30
from spinnman.messages.scp.abstract_messages import (
1✔
31
    AbstractSCPRequest, AbstractSCPResponse)
32
#: Type of responses.
33
#: :meta private:
34
R = TypeVar("R", bound=AbstractSCPResponse)
1✔
35
#: Error handling callback.
36
#: :meta private:
37
ECB: TypeAlias = Callable[
1✔
38
    [AbstractSCPRequest[R], Exception, TracebackType, SCAMPConnection], None]
39

40
logger = FormatAdapter(logging.getLogger(__name__))
1✔
41

42

43
class AbstractMultiConnectionProcess(Generic[R]):
1✔
44
    """
45
    A process for talking to SpiNNaker efficiently that uses multiple
46
    connections in communication if relevant.
47
    """
48
    __slots__ = (
1✔
49
        "_error_requests",
50
        "_exceptions",
51
        "_tracebacks",
52
        "_connections",
53
        "_intermediate_channel_waits",
54
        "_n_channels",
55
        "_n_retries",
56
        "_conn_selector",
57
        "_scp_request_pipelines",
58
        "_timeout")
59

60
    def __init__(self, next_connection_selector: ConnectionSelector,
1✔
61
                 n_retries: int = N_RETRIES, timeout: float = SCP_TIMEOUT,
62
                 n_channels: int = 8, intermediate_channel_waits: int = 7):
63
        """
64
        :param ConnectionSelector next_connection_selector:
65
            How to choose the connection.
66
        :param int n_retries:
67
            The number of retries of a message to use. Passed to
68
            :py:class:`SCPRequestPipeLine`
69
        :param float timeout:
70
            The timeout, in seconds. Passed to :py:class:`SCPRequestPipeLine`
71
        :param int n_channels:
72
            The maximum number of channels to use when talking to a particular
73
            SCAMP instance. Passed to :py:class:`SCPRequestPipeLine`
74
        :param int intermediate_channel_waits:
75
            The maximum number of outstanding message/reply pairs to have on a
76
            particular connection. Passed to :py:class:`SCPRequestPipeLine`
77
        """
78
        self._exceptions: List[Exception] = []
1✔
79
        self._tracebacks: List[TracebackType] = []
1✔
80
        self._error_requests: List[AbstractSCPRequest[R]] = []
1✔
81
        self._connections: List[SCAMPConnection] = []
1✔
82
        self._scp_request_pipelines: Dict[
1✔
83
            SCAMPConnection, SCPRequestPipeLine[R]] = dict()
84
        self._n_retries = n_retries
1✔
85
        self._timeout = timeout
1✔
86
        self._n_channels = n_channels
1✔
87
        self._intermediate_channel_waits = intermediate_channel_waits
1✔
88
        self._conn_selector = next_connection_selector
1✔
89

90
    def _send_request(self, request: AbstractSCPRequest[R],
1✔
91
                      callback: Optional[Callable[[R], None]] = None,
92
                      error_callback: Optional[ECB] = None):
93
        if error_callback is None:
1!
94
            error_callback = self._receive_error
1✔
95
        connection = self._conn_selector.get_next_connection(request)
1✔
96
        if connection not in self._scp_request_pipelines:
1!
97
            self._scp_request_pipelines[connection] = SCPRequestPipeLine(
1✔
98
                connection, n_retries=self._n_retries,
99
                packet_timeout=self._timeout,
100
                n_channels=self._n_channels,
101
                intermediate_channel_waits=self._intermediate_channel_waits)
102
        self._scp_request_pipelines[connection].send_request(
1✔
103
            request, callback, error_callback)
104

105
    def _receive_error(
1✔
106
            self, request: AbstractSCPRequest[R], exception: Exception,
107
            tb: TracebackType, connection: SCAMPConnection):
108
        self._error_requests.append(request)
1✔
109
        self._exceptions.append(exception)
1✔
110
        self._tracebacks.append(tb)
1✔
111
        self._connections.append(connection)
1✔
112

113
    def is_error(self) -> bool:
1✔
114
        return bool(self._exceptions)
×
115

116
    def _finish(self) -> None:
1✔
117
        for request_pipeline in self._scp_request_pipelines.values():
×
118
            request_pipeline.finish()
×
119

120
    @contextlib.contextmanager
1✔
121
    def _collect_responses(
1✔
122
            self, *, print_exception: bool = False,
123
            check_error: bool = True) -> Generator[Self, None, None]:
124
        """
125
        A simple context for calls. Lets you do this::
126

127
            with self._collect_responses():
128
                self._send_request(...)
129

130
        instead of this::
131

132
            self._send_request(...)
133
            self._finish()
134
            self.check_for_error()
135

136
        :param bool print_exception:
137
            Whether to log errors as well as raising
138
        :param bool check_error:
139
            Whether to check for errors; if not, caller must handle
140
        """
141
        yield self
1✔
142
        for request_pipeline in self._scp_request_pipelines.values():
1✔
143
            request_pipeline.finish()
1✔
144
        if check_error and self._exceptions:
1!
145
            self.check_for_error(print_exception=print_exception)
1✔
146

147
    @property
1✔
148
    def connection_selector(self) -> ConnectionSelector:
1✔
149
        """
150
        The connection selector of the process.
151

152
        :rtype: ConnectionSelector
153
        """
154
        return self._conn_selector
×
155

156
    def check_for_error(self, print_exception: bool = False):
1✔
157
        if len(self._exceptions) == 1:
1!
158
            exc_info = sys.exc_info()
1✔
159
            sdp_header = self._error_requests[0].sdp_header
1✔
160
            phys_p = sdp_header.get_physical_cpu_id()
1✔
161

162
            if print_exception:
1!
163
                connection = self._connections[0]
1✔
164
                logger.error(
1✔
165
                    "failure in request to board {} with ethernet chip "
166
                    "({}, {}) for chip ({}, {}, {}({}))",
167
                    connection.remote_ip_address, connection.chip_x,
168
                    connection.chip_y, sdp_header.destination_chip_x,
169
                    sdp_header.destination_chip_y, sdp_header.destination_cpu,
170
                    phys_p)
171

172
            raise SpinnmanGenericProcessException(
1✔
173
                self._exceptions[0], cast(TracebackType, exc_info[2]),
174
                sdp_header.destination_chip_x,
175
                sdp_header.destination_chip_y, sdp_header.destination_cpu,
176
                phys_p, self._tracebacks[0])
177
        elif self._exceptions:
×
178
            ex = SpinnmanGroupedProcessException(
×
179
                self._error_requests, self._exceptions, self._tracebacks,
180
                self._connections)
181
            if print_exception:
×
182
                logger.error("{}", str(ex))
×
183
            raise ex
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc