• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / SpiNNMan / 6574804013

19 Oct 2023 12:47PM UTC coverage: 51.937% (+1.2%) from 50.777%
6574804013

Pull #327

github

Christian-B
typing changes
Pull Request #327: Type Annotations and Checking

105 of 1288 branches covered (0.0%)

Branch coverage included in aggregate %.

2375 of 2375 new or added lines in 180 files covered. (100.0%)

4775 of 8108 relevant lines covered (58.89%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

31.15
/spinnman/processes/write_memory_process.py
1
# Copyright (c) 2015 The University of Manchester
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     https://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

15
import functools
1✔
16
from typing import BinaryIO, Callable
1✔
17
import numpy
1✔
18
from numpy import uint8, uint32
1✔
19
from spinn_utilities.typing.coords import XYP
1✔
20
from spinnman.messages.scp.abstract_messages import AbstractSCPRequest
1✔
21
from spinnman.messages.scp.impl import WriteLink, WriteMemory
1✔
22
from spinnman.messages.scp.impl import CheckOKResponse
1✔
23
from spinnman.constants import UDP_MESSAGE_MAX_SIZE
1✔
24
from .abstract_multi_connection_process import AbstractMultiConnectionProcess
1✔
25

26
_UNSIGNED_WORD = 0xFFFFFFFF
1✔
27

28

29
class WriteMemoryProcess(AbstractMultiConnectionProcess[CheckOKResponse]):
1✔
30
    """
31
    A process for writing memory on a SpiNNaker chip.
32
    """
33
    __slots__ = ()
1✔
34
    # pylint: disable=too-many-arguments
35

36
    def write_memory_from_bytearray(
1✔
37
            self, coordinates: XYP, base_address: int, data: bytes,
38
            offset: int, n_bytes: int, get_sum: bool = False) -> int:
39
        """
40
        Writes memory onto a SpiNNaker chip from a bytearray.
41

42
        :param tuple(int,int,int) coordinates:
43
            The X,Y,P coordinates of the core that will write to memory.
44
        :param int base_address: the address in SDRAM to start writing
45
        :param data: the data to write
46
        :type data: bytearray or bytes
47
        :param int offset: where in the data to start writing from
48
        :param int n_bytes: how much data to write
49
        :param bool get_sum: whether to return a checksum or 0
50
        :return: the data checksum or 0 if get_sum is False
51
        :rtype: int
52
        """
53
        return self._write_memory_from_bytearray(
×
54
            base_address, data, offset, n_bytes,
55
            functools.partial(WriteMemory, coordinates), get_sum)
56

57
    def write_link_memory_from_bytearray(
1✔
58
            self, coordinates: XYP, link: int, base_address: int, data: bytes,
59
            offset: int, n_bytes: int, get_sum: bool = False) -> int:
60
        """
61
        Writes memory onto a neighbour of a SpiNNaker chip from a bytearray.
62

63
        :param tuple(int,int,int) coordinates:
64
            The X,Y,P coordinates of the core that will write to its
65
            neighbour's memory.
66
        :param int link:
67
            Along which link is the neighbour.
68
        :param int base_address: the address in SDRAM to start writing
69
        :param data: the data to write
70
        :type data: bytearray or bytes
71
        :param int offset: where in the data to start writing from
72
        :param int n_bytes: how much data to write
73
        :param bool get_sum: whether to return a checksum or 0
74
        :return: the data checksum or 0 if get_sum is False
75
        :rtype: int
76
        """
77
        return self._write_memory_from_bytearray(
×
78
            base_address, data, offset, n_bytes,
79
            functools.partial(WriteLink, coordinates, link), get_sum)
80

81
    def write_memory_from_reader(
1✔
82
            self, coordinates: XYP, base_address: int, reader: BinaryIO,
83
            n_bytes: int, get_sum: bool = False) -> int:
84
        """
85
        Writes memory onto a SpiNNaker chip from a reader.
86

87
        :param tuple(int,int,int) coordinates:
88
            The X,Y,P coordinates of the core that will write to memory.
89
        :param int base_address: the address in SDRAM to start writing
90
        :param reader: the readable object containing the data to write
91
        :type reader: ~io.RawIOBase or ~io.BufferedIOBase
92
        :param int n_bytes: how much data to write
93
        :param bool get_sum: whether to return a checksum or 0
94
        :return: the data checksum or 0 if get_sum is False
95
        :rtype: int
96
        """
97
        return self._write_memory_from_reader(
×
98
            base_address, reader, n_bytes,
99
            functools.partial(WriteMemory, coordinates), get_sum)
100

101
    def write_link_memory_from_reader(
1✔
102
            self, coordinates: XYP, link: int, base_address: int,
103
            reader: BinaryIO, n_bytes: int, get_sum: bool = False) -> int:
104
        """
105
        Writes memory onto a neighbour of a SpiNNaker chip from a reader.
106

107
        :param tuple(int,int,int) coordinates:
108
            The X,Y,P coordinates of the core that will write to its
109
            neighbour's memory. The P coordinate is normally 0; no reason to
110
            not use SCAMP for this.
111
        :param int link:
112
            Along which link is the neighbour.
113
        :param int base_address: the address in SDRAM to start writing
114
        :param reader: the readable object containing the data to write
115
        :type reader: ~io.RawIOBase or ~io.BufferedIOBase
116
        :param int n_bytes: how much data to write
117
        :param bool get_sum: whether to return a checksum or 0
118
        :return: the data checksum or 0 if get_sum is False
119
        :rtype: int
120
        """
121
        return self._write_memory_from_reader(
×
122
            base_address, reader, n_bytes,
123
            functools.partial(WriteLink, coordinates, link), get_sum)
124

125
    def _write_memory_from_bytearray(
1✔
126
            self, base_address: int, data: bytes, data_offset: int,
127
            n_bytes: int, packet_class: Callable[
128
                [int, bytes], AbstractSCPRequest[CheckOKResponse]],
129
            get_sum: bool) -> int:
130
        offset = 0
×
131
        n_bytes_to_write = int(n_bytes)
×
132
        with self._collect_responses():
×
133
            while n_bytes_to_write > 0:
×
134
                bytes_to_send = min(n_bytes_to_write, UDP_MESSAGE_MAX_SIZE)
×
135
                data_array = data[data_offset:data_offset + bytes_to_send]
×
136

137
                self._send_request(
×
138
                    packet_class(base_address + offset, data_array))
139

140
                n_bytes_to_write -= bytes_to_send
×
141
                offset += bytes_to_send
×
142
                data_offset += bytes_to_send
×
143
        if not get_sum:
×
144
            return 0
×
145
        np_data = numpy.array(data, dtype=uint8)
×
146
        np_sum = int(numpy.sum(np_data.view(uint32), dtype=uint32))
×
147
        return np_sum & _UNSIGNED_WORD
×
148

149
    def _write_memory_from_reader(
1✔
150
            self, base_address: int, reader: BinaryIO, n_bytes: int,
151
            packet_class: Callable[
152
                [int, bytes], AbstractSCPRequest[CheckOKResponse]],
153
            with_sum: bool) -> int:
154
        offset = 0
×
155
        n_bytes_to_write = int(n_bytes)
×
156
        chksum = 0
×
157
        with self._collect_responses():
×
158
            while n_bytes_to_write > 0:
×
159
                data_array = reader.read(
×
160
                    min(n_bytes_to_write, UDP_MESSAGE_MAX_SIZE))
161
                bytes_to_send = len(data_array)
×
162
                self._send_request(packet_class(
×
163
                    base_address + offset, data_array))
164

165
                n_bytes_to_write -= bytes_to_send
×
166
                offset += bytes_to_send
×
167

168
                if with_sum:
×
169
                    np_data = numpy.frombuffer(data_array, dtype=uint8)
×
170
                    np_sum = int(numpy.sum(np_data.view(uint32)))
×
171
                    chksum = (chksum + np_sum) & _UNSIGNED_WORD
×
172

173
        return chksum
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc