• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / PACMAN / 8143395689

04 Mar 2024 04:13PM UTC coverage: 86.121%. Remained the same
8143395689

push

github

web-flow
Merge pull request #541 from SpiNNakerManchester/pylint_default

Pylint default

2688 of 3185 branches covered (84.4%)

Branch coverage included in aggregate %.

39 of 42 new or added lines in 32 files covered. (92.86%)

1 existing line in 1 file now uncovered.

5168 of 5937 relevant lines covered (87.05%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.67
/pacman/utilities/algorithm_utilities/routing_algorithm_utilities.py
1
# Copyright (c) 2021 The University of Manchester
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     https://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

15
from collections import deque, defaultdict
1✔
16
import heapq
1✔
17
import itertools
1✔
18
from typing import Deque, Dict, Iterable, List, Optional, Set, Tuple
1✔
19

20
from spinn_utilities.typing.coords import XY
1✔
21
from spinn_machine import Machine, Chip
1✔
22

23
from pacman.data import PacmanDataView
1✔
24
from pacman.exceptions import MachineHasDisconnectedSubRegion
1✔
25
from pacman.model.routing_table_by_partition import (
1✔
26
    MulticastRoutingTableByPartitionEntry,
27
    MulticastRoutingTableByPartition)
28
from pacman.model.graphs import AbstractVertex, AbstractVirtual
1✔
29
from pacman.model.graphs.application import (
1✔
30
    ApplicationEdgePartition, ApplicationVertex)
31
from pacman.model.graphs.machine import MachineVertex
1✔
32

33
from .routing_tree import RoutingTree
1✔
34

35

36
def get_app_partitions() -> List[ApplicationEdgePartition]:
1✔
37
    """
38
    Find all application partitions.
39

40
    .. note::
41
        Where a vertex splitter indicates that it has internal
42
        partitions but is not the source of an external partition, a "fake"
43
        empty application partition is added.  This allows the calling
44
        algorithm to loop over the returned list and look at the set of
45
        edges *and* internal partitions to get a complete picture of *all*
46
        targets for each source machine vertex at once.
47

48
    :return: list of partitions
49

50
        .. note::
51
            Where there are only internal multicast partitions, the partition
52
            will have no edges.  Caller should use
53
            `vertex.splitter.get_internal_multicast_partitions` for details.
54
    :rtype: list(ApplicationEdgePartition)
55
    """
56
    # Find all partitions that need to be dealt with
57
    # Make a copy which we can edit
58
    partitions = list(PacmanDataView.iterate_partitions())
1✔
59
    sources = frozenset(p.pre_vertex for p in partitions)
1✔
60

61
    # Convert internal partitions to self-connected partitions
62
    for v in PacmanDataView.iterate_vertices():
1✔
63
        if not isinstance(v, ApplicationVertex) or not v.splitter:
1!
64
            continue
×
65
        internal_partitions = v.splitter.get_internal_multicast_partitions()
1✔
66
        if v not in sources and internal_partitions:
1✔
67
            # guarantee order
68
            for identifier in dict.fromkeys(
1✔
69
                    p.identifier for p in internal_partitions):
70
                # Add a partition with no edges to identify this as internal
71
                partitions.append(ApplicationEdgePartition(identifier, v))
1✔
72
    return partitions
1✔
73

74

75
def route_has_dead_links(root: RoutingTree) -> bool:
1✔
76
    """
77
    Quickly determine if a route uses any dead links.
78

79
    :param RoutingTree root:
80
        The root of the RoutingTree which contains nothing but RoutingTrees
81
        (i.e. no vertices and links).
82
    :return: True if the route uses any dead/missing links, False otherwise.
83
    :rtype: bool
84
    """
85
    for _, (x, y), routes in root.traverse():
1✔
86
        chip = PacmanDataView.get_chip_at(x, y)
1✔
87
        for route in routes:
1✔
88
            if chip is None:
1!
89
                return True
×
90
            if not chip.router.is_link(route):
1!
91
                return True
×
92
    return False
1✔
93

94

95
def avoid_dead_links(root: RoutingTree) -> RoutingTree:
1✔
96
    """
97
    Modify a RoutingTree to route-around dead links in a Machine.
98

99
    Uses A* to reconnect disconnected branches of the tree (due to dead links
100
    in the machine).
101

102
    :param RoutingTree root:
103
        The root of the RoutingTree which contains nothing but RoutingTrees
104
        (i.e. no vertices and links).
105
    :return:
106
        A new RoutingTree is produced rooted as before.
107
    :rtype: RoutingTree
108
    """
109
    # Make a copy of the RoutingTree with all broken parts disconnected
110
    root, lookup, broken_links = _copy_and_disconnect_tree(root)
×
111

112
    # For each disconnected subtree, use A* to connect the tree to *any* other
113
    # disconnected subtree. Note that this process will eventually result in
114
    # all disconnected subtrees being connected, the result is a fully
115
    # connected tree.
116
    for parent, child in broken_links:
×
117
        child_chips = frozenset(
×
118
            c.chip for c in lookup[child] if isinstance(c, RoutingTree))
119

120
        # Try to reconnect broken links to any other part of the tree
121
        # (excluding this broken subtree itself since that would create a
122
        # cycle).
123
        path = a_star(child, parent, set(lookup).difference(child_chips))
×
124

125
        # Add new RoutingTree nodes to reconnect the child to the tree.
126
        last_node = lookup[path[0][1]]
×
127
        last_direction = path[0][0]
×
128
        for direction, (x, y) in path[1:]:
×
129
            if (x, y) not in child_chips:
×
130
                # This path segment traverses new ground so we must create a
131
                # new RoutingTree for the segment.
132
                new_node = RoutingTree((x, y))
×
133
                # A* will not traverse anything but chips in this tree so this
134
                # assert is a sanity check that this occurred correctly.
135
                assert (x, y) not in lookup, "Cycle created."
×
136
                lookup[(x, y)] = new_node
×
137
            else:
138
                # This path segment overlaps part of the disconnected tree
139
                # (A* doesn't know where the disconnected tree is and thus
140
                # doesn't avoid it). To prevent cycles being introduced, this
141
                # overlapped node is severed from its parent and merged as part
142
                # of the A* path.
143
                new_node = lookup[(x, y)]
×
144

145
                # Find the node's current parent and disconnect it.
146
                for node in lookup[child]:  # pragma: no branch
×
147
                    if not isinstance(node, RoutingTree):
×
148
                        continue
×
149
                    dn = [(d, n) for d, n in node.children if n == new_node]
×
150
                    assert len(dn) <= 1
×
151
                    if dn:
×
152
                        node.remove_child(dn[0])
×
153
                        # A node can only have one parent so we can stop now.
154
                        break
×
155
            last_node.append_child((last_direction, new_node))
×
156
            last_node = new_node
×
157
            last_direction = direction
×
158
        last_node.append_child((last_direction, lookup[child]))
×
159

160
    return root
×
161

162

163
def _copy_and_disconnect_tree(root: RoutingTree) -> Tuple[
1✔
164
        RoutingTree, Dict[XY, RoutingTree], Set[Tuple[XY, XY]]]:
165
    """
166
    Copy a RoutingTree (containing nothing but RoutingTrees), disconnecting
167
    nodes which are not connected in the machine.
168

169
    .. note::
170
        If a dead chip is part of the input RoutingTree, no corresponding node
171
        will be included in the copy. The assumption behind this is that the
172
        only reason a tree would visit a dead chip is because a route passed
173
        through the chip and wasn't actually destined to arrive at that chip.
174
        This situation is impossible to confirm since the input routing trees
175
        have not yet been populated with vertices. The caller is responsible
176
        for being sensible.
177

178
    :param RoutingTree root:
179
        The root of the RoutingTree that contains nothing but RoutingTrees
180
        (i.e. no children which are vertices or links).
181
    :return: (root, lookup, broken_links)
182
        Where:
183
        * `root` is the new root of the tree
184
          :py:class:`~.RoutingTree`
185
        * `lookup` is a dict {(x, y):
186
          :py:class:`~.RoutingTree`, ...}
187
        * `broken_links` is a set ([(parent, child), ...]) containing all
188
          disconnected parent and child (x, y) pairs due to broken links.
189
    :rtype: tuple(RoutingTree, dict(tuple(int,int),RoutingTree),
190
        set(tuple(tuple(int,int),tuple(int,int))))
191
    """
192
    new_root: Optional[RoutingTree] = None
×
193

194
    # Lookup for copied routing tree {(x, y): RoutingTree, ...}
195
    new_lookup: Dict[XY, RoutingTree] = {}
×
196

197
    # List of missing connections in the copied routing tree [(new_parent,
198
    # new_child), ...]
199
    broken_links: Set[Tuple[XY, XY]] = set()
×
200

201
    # A queue [(new_parent, direction, old_node), ...]
202
    to_visit: Deque[
×
203
        Tuple[Optional[RoutingTree], Optional[int], RoutingTree]] = deque(
204
            ((None, None, root), ))
205
    machine = PacmanDataView.get_machine()
×
206
    while to_visit:
×
207
        new_parent, direction, old_node = to_visit.popleft()
×
208
        if machine.is_chip_at(old_node.chip[0], old_node.chip[1]):
×
209
            # Create a copy of the node
210
            new_node = RoutingTree(old_node.chip)
×
211
            new_lookup[new_node.chip] = new_node
×
212
        else:
213
            # This chip is dead, move all its children into the parent node
214
            assert new_parent is not None, \
×
215
                "Net cannot be sourced from a dead chip."
216
            new_node = new_parent
×
217

218
        if new_parent is None:
×
219
            # This is the root node
220
            new_root = new_node
×
221
        elif new_node is not new_parent:
×
222
            assert direction is not None
×
223
            # If this node is not dead, check connectivity to parent
224
            # node (no reason to check connectivity between a dead node
225
            # and its parent).
226
            if _is_linked(new_parent.chip, new_node.chip, direction, machine):
×
227
                # Is connected via working link
228
                new_parent.append_child((direction, new_node))
×
229
            else:
230
                # Link to parent is dead (or original parent was dead and
231
                # the new parent is not adjacent)
232
                broken_links.add((new_parent.chip, new_node.chip))
×
233

234
        # Copy children
235
        for child_direction, child in old_node.children:
×
236
            if isinstance(child, RoutingTree):
×
237
                to_visit.append((new_node, child_direction, child))
×
238

239
    assert new_root is not None
×
240
    return new_root, new_lookup, broken_links
×
241

242

243
def a_star(sink: XY, heuristic_source: XY,
1✔
244
           sources: Set[XY]) -> List[Tuple[int, XY]]:
245
    """
246
    Use A* to find a path from any of the sources to the sink.
247

248
    .. note::
249
        The heuristic means that the search will proceed towards
250
        heuristic_source without any concern for any other sources. This means
251
        that the algorithm may miss a very close neighbour in order to pursue
252
        its goal of reaching heuristic_source. This is not considered a problem
253
        since 1) the heuristic source will typically be in the direction of the
254
        rest of the tree and near by and often the closest entity 2) it
255
        prevents us accidentally forming loops in the rest of the tree since
256
        we'll stop as soon as we touch any part of it.
257

258
    :param tuple(int,int) sink: (x, y)
259
    :param tuple(int,int) heuristic_source: (x, y)
260
        An element from `sources` which is used as a guiding heuristic for the
261
        A* algorithm.
262
    :param set(tuple(int,int)) sources: set([(x, y), ...])
263
    :return: [(int, (x, y)), ...]
264
        A path starting with a coordinate in `sources` and terminating at
265
        connected neighbour of `sink` (i.e. the path does not include `sink`).
266
        The direction given is the link down which to proceed from the given
267
        (x, y) to arrive at the next point in the path.
268
    :rtype: list(tuple(int,tuple(int,int)))
269
    """
270
    machine = PacmanDataView.get_machine()
×
271
    # Select the heuristic function to use for distances
272
    # pylint: disable=unnecessary-lambda-assignment
UNCOV
273
    heuristic = (lambda the_node: machine.get_vector_length(
×
274
        the_node, heuristic_source))
275

276
    # A dictionary {node: (direction, previous_node}. An entry indicates that
277
    # 1) the node has been visited and 2) which node we hopped from (and the
278
    # direction used) to reach previous_node.  This is a dummy value if the
279
    # node is the sink.
280
    visited: Dict[XY, Tuple[int, XY]] = {sink: (-1, (-1, -1))}
×
281

282
    # The node which the tree will be reconnected to
283
    selected_source = None
×
284

285
    # A heap (accessed via heapq) of (distance, (x, y)) where distance is the
286
    # distance between (x, y) and heuristic_source and (x, y) is a node to
287
    # explore.
288
    to_visit = [(heuristic(sink), sink)]
×
289
    while to_visit:
×
290
        _, node = heapq.heappop(to_visit)
×
291

292
        # Terminate if we've found the destination
293
        if node in sources:
×
294
            selected_source = node
×
295
            break
×
296

297
        # Try all neighbouring locations.
298
        for neighbour_link in range(6):  # Router.MAX_LINKS_PER_ROUTER
×
299
            # Note: link identifiers are from the perspective of the neighbour,
300
            # not the current node!
301
            neighbour = machine.xy_over_link(
×
302
                #                  Same as Router.opposite
303
                node[0], node[1], (neighbour_link + 3) % 6)
304

305
            # Skip links which are broken
306
            if not machine.is_link_at(
×
307
                    neighbour[0], neighbour[1], neighbour_link):
308
                continue
×
309

310
            # Skip neighbours who have already been visited
311
            if neighbour in visited:
×
312
                continue
×
313

314
            # Explore all other neighbours
315
            visited[neighbour] = (neighbour_link, node)
×
316
            heapq.heappush(to_visit, (heuristic(neighbour), neighbour))
×
317

318
    # Fail of no paths exist
319
    if selected_source is None:
×
320
        raise MachineHasDisconnectedSubRegion(
×
321
            f"Could not find path from {sink} to {heuristic_source}")
322

323
    # Reconstruct the discovered path, starting from the source we found and
324
    # working back until the sink.
325
    path = [(visited[selected_source][0], selected_source)]
×
326
    while visited[path[-1][1]][1] != sink:
×
327
        node = visited[path[-1][1]][1]
×
328
        direction = visited[node][0]
×
329
        path.append((direction, node))
×
330

331
    return path
×
332

333

334
def _is_linked(
1✔
335
        source: XY, target: XY, direction: int, machine: Machine) -> bool:
336
    """
337
    :param tuple(int,int) source:
338
    :param tuple(int,int) target:
339
    :param int direction:
340
    :param ~spinn_machine.Machine machine:
341
    :rtype: bool
342
    """
343
    s_chip = machine.get_chip_at(source[0], source[1])
×
344
    if s_chip is None:
×
345
        return False
×
346
    link = s_chip.router.get_link(direction)
×
347
    if link is None:
×
348
        return False
×
349
    if link.destination_x != target[0]:
×
350
        return False
×
351
    if link.destination_y != target[1]:
×
352
        return False
×
353
    return True
×
354

355

356
def convert_a_route(
1✔
357
        routing_tables: MulticastRoutingTableByPartition,
358
        source_vertex: AbstractVertex, partition_id: str,
359
        incoming_processor: Optional[int], incoming_link: Optional[int],
360
        route_tree: RoutingTree,
361
        targets_by_chip: Dict[XY, Tuple[Set[int], Set[int]]]):
362
    """
363
    Converts the algorithm specific partition_route back to standard SpiNNaker
364
    and adds it to the `routing_tables`.
365

366
    :param MulticastRoutingTableByPartition routing_tables:
367
        SpiNNaker-format routing tables
368
    :param AbstractSingleSourcePartition partition:
369
        Partition this route applies to
370
    :param incoming_processor: processor this link came from
371
    :type incoming_processor: int or None
372
    :param incoming_link: link this link came from
373
    :type incoming_link: int or None
374
    :param RoutingTree route_tree: algorithm specific format of the route
375
    :param dict((int,int),(list(int),list(int))) targets_by_chip:
376
        Target cores and links of things on the route that are final end points
377
    """
378
    x, y = route_tree.chip
1✔
379

380
    next_hops: List[Tuple[RoutingTree, int]] = list()
1✔
381
    processor_ids: List[int] = list()
1✔
382
    link_ids: List[int] = list()
1✔
383
    for (route, next_hop) in route_tree.children:
1✔
384
        if route is not None:
1!
385
            link_ids.append(route)
1✔
386
            next_incoming_link = (route + 3) % 6
1✔
387
        if isinstance(next_hop, RoutingTree):
1!
388
            next_hops.append((next_hop, next_incoming_link))
1✔
389
    if (x, y) in targets_by_chip:
1✔
390
        cores, links = targets_by_chip[x, y]
1✔
391
        processor_ids.extend(cores)
1✔
392
        link_ids.extend(links)
1✔
393

394
    entry = MulticastRoutingTableByPartitionEntry(
1✔
395
        link_ids, processor_ids, incoming_processor, incoming_link)
396
    routing_tables.add_path_entry(entry, x, y, source_vertex, partition_id)
1✔
397

398
    for next_hop, next_incoming_link in next_hops:
1✔
399
        convert_a_route(
1✔
400
            routing_tables, source_vertex, partition_id, None,
401
            next_incoming_link, next_hop, targets_by_chip)
402

403

404
def longest_dimension_first(
1✔
405
        vector: Tuple[int, int, int], start: XY) -> List[Tuple[int, XY]]:
406
    """
407
    List the (x, y) steps on a longest-dimension first route.
408

409
    :param tuple(int,int,int) vector: (x, y, z)
410
        The vector which the path should cover.
411
    :param tuple(int,int) start: (x, y)
412
        The coordinates from which the path should start.
413

414
        .. note::
415
            This is a 2D coordinate.
416
    :return: min route
417
    :rtype: list(tuple(int,tuple(int, int)))
418
    """
419
    return vector_to_nodes(
1✔
420
        sorted(enumerate(vector), key=(lambda x: abs(x[1])), reverse=True),
421
        start)
422

423

424
def least_busy_dimension_first(
1✔
425
        traffic: Dict[XY, int], vector: Tuple[int, int, int],
426
        start: XY) -> List[Tuple[int, XY]]:
427
    """
428
    List the (x, y) steps on a route that goes through the least busy
429
    routes first.
430

431
    :param traffic: A dictionary of (x, y): count of routes
432
    :type traffic: dict(tuple(int,int), int)
433
    :param vector: (x, y, z)
434
        The vector which the path should cover.
435
    :type vector: tuple(int, int, int)
436
    :param start: (x, y)
437
        The coordinates from which the path should start.
438

439
        .. note::
440
            This is a 2D coordinate.
441
    :type start: tuple(int, int)
442
    :return: min route
443
    :rtype: list(tuple(int,tuple(int, int)))
444
    """
445

446
    # Go through and find the sum of traffic depending on the route taken
447
    min_sum = 0
1✔
448
    min_route = None
1✔
449
    for order in itertools.permutations([0, 1, 2]):
1✔
450
        dm_vector = [(i, vector[i]) for i in order]
1!
451
        route = vector_to_nodes(dm_vector, start)
1✔
452
        sum_traffic = sum(traffic[x, y] for _, (x, y) in route)
1✔
453
        if min_route is None or min_sum > sum_traffic:
1✔
454
            min_sum = sum_traffic
1✔
455
            min_route = route
1✔
456

457
    assert min_route is not None
1✔
458
    for _, (x, y) in min_route:
1✔
459
        traffic[x, y] += 1
1✔
460

461
    return min_route
1✔
462

463

464
def vector_to_nodes(dm_vector: List[XY], start: XY) -> List[Tuple[int, XY]]:
1✔
465
    """
466
    Convert a vector to a set of nodes.
467

468
    :param list(tuple(int,int)) dm_vector:
469
        A vector made up of a list of (dimension, magnitude), where dimensions
470
        are x=0, y=1, z=diagonal=2
471
    :param tuple(int,int) start: The x, y coordinates of the start
472
    :return: A list of (link_id, (target_x, target_y)) of nodes on a route
473
    :rtype: list(tuple(int,tuple(int, int)))
474
    """
475
    machine = PacmanDataView.get_machine()
1✔
476
    x, y = start
1✔
477

478
    out = []
1✔
479

480
    for dimension, magnitude in dm_vector:
1✔
481
        if magnitude == 0:
1✔
482
            continue
1✔
483

484
        if dimension == 0:  # x
1✔
485
            if magnitude > 0:
1✔
486
                # Move East (0) magnitude times
487
                for _ in range(magnitude):
1✔
488
                    x, y = machine.xy_over_link(x, y, 0)
1✔
489
                    out.append((0, (x, y)))
1✔
490
            else:
491
                # Move West (3) -magnitude times
492
                for _ in range(magnitude, 0):
1✔
493
                    x, y = machine.xy_over_link(x, y, 3)
1✔
494
                    out.append((3, (x, y)))
1✔
495
        elif dimension == 1:  # y
1✔
496
            if magnitude > 0:
1✔
497
                # Move North (2) magnitude times
498
                for _ in range(magnitude):
1✔
499
                    x, y = machine.xy_over_link(x, y, 2)
1✔
500
                    out.append((2, (x, y)))
1✔
501
            else:
502
                # Move South (5) -magnitude times
503
                for _ in range(magnitude, 0):
1✔
504
                    x, y = machine.xy_over_link(x, y, 5)
1✔
505
                    out.append((5, (x, y)))
1✔
506
        else:  # z
507
            if magnitude > 0:
1✔
508
                # Move SouthWest (4) magnitude times
509
                for _ in range(magnitude):
1✔
510
                    x, y = machine.xy_over_link(x, y, 4)
1✔
511
                    out.append((4, (x, y)))
1✔
512
            else:
513
                # Move NorthEast (1) -magnitude times
514
                for _ in range(magnitude, 0):
1✔
515
                    x, y = machine.xy_over_link(x, y, 1)
1✔
516
                    out.append((1, (x, y)))
1✔
517
    return out
1✔
518

519

520
def nodes_to_trees(
1✔
521
        nodes: List[Tuple[int, XY]], start: XY, route: Dict[XY, RoutingTree]):
522
    """
523
    Convert a list of nodes into routing trees, adding them to existing routes.
524

525
    :param list(tuple(int,tuple(int,int))) nodes:
526
        The list of (link_id, (target_x, target_y)) nodes on the route
527
    :param tuple(int,int) start: The start of the route
528
    :param dict(tuple(int,int),RoutingTree) route:
529
        Existing routing trees, with key (x, y) coordinates of the chip of the
530
        routes.
531
    """
532
    last_node = route.get(start)
1✔
533
    if last_node is None:
1!
534
        last_node = RoutingTree(start)
×
535
        route[start] = last_node
×
536
    for direction, (x, y) in nodes:
1✔
537
        this_node = RoutingTree((x, y))
1✔
538
        route[(x, y)] = this_node
1✔
539

540
        last_node.append_child((direction, this_node))
1✔
541
        last_node = this_node
1✔
542

543

544
def most_direct_route(source: XY, dest: XY, machine: Machine) -> RoutingTree:
1✔
545
    """
546
    Find the most direct route from source to target on the machine.
547

548
    :param tuple(int,int) source: The source x, y coordinates
549
    :param tuple(int,int) dest: The destination x, y coordinates
550
    :param ~spinn_machine.Machine machine: The machine on which to route
551
    :rtype: RoutingTree
552
    """
553
    vector = machine.get_vector(source, dest)
×
554
    nodes = longest_dimension_first(vector, source)
×
555
    route: Dict[XY, RoutingTree] = dict()
×
556
    nodes_to_trees(nodes, source, route)
×
557
    root = route[source]
×
558
    if route_has_dead_links(root):
×
559
        root = avoid_dead_links(root)
×
560
    return root
×
561

562

563
def get_targets_by_chip(
1✔
564
        vertices: Iterable[MachineVertex]) -> Dict[
565
            XY, Tuple[Set[int], Set[int]]]:
566
    """
567
    Get the target links and cores on the relevant chips.
568

569
    :param list(MachineVertex) vertices: The vertices to target
570
    :return: A dict of (x, y) to target (cores, links)
571
    :rtype: dict(tuple(int, int), tuple(set(int), set(int)))
572
    """
573
    by_chip: Dict[XY, Tuple[Set[int], Set[int]]] = defaultdict(
1✔
574
        lambda: (set(), set()))
575
    for vertex in vertices:
1✔
576
        coords = vertex_xy(vertex)
1✔
577
        if isinstance(vertex, AbstractVirtual):
1✔
578
            # Sinks with route-to-endpoint restriction must be routed
579
            # in the according directions.
580
            link = route_to_endpoint(vertex)
1✔
581
            by_chip[coords][1].add(link)
1✔
582
        else:
583
            core = PacmanDataView.get_placement_of_vertex(vertex).p
1✔
584
            by_chip[coords][0].add(core)
1✔
585
    return by_chip
1✔
586

587

588
def vertex_xy(vertex: MachineVertex) -> XY:
1✔
589
    """
590
    :param MachineVertex vertex:
591
    :rtype: tuple(int,int)
592
    """
593
    if not isinstance(vertex, AbstractVirtual):
1✔
594
        placement = PacmanDataView.get_placement_of_vertex(vertex)
1✔
595
        return placement.x, placement.y
1✔
596
    link_data = vertex.get_link_data(PacmanDataView.get_machine())
1✔
597
    return link_data.connected_chip_x, link_data.connected_chip_y
1✔
598

599

600
def vertex_chip(vertex: MachineVertex) -> Chip:
1✔
601
    """
602
    :param MachineVertex vertex:
603
    :rtype: ~spinn_machine.Chip
604
    """
605
    machine = PacmanDataView.get_machine()
×
606
    if not isinstance(vertex, AbstractVirtual):
×
607
        placement = PacmanDataView.get_placement_of_vertex(vertex)
×
608
        return machine[placement.x, placement.y]
×
609
    link_data = vertex.get_link_data(machine)
×
610
    return machine[link_data.connected_chip_x, link_data.connected_chip_y]
×
611

612

613
def vertex_xy_and_route(vertex: MachineVertex) -> Tuple[
1✔
614
        XY, Tuple[MachineVertex, Optional[int], Optional[int]]]:
615
    """
616
    Get the non-virtual chip coordinates, the vertex, and processor or
617
    link to follow to get to the vertex.
618

619
    :param MachineVertex vertex:
620
    :return: the (x,y) coordinates of the target vertex mapped to a tuple of
621
        the vertex, core and link.
622
        One of core or link is provided the other is `None`
623
    :rtype: tuple(tuple(int, int), tuple(MachineVertex, int, None)) or
624
        tuple(tuple(int, int), tuple(MachineVertex, None, int))
625
    """
626
    if not isinstance(vertex, AbstractVirtual):
1✔
627
        placement = PacmanDataView.get_placement_of_vertex(vertex)
1✔
628
        return (placement.x, placement.y), (vertex, placement.p, None)
1✔
629
    machine = PacmanDataView.get_machine()
1✔
630
    link_data = vertex.get_link_data(machine)
1✔
631
    return ((link_data.connected_chip_x, link_data.connected_chip_y),
1✔
632
            (vertex, None, link_data.connected_link))
633

634

635
def route_to_endpoint(vertex: AbstractVirtual) -> int:
1✔
636
    """
637
    :param MachineVertex vertex:
638
    :rtype: int
639
    """
640
    machine = PacmanDataView.get_machine()
1✔
641
    link_data = vertex.get_link_data(machine)
1✔
642
    return link_data.connected_link
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc