• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / SpiNNFrontEndCommon / 8143402062

04 Mar 2024 04:13PM UTC coverage: 47.969% (-0.02%) from 47.992%
8143402062

push

github

web-flow
Merge pull request #1157 from SpiNNakerManchester/pylint_default

Pylint default

2050 of 4800 branches covered (42.71%)

Branch coverage included in aggregate %.

63 of 110 new or added lines in 37 files covered. (57.27%)

26 existing lines in 18 files now uncovered.

5484 of 10906 relevant lines covered (50.28%)

0.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

35.42
/spinn_front_end_common/interface/interface_functions/spalloc_allocator.py
1
# Copyright (c) 2016 The University of Manchester
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     https://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

15
from contextlib import AbstractContextManager, ExitStack
1✔
16
import logging
1✔
17
import math
1✔
18
from typing import ContextManager, Dict, Tuple, Optional, Union, cast
1✔
19

20
from spinn_utilities.config_holder import (
1✔
21
    get_config_bool, get_config_str_or_none, get_config_str_list)
22
from spinn_utilities.log import FormatAdapter
1✔
23
from spinn_utilities.overrides import overrides
1✔
24
from spinn_utilities.typing.coords import XY
1✔
25
from spinn_utilities.config_holder import get_config_int, get_config_str
1✔
26

27
from spalloc_client import Job  # type: ignore[import]
1✔
28
from spalloc_client.states import JobState  # type: ignore[import]
1✔
29

30
from spinnman.connections.udp_packet_connections import (
1✔
31
    SCAMPConnection, EIEIOConnection)
32
from spinnman.constants import SCP_SCAMP_PORT
1✔
33
from spinnman.spalloc import (
1✔
34
    is_server_address, SpallocClient, SpallocJob, SpallocState)
35
from spinnman.transceiver import Transceiver
1✔
36

37
from spinn_front_end_common.abstract_models.impl import (
1✔
38
    MachineAllocationController)
39
from spinn_front_end_common.data import FecDataView
1✔
40
from spinn_front_end_common.interface.provenance import ProvenanceWriter
1✔
41
from spinn_front_end_common.utilities.utility_calls import parse_old_spalloc
1✔
42

43
logger = FormatAdapter(logging.getLogger(__name__))
1✔
44
_MACHINE_VERSION = 5  # Spalloc only ever works with v5 boards
1✔
45

46

47
class SpallocJobController(MachineAllocationController):
1✔
48
    """
49
    A class to Create and support Transceivers specific for Spalloc.
50
    """
51

52
    __slots__ = (
1✔
53
        # the spalloc job object
54
        "_job",
55
        # the current job's old state
56
        "_state",
57
        "__client",
58
        "__closer",
59
        "__use_proxy"
60
    )
61

62
    def __init__(
1✔
63
            self, client: SpallocClient, job: SpallocJob,
64
            task: AbstractContextManager, use_proxy: bool):
65
        """
66
        :param ~spinnman.spalloc.SpallocClient client:
67
        :param ~spinnman.spalloc.SpallocJob job:
68
        :param task:
69
        :type task:
70
            ~spinn_utilities.abstract_context_manager.AbstractContextManager
71
        :param bool use_proxy:
72
        """
73
        if job is None:
×
74
            raise TypeError("must have a real job")
×
75
        self.__client = client
×
76
        self.__closer = task
×
77
        self._job = job
×
78
        self._state = job.get_state()
×
79
        self.__use_proxy = use_proxy
×
80
        super().__init__("SpallocJobController")
×
81

82
    @property
1✔
83
    def job(self) -> SpallocJob:
1✔
84
        """
85
        The job value passed into the init.
86

87
        :rtype: SpallocJob
88
        """
UNCOV
89
        return self._job
×
90

91
    @overrides(MachineAllocationController.extend_allocation)
1✔
92
    def extend_allocation(self, new_total_run_time: float):
1✔
93
        # Does Nothing in this allocator - machines are held until exit
94
        pass
×
95

96
    def __stop(self) -> None:
1✔
97
        self.__closer.__exit__(None, None, None)
×
98
        self._job.destroy()
×
99
        self.__client.close()
×
100

101
    @overrides(MachineAllocationController.close)
1✔
102
    def close(self) -> None:
1✔
103
        super().close()
×
104
        self.__stop()
×
105

106
    @overrides(MachineAllocationController.where_is_machine)
1✔
107
    def where_is_machine(
1✔
108
            self, chip_x: int, chip_y: int) -> Tuple[int, int, int]:
109
        """
110
        :param int chip_x:
111
        :param int chip_y:
112
        :rtype: tuple(int,int,int)
113
        """
114
        result = self._job.where_is_machine(x=chip_x, y=chip_y)
×
115
        if result is None:
×
116
            raise ValueError("coordinates lie outside machine")
×
117
        return result
×
118

119
    @overrides(MachineAllocationController._wait)
1✔
120
    def _wait(self) -> bool:
1✔
121
        try:
×
122
            if self._state != SpallocState.DESTROYED:
×
123
                self._state = self._job.wait_for_state_change(self._state)
×
124
        except TypeError:
×
125
            pass
×
126
        except Exception as e:  # pylint: disable=broad-except
×
127
            if not self._exited:
×
128
                raise e
×
129
        return self._state != SpallocState.DESTROYED
×
130

131
    @overrides(MachineAllocationController._teardown)
1✔
132
    def _teardown(self) -> None:
1✔
133
        if not self._exited:
×
134
            self.__stop()
×
135
        super()._teardown()
×
136

137
    @overrides(MachineAllocationController.create_transceiver,
1✔
138
               extend_doc=True)
139
    def create_transceiver(self) -> Transceiver:
1✔
140
        """
141
        .. note::
142
            This allocation controller proxies the transceiver's connections
143
            via Spalloc. This allows it to work even outside the UNIMAN
144
            firewall.
145
        """
146
        if not self.__use_proxy:
×
147
            return super().create_transceiver()
×
148
        txrx = self._job.create_transceiver()
×
149
        return txrx
×
150

151
    @overrides(MachineAllocationController.can_create_transceiver)
1✔
152
    def can_create_transceiver(self) -> bool:
1✔
153
        if not self.__use_proxy:
×
154
            return super().can_create_transceiver()
×
155
        return True
×
156

157
    @overrides(MachineAllocationController.open_sdp_connection,
1✔
158
               extend_doc=True)
159
    def open_sdp_connection(
1✔
160
            self, chip_x: int, chip_y: int,
161
            udp_port: int = SCP_SCAMP_PORT) -> Optional[SCAMPConnection]:
162
        """
163
        .. note::
164
            This allocation controller proxies connections via Spalloc. This
165
            allows it to work even outside the UNIMAN firewall.
166
        """
167
        if not self.__use_proxy:
×
168
            return super().open_sdp_connection(chip_x, chip_y, udp_port)
×
169
        return self._job.connect_to_board(chip_x, chip_y, udp_port)
×
170

171
    @overrides(MachineAllocationController.open_eieio_connection,
1✔
172
               extend_doc=True)
173
    def open_eieio_connection(
1✔
174
            self, chip_x: int, chip_y: int) -> Optional[EIEIOConnection]:
175
        """
176
        .. note::
177
            This allocation controller proxies connections via Spalloc. This
178
            allows it to work even outside the UNIMAN firewall.
179
        """
180
        if not self.__use_proxy:
×
181
            return super().open_eieio_connection(chip_x, chip_y)
×
182
        return self._job.open_eieio_connection(chip_x, chip_y)
×
183

184
    @overrides(MachineAllocationController.open_eieio_listener,
1✔
185
               extend_doc=True)
186
    def open_eieio_listener(self) -> EIEIOConnection:
1✔
187
        """
188
        .. note::
189
            This allocation controller proxies connections via Spalloc. This
190
            allows it to work even outside the UNIMAN firewall.
191
        """
192
        if not self.__use_proxy:
×
193
            return super().open_eieio_listener()
×
194
        return self._job.open_eieio_listener_connection()
×
195

196
    @property
1✔
197
    @overrides(MachineAllocationController.proxying)
1✔
198
    def proxying(self) -> bool:
1✔
199
        return self.__use_proxy
×
200

201
    @overrides(MachineAllocationController.make_report)
1✔
202
    def make_report(self, filename: str):
1✔
203
        with open(filename, "w", encoding="utf-8") as report:
×
204
            report.write(f"Job: {self._job}")
×
205

206

207
class _OldSpallocJobController(MachineAllocationController):
1✔
208
    __slots__ = (
1✔
209
        # the spalloc job object
210
        "_job",
211
        # the current job's old state
212
        "_state"
213
    )
214

215
    def __init__(self, job: Job, host: str):
1✔
216
        """
217
        :param ~spalloc.job.Job job:
218
        """
219
        if job is None:
×
220
            raise TypeError("must have a real job")
×
221
        self._job = job
×
222
        self._state = job.state
×
223
        super().__init__("SpallocJobController", host)
×
224

225
    @overrides(MachineAllocationController.extend_allocation)
1✔
226
    def extend_allocation(self, new_total_run_time: float):
1✔
227
        # Does Nothing in this allocator - machines are held until exit
228
        pass
×
229

230
    @overrides(MachineAllocationController.close)
1✔
231
    def close(self) -> None:
1✔
232
        super().close()
×
233
        self._job.destroy()
×
234

235
    @property
1✔
236
    def power(self) -> bool:
1✔
237
        """
238
        :rtype: bool
239
        """
240
        return self._job.power
×
241

242
    def set_power(self, power: bool):
1✔
243
        """
244
        :param bool power:
245
        """
246
        self._job.set_power(power)
×
247
        if power:
×
248
            self._job.wait_until_ready()
×
249

250
    @overrides(MachineAllocationController.where_is_machine)
1✔
251
    def where_is_machine(
1✔
252
            self, chip_x: int, chip_y: int) -> Tuple[int, int, int]:
253
        return self._job.where_is_machine(chip_y=chip_y, chip_x=chip_x)
×
254

255
    @overrides(MachineAllocationController._wait)
1✔
256
    def _wait(self) -> bool:
1✔
257
        try:
×
258
            if self._state != JobState.destroyed:
×
259
                self._state = self._job.wait_for_state_change(self._state)
×
260
        except TypeError:
×
261
            pass
×
262
        except Exception as e:  # pylint: disable=broad-except
×
263
            if not self._exited:
×
264
                raise e
×
265
        return self._state != JobState.destroyed
×
266

267
    @overrides(MachineAllocationController._teardown)
1✔
268
    def _teardown(self) -> None:
1✔
269
        if not self._exited:
×
270
            self._job.close()
×
271
        super()._teardown()
×
272

273

274
_MACHINE_VERSION = 5
1✔
275

276

277
def spalloc_allocator(
1✔
278
        bearer_token: Optional[str] = None, group: Optional[str] = None,
279
        collab: Optional[str] = None, nmpi_job: Union[int, str, None] = None,
280
        nmpi_user: Optional[str] = None) -> Tuple[
281
            str, int, None, bool, bool, Dict[XY, str],
282
            MachineAllocationController]:
283
    """
284
    Request a machine from a SPALLOC server that will fit the given
285
    number of chips.
286

287
    :param bearer_token: The bearer token to use
288
    :type bearer_token: str or None
289
    :param group: The group to associate with or None for no group
290
    :type group: str or None
291
    :param collab: The collab to associate with or None for no collab
292
    :type collab: str or None
293
    :param nmpi_job: The NMPI Job to associate with or None for no job
294
    :type nmpi_job: str or None
295
    :param nmpi_user: The NMPI username to associate with or None for no user
296
    :type nmpi_user: str or None
297
    :return:
298
        host, board version, BMP details, reset on startup flag,
299
        auto-detect BMP flag, board address map, allocation controller
300
    :rtype: tuple(str, int, object, bool, bool, dict(tuple(int,int),str),
301
        MachineAllocationController)
302
    """
303
    spalloc_server = get_config_str("Machine", "spalloc_server")
×
304

305
    # Work out how many boards are needed
306
    if FecDataView.has_n_boards_required():
×
307
        n_boards = FecDataView.get_n_boards_required()
×
308
    else:
309
        n_chips = FecDataView.get_n_chips_needed()
×
310
        # reduce max chips by 2 in case you get a bad board(s)
311
        chips_div = FecDataView.get_machine_version().n_chips_per_board - 2
×
312
        n_boards_float = float(n_chips) / chips_div
×
313
        logger.info("{:.2f} Boards Required for {} chips",
×
314
                    n_boards_float, n_chips)
315
        # If the number of boards rounded up is less than 50% of a board
316
        # bigger than the actual number of boards,
317
        # add another board just in case.
318
        n_boards = int(math.ceil(n_boards_float))
×
319
        if n_boards - n_boards_float < 0.5:
×
320
            n_boards += 1
×
321

322
    if is_server_address(spalloc_server):
×
323
        host, connections, mac = _allocate_job_new(
×
324
            spalloc_server, n_boards, bearer_token, group, collab,
325
            int(nmpi_job) if nmpi_job is not None else None,
326
            nmpi_user)
327
    else:
328
        host, connections, mac = _allocate_job_old(spalloc_server, n_boards)
×
329
    return (host, _MACHINE_VERSION, None, False, False, connections, mac)
×
330

331

332
def _allocate_job_new(
1✔
333
        spalloc_server: str, n_boards: int,
334
        bearer_token: Optional[str] = None, group: Optional[str] = None,
335
        collab: Optional[str] = None, nmpi_job: Optional[int] = None,
336
        nmpi_user: Optional[str] = None) -> Tuple[
337
            str, Dict[XY, str], MachineAllocationController]:
338
    """
339
    Request a machine from an new-style spalloc server that will fit the
340
    given number of boards.
341

342
    :param str spalloc_server:
343
        The server from which the machine should be requested
344
    :param int n_boards: The number of boards required
345
    :param bearer_token: The bearer token to use
346
    :type bearer_token: str or None
347
    :param group: The group to associate with or None for no group
348
    :type group: str or None
349
    :param collab: The collab to associate with or None for no collab
350
    :type collab: str or None
351
    :param nmpi_job: The NMPI Job to associate with or None for no job
352
    :type nmpi_job: int or None
353
    :param nmpi_user: The NMPI username to associate with or None for no user
354
    :type nmpi_user: str or None
355

356
    :rtype: tuple(str, dict(tuple(int,int),str), MachineAllocationController)
357
    """
358
    logger.info(f"Requesting job with {n_boards} boards")
×
359
    with ExitStack() as stack:
×
360
        spalloc_machine = get_config_str_or_none("Machine", "spalloc_machine")
×
361
        use_proxy = get_config_bool("Machine", "spalloc_use_proxy")
×
362
        client = SpallocClient(
×
363
            spalloc_server, bearer_token=bearer_token, group=group,
364
            collab=collab, nmpi_job=nmpi_job, nmpi_user=nmpi_user)
365
        stack.enter_context(cast(ContextManager[SpallocClient], client))
×
366
        job = client.create_job(n_boards, spalloc_machine)
×
367
        stack.enter_context(job)
×
368
        task = job.launch_keepalive_task()
×
369
        stack.enter_context(task)
×
370
        job.wait_until_ready()
×
371
        connections = job.get_connections()
×
372
        with ProvenanceWriter() as db:
×
373
            db.insert_board_provenance(connections)
×
374
        root = connections.get((0, 0), None)
×
375
        if logger.isEnabledFor(logging.DEBUG):
×
376
            logger.debug(
×
377
                "boards: {}",
378
                str(connections).replace("{", "[").replace("}", "]"))
379
        allocation_controller = SpallocJobController(
×
380
            client, job, task, use_proxy or False)
381
        # Success! We don't want to close the client, job or task now;
382
        # the allocation controller now owns them.
383
        stack.pop_all()
×
384
    assert root is not None, "no root of ready board"
×
385
    return (root, connections, allocation_controller)
×
386

387

388
def _allocate_job_old(spalloc_server: str, n_boards: int) -> Tuple[
1✔
389
        str, Dict[XY, str], MachineAllocationController]:
390
    """
391
    Request a machine from an old-style spalloc server that will fit the
392
    requested number of boards.
393

394
    :param str spalloc_server:
395
        The server from which the machine should be requested
396
    :param int n_boards: The number of boards required
397
    :rtype: tuple(str, dict(tuple(int,int),str), MachineAllocationController)
398
    """
399
    host, port, user = parse_old_spalloc(
×
400
        spalloc_server, get_config_int("Machine", "spalloc_port"),
401
        get_config_str("Machine", "spalloc_user"))
402
    spalloc_kwargs = {
×
403
        'hostname': host,
404
        'port': port,
405
        'owner': user
406
    }
407
    spalloc_machine = get_config_str_or_none("Machine", "spalloc_machine")
×
408

409
    if spalloc_machine is not None:
×
410
        spalloc_kwargs['machine'] = spalloc_machine
×
411

412
    job, hostname, scamp_connection_data = _launch_checked_job_old(
×
413
        n_boards, spalloc_kwargs)
414
    machine_allocation_controller = _OldSpallocJobController(job, hostname)
×
415
    return (hostname, scamp_connection_data, machine_allocation_controller)
×
416

417

418
def _launch_checked_job_old(n_boards: int, spalloc_kwargs: dict) -> Tuple[
1✔
419
        Job, str, Dict[XY, str]]:
420
    """
421
    :rtype: tuple(~.Job, str, dict(tuple(int,int),str))
422
    """
423
    logger.info(f"Requesting job with {n_boards} boards")
×
424
    avoid_boards = get_config_str_list("Machine", "spalloc_avoid_boards")
×
425
    avoid_jobs = []
×
426
    try:
×
427
        while True:
×
428
            job = Job(n_boards, **spalloc_kwargs)
×
429
            try:
×
430
                job.wait_until_ready()
×
431
                # get param from jobs before starting, so that hanging doesn't
432
                # occur
433
                hostname = job.hostname
×
434
            except Exception as ex:
×
435
                job.destroy(str(ex))
×
436
                raise
×
437
            connections = job.connections
×
438
            if len(connections) < n_boards:
×
439
                logger.warning(
×
440
                    "boards: {}",
441
                    str(connections).replace("{", "[").replace("}", "]"))
442
                raise ValueError("Not enough connections detected")
×
443
            if logger.isEnabledFor(logging.DEBUG):
×
444
                logger.debug("boards: {}",
×
445
                             str(connections).replace("{", "[").replace(
446
                                 "}", "]"))
447
            with ProvenanceWriter() as db:
×
448
                db.insert_board_provenance(connections)
×
449
            if hostname not in avoid_boards:
×
450
                break
×
451
            avoid_jobs.append(job)
×
452
            logger.warning(
×
453
                f"Asking for new job as {hostname} "
454
                f"as in the spalloc_avoid_boards list")
455
    finally:
456
        if avoid_boards:
×
457
            for key in list(connections.keys()):
×
458
                if connections[key] in avoid_boards:
×
459
                    logger.warning(
×
460
                        f"Removing connection info for {connections[key]} "
461
                        f"as in the spalloc avoid_boards list")
462
                    del connections[key]
×
463
        for avoid_job in avoid_jobs:
×
464
            avoid_job.destroy("Asked to avoid by cfg")
×
465
    return job, hostname, connections
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc