• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / SpiNNMan / 8143391027

04 Mar 2024 04:13PM UTC coverage: 60.422% (+0.4%) from 60.052%
8143391027

push

github

web-flow
Merge pull request #391 from SpiNNakerManchester/pylint_default

Pylint default

2245 of 3523 branches covered (63.72%)

Branch coverage included in aggregate %.

189 of 216 new or added lines in 56 files covered. (87.5%)

96 existing lines in 24 files now uncovered.

4825 of 8178 relevant lines covered (59.0%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.21
/spinnman/spalloc/spalloc_client.py
1
# Copyright (c) 2021 The University of Manchester
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     https://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
"""
1✔
15
Implementation of the client for the Spalloc web service.
16
"""
17

18
from contextlib import contextmanager
1✔
19
from logging import getLogger
1✔
20
from multiprocessing import Process, Queue
1✔
21
import queue
1✔
22
import struct
1✔
23
import threading
1✔
24
from time import sleep
1✔
25
from typing import (Any, ContextManager, Callable, Dict, FrozenSet, Iterable,
1✔
26
                    Iterator, List, Mapping, Optional, Tuple, cast)
27
from urllib.parse import urlparse, urlunparse, ParseResult
1✔
28

29
from packaging.version import Version
1✔
30
import requests
1✔
31
from typing_extensions import TypeAlias
1✔
32
from websocket import WebSocket  # type: ignore
1✔
33

34
from spinn_utilities.abstract_base import AbstractBase, abstractmethod
1✔
35
from spinn_utilities.abstract_context_manager import AbstractContextManager
1✔
36
from spinn_utilities.log import FormatAdapter
1✔
37
from spinn_utilities.typing.coords import XY
1✔
38
from spinn_utilities.typing.json import JsonObject, JsonValue
1✔
39
from spinn_utilities.overrides import overrides
1✔
40

41
from spinnman.connections.udp_packet_connections import UDPConnection
1✔
42
from spinnman.connections.abstract_classes import Connection, Listenable
1✔
43
from spinnman.constants import SCP_SCAMP_PORT, UDP_BOOT_CONNECTION_DEFAULT_PORT
1✔
44
from spinnman.exceptions import SpinnmanTimeoutException
1✔
45
from spinnman.exceptions import SpallocException
1✔
46
from spinnman.transceiver import (
1✔
47
    Transceiver, create_transceiver_from_connections)
48

49
from .abstract_spalloc_client import AbstractSpallocClient
1✔
50
from .proxy_protocol import ProxyProtocol
1✔
51
from .session import Session, SessionAware
1✔
52
from .spalloc_boot_connection import SpallocBootConnection
1✔
53
from .spalloc_eieio_connection import SpallocEIEIOConnection
1✔
54
from .spalloc_eieio_listener import SpallocEIEIOListener
1✔
55
from .spalloc_job import SpallocJob
1✔
56
from .spalloc_machine import SpallocMachine
1✔
57
from .spalloc_proxied_connection import SpallocProxiedConnection
1✔
58
from .spalloc_scp_connection import SpallocSCPConnection
1✔
59
from .spalloc_state import SpallocState
1✔
60
from .utils import parse_service_url, get_hostname
1✔
61

62
logger = FormatAdapter(getLogger(__name__))
1✔
63
_open_req = struct.Struct("<IIIII")
1✔
64
_close_req = struct.Struct("<III")
1✔
65
_open_listen_req = struct.Struct("<II")
1✔
66
# Open and close share the response structure
67
_open_close_res = struct.Struct("<III")
1✔
68
_open_listen_res = struct.Struct("<IIIBBBBI")
1✔
69
_msg = struct.Struct("<II")
1✔
70
_msg_to = struct.Struct("<IIIII")
1✔
71

72

73
def fix_url(url: Any) -> str:
1✔
74
    """
75
    Makes sure the url is the correct format.
76

77
    :param str url: original url
78
    :rtype: str
79
    """
80
    parts = urlparse(url)
×
81
    if parts.scheme != 'https':
×
82
        parts = ParseResult("https", parts.netloc, parts.path,
×
83
                            parts.params, parts. query, parts.fragment)
84
    if not parts.path.endswith("/"):
×
85
        parts = ParseResult(parts.scheme, parts.netloc, parts.path + "/",
×
86
                            parts.params, parts.query, parts.fragment)
87
    return urlunparse(parts)
×
88

89

90
class SpallocClient(AbstractContextManager, AbstractSpallocClient):
1✔
91
    """
92
    Basic client library for talking to new Spalloc.
93
    """
94
    __slots__ = ("__session",
1✔
95
                 "__machines_url", "__jobs_url", "version",
96
                 "__group", "__collab", "__nmpi_job", "__nmpi_user")
97

98
    def __init__(
1✔
99
            self, service_url: str,
100
            username: Optional[str] = None, password: Optional[str] = None,
101
            bearer_token: Optional[str] = None,
102
            group: Optional[str] = None, collab: Optional[str] = None,
103
            nmpi_job: Optional[int] = None, nmpi_user: Optional[str] = None):
104
        """
105
        :param str service_url: The reference to the service.
106
            May have username and password supplied as part of the network
107
            location; if so, the ``username`` and ``password`` arguments
108
            *must* be ``None``. If ``username`` and ``password`` are not given,
109
            not even within the URL, the ``bearer_token`` must be not ``None``.
110
        :param str username: The user name to use
111
        :param str password: The password to use
112
        :param str bearer_token: The bearer token to use
113
        """
114
        if username is None and password is None:
×
115
            service_url, username, password = parse_service_url(service_url)
×
116
        self.__session: Optional[Session] = Session(
×
117
            service_url, username, password, bearer_token)
118
        obj = self.__session.renew()
×
119
        v = cast(JsonObject, obj["version"])
×
120
        self.version = Version(
×
121
            f"{v['major-version']}.{v['minor-version']}.{v['revision']}")
122
        self.__machines_url = fix_url(obj["machines-ref"])
×
123
        self.__jobs_url = fix_url(obj["jobs-ref"])
×
124
        self.__group = group
×
125
        self.__collab = collab
×
126
        self.__nmpi_job = nmpi_job
×
127
        self.__nmpi_user = nmpi_user
×
128
        logger.info("established session to {} for {}", service_url, username)
×
129

130
    @staticmethod
1✔
131
    def open_job_from_database(
1✔
132
            service_url, job_url, cookies, headers) -> SpallocJob:
133
        """
134
        Create a job from the description in the attached database. This is
135
        intended to allow for access to the job's allocated resources from
136
        visualisers and other third party code participating in the SpiNNaker
137
        Tools Notification Protocol.
138

139
        .. note::
140
            The job is not verified to exist and be running. The session
141
            credentials may have expired; if so, the job will be unable to
142
            regenerate them.
143

144
        :param str service_url:
145
        :param str job_url:
146
        :param dict(str, str) cookies:
147
        :param dict(str, str) headers:
148

149
        :return:
150
            The job handle, or ``None`` if the records in the database are
151
            absent or incomplete.
152
        :rtype: SpallocJob
153
        """
154
        session = Session(service_url, session_credentials=(cookies, headers))
×
155
        return _SpallocJob(session, job_url)
×
156

157
    @overrides(AbstractSpallocClient.list_machines)
1✔
158
    def list_machines(self) -> Dict[str, SpallocMachine]:
1✔
159
        assert self.__session
×
160
        obj = self.__session.get(self.__machines_url).json()
×
161
        return {m["name"]: _SpallocMachine(self.__session, m)
×
162
                for m in obj["machines"]}
163

164
    @overrides(AbstractSpallocClient.list_jobs)
1✔
165
    def list_jobs(self, deleted: bool = False) -> Iterable[SpallocJob]:
1✔
166
        assert self.__session
×
167
        obj = self.__session.get(
×
168
            self.__jobs_url,
169
            deleted=("true" if deleted else "false")).json()
170
        while obj["jobs"]:
×
171
            for u in obj["jobs"]:
×
172
                yield _SpallocJob(self.__session, fix_url(u))
×
173
            if "next" not in obj:
×
174
                break
×
175
            obj = self.__session.get(obj["next"]).json()
×
176

177
    def _create(self, create: Mapping[str, JsonValue],
1✔
178
                machine_name: Optional[str]) -> SpallocJob:
179
        assert self.__session
×
180
        operation = dict(create)
×
181
        if machine_name:
×
182
            operation["machine-name"] = machine_name
×
183
        else:
184
            operation["tags"] = ["default"]
×
185
        if self.__group is not None:
×
186
            operation["group"] = self.__group
×
187
        if self.__collab is not None:
×
188
            operation["nmpi-collab"] = self.__collab
×
189
        if self.__nmpi_job is not None:
×
190
            operation["nmpi-job-id"] = self.__nmpi_job
×
191
            if self.__nmpi_user is not None:
×
192
                operation["owner"] = self.__nmpi_user
×
193
        logger.info("Posting {} to {}", operation, self.__jobs_url)
×
194
        r = self.__session.post(self.__jobs_url, operation, timeout=30)
×
195
        url = r.headers["Location"]
×
196
        return _SpallocJob(self.__session, fix_url(url))
×
197

198
    @overrides(AbstractSpallocClient.create_job)
1✔
199
    def create_job(
1✔
200
            self, num_boards: int = 1,
201
            machine_name: Optional[str] = None,
202
            keepalive: int = 45) -> SpallocJob:
203
        return self._create({
×
204
            "num-boards": int(num_boards),
205
            "keepalive-interval": f"PT{int(keepalive)}S"
206
        }, machine_name)
207

208
    @overrides(AbstractSpallocClient.create_job_rect)
1✔
209
    def create_job_rect(
1✔
210
            self, width: int, height: int,
211
            machine_name: Optional[str] = None,
212
            keepalive: int = 45) -> SpallocJob:
213
        return self._create({
×
214
            "dimensions": {
215
                "width": int(width),
216
                "height": int(height)
217
            },
218
            "keepalive-interval": f"PT{int(keepalive)}S"
219
        }, machine_name)
220

221
    @overrides(AbstractSpallocClient.create_job_board)
1✔
222
    def create_job_board(
1✔
223
            self, triad: Optional[Tuple[int, int, int]] = None,
224
            physical: Optional[Tuple[int, int, int]] = None,
225
            ip_address: Optional[str] = None,
226
            machine_name: Optional[str] = None,
227
            keepalive: int = 45) -> SpallocJob:
228
        board: JsonObject
229
        if triad:
×
230
            x, y, z = triad
×
231
            board = {"x": int(x), "y": int(y), "z": int(z)}
×
232
        elif physical:
×
233
            c, f, b = physical
×
234
            board = {"cabinet": int(c), "frame": int(f), "board": int(b)}
×
235
        elif ip_address:
×
236
            board = {"address": str(ip_address)}
×
237
        else:
238
            raise KeyError("at least one of triad, physical and ip_address "
×
239
                           "must be given")
240
        return self._create({
×
241
            "board": board,
242
            "keepalive-interval": f"PT{int(keepalive)}S"
243
        }, machine_name)
244

245
    @overrides(AbstractSpallocClient.create_job_rect_at_board)
1✔
246
    def create_job_rect_at_board(
1✔
247
            self, width: int, height: int,
248
            triad: Optional[Tuple[int, int, int]] = None,
249
            physical: Optional[Tuple[int, int, int]] = None,
250
            ip_address: Optional[str] = None,
251
            machine_name: Optional[str] = None, keepalive: int = 45,
252
            max_dead_boards: int = 0) -> SpallocJob:
253
        board: JsonObject
254
        if triad:
×
255
            x, y, z = triad
×
256
            board = {"x": int(x), "y": int(y), "z": int(z)}
×
257
        elif physical:
×
258
            c, f, b = physical
×
259
            board = {"cabinet": int(c), "frame": int(f), "board": int(b)}
×
260
        elif ip_address:
×
261
            board = {"address": str(ip_address)}
×
262
        else:
263
            raise KeyError("at least one of triad, physical and ip_address "
×
264
                           "must be given")
265
        return self._create({
×
266
            "dimensions": {
267
                "width": int(width),
268
                "height": int(height)
269
            },
270
            "board": board,
271
            "keepalive-interval": f"PT{int(keepalive)}S",
272
            "max-dead-boards": int(max_dead_boards)
273
        }, machine_name)
274

275
    def close(self) -> None:
1✔
276
        # pylint: disable=protected-access
277
        if self.__session is not None:
×
278
            self.__session._purge()
×
279
        self.__session = None
×
280

281

282
class _ProxyServiceError(IOError):
1✔
283
    """
284
    An error passed to us from the server over the proxy channel.
285
    """
286

287

288
def _spalloc_keepalive(url, interval, term_queue, cookies, headers):
1✔
289
    """
290
    Actual keepalive task implementation. Don't use directly.
291
    """
292
    headers["Content-Type"] = "text/plain; charset=UTF-8"
×
293
    while True:
×
294
        requests.put(url, data="alive", cookies=cookies, headers=headers,
×
295
                     allow_redirects=False, timeout=10)
296
        try:
×
297
            term_queue.get(True, interval)
×
298
            break
×
299
        except queue.Empty:
×
300
            continue
×
301
        # On ValueError or OSError, just terminate the keepalive process
302
        # They happen when the term_queue is directly closed
303
        except ValueError:
×
304
            break
×
305
        except OSError:
×
306
            break
×
307

308

309
class _SpallocMachine(SessionAware, SpallocMachine):
1✔
310
    """
311
    Represents a Spalloc-controlled machine.
312

313
    Don't make this yourself. Use :py:class:`SpallocClient` instead.
314
    """
315
    __slots__ = ("__name", "__tags", "__width", "__height",
1✔
316
                 "__dead_boards", "__dead_links")
317

318
    def __init__(self, session: Session, machine_data: JsonObject):
1✔
319
        """
320
        :param _Session session:
321
        :param dict machine_data:
322
        """
323
        super().__init__(session, cast(str, machine_data["uri"]))
×
324
        self.__name = cast(str, machine_data["name"])
×
325
        self.__tags = frozenset(cast(List[str], machine_data["tags"]))
×
326
        self.__width = cast(int, machine_data["width"])
×
327
        self.__height = cast(int, machine_data["height"])
×
328
        self.__dead_boards = cast(list, machine_data["dead-boards"])
×
329
        self.__dead_links = cast(list, machine_data["dead-links"])
×
330

331
    @property
1✔
332
    @overrides(SpallocMachine.name)
1✔
333
    def name(self) -> str:
1✔
334
        return self.__name
×
335

336
    @property
1✔
337
    @overrides(SpallocMachine.tags)
1✔
338
    def tags(self) -> FrozenSet[str]:
1✔
339
        return self.__tags
×
340

341
    @property
1✔
342
    @overrides(SpallocMachine.width)
1✔
343
    def width(self) -> int:
1✔
344
        return self.__width
×
345

346
    @property
1✔
347
    @overrides(SpallocMachine.height)
1✔
348
    def height(self) -> int:
1✔
349
        return self.__height
×
350

351
    @property
1✔
352
    @overrides(SpallocMachine.dead_boards)
1✔
353
    def dead_boards(self) -> list:
1✔
354
        return self.__dead_boards
×
355

356
    @property
1✔
357
    @overrides(SpallocMachine.dead_links)
1✔
358
    def dead_links(self) -> list:
1✔
359
        return self.__dead_links
×
360

361
    @property
1✔
362
    @overrides(SpallocMachine.area)
1✔
363
    def area(self) -> Tuple[int, int]:
1✔
364
        return (self.width, self.height)
×
365

366
    def __repr__(self):
1✔
367
        return "SpallocMachine" + str((
×
368
            self.name, self.tags, self.width, self.height, self.dead_boards,
369
            self.dead_links))
370

371

372
class _ProxyPing(threading.Thread):
1✔
373
    """
374
    Sends ping messages to an open websocket
375
    """
376

377
    def __init__(self, ws, sleep_time=30):
1✔
378
        super().__init__(daemon=True)
×
379
        self.__ws = ws
×
380
        self.__sleep_time = sleep_time
×
381
        self.__closed = False
×
382
        self.start()
×
383

384
    def run(self):
1✔
385
        """
386
        The handler loop of this thread
387
        """
388
        while self.__ws.connected:
×
389
            try:
×
390
                self.__ws.ping()
×
391
            except Exception:  # pylint: disable=broad-except
×
392
                # If closed, ignore error and get out of here
393
                if self.__closed:
×
394
                    break
×
395

396
                # Make someone aware of the error
397
                logger.exception("Error in websocket before close")
×
398
            sleep(self.__sleep_time)
×
399

400
    def close(self):
1✔
401
        """
402
        Mark as closed to avoid error messages.
403
        """
404
        self.__closed = True
×
405

406

407
_WSCB: TypeAlias = Callable[[Optional[bytes]], None]
1✔
408

409

410
class _ProxyReceiver(threading.Thread):
1✔
411
    """
412
    Receives all messages off an open websocket and dispatches them to
413
    registered listeners.
414
    """
415

416
    def __init__(self, ws: WebSocket):
1✔
417
        super().__init__(daemon=True)
×
418
        self.__ws = ws
×
419
        self.__returns: Dict[int, _WSCB] = {}
×
420
        self.__handlers: Dict[int, _WSCB] = {}
×
421
        self.__correlation_id = 0
×
422
        self.__closed = False
×
423
        self.start()
×
424

425
    def run(self) -> None:
1✔
426
        """
427
        The handler loop of this thread.
428
        """
429
        while self.__ws.connected:
×
430
            try:
×
431
                result: Tuple[int, bytes] = self.__ws.recv_data()
×
432
                frame = result[1]
×
433
                if len(frame) < _msg.size:
×
434
                    # Message is out of protocol
435
                    continue
×
436
            except Exception:  # pylint: disable=broad-except
×
437
                # If closed, ignore error and get out of here
438
                if self.__closed:
×
439
                    break
×
440

441
                # Make someone aware of the error
442
                logger.exception("Error in websocket before close")
×
443

444
                # If we are disconnected before closing, make errors happen
445
                if not self.__ws.connected:
×
446
                    for rt in self.__returns.values():
×
447
                        rt(None)
×
448
                    for hd in self.__handlers.values():
×
449
                        hd(None)
×
450
                    break
×
451
            code, num = _msg.unpack_from(frame, 0)
×
452
            if code == ProxyProtocol.MSG:
×
453
                self.dispatch_message(num, frame)
×
454
            else:
455
                self.dispatch_return(num, frame)
×
456

457
    def expect_return(self, handler: _WSCB) -> int:
1✔
458
        """
459
        Register a one-shot listener for a call-like message's return.
460

461
        :return: The correlation ID
462
        """
463
        c = self.__correlation_id
×
464
        self.__correlation_id += 1
×
465
        self.__returns[c] = handler
×
466
        return c
×
467

468
    def listen(self, channel_id: int, handler: _WSCB):
1✔
469
        """
470
        Register a persistent listener for one-way messages.
471
        """
472
        self.__handlers[channel_id] = handler
×
473

474
    def dispatch_return(self, correlation_id: int, msg: bytes):
1✔
475
        """
476
        Dispatch a received call-return message.
477
        """
478
        handler = self.__returns.pop(correlation_id, None)
×
479
        if handler:
×
480
            handler(msg)
×
481

482
    def dispatch_message(self, channel_id: int, msg: bytes):
1✔
483
        """
484
        Dispatch a received one-way message.
485
        """
486
        handler = self.__handlers.get(channel_id)
×
487
        if handler:
×
488
            handler(msg)
×
489

490
    def unlisten(self, channel_id: int):
1✔
491
        """
492
        Deregister a listener for a channel
493
        """
494
        self.__handlers.pop(channel_id)
×
495

496
    def close(self) -> None:
1✔
497
        """
498
        Mark receiver closed to avoid errors
499
        """
500
        self.__closed = True
×
501

502

503
class _SpallocJob(SessionAware, SpallocJob):
1✔
504
    """
505
    Represents a job in Spalloc.
506

507
    Don't make this yourself. Use :py:class:`SpallocClient` instead.
508
    """
509
    __slots__ = ("__machine_url", "__chip_url",
1✔
510
                 "_keepalive_url", "__keepalive_handle", "__proxy_handle",
511
                 "__proxy_thread", "__proxy_ping")
512

513
    def __init__(self, session: Session, job_handle: str):
1✔
514
        """
515
        :param _Session session:
516
        :param str job_handle:
517
        """
518
        super().__init__(session, job_handle)
×
519
        logger.info("established job at {}", job_handle)
×
520
        self.__machine_url = self._url + "machine"
×
521
        self.__chip_url = self._url + "chip"
×
522
        self._keepalive_url = self._url + "keepalive"
×
523
        self.__keepalive_handle: Optional[Queue] = None
×
524
        self.__proxy_handle: Optional[WebSocket] = None
×
525
        self.__proxy_thread: Optional[_ProxyReceiver] = None
×
526
        self.__proxy_ping: Optional[_ProxyPing] = None
×
527

528
    @overrides(SpallocJob.get_session_credentials_for_db)
1✔
529
    def get_session_credentials_for_db(self) -> Mapping[Tuple[str, str], str]:
1✔
530
        config = {}
×
531
        config["SPALLOC", "service uri"] = self._service_url
×
532
        config["SPALLOC", "job uri"] = self._url
×
533
        cookies, headers = self._session_credentials
×
534
        if "Authorization" in headers:
×
535
            # We never write the authorisation headers themselves;
536
            # we just extend the session
537
            del headers["Authorization"]
×
538
        for k, v in cookies.items():
×
539
            config["COOKIE", k] = v
×
540
        for k, v in headers.items():
×
541
            config["HEADER", k] = v
×
542
        return config
×
543

544
    @overrides(SpallocJob.get_state)
1✔
545
    def get_state(self, wait_for_change: bool = False) -> SpallocState:
1✔
546
        timeout: Optional[int] = 10
×
547
        if wait_for_change:
×
548
            timeout = None
×
549
        obj = self._get(
×
550
            self._url, wait=wait_for_change, timeout=timeout).json()
551
        return SpallocState[obj["state"]]
×
552

553
    @overrides(SpallocJob.get_root_host)
1✔
554
    def get_root_host(self) -> Optional[str]:
1✔
555
        r = self._get(self.__machine_url)
×
556
        if r.status_code == 204:
×
557
            return None
×
558
        obj = r.json()
×
559
        for c in obj["connections"]:
×
560
            [x, y], host = c
×
561
            if x == 0 and y == 0:
×
562
                return host
×
563
        return None
×
564

565
    @overrides(SpallocJob.get_connections)
1✔
566
    def get_connections(self) -> Dict[XY, str]:
1✔
567
        r = self._get(self.__machine_url)
×
568
        if r.status_code == 204:
×
569
            return {}
×
570
        return {
×
571
            (int(x), int(y)): str(host)
572
            for ((x, y), host) in r.json()["connections"]
573
        }
574

575
    @property
1✔
576
    def __proxy_url(self) -> Optional[str]:
1✔
577
        """
578
        The URL for talking to the proxy connection system.
579
        """
580
        r = self._get(self._url)
×
581
        if r.status_code == 204:
×
582
            return None
×
583
        try:
×
584
            url = r.json()["proxy-ref"]
×
585
            logger.info("Connecting to proxy on {}", url)
×
586
            return url
×
587
        except KeyError:
×
588
            return None
×
589

590
    def __init_proxy(self) -> Tuple[_ProxyReceiver, WebSocket]:
1✔
591
        if self.__proxy_handle is None or not self.__proxy_handle.connected:
×
592
            if self.__proxy_url is None:
×
593
                raise ValueError("no proxy available")
×
594
            self.__proxy_handle = self._websocket(
×
595
                self.__proxy_url, origin=get_hostname(self._url))
596
            self.__proxy_thread = _ProxyReceiver(self.__proxy_handle)
×
597
            self.__proxy_ping = _ProxyPing(self.__proxy_handle)
×
598
        assert self.__proxy_handle is not None
×
599
        assert self.__proxy_thread is not None
×
600
        return self.__proxy_thread, self.__proxy_handle
×
601

602
    @overrides(SpallocJob.connect_to_board)
1✔
603
    def connect_to_board(
1✔
604
            self, x: int, y: int,
605
            port: int = SCP_SCAMP_PORT) -> SpallocSCPConnection:
606
        proxy, ws = self.__init_proxy()
×
607
        return _ProxiedSCAMPConnection(ws, proxy, int(x), int(y), int(port))
×
608

609
    @overrides(SpallocJob.connect_for_booting)
1✔
610
    def connect_for_booting(self) -> SpallocBootConnection:
1✔
611
        proxy, ws = self.__init_proxy()
×
612
        return _ProxiedBootConnection(ws, proxy)
×
613

614
    @overrides(SpallocJob.open_eieio_connection)
1✔
615
    def open_eieio_connection(self, x: int, y: int) -> SpallocEIEIOConnection:
1✔
616
        proxy, ws = self.__init_proxy()
×
617
        return _ProxiedEIEIOConnection(
×
618
            ws, proxy, int(x), int(y), SCP_SCAMP_PORT)
619

620
    @overrides(SpallocJob.open_eieio_listener_connection)
1✔
621
    def open_eieio_listener_connection(self) -> SpallocEIEIOListener:
1✔
622
        proxy, ws = self.__init_proxy()
×
623
        return _ProxiedEIEIOListener(ws, proxy, self.get_connections())
×
624

625
    @overrides(SpallocJob.open_udp_listener_connection)
1✔
626
    def open_udp_listener_connection(self) -> UDPConnection:
1✔
627
        proxy, ws = self.__init_proxy()
×
628
        return _ProxiedUDPListener(ws, proxy, self.get_connections())
×
629

630
    @overrides(SpallocJob.wait_for_state_change)
1✔
631
    def wait_for_state_change(self, old_state: SpallocState,
1✔
632
                              timeout: Optional[int] = None) -> SpallocState:
633
        while old_state != SpallocState.DESTROYED:
×
634
            obj = self._get(self._url, wait="true", timeout=timeout).json()
×
635
            s = SpallocState[obj["state"]]
×
636
            if s != old_state or s == SpallocState.DESTROYED:
×
637
                return s
×
638
        return old_state
×
639

640
    @overrides(SpallocJob.wait_until_ready)
1✔
641
    def wait_until_ready(self, timeout: Optional[int] = None,
1✔
642
                         n_retries: Optional[int] = None):
643
        state = self.get_state()
×
644
        retries = 0
×
645
        while (state != SpallocState.READY and
×
646
               (n_retries is None or retries < n_retries)):
647
            retries += 1
×
648
            state = self.wait_for_state_change(state, timeout=timeout)
×
649
            if state == SpallocState.DESTROYED:
×
650
                raise SpallocException("job was unexpectedly destroyed")
×
651

652
    @overrides(SpallocJob.destroy)
1✔
653
    def destroy(self, reason: str = "finished"):
1✔
654
        if self.__keepalive_handle:
×
655
            self.__keepalive_handle.close()
×
656
            self.__keepalive_handle = None
×
657
        if self.__proxy_handle is not None:
×
658
            if self.__proxy_thread:
×
659
                self.__proxy_thread.close()
×
660
            if self.__proxy_ping:
×
661
                self.__proxy_ping.close()
×
662
            self.__proxy_handle.close()
×
663
        self._delete(self._url, reason=str(reason))
×
664
        logger.info("deleted job at {}", self._url)
×
665

666
    @overrides(SpallocJob.keepalive)
1✔
667
    def keepalive(self) -> None:
1✔
668
        self._put(self._keepalive_url, "alive")
×
669

670
    @overrides(SpallocJob.launch_keepalive_task, extend_doc=True)
1✔
671
    def launch_keepalive_task(
1✔
672
            self, period: float = 30) -> ContextManager[Process]:
673
        """
674
        .. note::
675
            Tricky! *Cannot* be done with a thread, as the main thread is known
676
            to do significant amounts of CPU-intensive work.
677
        """
678
        if self.__keepalive_handle is not None:
×
679
            raise SpallocException("cannot keep job alive from two tasks")
×
680
        q: Queue = Queue(1)
×
NEW
681
        p = Process(target=_spalloc_keepalive, args=(
×
682
            self._keepalive_url, 0 + period, q,
683
            *self._session_credentials), daemon=True)
684
        p.start()
×
685
        self.__keepalive_handle = q
×
686
        return self.__closer(q, p)
×
687

688
    @contextmanager
1✔
689
    def __closer(self, q: Queue, p: Process) -> Iterator[Process]:
1✔
690
        try:
×
691
            yield p
×
692
        finally:
693
            q.put("quit")
×
694
            # Give it a second, and if it still isn't dead, kill it
695
            p.join(1)
×
696
            if p.is_alive():
×
697
                p.kill()
×
698

699
    @overrides(SpallocJob.where_is_machine)
1✔
700
    def where_is_machine(self, x: int, y: int) -> Optional[
1✔
701
            Tuple[int, int, int]]:
702
        r = self._get(self.__chip_url, x=int(x), y=int(y))
×
703
        if r.status_code == 204:
×
704
            return None
×
705
        return cast(Tuple[int, int, int], tuple(
×
706
            r.json()["physical-board-coordinates"]))
707

708
    @property
1✔
709
    def _keepalive_handle(self) -> Optional[Queue]:
1✔
710
        return self.__keepalive_handle
×
711

712
    @_keepalive_handle.setter
1✔
713
    def _keepalive_handle(self, handle: Queue):
1✔
714
        if self.__keepalive_handle is not None:
×
715
            raise SpallocException("cannot keep job alive from two tasks")
×
716
        self.__keepalive_handle = handle
×
717

718
    @overrides(SpallocJob.create_transceiver)
1✔
719
    def create_transceiver(self) -> Transceiver:
1✔
720
        if self.get_state() != SpallocState.READY:
×
721
            raise SpallocException("job not ready to execute scripts")
×
722
        proxies: List[Connection] = [
×
723
            self.connect_to_board(x, y) for (x, y) in self.get_connections()]
724
        # Also need a boot connection
725
        proxies.append(self.connect_for_booting())
×
726
        return create_transceiver_from_connections(connections=proxies)
×
727

728
    def __repr__(self):
1✔
729
        return f"SpallocJob({self._url})"
×
730

731

732
class _ProxiedConnection(metaclass=AbstractBase):
1✔
733
    """
734
    Core multiplexer/demultiplexer emulating a connection that is proxied
735
    over a websocket.
736

737
    None of the methods are public because subclasses may expose a profile of
738
    them to conform to a particular type of connection.
739
    """
740

741
    def __init__(self, ws: WebSocket, receiver: _ProxyReceiver):
1✔
742
        self.__ws: Optional[WebSocket] = ws
×
743
        self.__receiver: Optional[_ProxyReceiver] = receiver
×
744
        self.__msgs: queue.SimpleQueue = queue.SimpleQueue()
×
745
        self.__call_queue: queue.Queue = queue.Queue(1)
×
746
        self.__call_lock = threading.RLock()
×
747
        self.__current_msg: Optional[bytes] = None
×
748
        self.__handle = self._open_connection()
×
749
        self.__receiver.listen(self.__handle, self.__msgs.put)
×
750

751
    @abstractmethod
1✔
752
    def _open_connection(self) -> int:
1✔
753
        raise NotImplementedError
754

755
    def _call(self, protocol: ProxyProtocol, packer: struct.Struct,
1✔
756
              unpacker: struct.Struct, *args) -> Tuple[Any, ...]:
757
        """
758
        Do a synchronous call.
759

760
        :param protocol:
761
            The protocol message number.
762
        :param packer:
763
            How to form the protocol message. The first two arguments passed
764
            will be the protocol message number and an issued correlation ID
765
            (not needed by the caller).
766
        :param unpacker:
767
            How to extract the expected response.
768
        :param args:
769
            Additional arguments to pass to the packer.
770
        :return:
771
            The results from the unpacker *after* the protocol message code and
772
            the correlation ID.
773
        :raises IOError:
774
            If something goes wrong. This could be due to the websocket being
775
            closed, or the receipt of an ERROR response.
776
        """
777
        if not self._connected:
×
778
            raise IOError("socket closed")
×
779
        if not self.__receiver:
×
780
            raise IOError("socket closed")
×
781
        if not self.__ws:
×
782
            raise IOError("socket closed")
×
783
        with self.__call_lock:
×
784
            # All calls via websocket use correlation_id
785
            correlation_id = self.__receiver.expect_return(
×
786
                self.__call_queue.put)
787
            self.__ws.send_binary(packer.pack(protocol, correlation_id, *args))
×
788
            if not self._connected:
×
789
                raise IOError("socket closed after send!")
×
790
            reply = self.__call_queue.get()
×
791
            code, _ = _msg.unpack_from(reply, 0)
×
792
            if code == ProxyProtocol.ERROR:
×
793
                # Rest of message is UTF-8 encoded error message string
794
                payload = reply[_msg.size:].decode("utf-8")
×
795
                if len(payload):
×
796
                    raise _ProxyServiceError(payload)
×
797
                raise _ProxyServiceError(
×
798
                    f"unknown problem with {protocol} call")
799
            return unpacker.unpack(reply)[2:]
×
800

801
    @property
1✔
802
    def _connected(self) -> bool:
1✔
803
        return bool(self.__ws and self.__ws.connected)
×
804

805
    def _throw_if_closed(self) -> None:
1✔
806
        if not self._connected:
×
807
            raise IOError("socket closed")
×
808

809
    def _close(self) -> None:
1✔
810
        if self._connected:
×
811
            channel_id, = self._call(
×
812
                ProxyProtocol.CLOSE, _close_req, _open_close_res,
813
                self.__handle)
814
            if channel_id != self.__handle:
×
815
                raise IOError("failed to close proxy socket")
×
816
        if self.__receiver:
×
817
            self.__receiver.unlisten(self.__handle)
×
818
        self.__ws = None
×
819
        self.__receiver = None
×
820

821
    def _send(self, message: bytes):
1✔
822
        self._throw_if_closed()
×
823
        # Put the header on the front and send it
824
        if not self.__ws:
×
825
            raise IOError("socket closed")
×
826
        self.__ws.send_binary(_msg.pack(
×
827
            ProxyProtocol.MSG, self.__handle) + message)
828

829
    def _send_to(self, message: bytes, x: int, y: int, port: int):
1✔
830
        self._throw_if_closed()
×
831
        # Put the header on the front and send it
832
        if not self.__ws:
×
833
            raise IOError("socket closed")
×
834
        self.__ws.send_binary(_msg_to.pack(
×
835
            ProxyProtocol.MSG_TO, self.__handle, x, y, port) + message)
836

837
    def __get(self, timeout: float = 0.5) -> bytes:
1✔
838
        """
839
        Get a value from the queue. Handles block/non-block switching and
840
        trimming of the message protocol prefix.
841
        """
842
        if not timeout:
×
843
            return self.__msgs.get(block=False)[_msg.size:]
×
844
        else:
845
            return self.__msgs.get(timeout=timeout)[_msg.size:]
×
846

847
    def _receive(self, timeout: Optional[float] = None) -> bytes:
1✔
848
        if self.__current_msg is not None:
×
849
            try:
×
850
                return self.__current_msg
×
851
            finally:
852
                self.__current_msg = None
×
853
        if timeout is None:
×
854
            while True:
×
855
                try:
×
856
                    return self.__get()
×
857
                except queue.Empty:
×
858
                    self._throw_if_closed()
×
859
        else:
860
            try:
×
861
                return self.__get(timeout)
×
862
            except queue.Empty as e:
×
863
                self._throw_if_closed()
×
864
                raise SpinnmanTimeoutException("receive", timeout) from e
×
865

866
    def _is_ready_to_receive(self, timeout: float = 0) -> bool:
1✔
867
        # If we already have a message or the queue peek succeeds, return now
868
        if self.__current_msg is not None or not self.__msgs.empty():
×
869
            return True
×
870
        try:
×
871
            self.__current_msg = self.__get(timeout)
×
872
            return True
×
873
        except queue.Empty:
×
874
            return False
×
875

876

877
class _ProxiedBidirectionalConnection(
1✔
878
        _ProxiedConnection, SpallocProxiedConnection):
879
    """
880
    A connection that talks to a particular board via the proxy.
881
    """
882

883
    def __init__(
1✔
884
            self, ws: WebSocket, receiver: _ProxyReceiver,
885
            x: int, y: int, port: int):
886
        self.__connect_args = (x, y, port)
×
887
        super().__init__(ws, receiver)
×
888

889
    @overrides(_ProxiedConnection._open_connection)
1✔
890
    def _open_connection(self) -> int:
1✔
891
        handle, = self._call(
×
892
            ProxyProtocol.OPEN, _open_req, _open_close_res,
893
            *self.__connect_args)
894
        return handle
×
895

896
    @overrides(Connection.is_connected)
1✔
897
    def is_connected(self) -> bool:
1✔
898
        """
899
        Determines if the medium is connected at this point in time.
900

901
        :return: True if the medium is connected, False otherwise
902
        :rtype: bool
903
        """
UNCOV
904
        return self._connected
×
905

906
    @overrides(Connection.close)
1✔
907
    def close(self) -> None:
1✔
908
        """
909
        Closes the connection.
910
        """
UNCOV
911
        self._close()
×
912

913
    @overrides(SpallocProxiedConnection.send)
1✔
914
    def send(self, data: bytes):
1✔
915
        if not isinstance(data, (bytes, bytearray)):
×
916
            data = bytes(data)
×
917
        self._send(data)
×
918

919
    @overrides(SpallocProxiedConnection.receive)
1✔
920
    def receive(self, timeout: Optional[float] = None) -> bytes:
1✔
921
        return self._receive(timeout)
×
922

923
    @overrides(Listenable.is_ready_to_receive)
1✔
924
    def is_ready_to_receive(self, timeout: float = 0) -> bool:
1✔
925
        return self._is_ready_to_receive(timeout)
×
926

927
    @abstractmethod
1✔
928
    def __str__(self) -> str:
1✔
929
        raise NotImplementedError
930

931

932
class _ProxiedUnboundConnection(
1✔
933
        _ProxiedConnection, SpallocProxiedConnection):
934
    """
935
    A connection that can listen to all boards via the proxy, but which can
936
    only send if a target board is provided.
937
    """
938

939
    def __init__(self, ws: WebSocket, receiver: _ProxyReceiver):
1✔
940
        super().__init__(ws, receiver)
×
941
        self.__addr: Optional[str] = None
×
942
        self.__port: Optional[int] = None
×
943

944
    @overrides(_ProxiedConnection._open_connection)
1✔
945
    def _open_connection(self) -> int:
1✔
946
        handle, ip1, ip2, ip3, ip4, self.__port = self._call(
×
947
            ProxyProtocol.OPEN_UNBOUND, _open_listen_req, _open_listen_res)
948
        # Assemble the address into the format expected elsewhere
949
        self.__addr = f"{ip1}.{ip2}.{ip3}.{ip4}"
×
950
        return handle
×
951

952
    @property
1✔
953
    def _addr(self) -> Optional[str]:
1✔
954
        return self.__addr if self._connected else None
×
955

956
    @property
1✔
957
    def _port(self) -> Optional[int]:
1✔
958
        return self.__port if self._connected else None
×
959

960
    @overrides(Connection.is_connected)
1✔
961
    def is_connected(self) -> bool:
1✔
962
        """
963
        Determines if the medium is connected at this point in time.
964

965
        :return: True if the medium is connected, False otherwise
966
        :rtype: bool
967
        """
UNCOV
968
        return self._connected
×
969

970
    @overrides(Connection.close)
1✔
971
    def close(self) -> None:
1✔
972
        """
973
        Closes the connection.
974
        """
UNCOV
975
        self._close()
×
976

977
    @overrides(SpallocProxiedConnection.send)
1✔
978
    def send(self, data: bytes):
1✔
979
        self._throw_if_closed()
×
980
        raise IOError("socket is not open for sending")
×
981

982
    @overrides(SpallocProxiedConnection.receive)
1✔
983
    def receive(self, timeout: Optional[float] = None) -> bytes:
1✔
984
        return self._receive(timeout)
×
985

986
    @overrides(Listenable.is_ready_to_receive)
1✔
987
    def is_ready_to_receive(self, timeout: float = 0) -> bool:
1✔
988
        return self._is_ready_to_receive(timeout)
×
989

990
    @abstractmethod
1✔
991
    def __str__(self) -> str:
1✔
992
        raise NotImplementedError
993

994

995
class _ProxiedSCAMPConnection(
1✔
996
        _ProxiedBidirectionalConnection, SpallocSCPConnection):
997
    __slots__ = ("__chip_x", "__chip_y")
1✔
998

999
    def __init__(
1✔
1000
            self, ws: WebSocket, receiver: _ProxyReceiver,
1001
            x: int, y: int, port: int):
1002
        super().__init__(ws, receiver, x, y, port)
×
1003
        SpallocSCPConnection.__init__(self, x, y)
×
1004

1005
    def __str__(self):
1✔
1006
        return f"SCAMPConnection[proxied]({self.chip_x},{self.chip_y})"
×
1007

1008

1009
class _ProxiedBootConnection(
1✔
1010
        _ProxiedBidirectionalConnection, SpallocBootConnection):
1011
    __slots__ = ()
1✔
1012

1013
    def __init__(self, ws: WebSocket, receiver: _ProxyReceiver):
1✔
1014
        super().__init__(ws, receiver, 0, 0, UDP_BOOT_CONNECTION_DEFAULT_PORT)
×
1015

1016
    def __str__(self):
1✔
1017
        return "BootConnection[proxied]()"
×
1018

1019

1020
class _ProxiedEIEIOConnection(
1✔
1021
        _ProxiedBidirectionalConnection,
1022
        SpallocEIEIOConnection, SpallocProxiedConnection):
1023
    # Special: This is a unidirectional receive-only connection
1024
    __slots__ = ("__addr", "__port", "__chip_x", "__chip_y")
1✔
1025

1026
    def __init__(
1✔
1027
            self, ws: WebSocket, receiver: _ProxyReceiver,
1028
            x: int, y: int, port: int):
1029
        super().__init__(ws, receiver, x, y, port)
×
1030
        self.__chip_x = x
×
1031
        self.__chip_y = y
×
1032

1033
    @property
1✔
1034
    @overrides(SpallocEIEIOConnection._coords)
1✔
1035
    def _coords(self) -> XY:
1✔
1036
        return self.__chip_x, self.__chip_y
×
1037

1038
    def send_to(
1✔
1039
            self,
1040
            data: bytes, address: tuple):  # pylint: disable=unused-argument
1041
        """
1042
        Direct ``send_to`` is unsupported.
1043
        """
1044
        self._throw_if_closed()
×
1045
        raise IOError("socket is not open for sending")
×
1046

1047
    def __str__(self):
1✔
1048
        return (f"EIEIOConnection[proxied](remote:{self.__chip_x},"
×
1049
                f"{self.__chip_y})")
1050

1051

1052
class _ProxiedEIEIOListener(_ProxiedUnboundConnection, SpallocEIEIOListener):
1✔
1053
    __slots__ = ("__conns", )
1✔
1054

1055
    def __init__(self, ws: WebSocket, receiver: _ProxyReceiver,
1✔
1056
                 conns: Dict[XY, str]):
1057
        super().__init__(ws, receiver)
×
1058
        # Invert the map
1059
        self.__conns = {ip: xy for (xy, ip) in conns.items()}
×
1060

1061
    @overrides(SpallocEIEIOListener.send_to_chip)
1✔
1062
    def send_to_chip(
1✔
1063
            self, message: bytes, x: int, y: int, port: int = SCP_SCAMP_PORT):
1064
        if not isinstance(message, (bytes, bytearray)):
×
1065
            message = bytes(message)
×
1066
        self._send_to(bytes(message), x, y, port)
×
1067

1068
    @property
1✔
1069
    @overrides(SpallocEIEIOListener.local_ip_address)
1✔
1070
    def local_ip_address(self) -> str:
1✔
1071
        return self._addr or "0.0.0.0"
×
1072

1073
    @property
1✔
1074
    @overrides(SpallocEIEIOListener.local_port)
1✔
1075
    def local_port(self) -> int:
1✔
1076
        return self._port or 0
×
1077

1078
    @overrides(SpallocEIEIOListener._get_chip_coords)
1✔
1079
    def _get_chip_coords(self, ip_address: str) -> XY:
1✔
1080
        return self.__conns[str(ip_address)]
×
1081

1082
    def __str__(self):
1✔
1083
        return f"EIEIOConnection[proxied](local:{self._addr}:{self._port})"
×
1084

1085

1086
class _ProxiedUDPListener(_ProxiedUnboundConnection, UDPConnection):
1✔
1087
    __slots__ = ("__conns", )
1✔
1088

1089
    def __init__(self, ws: WebSocket, receiver: _ProxyReceiver,
1✔
1090
                 conns: Dict[XY, str]):
1091
        super().__init__(ws, receiver)
×
1092
        # Invert the map
1093
        self.__conns = {ip: xy for (xy, ip) in conns.items()}
×
1094

1095
    @overrides(UDPConnection.send_to)
1✔
1096
    def send_to(self, data: bytes, address: Tuple[str, int]):
1✔
1097
        ip, port = address
×
1098
        x, y = self.__conns[ip]
×
1099
        self._send_to(data, x, y, port)
×
1100

1101
    @property
1✔
1102
    @overrides(UDPConnection.local_ip_address)
1✔
1103
    def local_ip_address(self) -> str:
1✔
1104
        return self._addr or "0.0.0.0"
×
1105

1106
    @property
1✔
1107
    @overrides(UDPConnection.local_port)
1✔
1108
    def local_port(self) -> int:
1✔
1109
        return self._port or 0
×
1110

1111
    def __str__(self):
1✔
1112
        return f"UDPConnection[proxied](local:{self._addr}:{self._port})"
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc