• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / SpiNNMan / 6574804013

19 Oct 2023 12:47PM UTC coverage: 51.937% (+1.2%) from 50.777%
6574804013

Pull #327

github

Christian-B
typing changes
Pull Request #327: Type Annotations and Checking

105 of 1288 branches covered (0.0%)

Branch coverage included in aggregate %.

2375 of 2375 new or added lines in 180 files covered. (100.0%)

4775 of 8108 relevant lines covered (58.89%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

35.07
/spinnman/processes/read_iobuf_process.py
1
# Copyright (c) 2015 The University of Manchester
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     https://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

15
from dataclasses import dataclass
1✔
16
import functools
1✔
17
import struct
1✔
18
from collections import defaultdict
1✔
19
from typing import Dict, Iterable, List
1✔
20
from spinn_utilities.typing.coords import XYP
1✔
21
from spinn_machine import CoreSubsets
1✔
22
from spinnman.model import IOBuffer
1✔
23
from spinnman.utilities.utility_functions import get_vcpu_address
1✔
24
from spinnman.messages.scp.impl.read_memory import ReadMemory, Response
1✔
25
from spinnman.constants import UDP_MESSAGE_MAX_SIZE, CPU_IOBUF_ADDRESS_OFFSET
1✔
26
from .abstract_multi_connection_process import AbstractMultiConnectionProcess
1✔
27
from .abstract_multi_connection_process_connection_selector import (
1✔
28
    ConnectionSelector)
29

30

31
@dataclass(frozen=True)
1✔
32
class _RegionTail:
1✔
33
    scamp_coords: XYP
1✔
34
    core_coords: XYP
1✔
35
    n: int
1✔
36
    base_address: int
1✔
37
    size: int
1✔
38
    offset: int
1✔
39

40

41
@dataclass(frozen=True)
1✔
42
class _NextRegion:
1✔
43
    scamp_coords: XYP
1✔
44
    core_coords: XYP
1✔
45
    n: int
1✔
46
    next_address: int
1✔
47
    first_read_size: int
1✔
48

49
    def next_at(self, address: int) -> '_NextRegion':
1✔
50
        return _NextRegion(
×
51
            self.scamp_coords, self.core_coords, self.n + 1, address,
52
            self.first_read_size)
53

54
    def tail(self, address: int, size: int, offset: int) -> _RegionTail:
1✔
55
        return _RegionTail(
×
56
            self.scamp_coords, self.core_coords, self.n, address, size, offset)
57

58

59
_ENCODING = "ascii"
1✔
60
_ONE_WORD = struct.Struct("<I")
1✔
61
_FIRST_IOBUF = struct.Struct("<I8xI")
1✔
62

63

64
class ReadIOBufProcess(AbstractMultiConnectionProcess[Response]):
1✔
65
    """
66
    A process for reading IOBUF memory (mostly log messages) from a
67
    SpiNNaker core.
68
    """
69
    __slots__ = (
1✔
70
        "_extra_reads",
71
        "_iobuf",
72
        "_iobuf_address",
73
        "_iobuf_view",
74
        "_next_reads")
75

76
    def __init__(self, connection_selector: ConnectionSelector) -> None:
1✔
77
        """
78
        :param ConnectionSelector connection_selector:
79
        """
80
        super().__init__(connection_selector)
×
81

82
        # A dictionary of (x, y, p) -> iobuf address
83
        self._iobuf_address: Dict[XYP, int] = dict()
×
84

85
        # A dictionary of (x, y, p) -> OrderedDict(n) -> bytearray
86
        self._iobuf: Dict[XYP, Dict[int, bytes]] = defaultdict(dict)
×
87

88
        # A dictionary of (x, y, p) -> OrderedDict(n) -> memoryview
89
        self._iobuf_view: Dict[XYP, Dict[int, memoryview]] = defaultdict(dict)
×
90

91
        # A list of extra reads that need to be done as a result of the first
92
        # read = list of (x, y, p, n, base_address, size, offset)
93
        self._extra_reads: List[_RegionTail] = list()
×
94

95
        # A list of next reads that need to be done as a result of the first
96
        # read = list of (x, y, p, n, next_address, first_read_size)
97
        self._next_reads: List[_NextRegion] = list()
×
98

99
    def _request_iobuf_address(self, iobuf_size: int, x: int, y: int, p: int):
1✔
100
        scamp_coords = (x, y, 0)
×
101
        base_address = get_vcpu_address(p) + CPU_IOBUF_ADDRESS_OFFSET
×
102
        self._send_request(
×
103
            ReadMemory(scamp_coords, base_address, 4),
104
            functools.partial(self.__handle_iobuf_address_response,
105
                              iobuf_size, scamp_coords, (x, y, p)))
106

107
    def __handle_iobuf_address_response(
1✔
108
            self, iobuf_size: int, scamp_coords: XYP, xyp: XYP,
109
            response: Response):
110
        iobuf_address, = _ONE_WORD.unpack_from(response.data, response.offset)
×
111
        if iobuf_address != 0:
×
112
            first_read_size = min((iobuf_size + 16, UDP_MESSAGE_MAX_SIZE))
×
113
            self._next_reads.append(_NextRegion(
×
114
                scamp_coords, xyp, 0, iobuf_address, first_read_size))
115

116
    def _request_iobuf_region_tail(self, tail: _RegionTail):
1✔
117
        self._send_request(
×
118
            ReadMemory(tail.scamp_coords, tail.base_address, tail.size),
119
            functools.partial(
120
                self.__handle_extra_iobuf_response, tail))
121

122
    def __handle_extra_iobuf_response(
1✔
123
            self, tail: _RegionTail, response: Response):
124
        view = self._iobuf_view[tail.core_coords][tail.n]
×
125
        base = tail.offset
×
126
        view[base:base + response.length] = response.data[
×
127
            response.offset:response.offset + response.length]
128

129
    def _request_iobuf_region(self, region: _NextRegion):
1✔
130
        self._send_request(
×
131
            ReadMemory(region.scamp_coords, region.next_address,
132
                       region.first_read_size),
133
            functools.partial(self.__handle_first_iobuf_response, region))
134

135
    def __handle_first_iobuf_response(
1✔
136
            self, region: _NextRegion, response: Response):
137
        base_address = region.next_address
×
138

139
        # Unpack the iobuf header
140
        (next_address, bytes_to_read) = _FIRST_IOBUF.unpack_from(
×
141
            response.data, response.offset)
142

143
        # Create a buffer for the data
144
        data = bytearray(bytes_to_read)
×
145
        view = memoryview(data)
×
146
        self._iobuf[region.core_coords][region.n] = data
×
147
        self._iobuf_view[region.core_coords][region.n] = view
×
148

149
        # Put the data from this packet into the buffer
150
        packet_bytes = response.length - 16
×
151
        if packet_bytes > bytes_to_read:
×
152
            packet_bytes = bytes_to_read
×
153
        if packet_bytes > 0:
×
154
            offset = response.offset + 16
×
155
            view[0:packet_bytes] = response.data[offset:offset + packet_bytes]
×
156

157
        bytes_to_read -= packet_bytes
×
158
        base_address += packet_bytes + 16
×
159
        read_offset = packet_bytes
×
160

161
        # While more reads need to be done to read the data
162
        while bytes_to_read > 0:
×
163
            # Read the next bit of memory making up the buffer
164
            next_bytes_to_read = min((bytes_to_read, UDP_MESSAGE_MAX_SIZE))
×
165
            self._extra_reads.append(region.tail(
×
166
                base_address, next_bytes_to_read, read_offset))
167
            base_address += next_bytes_to_read
×
168
            read_offset += next_bytes_to_read
×
169
            bytes_to_read -= next_bytes_to_read
×
170

171
        # If there is another IOBuf buffer, read this next
172
        if next_address != 0:
×
173
            self._next_reads.append(region.next_at(next_address))
×
174

175
    def read_iobuf(
1✔
176
            self, iobuf_size: int,
177
            core_subsets: CoreSubsets) -> Iterable[IOBuffer]:
178
        """
179
        :param int iobuf_size:
180
        :param ~spinn_machine.CoreSubsets core_subsets:
181
        :rtype: iterable(IOBuffer)
182
        """
183
        # Get the iobuf address for each core
184
        with self._collect_responses():
×
185
            for core_subset in core_subsets:
×
186
                x, y = core_subset.x, core_subset.y
×
187
                for p in core_subset.processor_ids:
×
188
                    self._request_iobuf_address(iobuf_size, x, y, p)
×
189

190
        # Run rounds of the process until reading is complete
191
        while self._extra_reads or self._next_reads:
×
192
            with self._collect_responses():
×
193
                # Process the extra iobuf reads needed
194
                while self._extra_reads:
×
195
                    self._request_iobuf_region_tail(self._extra_reads.pop())
×
196

197
                # Process the next iobuf reads needed
198
                while self._next_reads:
×
199
                    self._request_iobuf_region(self._next_reads.pop())
×
200

201
        for core_subset in core_subsets:
×
202
            x, y = core_subset.x, core_subset.y
×
203
            for p in core_subset.processor_ids:
×
204
                iobuf = ""
×
205
                for item in self._iobuf[x, y, p].values():
×
206
                    iobuf += item.decode(_ENCODING)
×
207
                yield IOBuffer(x, y, p, iobuf)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc