• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / SpiNNFrontEndCommon / 7531525803

15 Jan 2024 04:17PM UTC coverage: 47.011% (-1.0%) from 47.994%
7531525803

push

github

web-flow
Merge pull request #1150 from SpiNNakerManchester/overrides_check

Overrides check

1758 of 4509 branches covered (0.0%)

Branch coverage included in aggregate %.

17 of 19 new or added lines in 8 files covered. (89.47%)

1 existing line in 1 file now uncovered.

5492 of 10913 relevant lines covered (50.33%)

1.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

35.42
/spinn_front_end_common/interface/interface_functions/spalloc_allocator.py
1
# Copyright (c) 2016 The University of Manchester
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     https://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
from contextlib import AbstractContextManager, ExitStack
3✔
15
import logging
3✔
16
import math
3✔
17
from typing import ContextManager, Dict, Tuple, Optional, Union, cast
3✔
18
from spinn_utilities.config_holder import (
3✔
19
    get_config_bool, get_config_str_or_none, get_config_str_list)
20
from spinn_utilities.log import FormatAdapter
3✔
21
from spinn_utilities.overrides import overrides
3✔
22
from spinn_utilities.typing.coords import XY
3✔
23
from spinn_utilities.config_holder import get_config_int, get_config_str
3✔
24
from spalloc_client import Job  # type: ignore[import]
3✔
25
from spalloc_client.states import JobState  # type: ignore[import]
3✔
26
from spinnman.constants import SCP_SCAMP_PORT
3✔
27
from spinnman.spalloc import (
3✔
28
    is_server_address, SpallocClient, SpallocJob, SpallocState)
29
from spinn_front_end_common.abstract_models.impl import (
3✔
30
    MachineAllocationController)
31
from spinn_front_end_common.data import FecDataView
3✔
32
from spinn_front_end_common.interface.provenance import ProvenanceWriter
3✔
33
from spinn_front_end_common.utilities.utility_calls import parse_old_spalloc
3✔
34
from spinnman.transceiver import Transceiver
3✔
35
from spinnman.connections.udp_packet_connections import (
3✔
36
    SCAMPConnection, EIEIOConnection)
37

38
logger = FormatAdapter(logging.getLogger(__name__))
3✔
39
_MACHINE_VERSION = 5  # Spalloc only ever works with v5 boards
3✔
40

41

42
class SpallocJobController(MachineAllocationController):
3✔
43
    __slots__ = (
3✔
44
        # the spalloc job object
45
        "_job",
46
        # the current job's old state
47
        "_state",
48
        "__client",
49
        "__closer",
50
        "__use_proxy"
51
    )
52

53
    def __init__(
3✔
54
            self, client: SpallocClient, job: SpallocJob,
55
            task: AbstractContextManager, use_proxy: bool):
56
        """
57
        :param ~spinnman.spalloc.SpallocClient client:
58
        :param ~spinnman.spalloc.SpallocJob job:
59
        :param task:
60
        :type task:
61
            ~spinn_utilities.abstract_context_manager.AbstractContextManager
62
        :param bool use_proxy:
63
        """
64
        if job is None:
×
65
            raise TypeError("must have a real job")
×
66
        self.__client = client
×
67
        self.__closer = task
×
68
        self._job = job
×
69
        self._state = job.get_state()
×
70
        self.__use_proxy = use_proxy
×
71
        super().__init__("SpallocJobController")
×
72

73
    @property
3✔
74
    def job(self) -> SpallocJob:
3✔
75
        return self._job
×
76

77
    @overrides(MachineAllocationController.extend_allocation)
3✔
78
    def extend_allocation(self, new_total_run_time: float):
3✔
79
        # Does Nothing in this allocator - machines are held until exit
80
        pass
×
81

82
    def __stop(self) -> None:
3✔
83
        self.__closer.__exit__(None, None, None)
×
84
        self._job.destroy()
×
85
        self.__client.close()
×
86

87
    @overrides(MachineAllocationController.close)
3✔
88
    def close(self) -> None:
3✔
89
        super().close()
×
90
        self.__stop()
×
91

92
    @overrides(MachineAllocationController.where_is_machine)
3✔
93
    def where_is_machine(
3✔
94
            self, chip_x: int, chip_y: int) -> Tuple[int, int, int]:
95
        """
96
        :param int chip_x:
97
        :param int chip_y:
98
        :rtype: tuple(int,int,int)
99
        """
100
        result = self._job.where_is_machine(x=chip_x, y=chip_y)
×
101
        if result is None:
×
102
            raise ValueError("coordinates lie outside machine")
×
103
        return result
×
104

105
    @overrides(MachineAllocationController._wait)
3✔
106
    def _wait(self) -> bool:
3✔
107
        try:
×
108
            if self._state != SpallocState.DESTROYED:
×
109
                self._state = self._job.wait_for_state_change(self._state)
×
110
        except TypeError:
×
111
            pass
×
112
        except Exception as e:  # pylint: disable=broad-except
×
113
            if not self._exited:
×
114
                raise e
×
115
        return self._state != SpallocState.DESTROYED
×
116

117
    @overrides(MachineAllocationController._teardown)
3✔
118
    def _teardown(self) -> None:
3✔
119
        if not self._exited:
×
120
            self.__stop()
×
121
        super()._teardown()
×
122

123
    @overrides(MachineAllocationController.create_transceiver,
3✔
124
               extend_doc=True)
125
    def create_transceiver(self) -> Transceiver:
3✔
126
        """
127
        .. note::
128
            This allocation controller proxies the transceiver's connections
129
            via Spalloc. This allows it to work even outside the UNIMAN
130
            firewall.
131
        """
132
        if not self.__use_proxy:
×
133
            return super().create_transceiver()
×
134
        txrx = self._job.create_transceiver()
×
135
        return txrx
×
136

137
    @overrides(MachineAllocationController.can_create_transceiver)
3✔
138
    def can_create_transceiver(self) -> bool:
3✔
139
        if not self.__use_proxy:
×
140
            return super().can_create_transceiver()
×
141
        return True
×
142

143
    @overrides(MachineAllocationController.open_sdp_connection,
3✔
144
               extend_doc=True)
145
    def open_sdp_connection(
3✔
146
            self, chip_x: int, chip_y: int,
147
            udp_port: int = SCP_SCAMP_PORT) -> Optional[SCAMPConnection]:
148
        """
149
        .. note::
150
            This allocation controller proxies connections via Spalloc. This
151
            allows it to work even outside the UNIMAN firewall.
152
        """
153
        if not self.__use_proxy:
×
154
            return super().open_sdp_connection(chip_x, chip_y, udp_port)
×
155
        return self._job.connect_to_board(chip_x, chip_y, udp_port)
×
156

157
    @overrides(MachineAllocationController.open_eieio_connection,
3✔
158
               extend_doc=True)
159
    def open_eieio_connection(
3✔
160
            self, chip_x: int, chip_y: int) -> Optional[EIEIOConnection]:
161
        """
162
        .. note::
163
            This allocation controller proxies connections via Spalloc. This
164
            allows it to work even outside the UNIMAN firewall.
165
        """
166
        if not self.__use_proxy:
×
167
            return super().open_eieio_connection(chip_x, chip_y)
×
168
        return self._job.open_eieio_connection(chip_x, chip_y)
×
169

170
    @overrides(MachineAllocationController.open_eieio_listener,
3✔
171
               extend_doc=True)
172
    def open_eieio_listener(self) -> EIEIOConnection:
3✔
173
        """
174
        .. note::
175
            This allocation controller proxies connections via Spalloc. This
176
            allows it to work even outside the UNIMAN firewall.
177
        """
178
        if not self.__use_proxy:
×
179
            return super().open_eieio_listener()
×
180
        return self._job.open_eieio_listener_connection()
×
181

182
    @property
3✔
183
    @overrides(MachineAllocationController.proxying)
3✔
184
    def proxying(self) -> bool:
3✔
185
        return self.__use_proxy
×
186

187
    @overrides(MachineAllocationController.make_report)
3✔
188
    def make_report(self, filename: str):
3✔
189
        with open(filename, "w", encoding="utf-8") as report:
×
190
            report.write(f"Job: {self._job}")
×
191

192

193
class _OldSpallocJobController(MachineAllocationController):
3✔
194
    __slots__ = (
3✔
195
        # the spalloc job object
196
        "_job",
197
        # the current job's old state
198
        "_state"
199
    )
200

201
    def __init__(self, job: Job, host: str):
3✔
202
        """
203
        :param ~spalloc.job.Job job:
204
        """
205
        if job is None:
×
206
            raise TypeError("must have a real job")
×
207
        self._job = job
×
208
        self._state = job.state
×
209
        super().__init__("SpallocJobController", host)
×
210

211
    @overrides(MachineAllocationController.extend_allocation)
3✔
212
    def extend_allocation(self, new_total_run_time: float):
3✔
213
        # Does Nothing in this allocator - machines are held until exit
214
        pass
×
215

216
    @overrides(MachineAllocationController.close)
3✔
217
    def close(self) -> None:
3✔
218
        super().close()
×
219
        self._job.destroy()
×
220

221
    @property
3✔
222
    def power(self) -> bool:
3✔
223
        """
224
        :rtype: bool
225
        """
226
        return self._job.power
×
227

228
    def set_power(self, power: bool):
3✔
229
        """
230
        :param bool power:
231
        """
232
        self._job.set_power(power)
×
233
        if power:
×
234
            self._job.wait_until_ready()
×
235

236
    @overrides(MachineAllocationController.where_is_machine)
3✔
237
    def where_is_machine(
3✔
238
            self, chip_x: int, chip_y: int) -> Tuple[int, int, int]:
UNCOV
239
        return self._job.where_is_machine(chip_y=chip_y, chip_x=chip_x)
×
240

241
    @overrides(MachineAllocationController._wait)
3✔
242
    def _wait(self) -> bool:
3✔
243
        try:
×
244
            if self._state != JobState.destroyed:
×
245
                self._state = self._job.wait_for_state_change(self._state)
×
246
        except TypeError:
×
247
            pass
×
248
        except Exception as e:  # pylint: disable=broad-except
×
249
            if not self._exited:
×
250
                raise e
×
251
        return self._state != JobState.destroyed
×
252

253
    @overrides(MachineAllocationController._teardown)
3✔
254
    def _teardown(self) -> None:
3✔
255
        if not self._exited:
×
256
            self._job.close()
×
257
        super()._teardown()
×
258

259

260
_MACHINE_VERSION = 5
3✔
261

262

263
def spalloc_allocator(
3✔
264
        bearer_token: Optional[str] = None, group: Optional[str] = None,
265
        collab: Optional[str] = None, nmpi_job: Union[int, str, None] = None,
266
        nmpi_user: Optional[str] = None) -> Tuple[
267
            str, int, None, bool, bool, Dict[XY, str],
268
            MachineAllocationController]:
269
    """
270
    Request a machine from a SPALLOC server that will fit the given
271
    number of chips.
272

273
    :param bearer_token: The bearer token to use
274
    :type bearer_token: str or None
275
    :param group: The group to associate with or None for no group
276
    :type group: str or None
277
    :param collab: The collab to associate with or None for no collab
278
    :type collab: str or None
279
    :param nmpi_job: The NMPI Job to associate with or None for no job
280
    :type nmpi_job: str or None
281
    :param nmpi_user: The NMPI username to associate with or None for no user
282
    :type nmpi_user: str or None
283
    :return:
284
        host, board version, BMP details, reset on startup flag,
285
        auto-detect BMP flag, board address map, allocation controller
286
    :rtype: tuple(str, int, object, bool, bool, dict(tuple(int,int),str),
287
        MachineAllocationController)
288
    """
289
    spalloc_server = get_config_str("Machine", "spalloc_server")
×
290

291
    # Work out how many boards are needed
292
    if FecDataView.has_n_boards_required():
×
293
        n_boards = FecDataView.get_n_boards_required()
×
294
    else:
295
        n_chips = FecDataView.get_n_chips_needed()
×
296
        # reduce max chips by 2 in case you get a bad board(s)
297
        chips_div = FecDataView.get_machine_version().n_chips_per_board - 2
×
298
        n_boards_float = float(n_chips) / chips_div
×
299
        logger.info("{:.2f} Boards Required for {} chips",
×
300
                    n_boards_float, n_chips)
301
        # If the number of boards rounded up is less than 50% of a board
302
        # bigger than the actual number of boards,
303
        # add another board just in case.
304
        n_boards = int(math.ceil(n_boards_float))
×
305
        if n_boards - n_boards_float < 0.5:
×
306
            n_boards += 1
×
307

308
    if is_server_address(spalloc_server):
×
309
        host, connections, mac = _allocate_job_new(
×
310
            spalloc_server, n_boards, bearer_token, group, collab,
311
            int(nmpi_job) if nmpi_job is not None else None,
312
            nmpi_user)
313
    else:
314
        host, connections, mac = _allocate_job_old(spalloc_server, n_boards)
×
315
    return (host, _MACHINE_VERSION, None, False, False, connections, mac)
×
316

317

318
def _allocate_job_new(
3✔
319
        spalloc_server: str, n_boards: int,
320
        bearer_token: Optional[str] = None, group: Optional[str] = None,
321
        collab: Optional[str] = None, nmpi_job: Optional[int] = None,
322
        nmpi_user: Optional[str] = None) -> Tuple[
323
            str, Dict[XY, str], MachineAllocationController]:
324
    """
325
    Request a machine from an new-style spalloc server that will fit the
326
    given number of boards.
327

328
    :param str spalloc_server:
329
        The server from which the machine should be requested
330
    :param int n_boards: The number of boards required
331
    :param bearer_token: The bearer token to use
332
    :type bearer_token: str or None
333
    :param group: The group to associate with or None for no group
334
    :type group: str or None
335
    :param collab: The collab to associate with or None for no collab
336
    :type collab: str or None
337
    :param nmpi_job: The NMPI Job to associate with or None for no job
338
    :type nmpi_job: int or None
339
    :param nmpi_user: The NMPI username to associate with or None for no user
340
    :type nmpi_user: str or None
341

342
    :rtype: tuple(str, dict(tuple(int,int),str), MachineAllocationController)
343
    """
344
    logger.info(f"Requesting job with {n_boards} boards")
×
345
    with ExitStack() as stack:
×
346
        spalloc_machine = get_config_str_or_none("Machine", "spalloc_machine")
×
347
        use_proxy = get_config_bool("Machine", "spalloc_use_proxy")
×
348
        client = SpallocClient(
×
349
            spalloc_server, bearer_token=bearer_token, group=group,
350
            collab=collab, nmpi_job=nmpi_job, nmpi_user=nmpi_user)
351
        stack.enter_context(cast(ContextManager[SpallocClient], client))
×
352
        job = client.create_job(n_boards, spalloc_machine)
×
353
        stack.enter_context(job)
×
354
        task = job.launch_keepalive_task()
×
355
        stack.enter_context(task)
×
356
        job.wait_until_ready()
×
357
        connections = job.get_connections()
×
358
        with ProvenanceWriter() as db:
×
359
            db.insert_board_provenance(connections)
×
360
        root = connections.get((0, 0), None)
×
361
        if logger.isEnabledFor(logging.DEBUG):
×
362
            logger.debug(
×
363
                "boards: {}",
364
                str(connections).replace("{", "[").replace("}", "]"))
365
        allocation_controller = SpallocJobController(
×
366
            client, job, task, use_proxy or False)
367
        # Success! We don't want to close the client, job or task now;
368
        # the allocation controller now owns them.
369
        stack.pop_all()
×
370
    assert root is not None, "no root of ready board"
×
371
    return (root, connections, allocation_controller)
×
372

373

374
def _allocate_job_old(spalloc_server: str, n_boards: int) -> Tuple[
3✔
375
        str, Dict[XY, str], MachineAllocationController]:
376
    """
377
    Request a machine from an old-style spalloc server that will fit the
378
    requested number of boards.
379

380
    :param str spalloc_server:
381
        The server from which the machine should be requested
382
    :param int n_boards: The number of boards required
383
    :rtype: tuple(str, dict(tuple(int,int),str), MachineAllocationController)
384
    """
385
    host, port, user = parse_old_spalloc(
×
386
        spalloc_server, get_config_int("Machine", "spalloc_port"),
387
        get_config_str("Machine", "spalloc_user"))
388
    spalloc_kwargs = {
×
389
        'hostname': host,
390
        'port': port,
391
        'owner': user
392
    }
393
    spalloc_machine = get_config_str_or_none("Machine", "spalloc_machine")
×
394

395
    if spalloc_machine is not None:
×
396
        spalloc_kwargs['machine'] = spalloc_machine
×
397

398
    job, hostname, scamp_connection_data = _launch_checked_job_old(
×
399
        n_boards, spalloc_kwargs)
400
    machine_allocation_controller = _OldSpallocJobController(job, hostname)
×
401
    return (hostname, scamp_connection_data, machine_allocation_controller)
×
402

403

404
def _launch_checked_job_old(n_boards: int, spalloc_kwargs: dict) -> Tuple[
3✔
405
        Job, str, Dict[XY, str]]:
406
    """
407
    :rtype: tuple(~.Job, str, dict(tuple(int,int),str))
408
    """
409
    logger.info(f"Requesting job with {n_boards} boards")
×
410
    avoid_boards = get_config_str_list("Machine", "spalloc_avoid_boards")
×
411
    avoid_jobs = []
×
412
    try:
×
413
        while True:
×
414
            job = Job(n_boards, **spalloc_kwargs)
×
415
            try:
×
416
                job.wait_until_ready()
×
417
                # get param from jobs before starting, so that hanging doesn't
418
                # occur
419
                hostname = job.hostname
×
420
            except Exception as ex:
×
421
                job.destroy(str(ex))
×
422
                raise
×
423
            connections = job.connections
×
424
            if len(connections) < n_boards:
×
425
                logger.warning(
×
426
                    "boards: {}",
427
                    str(connections).replace("{", "[").replace("}", "]"))
428
                raise ValueError("Not enough connections detected")
×
429
            if logger.isEnabledFor(logging.DEBUG):
×
430
                logger.debug("boards: {}",
×
431
                             str(connections).replace("{", "[").replace(
432
                                 "}", "]"))
433
            with ProvenanceWriter() as db:
×
434
                db.insert_board_provenance(connections)
×
435
            if hostname not in avoid_boards:
×
436
                break
×
437
            avoid_jobs.append(job)
×
438
            logger.warning(
×
439
                f"Asking for new job as {hostname} "
440
                f"as in the spalloc_avoid_boards list")
441
    finally:
442
        if avoid_boards:
×
443
            for key in list(connections.keys()):
×
444
                if connections[key] in avoid_boards:
×
445
                    logger.warning(
×
446
                        f"Removing connection info for {connections[key]} "
447
                        f"as in the spalloc avoid_boards list")
448
                    del connections[key]
×
449
        for avoid_job in avoid_jobs:
×
450
            avoid_job.destroy("Asked to avoid by cfg")
×
451
    return job, hostname, connections
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc