• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / SpiNNMan / 6574804013

19 Oct 2023 12:47PM UTC coverage: 51.937% (+1.2%) from 50.777%
6574804013

Pull #327

github

Christian-B
typing changes
Pull Request #327: Type Annotations and Checking

105 of 1288 branches covered (0.0%)

Branch coverage included in aggregate %.

2375 of 2375 new or added lines in 180 files covered. (100.0%)

4775 of 8108 relevant lines covered (58.89%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

23.53
/spinnman/extended/write_memory_flood_process.py
1
# Copyright (c) 2015 The University of Manchester
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     https://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

15
import math
1✔
16
from typing import BinaryIO, Optional
1✔
17
from spinnman.messages.scp.impl import (
1✔
18
    FloodFillEnd, FloodFillStart, FloodFillData)
19
from spinnman.processes import (
1✔
20
    AbstractMultiConnectionProcess, ConnectionSelector)
21
from spinnman.constants import UDP_MESSAGE_MAX_SIZE
1✔
22

23

24
class WriteMemoryFloodProcess(AbstractMultiConnectionProcess):
1✔
25
    """
26
    A process for writing memory on multiple SpiNNaker chips at once.
27
    """
28
    __slots__ = ()
1✔
29

30
    def __init__(self, next_connection_selector: ConnectionSelector):
1✔
31
        AbstractMultiConnectionProcess.__init__(
×
32
            self, next_connection_selector, n_channels=3,
33
            intermediate_channel_waits=2)
34

35
    def _start_flood_fill(self, n_bytes: int, nearest_neighbour_id: int):
1✔
36
        n_blocks = int(math.ceil(math.ceil(n_bytes / 4.0) /
×
37
                                 UDP_MESSAGE_MAX_SIZE))
38
        with self._collect_responses():
×
39
            self._send_request(
×
40
                FloodFillStart(nearest_neighbour_id, n_blocks))
41

42
    def _end_flood_fill(self, nearest_neighbour_id: int):
1✔
43
        with self._collect_responses():
×
44
            self._send_request(FloodFillEnd(nearest_neighbour_id))
×
45

46
    def write_memory_from_bytearray(
1✔
47
            self, nearest_neighbour_id: int, base_address: int,
48
            data: bytes, offset: int, n_bytes: Optional[int] = None):
49
        """
50
        :param int nearest_neighbour_id:
51
        :param int base_address:
52
        :param data:
53
        :type data: bytes or bytearray
54
        :param int offset:
55
        :param int n_bytes:
56
        """
57
        # pylint: disable=too-many-arguments
58
        if n_bytes is None:
×
59
            n_bytes = len(data)
×
60
        self._start_flood_fill(n_bytes, nearest_neighbour_id)
×
61

62
        with self._collect_responses():
×
63
            data_offset = offset
×
64
            offset = base_address
×
65
            block_no = 0
×
66
            while n_bytes > 0:
×
67
                bytes_to_send = min((int(n_bytes), UDP_MESSAGE_MAX_SIZE))
×
68

69
                self._send_request(FloodFillData(
×
70
                    nearest_neighbour_id, block_no, offset,
71
                    data, data_offset, bytes_to_send))
72

73
                block_no += 1
×
74
                n_bytes -= bytes_to_send
×
75
                offset += bytes_to_send
×
76
                data_offset += bytes_to_send
×
77

78
        self._end_flood_fill(nearest_neighbour_id)
×
79

80
    def write_memory_from_reader(
1✔
81
            self, nearest_neighbour_id: int, base_address: int,
82
            reader: BinaryIO, n_bytes: int):
83
        """
84
        :param int nearest_neighbour_id:
85
        :param int base_address:
86
        :param reader:
87
        :type reader: ~io.RawIOBase or ~io.BufferedIOBase
88
        :param int n_bytes:
89
        """
90
        self._start_flood_fill(n_bytes, nearest_neighbour_id)
×
91

92
        with self._collect_responses():
×
93
            offset = base_address
×
94
            block_no = 0
×
95
            while n_bytes > 0:
×
96
                bytes_to_send = min((int(n_bytes), UDP_MESSAGE_MAX_SIZE))
×
97
                data_array = reader.read(bytes_to_send)
×
98

99
                self._send_request(FloodFillData(
×
100
                    nearest_neighbour_id, block_no, offset,
101
                    data_array, 0, len(data_array)))
102

103
                block_no += 1
×
104
                n_bytes -= bytes_to_send
×
105
                offset += bytes_to_send
×
106

107
        self._end_flood_fill(nearest_neighbour_id)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc