• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / SpiNNMan / 6574804013

19 Oct 2023 12:47PM UTC coverage: 51.937% (+1.2%) from 50.777%
6574804013

Pull #327

github

Christian-B
typing changes
Pull Request #327: Type Annotations and Checking

105 of 1288 branches covered (0.0%)

Branch coverage included in aggregate %.

2375 of 2375 new or added lines in 180 files covered. (100.0%)

4775 of 8108 relevant lines covered (58.89%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

31.51
/spinnman/extended/extended_transceiver.py
1
# Copyright (c) 2014 The University of Manchester
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     https://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

15
# pylint: disable=too-many-arguments
16
from contextlib import contextmanager
1✔
17
import io
1✔
18
import os
1✔
19
import logging
1✔
20
import random
1✔
21
import struct
1✔
22
import time
1✔
23
from spinn_utilities.abstract_base import AbstractBase
1✔
24
from spinn_utilities.log import FormatAdapter
1✔
25
from spinn_utilities.logger_utils import warn_once
1✔
26
from spinn_utilities.require_subclass import require_subclass
1✔
27
from spinn_machine import CoreSubsets
1✔
28
from spinnman.constants import (
1✔
29
    ROUTER_REGISTER_BASE_ADDRESS, ROUTER_FILTER_CONTROLS_OFFSET,
30
    ROUTER_DIAGNOSTIC_FILTER_SIZE)
31
from spinnman.exceptions import SpinnmanException
1✔
32
from spinnman.extended import (
1✔
33
    BMPSetLed, DeAllocSDRAMProcess, ReadADC, SetLED, WriteMemoryFloodProcess)
34
from spinnman.model import DiagnosticFilter
1✔
35
from spinnman.model.enums import CPUState
1✔
36
from spinnman.messages.scp.enums import Signal
1✔
37
from spinnman.messages.scp.impl import (
1✔
38
    ReadMemory, ApplicationRun)
39
from spinnman.connections.udp_packet_connections import SCAMPConnection
1✔
40
from spinnman.constants import SYSTEM_VARIABLE_BASE_ADDRESS
1✔
41
from spinnman.data import SpiNNManDataView
1✔
42
from spinnman.messages.spinnaker_boot import SystemVariableDefinition
1✔
43
from spinnman.processes import (
1✔
44
    GetHeapProcess, ReadMemoryProcess, SendSingleCommandProcess,
45
    WriteMemoryProcess)
46
from spinnman.transceiver.extendable_transceiver import ExtendableTransceiver
1✔
47

48
_ONE_BYTE = struct.Struct("B")
1✔
49

50
logger = FormatAdapter(logging.getLogger(__name__))
1✔
51

52

53
@require_subclass(ExtendableTransceiver)
1✔
54
class ExtendedTransceiver(object, metaclass=AbstractBase):
1✔
55
    """
56
    Allows a Transceiver to support extra method not currently needed.
57

58
    All methods here are in danger of being removed if they become too hard
59
    to support and many are untested so use at your own risk.
60
    It is undetermined if these will work with Spin2 boards.
61

62
    If any method here is considered important to keep please move it to
63
    Transceiver and its implementations
64
    """
65
    __slots__ = ()
1✔
66

67
    # calls many methods only reachable do to require_subclass
68
    # pylint: disable=no-member,assigning-non-slot
69
    # pylint: disable=access-member-before-definition
70
    # pylint: disable=attribute-defined-outside-init
71

72
    def send_scp_message(self, message, connection=None):
1✔
73
        """
74
        Sends an SCP message, without expecting a response.
75

76
        :param message: The message to send
77
        :type message:
78
            spinnman.messages.scp.abstract_messages.AbstractSCPRequest
79
        :param SCAMPConnection connection:
80
            The connection to use (omit to pick a random one)
81
        :raise SpinnmanTimeoutException:
82
            If there is a timeout before a message is received
83
        :raise SpinnmanInvalidParameterException:
84
            If one of the fields of the received message is invalid
85
        :raise SpinnmanInvalidPacketException:
86
            * If the message is not a recognised packet type
87
            * If a packet is received that is not a valid response
88
        :raise SpinnmanUnsupportedOperationException:
89
            If no connection can send the type of message given
90
        :raise SpinnmanIOException:
91
            If there is an error sending the message or receiving the response
92
        :raise SpinnmanUnexpectedResponseCodeException:
93
            If the response is not one of the expected codes
94
        """
95
        if connection is None:
×
96
            connection = self.scamp_connections[random.randint(
×
97
                0, len(self.scamp_connections) - 1)]
98
        connection.send_scp_request(message)
×
99

100
    def is_connected(self, connection=None):
1✔
101
        """
102
        Determines if the board can be contacted via SCAMP
103

104
        :param Connection connection:
105
            The connection which is to be tested.  If `None`,
106
            all Scamp connections will be tested,
107
            and the board will be considered
108
            to be connected if any one connection works.
109
        :return: True if the board can be contacted, False otherwise
110
        :rtype: bool
111
        """
112
        if connection is not None:
×
113
            return connection.is_connected()
×
114
        return any(c.is_connected() and isinstance(c, SCAMPConnection)
×
115
                   for c in self.scamp_connections)
116

117
    def get_iobuf_from_core(self, x, y, p):
1✔
118
        """
119
        Get the contents of IOBUF for a given core.
120

121
        .. warning::
122
            This method is currently deprecated and likely to be removed.
123

124
        :param int x: The x-coordinate of the chip containing the processor
125
        :param int y: The y-coordinate of the chip containing the processor
126
        :param int p: The ID of the processor to get the IOBUF for
127
        :return: An IOBUF buffer
128
        :rtype: IOBuffer
129
        :raise SpinnmanIOException:
130
            If there is an error communicating with the board
131
        :raise SpinnmanInvalidPacketException:
132
            If a packet is received that is not in the valid format
133
        :raise SpinnmanInvalidParameterException:
134
            * If chip_and_cores contains invalid items
135
            * If a packet is received that has invalid parameters
136
        :raise SpinnmanUnexpectedResponseCodeException:
137
            If a response indicates an error during the exchange
138
        """
139
        warn_once(logger, "The get_iobuf_from_core method is deprecated and "
×
140
                  "likely to be removed.")
141
        core_subsets = CoreSubsets()
×
142
        core_subsets.add_processor(x, y, p)
×
143
        return next(self.get_iobuf(core_subsets))
×
144

145
    @contextmanager
1✔
146
    def _chip_execute_lock(self, x, y):
1✔
147
        """
148
        Get a lock for executing an executable on a chip.
149

150
        .. warning::
151
            This method is currently deprecated and untested as there is no
152
            known use except for execute, which is itself deprecated.
153

154
        :param int x:
155
        :param int y:
156
        """
157
        # Check if there is a lock for the given chip
158
        with self._chip_execute_lock_condition:
×
159
            chip_lock = self._chip_execute_locks[x, y]
×
160
        # Acquire the lock for the chip
161
        chip_lock.acquire()
×
162

163
        # Increment the lock counter (used for the flood lock)
164
        with self._chip_execute_lock_condition:
×
165
            self._n_chip_execute_locks += 1
×
166

167
        try:
×
168
            yield chip_lock
×
169
        finally:
170
            with self._chip_execute_lock_condition:
×
171
                # Release the chip lock
172
                chip_lock.release()
×
173
                # Decrement the lock and notify
174
                self._n_chip_execute_locks -= 1
×
175
                self._chip_execute_lock_condition.notify_all()
×
176

177
    def execute(
1✔
178
            self, x, y, processors, executable, app_id, n_bytes=None,
179
            wait=False, is_filename=False):
180
        """
181
        Start an executable running on a single chip.
182

183
        .. warning::
184
            This method is currently deprecated and likely to be removed.
185

186
        :param int x:
187
            The x-coordinate of the chip on which to run the executable
188
        :param int y:
189
            The y-coordinate of the chip on which to run the executable
190
        :param list(int) processors:
191
            The cores on the chip on which to run the application
192
        :param executable:
193
            The data that is to be executed. Should be one of the following:
194

195
            * An instance of RawIOBase
196
            * A bytearray/bytes
197
            * A filename of a file containing the executable (in which case
198
              `is_filename` must be set to True)
199
        :type executable:
200
            ~io.RawIOBase or bytes or bytearray or str
201
        :param int app_id:
202
            The ID of the application with which to associate the executable
203
        :param int n_bytes:
204
            The size of the executable data in bytes. If not specified:
205

206
            * If executable is an RawIOBase, an error is raised
207
            * If executable is a bytearray, the length of the bytearray will
208
              be used
209
            * If executable is an int, 4 will be used
210
            * If executable is a str, the length of the file will be used
211
        :param bool wait:
212
            True if the binary should enter a "wait" state on loading
213
        :param bool is_filename: True if executable is a filename
214
        :raise SpinnmanIOException:
215
            * If there is an error communicating with the board
216
            * If there is an error reading the executable
217
        :raise SpinnmanInvalidPacketException:
218
            If a packet is received that is not in the valid format
219
        :raise SpinnmanInvalidParameterException:
220
            * If x, y, p does not lead to a valid core
221
            * If app_id is an invalid application ID
222
            * If a packet is received that has invalid parameters
223
        :raise SpinnmanUnexpectedResponseCodeException:
224
            If a response indicates an error during the exchange
225
        """
226
        warn_once(logger, "The Transceiver's execute method is deprecated "
×
227
                          "likely to be removed.")
228
        # Lock against updates
229
        with self._chip_execute_lock(x, y):
×
230
            # Write the executable
231
            self.write_memory(
×
232
                x, y, self._EXECUTABLE_ADDRESS, executable, n_bytes,
233
                is_filename=is_filename)
234

235
            # Request the start of the executable
236
            process = SendSingleCommandProcess(self._scamp_connection_selector)
×
237
            process.execute(ApplicationRun(app_id, x, y, processors, wait))
×
238

239
    def execute_application(self, executable_targets, app_id):
1✔
240
        """
241
        Execute a set of binaries that make up a complete application on
242
        specified cores, wait for them to be ready and then start all of the
243
        binaries.
244

245
        .. note::
246
            This will get the binaries into c_main but will not signal the
247
            barrier.
248

249
        :param ExecutableTargets executable_targets:
250
            The binaries to be executed and the cores to execute them on
251
        :param int app_id: The app_id to give this application
252
        """
253
        # Execute each of the binaries and get them in to a "wait" state
254
        for binary in executable_targets.binaries:
×
255
            core_subsets = executable_targets.get_cores_for_binary(binary)
×
256
            self.execute_flood(
×
257
                core_subsets, binary, app_id, wait=True, is_filename=True)
258

259
        # Sleep to allow cores to get going
260
        time.sleep(0.5)
×
261

262
        # Check that the binaries have reached a wait state
263
        count = self.get_core_state_count(app_id, CPUState.READY)
×
264
        if count < executable_targets.total_processors:
×
265
            cores_ready = self.get_cpu_infos(
×
266
                executable_targets.all_core_subsets, [CPUState.READY],
267
                include=False)
268
            if len(cores_ready) > 0:
×
269
                raise SpinnmanException(
×
270
                    f"Only {count} of {executable_targets.total_processors} "
271
                    "cores reached ready state: "
272
                    f"{cores_ready.get_status_string()}")
273

274
        # Send a signal telling the application to start
275
        self.send_signal(app_id, Signal.START)
×
276

277
    def set_led(self, led, action, board):
1✔
278
        """
279
        Set the LED state of a board in the machine.
280

281
        .. warning::
282
            This method is currently deprecated and untested as there is no
283
            known use. Same functionality provided by ybug and bmpc.
284
            Retained in case needed for hardware debugging.
285

286
        :param led:
287
            Number of the LED or an iterable of LEDs to set the state of (0-7)
288
        :type led: int or iterable(int)
289
        :param LEDAction action:
290
            State to set the LED to, either on, off or toggle
291
        :param board: Specifies the board to control the LEDs of. This may
292
            also be an iterable of multiple boards. The
293
            command will actually be sent to the first board in the iterable.
294
        :type board: int or iterable(int)
295
        """
296
        warn_once(logger, "The set_led method is deprecated and "
×
297
                  "untested due to no known use.")
298
        process = SendSingleCommandProcess(self.bmp_selector)
×
299
        process.execute(BMPSetLed(led, action, board))
×
300

301
    def read_adc_data(self, board):
1✔
302
        """
303
        Read the BMP ADC data.
304

305
        .. warning::
306
            This method is currently deprecated and untested as there is no
307
            known use. Same functionality provided by ybug and bmpc.
308
            Retained in case needed for hardware debugging.
309

310
        :param int board: which board to request the ADC data from
311
        :return: the FPGA's ADC data object
312
        :rtype: ADCInfo
313
        """
314
        warn_once(logger, "The read_adc_data method is deprecated and "
×
315
                  "untested due to no known use.")
316
        process = SendSingleCommandProcess(self.bmp_selector)
×
317
        response = process.execute(ReadADC(board))
×
318
        return response.adc_info  # pylint: disable=no-member
×
319

320
    def write_neighbour_memory(self, x, y, link, base_address, data,
1✔
321
                               n_bytes=None, offset=0, cpu=0):
322
        """
323
        Write to the memory of a neighbouring chip using a LINK_READ SCP
324
        command. If sent to a BMP, this command can be used to communicate
325
        with the FPGAs' debug registers.
326

327
        .. warning::
328
            This method is deprecated and untested due to no known use.
329

330
        :param int x:
331
            The x-coordinate of the chip whose neighbour is to be written to
332
        :param int y:
333
            The y-coordinate of the chip whose neighbour is to be written to
334
        :param int link:
335
            The link index to send the request to (or if BMP, the FPGA number)
336
        :param int base_address:
337
            The address in SDRAM where the region of memory is to be written
338
        :param data: The data to write.  Should be one of the following:
339

340
            * An instance of RawIOBase
341
            * A bytearray/bytes
342
            * A single integer; will be written in little-endian byte order
343
        :type data:
344
            ~io.RawIOBase or bytes or bytearray or int
345
        :param int n_bytes:
346
            The amount of data to be written in bytes.  If not specified:
347

348
            * If `data` is an RawIOBase, an error is raised
349
            * If `data` is a bytearray, the length of the bytearray will be
350
              used
351
            * If `data` is an int, 4 will be used
352
        :param int offset:
353
            The offset where the valid data starts (if `data` is
354
            an int then offset will be ignored and used 0)
355
        :param int cpu:
356
            The CPU to use, typically 0 (or if a BMP, the slot number)
357
        :raise SpinnmanIOException:
358
            * If there is an error communicating with the board
359
            * If there is an error reading the data
360
        :raise SpinnmanInvalidPacketException:
361
            If a packet is received that is not in the valid format
362
        :raise SpinnmanInvalidParameterException:
363
            * If `x, y` does not lead to a valid chip
364
            * If a packet is received that has invalid parameters
365
            * If `base_address` is not a positive integer
366
            * If `data` is an RawIOBase but `n_bytes` is not specified
367
            * If `data` is an int and `n_bytes` is more than 4
368
            * If `n_bytes` is less than 0
369
        :raise SpinnmanUnexpectedResponseCodeException:
370
            If a response indicates an error during the exchange
371
        """
372
        warn_once(logger, "The write_neighbour_memory method is deprecated "
×
373
                          "and untested due to no known use.")
374
        process = WriteMemoryProcess(self.scamp_connection_selector)
×
375
        if isinstance(data, io.RawIOBase):
×
376
            process.write_link_memory_from_reader(
×
377
                (x, y, cpu), link, base_address, data, n_bytes)
378
        elif isinstance(data, int):
×
379
            data_to_write = self._ONE_WORD.pack(data)
×
380
            process.write_link_memory_from_bytearray(
×
381
                (x, y, cpu), link, base_address, data_to_write, 0, 4)
382
        else:
383
            if n_bytes is None:
×
384
                n_bytes = len(data)
×
385
            process.write_link_memory_from_bytearray(
×
386
                (x, y, cpu), link, base_address, data, offset, n_bytes)
387

388
    def read_neighbour_memory(self, x, y, link, base_address, length, cpu=0):
1✔
389
        """
390
        Read some areas of memory on a neighbouring chip using a LINK_READ
391
        SCP command. If sent to a BMP, this command can be used to
392
        communicate with the FPGAs' debug registers.
393

394
        .. warning::
395
            This method is currently deprecated and untested as there is no
396
            known use. Same functionality provided by ybug and bmpc.
397
            Retained in case needed for hardware debugging.
398

399
        :param int x:
400
            The x-coordinate of the chip whose neighbour is to be read from
401
        :param int y:
402
            The y-coordinate of the chip whose neighbour is to be read from
403
        :param int cpu:
404
            The CPU to use, typically 0 (or if a BMP, the slot number)
405
        :param int link:
406
            The link index to send the request to (or if BMP, the FPGA number)
407
        :param int base_address:
408
            The address in SDRAM where the region of memory to be read starts
409
        :param int length: The length of the data to be read in bytes
410
        :return: An iterable of chunks of data read in order
411
        :rtype: bytes
412
        :raise SpinnmanIOException:
413
            If there is an error communicating with the board
414
        :raise SpinnmanInvalidPacketException:
415
            If a packet is received that is not in the valid format
416
        :raise SpinnmanInvalidParameterException:
417
            * If one of `x`, `y`, `cpu`, `base_address` or `length` is invalid
418
            * If a packet is received that has invalid parameters
419
        :raise SpinnmanUnexpectedResponseCodeException:
420
            If a response indicates an error during the exchange
421
        """
422
        try:
×
423
            warn_once(logger, "The read_neighbour_memory method is deprecated "
×
424
                      "and untested due to no known use.")
425
            process = ReadMemoryProcess(self.scamp_connection_selector)
×
426
            return process.read_link_memory(
×
427
                (x, y, cpu), link, base_address, length)
428
        except Exception:
×
429
            logger.info(self.where_is_xy(x, y))
×
430
            raise
×
431

432
    def _get_next_nearest_neighbour_id(self):
1✔
433
        with self._nearest_neighbour_lock:
×
434
            next_nearest_neighbour_id = (self._nearest_neighbour_id + 1) % 127
×
435
            self._nearest_neighbour_id = next_nearest_neighbour_id
×
436
        return next_nearest_neighbour_id
×
437

438
    def write_memory_flood(
1✔
439
            self, base_address, data, n_bytes=None, offset=0,
440
            is_filename=False):
441
        """
442
        Write to the SDRAM of all chips.
443

444
        :param int base_address:
445
            The address in SDRAM where the region of memory is to be written
446
        :param data:
447
            The data that is to be written.  Should be one of the following:
448

449
            * An instance of RawIOBase
450
            * A byte-string
451
            * A single integer
452
            * A file name of a file to read (in which case `is_filename`
453
              should be set to True)
454
        :type data:
455
            ~io.RawIOBase or bytes or bytearray or int or str
456
        :param int n_bytes:
457
            The amount of data to be written in bytes.  If not specified:
458

459
            * If `data` is an RawIOBase, an error is raised
460
            * If `data` is a bytearray or bytes, the length of the bytearray
461
              will be used
462
            * If `data` is an int, 4 will be used
463
            * If `data` is a str, the size of the file will be used
464
        :param int offset:
465
            The offset where the valid data starts; if `data` is
466
            an int, then the offset will be ignored and 0 is used.
467
        :param bool is_filename:
468
            True if `data` should be interpreted as a file name
469
        :raise SpinnmanIOException:
470
            * If there is an error communicating with the board
471
            * If there is an error reading the executable
472
        :raise SpinnmanInvalidPacketException:
473
            If a packet is received that is not in the valid format
474
        :raise SpinnmanInvalidParameterException:
475
            * If one of the specified chips is not valid
476
            * If `app_id` is an invalid application ID
477
            * If a packet is received that has invalid parameters
478
        :raise SpinnmanUnexpectedResponseCodeException:
479
            If a response indicates an error during the exchange
480
        """
481
        process = WriteMemoryFloodProcess(self._scamp_connection_selector)
×
482
        # Ensure only one flood fill occurs at any one time
483
        with self._flood_write_lock:
×
484
            # Start the flood fill
485
            nearest_neighbour_id = self._get_next_nearest_neighbour_id()
×
486
            if isinstance(data, io.RawIOBase):
×
487
                process.write_memory_from_reader(
×
488
                    nearest_neighbour_id, base_address, data, n_bytes)
489
            elif isinstance(data, str) and is_filename:
×
490
                if n_bytes is None:
×
491
                    n_bytes = os.stat(data).st_size
×
492
                with open(data, "rb") as reader:
×
493
                    process.write_memory_from_reader(
×
494
                        nearest_neighbour_id, base_address, reader, n_bytes)
495
            elif isinstance(data, int):
×
496
                data_to_write = self._ONE_WORD.pack(data)
×
497
                process.write_memory_from_bytearray(
×
498
                    nearest_neighbour_id, base_address, data_to_write, 0)
499
            else:
500
                if n_bytes is None:
×
501
                    n_bytes = len(data)
×
502
                process.write_memory_from_bytearray(
×
503
                    nearest_neighbour_id, base_address, data, offset, n_bytes)
504

505
    def set_leds(self, x, y, cpu, led_states):
1✔
506
        """
507
        Set LED states.
508

509
        .. warning::
510
            The set_leds is deprecated and untested due to no known use.
511

512
        :param int x: The x-coordinate of the chip on which to set the LEDs
513
        :param int y: The x-coordinate of the chip on which to set the LEDs
514
        :param int cpu: The CPU of the chip on which to set the LEDs
515
        :param dict(int,int) led_states:
516
            A dictionary mapping SetLED index to state with
517
            0 being off, 1 on and 2 inverted.
518
        :raise SpinnmanIOException:
519
            If there is an error communicating with the board
520
        :raise SpinnmanInvalidPacketException:
521
            If a packet is received that is not in the valid format
522
        :raise SpinnmanInvalidParameterException:
523
            If a packet is received that has invalid parameters
524
        :raise SpinnmanUnexpectedResponseCodeException:
525
            If a response indicates an error during the exchange
526
        """
527
        try:
×
528
            warn_once(logger, "The set_leds is deprecated and "
×
529
                      "untested due to no known use.")
530
            process = SendSingleCommandProcess(self._scamp_connection_selector)
×
531
            process.execute(SetLED(x, y, cpu, led_states))
×
532
        except Exception:
×
533
            logger.info(self.where_is_xy(x, y))
×
534
            raise
×
535

536
    def free_sdram(self, x, y, base_address):
1✔
537
        """
538
        Free allocated SDRAM.
539

540
        .. warning::
541
            This method is currently deprecated and likely to be removed.
542

543
        :param int x: The x-coordinate of the chip onto which to ask for memory
544
        :param int y: The y-coordinate of the chip onto which to ask for memory
545
        :param int base_address: The base address of the allocated memory
546
        """
547
        try:
×
548
            warn_once(logger, "The free_sdram method is deprecated and "
×
549
                      "likely to be removed.")
550
            process = DeAllocSDRAMProcess(self._scamp_connection_selector)
×
551
            process.de_alloc_sdram(x, y, base_address)
×
552
        except Exception:
×
553
            logger.info(self.where_is_xy(x, y))
×
554
            raise
×
555

556
    def free_sdram_by_app_id(self, x, y, app_id):
1✔
557
        """
558
        Free all SDRAM allocated to a given app ID.
559

560
        .. warning::
561
            This method is currently deprecated and untested as there is no
562
            known use. Same functionality provided by ybug and bmpc.
563
            Retained in case needed for hardware debugging.
564

565
        :param int x: The x-coordinate of the chip onto which to ask for memory
566
        :param int y: The y-coordinate of the chip onto which to ask for memory
567
        :param int app_id: The app ID of the allocated memory
568
        :return: The number of blocks freed
569
        :rtype: int
570
        """
571
        try:
×
572
            warn_once(logger, "The free_sdram_by_app_id method is deprecated "
×
573
                              "and untested due to no known use.")
574
            process = DeAllocSDRAMProcess(self._scamp_connection_selector)
×
575
            process.de_alloc_sdram(x, y, app_id)
×
576
            return process.no_blocks_freed
×
577
        except Exception:
×
578
            logger.info(self.where_is_xy(x, y))
×
579
            raise
×
580

581
    def get_router_diagnostic_filter(self, x, y, position):
1✔
582
        """
583
        Gets a router diagnostic filter from a router.
584

585
        :param int x:
586
            the X address of the router from which this filter is being
587
            retrieved
588
        :param int y:
589
            the Y address of the router from which this filter is being
590
            retrieved
591
        :param int position:
592
            the position in the list of filters to read the information from
593
        :return: The diagnostic filter read
594
        :rtype: DiagnosticFilter
595
        :raise SpinnmanIOException:
596
            * If there is an error communicating with the board
597
            * If there is an error reading the data
598
        :raise SpinnmanInvalidPacketException:
599
            If a packet is received that is not in the valid format
600
        :raise SpinnmanInvalidParameterException:
601
            * If x, y does not lead to a valid chip
602
            * If a packet is received that has invalid parameters
603
            * If position is less than 0 or more than 15
604
        :raise SpinnmanUnexpectedResponseCodeException:
605
            If a response indicates an error during the exchange
606
        """
607
        try:
×
608
            memory_position = (
×
609
                ROUTER_REGISTER_BASE_ADDRESS + ROUTER_FILTER_CONTROLS_OFFSET +
610
                position * ROUTER_DIAGNOSTIC_FILTER_SIZE)
611

612
            process = SendSingleCommandProcess(
×
613
                self.scamp_connection_selector)
614
            response = process.execute(
×
615
                ReadMemory((x, y, 0), memory_position, 4))
616
            return DiagnosticFilter.read_from_int(self._ONE_WORD.unpack_from(
×
617
                response.data, response.offset)[0])
618
            # pylint: disable=no-member
619
        except Exception:
×
620
            logger.info(self.where_is_xy(x, y))
×
621
            raise
×
622

623
    @property
1✔
624
    def number_of_boards_located(self):
1✔
625
        """
626
        The number of boards currently configured.
627

628
        .. warning::
629
            This property is currently deprecated and likely to be removed.
630

631
        :rtype: int
632
        """
633
        warn_once(logger, "The number_of_boards_located method is deprecated "
×
634
                          "and likely to be removed.")
635
        if self.bmp_connection is not None:
×
636
            return max(1, len(self.bmp_connection.boards))
×
637
        else:
638
            # if no BMPs are available, then there's still at least one board
639
            return 1
×
640

641
    def get_heap(self, x, y, heap=SystemVariableDefinition.sdram_heap_address):
1✔
642
        """
643
        Get the contents of the given heap on a given chip.
644

645
        :param int x: The x-coordinate of the chip
646
        :param int y: The y-coordinate of the chip
647
        :param SystemVariableDefinition heap:
648
            The SystemVariableDefinition which is the heap to read
649
        :rtype: list(HeapElement)
650
        """
651
        try:
×
652
            process = GetHeapProcess(self.scamp_connection_selector)
×
653
            return process.get_heap((x, y), heap)
×
654
        except Exception:
×
655
            logger.info(self.where_is_xy(x, y))
×
656
            raise
×
657

658
    def __set_watch_dog_on_chip(self, x, y, watch_dog):
1✔
659
        """
660
        Enable, disable or set the value of the watch dog timer on a
661
        specific chip.
662

663
        .. warning::
664
            This method is currently deprecated and untested as there is no
665
            known use. Same functionality provided by ybug and bmpc.
666
            Retained in case needed for hardware debugging.
667

668
        :param int x: chip X coordinate to write new watchdog parameter to
669
        :param int y: chip Y coordinate to write new watchdog parameter to
670
        :param watch_dog:
671
            Either a boolean indicating whether to enable (True) or
672
            disable (False) the watchdog timer, or an int value to set the
673
            timer count to
674
        :type watch_dog: bool or int
675
        """
676
        # build what we expect it to be
677
        warn_once(logger, "The set_watch_dog_on_chip method is deprecated "
1✔
678
                          "and untested due to no known use.")
679
        value_to_set = watch_dog
1✔
680
        watchdog = SystemVariableDefinition.software_watchdog_count
1✔
681
        if isinstance(watch_dog, bool):
1✔
682
            value_to_set = watchdog.default if watch_dog else 0
1✔
683

684
        # build data holder
685
        data = _ONE_BYTE.pack(value_to_set)
1✔
686

687
        # write data
688
        address = SYSTEM_VARIABLE_BASE_ADDRESS + watchdog.offset
1✔
689
        self.write_memory(x=x, y=y, base_address=address, data=data)
1✔
690

691
    def set_watch_dog(self, watch_dog):
1✔
692
        """
693
        Enable, disable or set the value of the watch dog timer.
694

695
        .. warning::
696
            This method is currently deprecated and untested as there is no
697
            known use. Same functionality provided by ybug and bmpc.
698
            Retained in case needed for hardware debugging.
699

700
        :param watch_dog:
701
            Either a boolean indicating whether to enable (True) or
702
            disable (False) the watch dog timer, or an int value to set the
703
            timer count to.
704
        :type watch_dog: bool or int
705
        """
706
        warn_once(logger, "The set_watch_dog method is deprecated and "
1✔
707
                          "untested due to no known use.")
708
        for x, y in SpiNNManDataView.get_machine().chip_coordinates:
1✔
709
            self.__set_watch_dog_on_chip(x, y, watch_dog)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc