• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / PACMAN / 6945509033

21 Nov 2023 02:51PM UTC coverage: 84.047% (+0.9%) from 83.103%
6945509033

push

github

Christian-B
remove unused import

1552 of 2025 branches covered (0.0%)

Branch coverage included in aggregate %.

1 of 1 new or added line in 1 file covered. (100.0%)

534 existing lines in 55 files now uncovered.

5065 of 5848 relevant lines covered (86.61%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.94
/pacman/utilities/algorithm_utilities/routing_algorithm_utilities.py
1
# Copyright (c) 2021 The University of Manchester
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     https://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
from collections import deque, defaultdict
1✔
15
import heapq
1✔
16
import itertools
1✔
17
from typing import Deque, Dict, Iterable, List, Optional, Set, Tuple
1✔
18
from spinn_utilities.typing.coords import XY
1✔
19
from .routing_tree import RoutingTree
1✔
20
from spinn_machine import Machine, Chip
1✔
21
from pacman.data import PacmanDataView
1✔
22
from pacman.exceptions import MachineHasDisconnectedSubRegion
1✔
23
from pacman.model.routing_table_by_partition import (
1✔
24
    MulticastRoutingTableByPartitionEntry,
25
    MulticastRoutingTableByPartition)
26
from pacman.model.graphs import AbstractVertex, AbstractVirtual
1✔
27
from pacman.model.graphs.application import (
1✔
28
    ApplicationEdgePartition, ApplicationVertex)
29
from pacman.model.graphs.machine import MachineVertex
1✔
30

31

32
def get_app_partitions() -> List[ApplicationEdgePartition]:
1✔
33
    """
34
    Find all application partitions.
35

36
    .. note::
37
        Where a vertex splitter indicates that it has internal
38
        partitions but is not the source of an external partition, a "fake"
39
        empty application partition is added.  This allows the calling
40
        algorithm to loop over the returned list and look at the set of
41
        edges *and* internal partitions to get a complete picture of *all*
42
        targets for each source machine vertex at once.
43

44
    :return: list of partitions
45

46
        .. note::
47
            Where there are only internal multicast partitions, the partition
48
            will have no edges.  Caller should use
49
            `vertex.splitter.get_internal_multicast_partitions` for details.
50
    :rtype: list(ApplicationEdgePartition)
51
    """
52
    # Find all partitions that need to be dealt with
53
    # Make a copy which we can edit
54
    partitions = list(PacmanDataView.iterate_partitions())
1✔
55
    sources = frozenset(p.pre_vertex for p in partitions)
1✔
56

57
    # Convert internal partitions to self-connected partitions
58
    for v in PacmanDataView.iterate_vertices():
1✔
59
        if not isinstance(v, ApplicationVertex) or not v.splitter:
1!
UNCOV
60
            continue
×
61
        internal_partitions = v.splitter.get_internal_multicast_partitions()
1✔
62
        if v not in sources and internal_partitions:
1✔
63
            # Use dict.fromkeys to guarantee order
64
            for identifier in dict.fromkeys(
1✔
65
                    p.identifier for p in internal_partitions):
66
                # Add a partition with no edges to identify this as internal
67
                partitions.append(ApplicationEdgePartition(identifier, v))
1✔
68
    return partitions
1✔
69

70

71
def route_has_dead_links(root: RoutingTree) -> bool:
1✔
72
    """
73
    Quickly determine if a route uses any dead links.
74

75
    :param RoutingTree root:
76
        The root of the RoutingTree which contains nothing but RoutingTrees
77
        (i.e. no vertices and links).
78
    :return: True if the route uses any dead/missing links, False otherwise.
79
    :rtype: bool
80
    """
81
    for _, (x, y), routes in root.traverse():
1✔
82
        chip = PacmanDataView.get_chip_at(x, y)
1✔
83
        for route in routes:
1✔
84
            if chip is None:
1!
UNCOV
85
                return True
×
86
            if not chip.router.is_link(route):
1!
UNCOV
87
                return True
×
88
    return False
1✔
89

90

91
def avoid_dead_links(root: RoutingTree) -> RoutingTree:
1✔
92
    """
93
    Modify a RoutingTree to route-around dead links in a Machine.
94

95
    Uses A* to reconnect disconnected branches of the tree (due to dead links
96
    in the machine).
97

98
    :param RoutingTree root:
99
        The root of the RoutingTree which contains nothing but RoutingTrees
100
        (i.e. no vertices and links).
101
    :return:
102
        A new RoutingTree is produced rooted as before.
103
    :rtype: RoutingTree
104
    """
105
    # Make a copy of the RoutingTree with all broken parts disconnected
106
    root, lookup, broken_links = _copy_and_disconnect_tree(root)
×
107

108
    # For each disconnected subtree, use A* to connect the tree to *any* other
109
    # disconnected subtree. Note that this process will eventually result in
110
    # all disconnected subtrees being connected, the result is a fully
111
    # connected tree.
UNCOV
112
    for parent, child in broken_links:
×
UNCOV
113
        child_chips = frozenset(
×
114
            c.chip for c in lookup[child] if isinstance(c, RoutingTree))
115

116
        # Try to reconnect broken links to any other part of the tree
117
        # (excluding this broken subtree itself since that would create a
118
        # cycle).
UNCOV
119
        path = a_star(child, parent, set(lookup).difference(child_chips))
×
120

121
        # Add new RoutingTree nodes to reconnect the child to the tree.
UNCOV
122
        last_node = lookup[path[0][1]]
×
123
        last_direction = path[0][0]
×
124
        for direction, (x, y) in path[1:]:
×
UNCOV
125
            if (x, y) not in child_chips:
×
126
                # This path segment traverses new ground so we must create a
127
                # new RoutingTree for the segment.
UNCOV
128
                new_node = RoutingTree((x, y))
×
129
                # A* will not traverse anything but chips in this tree so this
130
                # assert is meerly a sanity check that this occurred correctly.
131
                assert (x, y) not in lookup, "Cycle created."
×
UNCOV
132
                lookup[(x, y)] = new_node
×
133
            else:
134
                # This path segment overlaps part of the disconnected tree
135
                # (A* doesn't know where the disconnected tree is and thus
136
                # doesn't avoid it). To prevent cycles being introduced, this
137
                # overlapped node is severed from its parent and merged as part
138
                # of the A* path.
UNCOV
139
                new_node = lookup[(x, y)]
×
140

141
                # Find the node's current parent and disconnect it.
142
                for node in lookup[child]:  # pragma: no branch
×
143
                    if not isinstance(node, RoutingTree):
×
144
                        continue
×
UNCOV
145
                    dn = [(d, n) for d, n in node.children if n == new_node]
×
146
                    assert len(dn) <= 1
×
UNCOV
147
                    if dn:
×
UNCOV
148
                        node.remove_child(dn[0])
×
149
                        # A node can only have one parent so we can stop now.
UNCOV
150
                        break
×
UNCOV
151
            last_node.append_child((last_direction, new_node))
×
UNCOV
152
            last_node = new_node
×
UNCOV
153
            last_direction = direction
×
UNCOV
154
        last_node.append_child((last_direction, lookup[child]))
×
155

UNCOV
156
    return root
×
157

158

159
def _copy_and_disconnect_tree(root: RoutingTree) -> Tuple[
1✔
160
        RoutingTree, Dict[XY, RoutingTree], Set[Tuple[XY, XY]]]:
161
    """
162
    Copy a RoutingTree (containing nothing but RoutingTrees), disconnecting
163
    nodes which are not connected in the machine.
164

165
    .. note::
166
        If a dead chip is part of the input RoutingTree, no corresponding node
167
        will be included in the copy. The assumption behind this is that the
168
        only reason a tree would visit a dead chip is because a route passed
169
        through the chip and wasn't actually destined to arrive at that chip.
170
        This situation is impossible to confirm since the input routing trees
171
        have not yet been populated with vertices. The caller is responsible
172
        for being sensible.
173

174
    :param RoutingTree root:
175
        The root of the RoutingTree that contains nothing but RoutingTrees
176
        (i.e. no children which are vertices or links).
177
    :return: (root, lookup, broken_links)
178
        Where:
179
        * `root` is the new root of the tree
180
          :py:class:`~.RoutingTree`
181
        * `lookup` is a dict {(x, y):
182
          :py:class:`~.RoutingTree`, ...}
183
        * `broken_links` is a set ([(parent, child), ...]) containing all
184
          disconnected parent and child (x, y) pairs due to broken links.
185
    :rtype: tuple(RoutingTree, dict(tuple(int,int),RoutingTree),
186
        set(tuple(tuple(int,int),tuple(int,int))))
187
    """
188
    new_root: Optional[RoutingTree] = None
×
189

190
    # Lookup for copied routing tree {(x, y): RoutingTree, ...}
191
    new_lookup: Dict[XY, RoutingTree] = {}
×
192

193
    # List of missing connections in the copied routing tree [(new_parent,
194
    # new_child), ...]
UNCOV
195
    broken_links: Set[Tuple[XY, XY]] = set()
×
196

197
    # A queue [(new_parent, direction, old_node), ...]
UNCOV
198
    to_visit: Deque[
×
199
        Tuple[Optional[RoutingTree], Optional[int], RoutingTree]] = deque(
200
            ((None, None, root), ))
201
    machine = PacmanDataView.get_machine()
×
UNCOV
202
    while to_visit:
×
203
        new_parent, direction, old_node = to_visit.popleft()
×
UNCOV
204
        if machine.is_chip_at(old_node.chip[0], old_node.chip[1]):
×
205
            # Create a copy of the node
UNCOV
206
            new_node = RoutingTree(old_node.chip)
×
UNCOV
207
            new_lookup[new_node.chip] = new_node
×
208
        else:
209
            # This chip is dead, move all its children into the parent node
UNCOV
210
            assert new_parent is not None, \
×
211
                "Net cannot be sourced from a dead chip."
212
            new_node = new_parent
×
213

UNCOV
214
        if new_parent is None:
×
215
            # This is the root node
216
            new_root = new_node
×
UNCOV
217
        elif new_node is not new_parent:
×
UNCOV
218
            assert direction is not None
×
219
            # If this node is not dead, check connectivity to parent
220
            # node (no reason to check connectivity between a dead node
221
            # and its parent).
222
            if _is_linked(new_parent.chip, new_node.chip, direction, machine):
×
223
                # Is connected via working link
UNCOV
224
                new_parent.append_child((direction, new_node))
×
225
            else:
226
                # Link to parent is dead (or original parent was dead and
227
                # the new parent is not adjacent)
UNCOV
228
                broken_links.add((new_parent.chip, new_node.chip))
×
229

230
        # Copy children
UNCOV
231
        for child_direction, child in old_node.children:
×
UNCOV
232
            if isinstance(child, RoutingTree):
×
UNCOV
233
                to_visit.append((new_node, child_direction, child))
×
234

UNCOV
235
    assert new_root is not None
×
UNCOV
236
    return new_root, new_lookup, broken_links
×
237

238

239
def a_star(sink: XY, heuristic_source: XY,
1✔
240
           sources: Set[XY]) -> List[Tuple[int, XY]]:
241
    """
242
    Use A* to find a path from any of the sources to the sink.
243

244
    .. note::
245
        The heuristic means that the search will proceed towards
246
        heuristic_source without any concern for any other sources. This means
247
        that the algorithm may miss a very close neighbour in order to pursue
248
        its goal of reaching heuristic_source. This is not considered a problem
249
        since 1) the heuristic source will typically be in the direction of the
250
        rest of the tree and near by and often the closest entity 2) it
251
        prevents us accidentally forming loops in the rest of the tree since
252
        we'll stop as soon as we touch any part of it.
253

254
    :param tuple(int,int) sink: (x, y)
255
    :param tuple(int,int) heuristic_source: (x, y)
256
        An element from `sources` which is used as a guiding heuristic for the
257
        A* algorithm.
258
    :param set(tuple(int,int)) sources: set([(x, y), ...])
259
    :return: [(int, (x, y)), ...]
260
        A path starting with a coordinate in `sources` and terminating at
261
        connected neighbour of `sink` (i.e. the path does not include `sink`).
262
        The direction given is the link down which to proceed from the given
263
        (x, y) to arrive at the next point in the path.
264
    :rtype: list(tuple(int,tuple(int,int)))
265
    """
UNCOV
266
    machine = PacmanDataView.get_machine()
×
267
    # Select the heuristic function to use for distances
268
    heuristic = (lambda the_node: machine.get_vector_length(
×
269
        the_node, heuristic_source))
270

271
    # A dictionary {node: (direction, previous_node}. An entry indicates that
272
    # 1) the node has been visited and 2) which node we hopped from (and the
273
    # direction used) to reach previous_node.  This is a dummy value if the
274
    # node is the sink.
275
    visited: Dict[XY, Tuple[int, XY]] = {sink: (-1, (-1, -1))}
×
276

277
    # The node which the tree will be reconnected to
278
    selected_source = None
×
279

280
    # A heap (accessed via heapq) of (distance, (x, y)) where distance is the
281
    # distance between (x, y) and heuristic_source and (x, y) is a node to
282
    # explore.
UNCOV
283
    to_visit = [(heuristic(sink), sink)]
×
UNCOV
284
    while to_visit:
×
UNCOV
285
        _, node = heapq.heappop(to_visit)
×
286

287
        # Terminate if we've found the destination
288
        if node in sources:
×
UNCOV
289
            selected_source = node
×
UNCOV
290
            break
×
291

292
        # Try all neighbouring locations.
UNCOV
293
        for neighbour_link in range(6):  # Router.MAX_LINKS_PER_ROUTER
×
294
            # Note: link identifiers arefrom the perspective of the neighbour,
295
            # not the current node!
296
            neighbour = machine.xy_over_link(
×
297
                #                  Same as Router.opposite
298
                node[0], node[1], (neighbour_link + 3) % 6)
299

300
            # Skip links which are broken
UNCOV
301
            if not machine.is_link_at(
×
302
                    neighbour[0], neighbour[1], neighbour_link):
UNCOV
303
                continue
×
304

305
            # Skip neighbours who have already been visited
306
            if neighbour in visited:
×
307
                continue
×
308

309
            # Explore all other neighbours
UNCOV
310
            visited[neighbour] = (neighbour_link, node)
×
311
            heapq.heappush(to_visit, (heuristic(neighbour), neighbour))
×
312

313
    # Fail of no paths exist
UNCOV
314
    if selected_source is None:
×
UNCOV
315
        raise MachineHasDisconnectedSubRegion(
×
316
            f"Could not find path from {sink} to {heuristic_source}")
317

318
    # Reconstruct the discovered path, starting from the source we found and
319
    # working back until the sink.
UNCOV
320
    path = [(visited[selected_source][0], selected_source)]
×
UNCOV
321
    while visited[path[-1][1]][1] != sink:
×
322
        node = visited[path[-1][1]][1]
×
323
        direction = visited[node][0]
×
324
        path.append((direction, node))
×
325

326
    return path
×
327

328

329
def _is_linked(
1✔
330
        source: XY, target: XY, direction: int, machine: Machine) -> bool:
331
    """
332
    :param tuple(int,int) source:
333
    :param tuple(int,int) target:
334
    :param int direction:
335
    :param ~spinn_machine.Machine machine:
336
    :rtype: bool
337
    """
UNCOV
338
    s_chip = machine.get_chip_at(source[0], source[1])
×
UNCOV
339
    if s_chip is None:
×
UNCOV
340
        return False
×
UNCOV
341
    link = s_chip.router.get_link(direction)
×
UNCOV
342
    if link is None:
×
UNCOV
343
        return False
×
UNCOV
344
    if link.destination_x != target[0]:
×
UNCOV
345
        return False
×
UNCOV
346
    if link.destination_y != target[1]:
×
UNCOV
347
        return False
×
UNCOV
348
    return True
×
349

350

351
def convert_a_route(
1✔
352
        routing_tables: MulticastRoutingTableByPartition,
353
        source_vertex: AbstractVertex, partition_id: str,
354
        incoming_processor: Optional[int], incoming_link: Optional[int],
355
        route_tree: RoutingTree,
356
        targets_by_chip: Dict[XY, Tuple[Set[int], Set[int]]]):
357
    """
358
    Converts the algorithm specific partition_route back to standard SpiNNaker
359
    and adds it to the `routing_tables`.
360

361
    :param MulticastRoutingTableByPartition routing_tables:
362
        SpiNNaker-format routing tables
363
    :param AbstractSingleSourcePartition partition:
364
        Partition this route applies to
365
    :param incoming_processor: processor this link came from
366
    :type incoming_processor: int or None
367
    :param incoming_link: link this link came from
368
    :type incoming_link: int or None
369
    :param RoutingTree route_tree: algorithm specific format of the route
370
    :param dict((int,int),(list(int),list(int))) targets_by_chip:
371
        Target cores and links of things on the route that are final end points
372
    """
373
    x, y = route_tree.chip
1✔
374

375
    next_hops: List[Tuple[RoutingTree, int]] = list()
1✔
376
    processor_ids: List[int] = list()
1✔
377
    link_ids: List[int] = list()
1✔
378
    for (route, next_hop) in route_tree.children:
1✔
379
        if route is not None:
1!
380
            link_ids.append(route)
1✔
381
            next_incoming_link = (route + 3) % 6
1✔
382
        if isinstance(next_hop, RoutingTree):
1!
383
            next_hops.append((next_hop, next_incoming_link))
1✔
384
    if (x, y) in targets_by_chip:
1✔
385
        cores, links = targets_by_chip[x, y]
1✔
386
        processor_ids.extend(cores)
1✔
387
        link_ids.extend(links)
1✔
388

389
    entry = MulticastRoutingTableByPartitionEntry(
1✔
390
        link_ids, processor_ids, incoming_processor, incoming_link)
391
    routing_tables.add_path_entry(entry, x, y, source_vertex, partition_id)
1✔
392

393
    for next_hop, next_incoming_link in next_hops:
1✔
394
        convert_a_route(
1✔
395
            routing_tables, source_vertex, partition_id, None,
396
            next_incoming_link, next_hop, targets_by_chip)
397

398

399
def longest_dimension_first(
1✔
400
        vector: Tuple[int, int, int], start: XY) -> List[Tuple[int, XY]]:
401
    """
402
    List the (x, y) steps on a longest-dimension first route.
403

404
    :param tuple(int,int,int) vector: (x, y, z)
405
        The vector which the path should cover.
406
    :param tuple(int,int) start: (x, y)
407
        The coordinates from which the path should start.
408

409
        .. note::
410
            This is a 2D coordinate.
411
    :return: min route
412
    :rtype: list(tuple(int,tuple(int, int)))
413
    """
414
    return vector_to_nodes(
1✔
415
        sorted(enumerate(vector), key=(lambda x: abs(x[1])), reverse=True),
416
        start)
417

418

419
def least_busy_dimension_first(
1✔
420
        traffic: Dict[XY, int], vector: Tuple[int, int, int],
421
        start: XY) -> List[Tuple[int, XY]]:
422
    """
423
    List the (x, y) steps on a route that goes through the least busy
424
    routes first.
425

426
    :param traffic: A dictionary of (x, y): count of routes
427
    :type traffic: dict(tuple(int,int), int)
428
    :param vector: (x, y, z)
429
        The vector which the path should cover.
430
    :type vector: tuple(int, int, int)
431
    :param start: (x, y)
432
        The coordinates from which the path should start.
433

434
        .. note::
435
            This is a 2D coordinate.
436
    :type start: tuple(int, int)
437
    :return: min route
438
    :rtype: list(tuple(int,tuple(int, int)))
439
    """
440

441
    # Go through and find the sum of traffic depending on the route taken
442
    min_sum = 0
1✔
443
    min_route = None
1✔
444
    for order in itertools.permutations([0, 1, 2]):
1✔
445
        dm_vector = [(i, vector[i]) for i in order]
1✔
446
        route = vector_to_nodes(dm_vector, start)
1✔
447
        sum_traffic = sum(traffic[x, y] for _, (x, y) in route)
1✔
448
        if min_route is None or min_sum > sum_traffic:
1✔
449
            min_sum = sum_traffic
1✔
450
            min_route = route
1✔
451

452
    assert min_route is not None
1✔
453
    for _, (x, y) in min_route:
1✔
454
        traffic[x, y] += 1
1✔
455

456
    return min_route
1✔
457

458

459
def vector_to_nodes(dm_vector: List[XY], start: XY) -> List[Tuple[int, XY]]:
1✔
460
    """
461
    Convert a vector to a set of nodes.
462

463
    :param list(tuple(int,int)) dm_vector:
464
        A vector made up of a list of (dimension, magnitude), where dimensions
465
        are x=0, y=1, z=diagonal=2
466
    :param tuple(int,int) start: The x, y coordinates of the start
467
    :return: A list of (link_id, (target_x, target_y)) of nodes on a route
468
    :rtype: list(tuple(int,tuple(int, int)))
469
    """
470
    machine = PacmanDataView.get_machine()
1✔
471
    x, y = start
1✔
472

473
    out = []
1✔
474

475
    for dimension, magnitude in dm_vector:
1✔
476
        if magnitude == 0:
1✔
477
            continue
1✔
478

479
        if dimension == 0:  # x
1✔
480
            if magnitude > 0:
1✔
481
                # Move East (0) magnitude times
482
                for _ in range(magnitude):
1✔
483
                    x, y = machine.xy_over_link(x, y, 0)
1✔
484
                    out.append((0, (x, y)))
1✔
485
            else:
486
                # Move West (3) -magnitude times
487
                for _ in range(magnitude, 0):
1✔
488
                    x, y = machine.xy_over_link(x, y, 3)
1✔
489
                    out.append((3, (x, y)))
1✔
490
        elif dimension == 1:  # y
1✔
491
            if magnitude > 0:
1✔
492
                # Move North (2) magnitude times
493
                for _ in range(magnitude):
1✔
494
                    x, y = machine.xy_over_link(x, y, 2)
1✔
495
                    out.append((2, (x, y)))
1✔
496
            else:
497
                # Move South (5) -magnitude times
498
                for _ in range(magnitude, 0):
1✔
499
                    x, y = machine.xy_over_link(x, y, 5)
1✔
500
                    out.append((5, (x, y)))
1✔
501
        else:  # z
502
            if magnitude > 0:
1✔
503
                # Move SouthWest (4) magnitude times
504
                for _ in range(magnitude):
1✔
505
                    x, y = machine.xy_over_link(x, y, 4)
1✔
506
                    out.append((4, (x, y)))
1✔
507
            else:
508
                # Move NorthEast (1) -magnitude times
509
                for _ in range(magnitude, 0):
1✔
510
                    x, y = machine.xy_over_link(x, y, 1)
1✔
511
                    out.append((1, (x, y)))
1✔
512
    return out
1✔
513

514

515
def nodes_to_trees(
1✔
516
        nodes: List[Tuple[int, XY]], start: XY, route: Dict[XY, RoutingTree]):
517
    """
518
    Convert a list of nodes into routing trees, adding them to existing routes.
519

520
    :param list(tuple(int,tuple(int,int))) nodes:
521
        The list of (link_id, (target_x, target_y)) nodes on the route
522
    :param tuple(int,int) start: The start of the route
523
    :param dict(tuple(int,int),RoutingTree) route:
524
        Existing routing trees, with key (x, y) coordinates of the chip of the
525
        routes.
526
    """
527
    last_node = route.get(start)
1✔
528
    if last_node is None:
1!
529
        last_node = RoutingTree(start)
×
530
        route[start] = last_node
×
531
    for direction, (x, y) in nodes:
1✔
532
        this_node = RoutingTree((x, y))
1✔
533
        route[(x, y)] = this_node
1✔
534

535
        last_node.append_child((direction, this_node))
1✔
536
        last_node = this_node
1✔
537

538

539
def most_direct_route(source: XY, dest: XY, machine: Machine) -> RoutingTree:
1✔
540
    """
541
    Find the most direct route from source to target on the machine.
542

543
    :param tuple(int,int) source: The source x, y coordinates
544
    :param tuple(int,int) dest: The destination x, y coordinates
545
    :param ~spinn_machine.Machine machine: The machine on which to route
546
    :rtype: RoutingTree
547
    """
UNCOV
548
    vector = machine.get_vector(source, dest)
×
UNCOV
549
    nodes = longest_dimension_first(vector, source)
×
UNCOV
550
    route: Dict[XY, RoutingTree] = dict()
×
UNCOV
551
    nodes_to_trees(nodes, source, route)
×
UNCOV
552
    root = route[source]
×
UNCOV
553
    if route_has_dead_links(root):
×
UNCOV
554
        root = avoid_dead_links(root)
×
UNCOV
555
    return root
×
556

557

558
def get_targets_by_chip(
1✔
559
        vertices: Iterable[MachineVertex]) -> Dict[
560
            XY, Tuple[Set[int], Set[int]]]:
561
    """
562
    Get the target links and cores on the relevant chips.
563

564
    :param list(MachineVertex) vertices: The vertices to target
565
    :return: A dict of (x, y) to target (cores, links)
566
    :rtype: dict(tuple(int, int), tuple(set(int), set(int)))
567
    """
568
    by_chip: Dict[XY, Tuple[Set[int], Set[int]]] = defaultdict(
1✔
569
        lambda: (set(), set()))
570
    for vertex in vertices:
1✔
571
        coords = vertex_xy(vertex)
1✔
572
        if isinstance(vertex, AbstractVirtual):
1✔
573
            # Sinks with route-to-endpoint restriction must be routed
574
            # in the according directions.
575
            link = route_to_endpoint(vertex)
1✔
576
            by_chip[coords][1].add(link)
1✔
577
        else:
578
            core = PacmanDataView.get_placement_of_vertex(vertex).p
1✔
579
            by_chip[coords][0].add(core)
1✔
580
    return by_chip
1✔
581

582

583
def vertex_xy(vertex: MachineVertex) -> XY:
1✔
584
    """
585
    :param MachineVertex vertex:
586
    :rtype: tuple(int,int)
587
    """
588
    if not isinstance(vertex, AbstractVirtual):
1✔
589
        placement = PacmanDataView.get_placement_of_vertex(vertex)
1✔
590
        return placement.x, placement.y
1✔
591
    link_data = vertex.get_link_data(PacmanDataView.get_machine())
1✔
592
    return link_data.connected_chip_x, link_data.connected_chip_y
1✔
593

594

595
def vertex_chip(vertex: MachineVertex) -> Chip:
1✔
596
    """
597
    :param MachineVertex vertex:
598
    :rtype: ~spinn_machine.Chip
599
    """
UNCOV
600
    machine = PacmanDataView.get_machine()
×
UNCOV
601
    if not isinstance(vertex, AbstractVirtual):
×
UNCOV
602
        placement = PacmanDataView.get_placement_of_vertex(vertex)
×
UNCOV
603
        return machine[placement.x, placement.y]
×
UNCOV
604
    link_data = vertex.get_link_data(machine)
×
UNCOV
605
    return machine[link_data.connected_chip_x, link_data.connected_chip_y]
×
606

607

608
def vertex_xy_and_route(vertex: MachineVertex) -> Tuple[
1✔
609
        XY, Tuple[MachineVertex, Optional[int], Optional[int]]]:
610
    """
611
    Get the non-virtual chip coordinates, the vertex, and processor or
612
    link to follow to get to the vertex.
613

614
    :param MachineVertex vertex:
615
    :return: the (x,y) coordinates of the target vertex mapped to a tuple of
616
        the vertex, core and link.
617
        One of core or link is provided the other is `None`
618
    :rtype: tuple(tuple(int, int), tuple(MachineVertex, int, None)) or
619
        tuple(tuple(int, int), tuple(MachineVertex, None, int))
620
    """
621
    if not isinstance(vertex, AbstractVirtual):
1✔
622
        placement = PacmanDataView.get_placement_of_vertex(vertex)
1✔
623
        return (placement.x, placement.y), (vertex, placement.p, None)
1✔
624
    machine = PacmanDataView.get_machine()
1✔
625
    link_data = vertex.get_link_data(machine)
1✔
626
    return ((link_data.connected_chip_x, link_data.connected_chip_y),
1✔
627
            (vertex, None, link_data.connected_link))
628

629

630
def route_to_endpoint(vertex: AbstractVirtual) -> int:
1✔
631
    """
632
    :param MachineVertex vertex:
633
    :rtype: int
634
    """
635
    machine = PacmanDataView.get_machine()
1✔
636
    link_data = vertex.get_link_data(machine)
1✔
637
    return link_data.connected_link
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc