• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / sPyNNaker / 8143408510

04 Mar 2024 04:14PM UTC coverage: 69.531% (+1.5%) from 67.984%
8143408510

push

github

web-flow
Merge pull request #1437 from SpiNNakerManchester/pylint_default

Pylint default

7334 of 10026 branches covered (73.15%)

Branch coverage included in aggregate %.

352 of 398 new or added lines in 130 files covered. (88.44%)

105 existing lines in 35 files now uncovered.

12791 of 18918 relevant lines covered (67.61%)

0.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.85
/spynnaker/pyNN/models/spike_source/spike_source_array_vertex.py
1
# Copyright (c) 2017 The University of Manchester
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     https://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
from __future__ import annotations
1✔
15
from collections import Counter
1✔
16
import logging
1✔
17
from typing import (
1✔
18
    Collection, List, Optional, Sequence, Tuple, Union, TYPE_CHECKING)
19

20
import numpy
1✔
21
from numpy.typing import ArrayLike, NDArray
1✔
22
from typing_extensions import TypeAlias, TypeGuard
1✔
23

24
from pyNN.space import Grid2D, Grid3D, BaseStructure
1✔
25

26
from spinn_utilities.log import FormatAdapter
1✔
27
from spinn_utilities.overrides import overrides
1✔
28
from spinn_utilities.config_holder import get_config_int
1✔
29
from spinn_utilities.ranged.abstract_sized import Selector
1✔
30

31
from pacman.model.graphs.common import Slice
1✔
32
from pacman.model.partitioner_splitters import AbstractSplitterCommon
1✔
33
from pacman.model.resources import AbstractSDRAM
1✔
34

35
from spinn_front_end_common.utility_models import ReverseIpTagMultiCastSource
1✔
36

37
from spynnaker.pyNN.data import SpynnakerDataView
1✔
38
from spynnaker.pyNN.models.abstract_models import SupportsStructure
1✔
39
from spynnaker.pyNN.models.common import (
1✔
40
    ParameterHolder, PopulationApplicationVertex)
41
from spynnaker.pyNN.models.common.types import (Names, Spikes)
1✔
42
from spynnaker.pyNN.utilities import constants
1✔
43
from spynnaker.pyNN.utilities.buffer_data_type import BufferDataType
1✔
44
from spynnaker.pyNN.utilities.ranged import SpynnakerRangedList
1✔
45

46
from .spike_source_array_machine_vertex import SpikeSourceArrayMachineVertex
1✔
47

48
if TYPE_CHECKING:
49
    from .spike_source_array import SpikeSourceArray
50

51
logger = FormatAdapter(logging.getLogger(__name__))
1✔
52

53
# Cut off to warn too many spikes sent at one time
54
TOO_MANY_SPIKES = 100
1✔
55

56
_Number: TypeAlias = Union[int, float]
1✔
57

58
_SingleList: TypeAlias = Union[
1✔
59
    Sequence[_Number], NDArray[numpy.integer]]
60
_DoubleList: TypeAlias = Union[
1✔
61
    Sequence[Sequence[_Number]], NDArray[numpy.integer]]
62

63

64
def _is_double_list(value: Spikes) -> TypeGuard[_DoubleList]:
1✔
65
    return not isinstance(value, (float, int)) and bool(len(value)) and \
1✔
66
        hasattr(value[0], "__len__")
67

68

69
def _is_single_list(value: Spikes) -> TypeGuard[_SingleList]:
1✔
70
    # USE _is_double_list first!
71
    return not isinstance(value, (float, int)) and bool(len(value))
1✔
72

73

74
def _is_singleton(value: Spikes) -> TypeGuard[_Number]:
1✔
75
    return isinstance(value, (float, int))
1✔
76

77

78
def _as_numpy_ticks(
1✔
79
        times: ArrayLike, time_step: float) -> NDArray[numpy.int64]:
80
    return numpy.ceil(
1✔
81
        numpy.floor(numpy.array(times) * 1000.0) / time_step).astype("int64")
82

83

84
def _send_buffer_times(
1✔
85
        spike_times: Spikes, time_step: float) -> Union[
86
            NDArray[numpy.int64], List[NDArray[numpy.int64]]]:
87
    # Convert to ticks
88
    if _is_double_list(spike_times):
1✔
89
        return [_as_numpy_ticks(times, time_step) for times in spike_times]
1!
90
    elif _is_single_list(spike_times):
1✔
91
        return _as_numpy_ticks(spike_times, time_step)
1✔
92
    elif _is_singleton(spike_times):
1!
93
        return _as_numpy_ticks([spike_times], time_step)
×
94
    else:
95
        return []
1✔
96

97

98
class SpikeSourceArrayVertex(
1✔
99
        ReverseIpTagMultiCastSource, PopulationApplicationVertex,
100
        SupportsStructure):
101
    """
102
    Model for play back of spikes.
103
    """
104
    __slots__ = (
1✔
105
        "__model_name",
106
        "__model",
107
        "__structure",
108
        "_spike_times",
109
        "__n_colour_bits")
110

111
    #: ID of the recording region used for recording transmitted spikes.
112
    SPIKE_RECORDING_REGION_ID = 0
1✔
113

114
    def __init__(
1✔
115
            self, n_neurons: int, spike_times: Spikes, label: str,
116
            max_atoms_per_core: Union[int, Tuple[int, ...]],
117
            model: SpikeSourceArray,
118
            splitter: Optional[AbstractSplitterCommon],
119
            n_colour_bits: Optional[int]):
120
        # pylint: disable=too-many-arguments
121
        self.__model_name = "SpikeSourceArray"
1✔
122
        self.__model = model
1✔
123
        self.__structure: Optional[BaseStructure] = None
1✔
124

125
        if spike_times is None:
1✔
126
            spike_times = []
1✔
127
        self._spike_times = SpynnakerRangedList(
1✔
128
            n_neurons, spike_times,
129
            use_list_as_value=not _is_double_list(spike_times))
130

131
        time_step = SpynnakerDataView.get_simulation_time_step_us()
1✔
132

133
        super().__init__(
1✔
134
            n_keys=n_neurons, label=label,
135
            max_atoms_per_core=max_atoms_per_core,
136
            send_buffer_times=_send_buffer_times(spike_times, time_step),
137
            send_buffer_partition_id=constants.SPIKE_PARTITION_ID,
138
            splitter=splitter)
139

140
        self._check_spike_density(spike_times)
1✔
141
        # Do colouring
142
        if n_colour_bits is None:
1!
143
            self.__n_colour_bits = get_config_int(
1✔
144
                "Simulation", "n_colour_bits")
145
        else:
146
            self.__n_colour_bits = n_colour_bits
×
147

148
    @overrides(ReverseIpTagMultiCastSource.create_machine_vertex)
1✔
149
    def create_machine_vertex(
1✔
150
            self, vertex_slice: Slice, sdram: AbstractSDRAM,
151
            label: Optional[str] = None) -> SpikeSourceArrayMachineVertex:
152
        send_buffer_times = self._filtered_send_buffer_times(vertex_slice)
1✔
153
        machine_vertex = SpikeSourceArrayMachineVertex(
1✔
154
            label=label, app_vertex=self, vertex_slice=vertex_slice,
155
            eieio_params=self._eieio_params,
156
            send_buffer_times=send_buffer_times)
157
        machine_vertex.enable_recording(self._is_recording)
1✔
158
        # Known issue with ReverseIPTagMulticastSourceMachineVertex
159
        if sdram:
1!
160
            assert sdram == machine_vertex.sdram_required
1✔
161
        return machine_vertex
1✔
162

163
    def _check_spike_density(self, spike_times: Spikes):
1✔
164
        if _is_double_list(spike_times):
1✔
165
            self._check_density_double_list(spike_times)
1✔
166
        elif _is_single_list(spike_times):
1✔
167
            self._check_density_single_list(spike_times)
1✔
168
        elif _is_singleton(spike_times):
1!
169
            pass
×
170
        else:
171
            logger.warning("SpikeSourceArray has no spike times")
1✔
172

173
    def _check_density_single_list(self, spike_times: _SingleList):
1✔
174
        counter = Counter(spike_times)
1✔
175
        top = counter.most_common(1)
1✔
176
        val, count = top[0]
1✔
177
        if count * self.n_atoms > TOO_MANY_SPIKES:
1✔
178
            if self.n_atoms > 1:
1✔
179
                logger.warning(
1✔
180
                    "Danger of SpikeSourceArray sending too many spikes "
181
                    "at the same time. "
182
                    "This is because ({}) neurons share the same spike list",
183
                    self.n_atoms)
184
            else:
185
                logger.warning(
1✔
186
                    "Danger of SpikeSourceArray sending too many spikes "
187
                    "at the same time. "
188
                    "For example at time {}, {} spikes will be sent",
189
                    val, count * self.n_atoms)
190

191
    def _check_density_double_list(self, spike_times: _DoubleList):
1✔
192
        counter: Counter = Counter()
1✔
193
        for neuron_id in range(0, self.n_atoms):
1✔
194
            counter.update(spike_times[neuron_id])
1✔
195
        top = counter.most_common(1)
1✔
196
        val, count = top[0]
1✔
197
        if count > TOO_MANY_SPIKES:
1✔
198
            logger.warning(
1✔
199
                "Danger of SpikeSourceArray sending too many spikes "
200
                "at the same time. "
201
                "For example at time {}, {} spikes will be sent",
202
                val, count)
203

204
    @overrides(SupportsStructure.set_structure)
1✔
205
    def set_structure(self, structure: BaseStructure):
1✔
206
        self.__structure = structure
1✔
207

208
    @property
1✔
209
    @overrides(ReverseIpTagMultiCastSource.atoms_shape)
1✔
210
    def atoms_shape(self) -> Tuple[int, ...]:
1✔
211
        if isinstance(self.__structure, (Grid2D, Grid3D)):
1!
212
            return self.__structure.calculate_size(self.n_atoms)
×
213
        return super().atoms_shape
1✔
214

215
    def _to_early_spikes_single_list(self, spike_times: _SingleList):
1✔
216
        """
217
        Checks if there is one or more spike_times before the current time.
218

219
        Logs a warning for the first one found
220

221
        :param list(int) spike_times:
222
        """
223
        current_time = SpynnakerDataView.get_current_run_time_ms()
1✔
224
        for spike_time in spike_times:
1✔
225
            if spike_time < current_time:
1!
UNCOV
226
                logger.warning(
×
227
                    "SpikeSourceArray {} has spike_times that are lower than "
228
                    "the current time {} For example {} - "
229
                    "these will be ignored.",
230
                    self, current_time, float(spike_time))
231
                return
×
232

233
    def _check_spikes_double_list(self, spike_times: _DoubleList):
1✔
234
        """
235
        Checks if there is one or more spike_times before the current time.
236

237
        Logs a warning for the first one found
238

239
        :param iterable(int) spike_times:
240
        """
241
        current_time = SpynnakerDataView.get_current_run_time_ms()
×
242
        for neuron_id in range(0, self.n_atoms):
×
243
            id_times = spike_times[neuron_id]
×
NEW
244
            for id_time in id_times:
×
NEW
245
                if id_time < current_time:
×
UNCOV
246
                    logger.warning(
×
247
                        "SpikeSourceArray {} has spike_times that are lower "
248
                        "than the current time {} For example {} - "
249
                        "these will be ignored.",
250
                        self, current_time, float(id_time))
251
                    return
×
252

253
    def __set_spike_buffer_times(self, spike_times: Spikes):
1✔
254
        """
255
        Set the spike source array's buffer spike times.
256
        """
257
        time_step = SpynnakerDataView.get_simulation_time_step_us()
1✔
258
        # warn the user if they are asking for a spike time out of range
259
        if _is_double_list(spike_times):
1!
260
            self._check_spikes_double_list(spike_times)
×
261
        elif _is_single_list(spike_times):
1✔
262
            self._to_early_spikes_single_list(spike_times)
1✔
263
        elif _is_singleton(spike_times):
1!
264
            self._to_early_spikes_single_list([spike_times])
×
265
        else:
266
            # in case of empty list do not check
267
            pass
1✔
268
        self.send_buffer_times = _send_buffer_times(spike_times, time_step)
1✔
269
        self._check_spike_density(spike_times)
1✔
270

271
    def __read_parameter(self, name: str, selector: Selector):
1✔
272
        # pylint: disable=unused-argument
273
        # This can only be spike times
274
        return self._spike_times.get_values(selector)
1✔
275

276
    @overrides(PopulationApplicationVertex.get_parameter_values)
1✔
277
    def get_parameter_values(
1✔
278
            self, names: Names, selector: Selector = None) -> ParameterHolder:
279
        self._check_parameters(names, {"spike_times"})
1✔
280
        return ParameterHolder(names, self.__read_parameter, selector)
1✔
281

282
    @overrides(PopulationApplicationVertex.set_parameter_values)
1✔
283
    def set_parameter_values(
1✔
284
            self, name: str, value: Spikes, selector: Selector = None):
285
        self._check_parameters(name, {"spike_times"})
1✔
286
        self.__set_spike_buffer_times(value)
1✔
287
        self._spike_times.set_value_by_selector(
1✔
288
            selector, value, use_list_as_value=not _is_double_list(value))
289

290
    @overrides(PopulationApplicationVertex.get_parameters)
1✔
291
    def get_parameters(self) -> List[str]:
1✔
292
        return ["spike_times"]
×
293

294
    @overrides(PopulationApplicationVertex.get_units)
1✔
295
    def get_units(self, name: str) -> str:
1✔
296
        if name == "spikes":
×
297
            return ""
×
298
        if name == "spike_times":
×
299
            return "ms"
×
300
        raise KeyError(f"Units for {name} unknown")
×
301

302
    @overrides(PopulationApplicationVertex.get_recordable_variables)
1✔
303
    def get_recordable_variables(self) -> List[str]:
1✔
304
        return ["spikes"]
1✔
305

306
    @overrides(PopulationApplicationVertex.get_buffer_data_type)
1✔
307
    def get_buffer_data_type(self, name: str) -> BufferDataType:
1✔
308
        if name == "spikes":
×
309
            return BufferDataType.EIEIO_SPIKES
×
310
        raise KeyError(f"Cannot record {name}")
×
311

312
    @overrides(PopulationApplicationVertex.get_neurons_recording)
1✔
313
    def get_neurons_recording(
1✔
314
            self, name: str, vertex_slice: Slice) -> NDArray[numpy.integer]:
315
        if name != "spikes":
×
316
            raise KeyError(f"Cannot record {name}")
×
317
        return vertex_slice.get_raster_ids()
×
318

319
    @overrides(PopulationApplicationVertex.set_recording)
1✔
320
    def set_recording(
1✔
321
            self, name: str, sampling_interval: Optional[float] = None,
322
            indices: Optional[Collection[int]] = None):
323
        if name != "spikes":
1✔
324
            raise KeyError(f"Cannot record {name}")
1✔
325
        if sampling_interval is not None:
1✔
326
            logger.warning("Sampling interval currently not supported for "
1✔
327
                           "SpikeSourceArray so being ignored")
328
        if indices is not None:
1✔
329
            logger.warning("Indices currently not supported for "
1✔
330
                           "SpikeSourceArray so being ignored")
331
        self.enable_recording(True)
1✔
332
        SpynnakerDataView.set_requires_mapping()
1✔
333

334
    @overrides(PopulationApplicationVertex.set_not_recording)
1✔
335
    def set_not_recording(
1✔
336
            self, name: str, indices: Optional[Collection[int]] = None):
337
        if name != "spikes":
×
338
            raise KeyError(f"Cannot record {name}")
×
339
        if indices is not None:
×
340
            logger.warning("Indices currently not supported for "
×
341
                           "SpikeSourceArray so being ignored")
342
        self.enable_recording(False)
×
343

344
    @overrides(PopulationApplicationVertex.get_recording_variables)
1✔
345
    def get_recording_variables(self) -> List[str]:
1✔
346
        if self._is_recording:
1✔
347
            return ["spikes"]
1✔
348
        return []
1✔
349

350
    @overrides(PopulationApplicationVertex.get_sampling_interval_ms)
1✔
351
    def get_sampling_interval_ms(self, name: str) -> float:
1✔
352
        if name != "spikes":
×
353
            raise KeyError(f"Cannot record {name}")
×
354
        return SpynnakerDataView.get_simulation_time_step_us()
×
355

356
    @overrides(PopulationApplicationVertex.get_recording_region)
1✔
357
    def get_recording_region(self, name: str) -> int:
1✔
358
        if name != "spikes":
×
359
            raise KeyError(f"Cannot record {name}")
×
360
        return self.SPIKE_RECORDING_REGION_ID
×
361

362
    @overrides(PopulationApplicationVertex.get_data_type)
1✔
363
    def get_data_type(self, name: str) -> None:
1✔
364
        if name != "spikes":
×
365
            raise KeyError(f"Cannot record {name}")
×
366
        return None
×
367

368
    def describe(self):
1✔
369
        """
370
        Returns a human-readable description of the cell or synapse type.
371

372
        The output may be customised by specifying a different template
373
        together with an associated template engine
374
        (see :py:mod:`pyNN.descriptions`).
375

376
        If template is `None`, then a dictionary containing the template
377
        context will be returned.
378
        """
379
        return {
×
380
            "name": self.__model_name,
381
            "default_parameters": self.__model.default_parameters,
382
            "default_initial_values": self.__model.default_parameters,
383
            "parameters": self.get_parameter_values(
384
                self.__model.default_parameters),
385
        }
386

387
    @property
1✔
388
    @overrides(PopulationApplicationVertex.n_colour_bits)
1✔
389
    def n_colour_bits(self) -> int:
1✔
390
        return self.__n_colour_bits
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc