• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / SpiNNMan / 6574804013

19 Oct 2023 12:47PM UTC coverage: 51.937% (+1.2%) from 50.777%
6574804013

Pull #327

github

Christian-B
typing changes
Pull Request #327: Type Annotations and Checking

105 of 1288 branches covered (0.0%)

Branch coverage included in aggregate %.

2375 of 2375 new or added lines in 180 files covered. (100.0%)

4775 of 8108 relevant lines covered (58.89%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

36.54
/spinnman/model/diagnostic_filter.py
1
# Copyright (c) 2015 The University of Manchester
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     https://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
from __future__ import annotations
1✔
15
from enum import Enum
1✔
16
from typing import List, Type, TypeVar
1✔
17
from spinnman.model.enums import (
1✔
18
    DiagnosticFilterDestination, DiagnosticFilterSource,
19
    DiagnosticFilterPayloadStatus, DiagnosticFilterDefaultRoutingStatus,
20
    DiagnosticFilterEmergencyRoutingStatus, DiagnosticFilterPacketType)
21
#: :meta private:
22
E = TypeVar("E", bound=Enum)
1✔
23

24
# Bit offsets of the various fields in the filter word
25
_PACKET_TYPE_OFFSET = 0
1✔
26
_EMERGENCY_ROUTE_OFFSET = 4
1✔
27
_EMERGENCY_ROUTE_MODE_OFFSET = 8
1✔
28
_DEFAULT_ROUTE_OFFSET = 10
1✔
29
_PAYLOAD_OFFSET = 12
1✔
30
_SOURCE_OFFSET = 14
1✔
31
_DESTINATION_OFFSET = 16
1✔
32
_ENABLE_INTERRUPT_OFFSET = 30
1✔
33

34

35
# Uses an enum to set flags in the filter word from a given offset
36
def _set_flags_in_word(
1✔
37
        word: int, enum_list: List[E], enum_type: Type[E],
38
        offset: int) -> int:
39
    if enum_list is None:
×
40
        enum_values: List[E] = list()
×
41
    else:
42
        enum_values = list(enum_list)
×
43
    if not enum_values:
×
44
        enum_values = [value for value in enum_type]
×
45
    for enum_value in enum_values:
×
46
        word |= 1 << (enum_value.value + offset)
×
47
    return word
×
48

49

50
# Uses an enum to read flags in the filter word from a given offset
51
def _read_flags_from_word(
1✔
52
        word: int, enum_list: Type[E], offset: int) -> List[E]:
53
    flags: List[E] = list()
×
54
    for enum_value in enum_list:
×
55
        if word & 1 << (enum_value.value + offset) != 0:
×
56
            flags.append(enum_value)
×
57
    return flags
×
58

59

60
class DiagnosticFilter(object):
1✔
61
    """
62
    A router diagnostic counter filter, which counts packets passing
63
    through the router with certain properties.  The counter will be
64
    incremented so long as the packet matches one of the values in each
65
    field i.e. one of each of the destinations, sources, payload_statuses,
66
    default_routing_statuses, emergency_routing_statuses and packet_types.
67
    """
68
    __slots__ = [
1✔
69
        "_default_routing_statuses",
70
        "_destinations",
71
        "_emergency_routing_statuses",
72
        "_enable_interrupt_on_counter_event",
73
        "_match_emergency_routing_status_to_incoming_packet",
74
        "_packet_types",
75
        "_payload_statuses",
76
        "_sources"]
77

78
    def __init__(self, enable_interrupt_on_counter_event: bool,
1✔
79
                 match_emergency_routing_status_to_incoming_packet: bool,
80
                 destinations: List[DiagnosticFilterDestination],
81
                 sources: List[DiagnosticFilterSource],
82
                 payload_statuses: List[DiagnosticFilterPayloadStatus],
83
                 default_routing_statuses: List[
84
                     DiagnosticFilterDefaultRoutingStatus],
85
                 emergency_routing_statuses: List[
86
                     DiagnosticFilterEmergencyRoutingStatus],
87
                 packet_types: List[DiagnosticFilterPacketType]):
88
        """
89
        :param bool enable_interrupt_on_counter_event: Indicates whether
90
            an interrupt should be raised when this rule matches
91
        :param bool match_emergency_routing_status_to_incoming_packet:
92
            Indicates whether the emergency routing statuses should be matched
93
            against packets arriving at this router (if True), or if they
94
            should be matched against packets leaving this router (if False)
95
        :param list(DiagnosticFilterDestination) destinations:
96
            Increment the counter if one or more of the given destinations
97
            match
98
        :param list(DiagnosticFilterSource) sources:
99
            Increment the counter if one or more of the given sources match
100
            (or `None` or empty list to match all)
101
        :param list(DiagnosticFilterPayloadStatus) payload_statuses:
102
            Increment the counter if one or more of the given payload statuses
103
            match (or `None` or empty list to match all)
104
        :param default_routing_statuses:
105
            Increment the counter if one or more of the given default routing
106
            statuses match (or `None` or empty list to match all)
107
        :type default_routing_statuses:
108
            list(DiagnosticFilterDefaultRoutingStatus)
109
        :param emergency_routing_statuses:
110
            Increment the counter if one or more of the given emergency routing
111
            statuses match (or `None` or empty list to match all)
112
        :type emergency_routing_statuses:
113
            list(DiagnosticFilterEmergencyRoutingStatus)
114
        :param list(DiagnosticFilterPacketType) packet_types:
115
            Increment the counter if one or more of the given packet types
116
            match (or `None` or empty list to match all)
117
        """
118
        # pylint: disable=too-many-arguments
119
        self._enable_interrupt_on_counter_event = \
×
120
            enable_interrupt_on_counter_event
121
        self._match_emergency_routing_status_to_incoming_packet = \
×
122
            match_emergency_routing_status_to_incoming_packet
123
        self._destinations = destinations
×
124
        self._sources = sources
×
125
        self._payload_statuses = payload_statuses
×
126
        self._default_routing_statuses = default_routing_statuses
×
127
        self._emergency_routing_statuses = emergency_routing_statuses
×
128
        self._packet_types = packet_types
×
129

130
    @property
1✔
131
    def enable_interrupt_on_counter_event(self) -> bool:
1✔
132
        return self._enable_interrupt_on_counter_event
×
133

134
    @property
1✔
135
    def match_emergency_routing_status_to_incoming_packet(self) -> bool:
1✔
136
        return self._match_emergency_routing_status_to_incoming_packet
×
137

138
    @property
1✔
139
    def destinations(self) -> List[DiagnosticFilterDestination]:
1✔
140
        return self._destinations
×
141

142
    @property
1✔
143
    def sources(self) -> List[DiagnosticFilterSource]:
1✔
144
        return self._sources
×
145

146
    @property
1✔
147
    def payload_statuses(self) -> List[DiagnosticFilterPayloadStatus]:
1✔
148
        return self._payload_statuses
×
149

150
    @property
1✔
151
    def default_routing_statuses(self) -> List[
1✔
152
            DiagnosticFilterDefaultRoutingStatus]:
153
        return self._default_routing_statuses
×
154

155
    @property
1✔
156
    def emergency_routing_statuses(self) -> List[
1✔
157
            DiagnosticFilterEmergencyRoutingStatus]:
158
        return self._emergency_routing_statuses
×
159

160
    @property
1✔
161
    def packet_types(self) -> List[DiagnosticFilterPacketType]:
1✔
162
        return self._packet_types
×
163

164
    @property
1✔
165
    def filter_word(self) -> int:
1✔
166
        """
167
        A word of data that can be written to the router to set up the filter.
168
        """
169
        data = 0
×
170
        if self._enable_interrupt_on_counter_event:
×
171
            data |= 1 << _ENABLE_INTERRUPT_OFFSET
×
172
        if not self._match_emergency_routing_status_to_incoming_packet:
×
173
            data |= 1 << _EMERGENCY_ROUTE_MODE_OFFSET
×
174
        data = _set_flags_in_word(data, self._destinations,
×
175
                                  DiagnosticFilterDestination,
176
                                  _DESTINATION_OFFSET)
177
        data = _set_flags_in_word(data, self._sources,
×
178
                                  DiagnosticFilterSource,
179
                                  _SOURCE_OFFSET)
180
        data = _set_flags_in_word(data, self._payload_statuses,
×
181
                                  DiagnosticFilterPayloadStatus,
182
                                  _PAYLOAD_OFFSET)
183
        data = _set_flags_in_word(data, self._default_routing_statuses,
×
184
                                  DiagnosticFilterDefaultRoutingStatus,
185
                                  _DEFAULT_ROUTE_OFFSET)
186
        data = _set_flags_in_word(data, self._emergency_routing_statuses,
×
187
                                  DiagnosticFilterEmergencyRoutingStatus,
188
                                  _EMERGENCY_ROUTE_OFFSET)
189
        data = _set_flags_in_word(data, self._packet_types,
×
190
                                  DiagnosticFilterPacketType,
191
                                  _PACKET_TYPE_OFFSET)
192

193
        return data
×
194

195
    @staticmethod
1✔
196
    def read_from_int(int_value: int) -> DiagnosticFilter:
1✔
197
        enable_interrupt_on_counter_event = (
×
198
            (int_value >> _ENABLE_INTERRUPT_OFFSET) & 0x1) == 1
199
        match_emergency_routing_status_to_incoming_packet = (
×
200
            (int_value >> _EMERGENCY_ROUTE_MODE_OFFSET) & 0x1) == 0
201
        destinations = _read_flags_from_word(
×
202
            int_value, DiagnosticFilterDestination, _DESTINATION_OFFSET)
203
        sources = _read_flags_from_word(
×
204
            int_value, DiagnosticFilterSource, _SOURCE_OFFSET)
205
        payload_statuses = _read_flags_from_word(
×
206
            int_value, DiagnosticFilterPayloadStatus, _PAYLOAD_OFFSET)
207
        default_routing_statuses = _read_flags_from_word(
×
208
            int_value, DiagnosticFilterDefaultRoutingStatus,
209
            _DEFAULT_ROUTE_OFFSET)
210
        emergency_routing_statuses = _read_flags_from_word(
×
211
            int_value, DiagnosticFilterEmergencyRoutingStatus,
212
            _EMERGENCY_ROUTE_OFFSET)
213
        packet_types = _read_flags_from_word(
×
214
            int_value, DiagnosticFilterPacketType, _PACKET_TYPE_OFFSET)
215

216
        return DiagnosticFilter(
×
217
            enable_interrupt_on_counter_event,
218
            match_emergency_routing_status_to_incoming_packet,
219
            destinations, sources, payload_statuses, default_routing_statuses,
220
            emergency_routing_statuses, packet_types)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc