• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / SpiNNFrontEndCommon / 6742386295

03 Nov 2023 07:23AM UTC coverage: 42.408% (-0.1%) from 42.511%
6742386295

push

github

web-flow
Merge pull request #1140 from SpiNNakerManchester/create_transceiver

Create transceiver typing and remove AbstractMachineAllocationController

830 of 3281 branches covered (0.0%)

Branch coverage included in aggregate %.

29 of 37 new or added lines in 5 files covered. (78.38%)

2 existing lines in 2 files now uncovered.

5175 of 10879 relevant lines covered (47.57%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

26.01
/spinn_front_end_common/interface/interface_functions/spalloc_allocator.py
1
# Copyright (c) 2016 The University of Manchester
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     https://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
from contextlib import AbstractContextManager, ExitStack
1✔
15
import logging
1✔
16
import math
1✔
17
from typing import ContextManager, Dict, Tuple, Optional, Union, cast
1✔
18
from spinn_utilities.config_holder import (
1✔
19
    get_config_bool, get_config_str_or_none, get_config_str_list)
20
from spinn_utilities.log import FormatAdapter
1✔
21
from spinn_utilities.overrides import overrides
1✔
22
from spinn_utilities.typing.coords import XY
1✔
23
from spinn_utilities.config_holder import get_config_int, get_config_str
1✔
24
from spalloc_client import Job  # type: ignore[import]
1✔
25
from spalloc_client.states import JobState  # type: ignore[import]
1✔
26
from spinnman.constants import SCP_SCAMP_PORT
1✔
27
from spinnman.spalloc import (
1✔
28
    is_server_address, SpallocClient, SpallocJob, SpallocState)
29
from spinn_front_end_common.abstract_models.impl import (
1✔
30
    MachineAllocationController)
31
from spinn_front_end_common.data import FecDataView
1✔
32
from spinn_front_end_common.interface.provenance import ProvenanceWriter
1✔
33
from spinn_front_end_common.utilities.utility_calls import parse_old_spalloc
1✔
34
from spinnman.transceiver import Transceiver
1✔
35
from spinnman.connections.udp_packet_connections import (
1✔
36
    SCAMPConnection, EIEIOConnection)
37

38
logger = FormatAdapter(logging.getLogger(__name__))
1✔
39
_MACHINE_VERSION = 5  # Spalloc only ever works with v5 boards
1✔
40

41

42
class SpallocJobController(MachineAllocationController):
1✔
43
    __slots__ = (
1✔
44
        # the spalloc job object
45
        "_job",
46
        # the current job's old state
47
        "_state",
48
        "__client",
49
        "__closer",
50
        "__use_proxy"
51
    )
52

53
    def __init__(
1✔
54
            self, client: SpallocClient, job: SpallocJob,
55
            task: AbstractContextManager, use_proxy: bool):
56
        """
57
        :param ~spinnman.spalloc.SpallocClient client:
58
        :param ~spinnman.spalloc.SpallocJob job:
59
        :param task:
60
        :type task:
61
            ~spinn_utilities.abstract_context_manager.AbstractContextManager
62
        :param bool use_proxy:
63
        """
64
        if job is None:
×
65
            raise TypeError("must have a real job")
×
66
        self.__client = client
×
67
        self.__closer = task
×
68
        self._job = job
×
69
        self._state = job.get_state()
×
70
        self.__use_proxy = use_proxy
×
71
        super().__init__("SpallocJobController")
×
72

73
    @overrides(MachineAllocationController.extend_allocation)
1✔
74
    def extend_allocation(self, new_total_run_time: float):
1✔
75
        # Does Nothing in this allocator - machines are held until exit
76
        pass
×
77

78
    def __stop(self) -> None:
1✔
79
        self.__closer.__exit__(None, None, None)
×
80
        self._job.destroy()
×
81
        self.__client.close()
×
82

83
    @overrides(MachineAllocationController.close)
1✔
84
    def close(self) -> None:
1✔
85
        super().close()
×
86
        self.__stop()
×
87

88
    @overrides(MachineAllocationController.where_is_machine)
1✔
89
    def where_is_machine(
1✔
90
            self, chip_x: int, chip_y: int) -> Tuple[int, int, int]:
91
        """
92
        :param int chip_x:
93
        :param int chip_y:
94
        :rtype: tuple(int,int,int)
95
        """
96
        result = self._job.where_is_machine(x=chip_x, y=chip_y)
×
97
        if result is None:
×
98
            raise ValueError("coordinates lie outside machine")
×
99
        return result
×
100

101
    @overrides(MachineAllocationController._wait)
1✔
102
    def _wait(self) -> bool:
1✔
103
        try:
×
104
            if self._state != SpallocState.DESTROYED:
×
105
                self._state = self._job.wait_for_state_change(self._state)
×
106
        except TypeError:
×
107
            pass
×
108
        except Exception as e:  # pylint: disable=broad-except
×
109
            if not self._exited:
×
110
                raise e
×
111
        return self._state != SpallocState.DESTROYED
×
112

113
    @overrides(MachineAllocationController._teardown)
1✔
114
    def _teardown(self) -> None:
1✔
115
        if not self._exited:
×
116
            self.__stop()
×
117
        super()._teardown()
×
118

119
    @overrides(MachineAllocationController.create_transceiver,
1✔
120
               extend_doc=True)
121
    def create_transceiver(self) -> Transceiver:
1✔
122
        """
123
        .. note::
124
            This allocation controller proxies the transceiver's connections
125
            via Spalloc. This allows it to work even outside the UNIMAN
126
            firewall.
127
        """
128
        if not self.__use_proxy:
×
129
            return super().create_transceiver()
×
130
        txrx = self._job.create_transceiver()
×
131
        return txrx
×
132

133
    @overrides(MachineAllocationController.can_create_transceiver)
1✔
134
    def can_create_transceiver(self) -> bool:
1✔
NEW
135
        if not self.__use_proxy:
×
NEW
136
            return super().can_create_transceiver()
×
NEW
137
        return True
×
138

139
    @overrides(MachineAllocationController.open_sdp_connection,
1✔
140
               extend_doc=True)
141
    def open_sdp_connection(
1✔
142
            self, chip_x: int, chip_y: int,
143
            udp_port: int = SCP_SCAMP_PORT) -> Optional[SCAMPConnection]:
144
        """
145
        .. note::
146
            This allocation controller proxies connections via Spalloc. This
147
            allows it to work even outside the UNIMAN firewall.
148
        """
149
        if not self.__use_proxy:
×
150
            return super().open_sdp_connection(chip_x, chip_y, udp_port)
×
151
        return self._job.connect_to_board(chip_x, chip_y, udp_port)
×
152

153
    @overrides(MachineAllocationController.open_eieio_connection,
1✔
154
               extend_doc=True)
155
    def open_eieio_connection(
1✔
156
            self, chip_x: int, chip_y: int) -> Optional[EIEIOConnection]:
157
        """
158
        .. note::
159
            This allocation controller proxies connections via Spalloc. This
160
            allows it to work even outside the UNIMAN firewall.
161
        """
162
        if not self.__use_proxy:
×
163
            return super().open_eieio_connection(chip_x, chip_y)
×
164
        return self._job.open_eieio_connection(chip_x, chip_y)
×
165

166
    @overrides(MachineAllocationController.open_eieio_listener,
1✔
167
               extend_doc=True)
168
    def open_eieio_listener(self) -> EIEIOConnection:
1✔
169
        """
170
        .. note::
171
            This allocation controller proxies connections via Spalloc. This
172
            allows it to work even outside the UNIMAN firewall.
173
        """
174
        if not self.__use_proxy:
×
175
            return super().open_eieio_listener()
×
176
        return self._job.open_eieio_listener_connection()
×
177

178
    @property
1✔
179
    @overrides(MachineAllocationController.proxying)
1✔
180
    def proxying(self) -> bool:
1✔
181
        return self.__use_proxy
×
182

183
    @overrides(MachineAllocationController.make_report)
1✔
184
    def make_report(self, filename):
1✔
185
        with open(filename, "w", encoding="utf-8") as report:
×
186
            report.write(f"Job: {self._job}")
×
187

188

189
class _OldSpallocJobController(MachineAllocationController):
1✔
190
    __slots__ = (
1✔
191
        # the spalloc job object
192
        "_job",
193
        # the current job's old state
194
        "_state"
195
    )
196

197
    def __init__(self, job: Job, host: str):
1✔
198
        """
199
        :param ~spalloc.job.Job job:
200
        """
201
        if job is None:
×
202
            raise TypeError("must have a real job")
×
203
        self._job = job
×
204
        self._state = job.state
×
205
        super().__init__("SpallocJobController", host)
×
206

207
    @overrides(MachineAllocationController.extend_allocation)
1✔
208
    def extend_allocation(self, new_total_run_time: float):
1✔
209
        # Does Nothing in this allocator - machines are held until exit
210
        pass
×
211

212
    @overrides(MachineAllocationController.close)
1✔
213
    def close(self) -> None:
1✔
214
        super().close()
×
215
        self._job.destroy()
×
216

217
    @property
1✔
218
    def power(self) -> bool:
1✔
219
        """
220
        :rtype: bool
221
        """
222
        return self._job.power
×
223

224
    def set_power(self, power: bool):
1✔
225
        """
226
        :param bool power:
227
        """
228
        self._job.set_power(power)
×
229
        if power:
×
230
            self._job.wait_until_ready()
×
231

232
    @overrides(MachineAllocationController.where_is_machine)
1✔
233
    def where_is_machine(self, chip_x, chip_y):
1✔
234
        return self._job.where_is_machine(chip_y=chip_y, chip_x=chip_x)
×
235

236
    @overrides(MachineAllocationController._wait)
1✔
237
    def _wait(self):
1✔
238
        try:
×
239
            if self._state != JobState.destroyed:
×
240
                self._state = self._job.wait_for_state_change(self._state)
×
241
        except TypeError:
×
242
            pass
×
243
        except Exception as e:  # pylint: disable=broad-except
×
244
            if not self._exited:
×
245
                raise e
×
246
        return self._state != JobState.destroyed
×
247

248
    @overrides(MachineAllocationController._teardown)
1✔
249
    def _teardown(self):
1✔
250
        if not self._exited:
×
251
            self._job.close()
×
252
        super()._teardown()
×
253

254

255
_MACHINE_VERSION = 5
1✔
256

257

258
def spalloc_allocator(
1✔
259
        bearer_token: Optional[str] = None, group: Optional[str] = None,
260
        collab: Optional[str] = None, nmpi_job: Union[int, str, None] = None,
261
        nmpi_user: Optional[str] = None) -> Tuple[
262
            str, int, None, bool, bool, Dict[XY, str],
263
            MachineAllocationController]:
264
    """
265
    Request a machine from a SPALLOC server that will fit the given
266
    number of chips.
267

268
    :param bearer_token: The bearer token to use
269
    :type bearer_token: str or None
270
    :param group: The group to associate with or None for no group
271
    :type group: str or None
272
    :param collab: The collab to associate with or None for no collab
273
    :type collab: str or None
274
    :param nmpi_job: The NMPI Job to associate with or None for no job
275
    :type nmpi_job: str or None
276
    :param nmpi_user: The NMPI username to associate with or None for no user
277
    :type nmpi_user: str or None
278
    :return:
279
        host, board version, BMP details, reset on startup flag,
280
        auto-detect BMP flag, board address map, allocation controller
281
    :rtype: tuple(str, int, object, bool, bool, dict(tuple(int,int),str),
282
        MachineAllocationController)
283
    """
284
    spalloc_server = get_config_str("Machine", "spalloc_server")
×
285

286
    # Work out how many boards are needed
287
    if FecDataView.has_n_boards_required():
×
288
        n_boards = FecDataView.get_n_boards_required()
×
289
    else:
290
        n_chips = FecDataView.get_n_chips_needed()
×
291
        # reduce max chips by 2 in case you get a bad board(s)
292
        chips_div = FecDataView.get_machine_version().n_chips_per_board - 2
×
293
        n_boards_float = float(n_chips) / chips_div
×
294
        logger.info("{:.2f} Boards Required for {} chips",
×
295
                    n_boards_float, n_chips)
296
        # If the number of boards rounded up is less than 50% of a board
297
        # bigger than the actual number of boards,
298
        # add another board just in case.
299
        n_boards = int(math.ceil(n_boards_float))
×
300
        if n_boards - n_boards_float < 0.5:
×
301
            n_boards += 1
×
302

303
    if is_server_address(spalloc_server):
×
304
        host, connections, mac = _allocate_job_new(
×
305
            spalloc_server, n_boards, bearer_token, group, collab,
306
            int(nmpi_job) if nmpi_job is not None else None,
307
            nmpi_user)
308
    else:
309
        host, connections, mac = _allocate_job_old(spalloc_server, n_boards)
×
310
    return (host, _MACHINE_VERSION, None, False, False, connections, mac)
×
311

312

313
def _allocate_job_new(
1✔
314
        spalloc_server: str, n_boards: int,
315
        bearer_token: Optional[str] = None, group: Optional[str] = None,
316
        collab: Optional[str] = None, nmpi_job: Optional[int] = None,
317
        nmpi_user: Optional[str] = None) -> Tuple[
318
            str, Dict[XY, str], MachineAllocationController]:
319
    """
320
    Request a machine from an new-style spalloc server that will fit the
321
    given number of boards.
322

323
    :param str spalloc_server:
324
        The server from which the machine should be requested
325
    :param int n_boards: The number of boards required
326
    :param bearer_token: The bearer token to use
327
    :type bearer_token: str or None
328
    :param group: The group to associate with or None for no group
329
    :type group: str or None
330
    :param collab: The collab to associate with or None for no collab
331
    :type collab: str or None
332
    :param nmpi_job: The NMPI Job to associate with or None for no job
333
    :type nmpi_job: int or None
334
    :param nmpi_user: The NMPI username to associate with or None for no user
335
    :type nmpi_user: str or None
336

337
    :rtype: tuple(str, dict(tuple(int,int),str), MachineAllocationController)
338
    """
339
    logger.info(f"Requesting job with {n_boards} boards")
×
340
    with ExitStack() as stack:
×
341
        spalloc_machine = get_config_str_or_none("Machine", "spalloc_machine")
×
342
        use_proxy = get_config_bool("Machine", "spalloc_use_proxy")
×
343
        client = SpallocClient(
×
344
            spalloc_server, bearer_token=bearer_token, group=group,
345
            collab=collab, nmpi_job=nmpi_job, nmpi_user=nmpi_user)
346
        stack.enter_context(cast(ContextManager[SpallocClient], client))
×
347
        job = client.create_job(n_boards, spalloc_machine)
×
348
        stack.enter_context(job)
×
349
        task = job.launch_keepalive_task()
×
350
        stack.enter_context(task)
×
351
        job.wait_until_ready()
×
352
        connections = job.get_connections()
×
353
        with ProvenanceWriter() as db:
×
354
            db.insert_board_provenance(connections)
×
355
        root = connections.get((0, 0), None)
×
356
        if logger.isEnabledFor(logging.DEBUG):
×
357
            logger.debug(
×
358
                "boards: {}",
359
                str(connections).replace("{", "[").replace("}", "]"))
360
        allocation_controller = SpallocJobController(
×
361
            client, job, task, use_proxy or False)
362
        # Success! We don't want to close the client, job or task now;
363
        # the allocation controller now owns them.
364
        stack.pop_all()
×
365
    assert root is not None, "no root of ready board"
×
366
    return (root, connections, allocation_controller)
×
367

368

369
def _allocate_job_old(spalloc_server: str, n_boards: int) -> Tuple[
1✔
370
        str, Dict[XY, str], MachineAllocationController]:
371
    """
372
    Request a machine from an old-style spalloc server that will fit the
373
    requested number of boards.
374

375
    :param str spalloc_server:
376
        The server from which the machine should be requested
377
    :param int n_boards: The number of boards required
378
    :rtype: tuple(str, dict(tuple(int,int),str), MachineAllocationController)
379
    """
380
    host, port, user = parse_old_spalloc(
×
381
        spalloc_server, get_config_int("Machine", "spalloc_port"),
382
        get_config_str("Machine", "spalloc_user"))
383
    spalloc_kwargs = {
×
384
        'hostname': host,
385
        'port': port,
386
        'owner': user
387
    }
388
    spalloc_machine = get_config_str_or_none("Machine", "spalloc_machine")
×
389

390
    if spalloc_machine is not None:
×
391
        spalloc_kwargs['machine'] = spalloc_machine
×
392

393
    job, hostname, scamp_connection_data = _launch_checked_job_old(
×
394
        n_boards, spalloc_kwargs)
395
    machine_allocation_controller = _OldSpallocJobController(job, hostname)
×
396
    return (hostname, scamp_connection_data, machine_allocation_controller)
×
397

398

399
def _launch_checked_job_old(n_boards: int, spalloc_kwargs: dict) -> Tuple[
1✔
400
        Job, str, Dict[XY, str]]:
401
    """
402
    :rtype: tuple(~.Job, str, dict(tuple(int,int),str))
403
    """
404
    logger.info(f"Requesting job with {n_boards} boards")
×
405
    avoid_boards = get_config_str_list("Machine", "spalloc_avoid_boards")
×
406
    avoid_jobs = []
×
407
    try:
×
408
        while True:
409
            job = Job(n_boards, **spalloc_kwargs)
×
410
            try:
×
411
                job.wait_until_ready()
×
412
                # get param from jobs before starting, so that hanging doesn't
413
                # occur
414
                hostname = job.hostname
×
415
            except Exception as ex:
×
416
                job.destroy(str(ex))
×
417
                raise
×
418
            connections = job.connections
×
419
            if logger.isEnabledFor(logging.DEBUG):
×
420
                logger.debug("boards: {}",
×
421
                             str(connections).replace("{", "[").replace(
422
                                 "}", "]"))
423
            with ProvenanceWriter() as db:
×
424
                db.insert_board_provenance(connections)
×
425
            if hostname not in avoid_boards:
×
426
                break
×
427
            avoid_jobs.append(job)
×
428
            logger.warning(
×
429
                f"Asking for new job as {hostname} "
430
                f"as in the spalloc_avoid_boards list")
431
    finally:
432
        if avoid_boards:
×
433
            for key in list(connections.keys()):
×
434
                if connections[key] in avoid_boards:
×
435
                    logger.warning(
×
436
                        f"Removing connection info for {connections[key]} "
437
                        f"as in the spalloc avoid_boards list")
438
                    del connections[key]
×
439
        for avoid_job in avoid_jobs:
×
440
            avoid_job.destroy("Asked to avoid by cfg")
×
441
    return job, hostname, connections
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc