• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / SpiNNMan / 6574804013

19 Oct 2023 12:47PM UTC coverage: 51.937% (+1.2%) from 50.777%
6574804013

Pull #327

github

Christian-B
typing changes
Pull Request #327: Type Annotations and Checking

105 of 1288 branches covered (0.0%)

Branch coverage included in aggregate %.

2375 of 2375 new or added lines in 180 files covered. (100.0%)

4775 of 8108 relevant lines covered (58.89%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.0
/spinnman/processes/get_heap_process.py
1
# Copyright (c) 2016 The University of Manchester
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     https://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

15
import functools
1✔
16
import struct
1✔
17
from typing import Callable, List, Sequence
1✔
18
from spinn_utilities.typing.coords import XY, XYP
1✔
19
from spinnman.processes import AbstractMultiConnectionProcess
1✔
20
from spinnman.constants import SYSTEM_VARIABLE_BASE_ADDRESS
1✔
21
from spinnman.model import HeapElement
1✔
22
from spinnman.messages.spinnaker_boot import SystemVariableDefinition
1✔
23
from spinnman.messages.scp.impl.read_memory import ReadMemory, Response
1✔
24
from .abstract_multi_connection_process_connection_selector import (
1✔
25
    ConnectionSelector)
26

27

28
HEAP_ADDRESS = SystemVariableDefinition.sdram_heap_address
1✔
29
_ADDRESS = struct.Struct("<I")
1✔
30
_HEAP_POINTER = struct.Struct("<4xI")
1✔
31
_ELEMENT_HEADER = struct.Struct("<II")
1✔
32

33

34
class GetHeapProcess(AbstractMultiConnectionProcess[Response]):
1✔
35
    __slots__ = (
1✔
36
        "_blocks",
37
        "_heap_address",
38
        "_next_block_address")
39

40
    def __init__(self, connection_selector: ConnectionSelector):
1✔
41
        """
42
        :param ConnectionSelector connection_selector:
43
        """
44
        super().__init__(connection_selector)
×
45

46
        self._heap_address = 0
×
47
        self._next_block_address = 0
×
48
        self._blocks: List[HeapElement] = list()
×
49

50
    def _read_heap_address_response(self, response: Response):
1✔
51
        self._heap_address = _ADDRESS.unpack_from(
×
52
            response.data, response.offset)[0]
53

54
    def _read_heap_pointer(self, response: Response):
1✔
55
        self._next_block_address = _HEAP_POINTER.unpack_from(
×
56
            response.data, response.offset)[0]
57

58
    def _read_next_block(self, block_address: int, response: Response):
1✔
59
        self._next_block_address, free = _ELEMENT_HEADER.unpack_from(
×
60
            response.data, response.offset)
61
        if self._next_block_address != 0:
×
62
            self._blocks.append(HeapElement(
×
63
                block_address, self._next_block_address, free))
64

65
    def __read_address(
1✔
66
            self, core_coords: XYP, address: int, size: int,
67
            callback: Callable[[Response], None]):
68
        with self._collect_responses():
×
69
            self._send_request(
×
70
                ReadMemory(core_coords, address, size), callback)
71

72
    def get_heap(self, chip_coords: XY,
1✔
73
                 pointer: SystemVariableDefinition = HEAP_ADDRESS
74
                 ) -> Sequence[HeapElement]:
75
        """
76
        :param tuple(int,int) chip_coords: x, y
77
        :param SystemVariableDefinition pointer:
78
        :rtype: list(HeapElement)
79
        """
80
        x, y = chip_coords
×
81
        core_coords = (x, y, 0)
×
82
        self.__read_address(
×
83
            core_coords, SYSTEM_VARIABLE_BASE_ADDRESS + pointer.offset,
84
            pointer.data_type.value, self._read_heap_address_response)
85

86
        self.__read_address(
×
87
            core_coords, self._heap_address, 8, self._read_heap_pointer)
88

89
        while self._next_block_address != 0:
×
90
            self.__read_address(
×
91
                core_coords, self._next_block_address, 8,
92
                functools.partial(
93
                    self._read_next_block, self._next_block_address))
94

95
        return self._blocks
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc