• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / SpiNNMan / 7088027391

04 Dec 2023 02:28PM UTC coverage: 51.918% (-0.06%) from 51.979%
7088027391

Pull #383

github

rowleya
Not sure what is really different!
Pull Request #383: Make it a bit more user friendly when you can't log in

110 of 1298 branches covered (0.0%)

Branch coverage included in aggregate %.

2 of 13 new or added lines in 1 file covered. (15.38%)

100 existing lines in 1 file now uncovered.

4789 of 8138 relevant lines covered (58.85%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

30.61
/spinnman/spalloc/spalloc_client.py
1
# Copyright (c) 2021 The University of Manchester
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     https://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
"""
1✔
15
Implementation of the client for the Spalloc web service.
16
"""
17

18
from contextlib import contextmanager
1✔
19
from logging import getLogger
1✔
20
from multiprocessing import Process, Queue
1✔
21
from time import sleep
1✔
22
from packaging.version import Version
1✔
23
from urllib.parse import urlparse, urlunparse, ParseResult
1✔
24
import queue
1✔
25
import requests
1✔
26
import struct
1✔
27
import threading
1✔
28
from typing import (
1✔
29
    Any, ContextManager,
30
    Callable, Dict, FrozenSet, Iterable, Iterator, List, Mapping,
31
    Optional, Tuple, cast)
32
from typing_extensions import TypeAlias
1✔
33
from websocket import WebSocket  # type: ignore
1✔
34
from spinn_utilities.abstract_base import AbstractBase, abstractmethod
1✔
35
from spinn_utilities.abstract_context_manager import AbstractContextManager
1✔
36
from spinn_utilities.log import FormatAdapter
1✔
37
from spinn_utilities.typing.coords import XY
1✔
38
from spinn_utilities.typing.json import JsonObject, JsonValue
1✔
39
from spinn_utilities.overrides import overrides
1✔
40
from spinnman.connections.udp_packet_connections import UDPConnection
1✔
41
from spinnman.connections.abstract_classes import Connection, Listenable
1✔
42
from spinnman.constants import SCP_SCAMP_PORT, UDP_BOOT_CONNECTION_DEFAULT_PORT
1✔
43
from spinnman.exceptions import SpinnmanTimeoutException
1✔
44
from spinnman.exceptions import SpallocException
1✔
45
from spinnman.transceiver import (
1✔
46
    Transceiver, create_transceiver_from_connections)
47
from .spalloc_state import SpallocState
1✔
48
from .proxy_protocol import ProxyProtocol
1✔
49
from .session import Session, SessionAware
1✔
50
from .utils import parse_service_url, get_hostname
1✔
51
from .abstract_spalloc_client import AbstractSpallocClient
1✔
52
from .spalloc_machine import SpallocMachine
1✔
53
from .spalloc_job import SpallocJob
1✔
54
from .spalloc_proxied_connection import SpallocProxiedConnection
1✔
55
from .spalloc_boot_connection import SpallocBootConnection
1✔
56
from .spalloc_eieio_connection import SpallocEIEIOConnection
1✔
57
from .spalloc_eieio_listener import SpallocEIEIOListener
1✔
58
from .spalloc_scp_connection import SpallocSCPConnection
1✔
59

60
logger = FormatAdapter(getLogger(__name__))
1✔
61
_open_req = struct.Struct("<IIIII")
1✔
62
_close_req = struct.Struct("<III")
1✔
63
_open_listen_req = struct.Struct("<II")
1✔
64
# Open and close share the response structure
65
_open_close_res = struct.Struct("<III")
1✔
66
_open_listen_res = struct.Struct("<IIIBBBBI")
1✔
67
_msg = struct.Struct("<II")
1✔
68
_msg_to = struct.Struct("<IIIII")
1✔
69

70

71
def fix_url(url):
1✔
72
    parts = urlparse(url)
×
73
    if parts.scheme != 'https':
×
74
        parts = ParseResult("https", parts.netloc, parts.path,
×
75
                            parts.params, parts. query, parts.fragment)
76
    if not parts.path.endswith("/"):
×
77
        parts = ParseResult(parts.scheme, parts.netloc, parts.path + "/",
×
78
                            parts.params, parts.query, parts.fragment)
79
    return urlunparse(parts)
×
80

81

82
class SpallocClient(AbstractContextManager, AbstractSpallocClient):
1✔
83
    """
84
    Basic client library for talking to new Spalloc.
85
    """
86
    __slots__ = ("__session",
1✔
87
                 "__machines_url", "__jobs_url", "version",
88
                 "__group", "__collab", "__nmpi_job", "__nmpi_user")
89

90
    def __init__(
1✔
91
            self, service_url: str,
92
            username: Optional[str] = None, password: Optional[str] = None,
93
            bearer_token: Optional[str] = None,
94
            group: Optional[str] = None, collab: Optional[str] = None,
95
            nmpi_job: Optional[int] = None, nmpi_user: Optional[str] = None):
96
        """
97
        :param str service_url: The reference to the service.
98
            May have username and password supplied as part of the network
99
            location; if so, the ``username`` and ``password`` arguments
100
            *must* be ``None``. If ``username`` and ``password`` are not given,
101
            not even within the URL, the ``bearer_token`` must be not ``None``.
102
        :param str username: The user name to use
103
        :param str password: The password to use
104
        :param str bearer_token: The bearer token to use
105
        """
106
        if username is None and password is None:
×
107
            service_url, username, password = parse_service_url(service_url)
×
108
        self.__session: Optional[Session] = Session(
×
109
            service_url, username, password, bearer_token)
110
        obj = self.__session.renew()
×
111
        v = cast(JsonObject, obj["version"])
×
112
        self.version = Version(
×
113
            f"{v['major-version']}.{v['minor-version']}.{v['revision']}")
114
        self.__machines_url = fix_url(obj["machines-ref"])
×
115
        self.__jobs_url = fix_url(obj["jobs-ref"])
×
116
        self.__group = group
×
117
        self.__collab = collab
×
118
        self.__nmpi_job = nmpi_job
×
119
        self.__nmpi_user = nmpi_user
×
120
        logger.info("established session to {} for {}", service_url, username)
×
121

122
    @staticmethod
1✔
123
    def open_job_from_database(
1✔
124
            service_url, job_url, cookies, headers) -> SpallocJob:
125
        """
126
        Create a job from the description in the attached database. This is
127
        intended to allow for access to the job's allocated resources from
128
        visualisers and other third party code participating in the SpiNNaker
129
        Tools Notification Protocol.
130

131
        .. note::
132
            The job is not verified to exist and be running. The session
133
            credentials may have expired; if so, the job will be unable to
134
            regenerate them.
135

136
        :param str service_url:
137
        :param str job_url:
138
        :param dict(str, str) cookies:
139
        :param dict(str, str) headers:
140

141
        :return:
142
            The job handle, or ``None`` if the records in the database are
143
            absent or incomplete.
144
        :rtype: SpallocJob
145
        """
146
        session = Session(service_url, session_credentials=(cookies, headers))
×
147
        return _SpallocJob(session, job_url)
×
148

149
    @overrides(AbstractSpallocClient.list_machines)
1✔
150
    def list_machines(self) -> Dict[str, SpallocMachine]:
1✔
151
        assert self.__session
×
152
        obj = self.__session.get(self.__machines_url).json()
×
153
        return {m["name"]: _SpallocMachine(self.__session, m)
×
154
                for m in obj["machines"]}
155

156
    @overrides(AbstractSpallocClient.list_jobs)
1✔
157
    def list_jobs(self, deleted: bool = False) -> Iterable[SpallocJob]:
1✔
158
        assert self.__session
×
159
        obj = self.__session.get(
×
160
            self.__jobs_url,
161
            deleted=("true" if deleted else "false")).json()
162
        while obj["jobs"]:
×
163
            for u in obj["jobs"]:
×
164
                yield _SpallocJob(self.__session, fix_url(u))
×
165
            if "next" not in obj:
×
166
                break
×
167
            obj = self.__session.get(obj["next"]).json()
×
168

169
    def _create(self, create: Mapping[str, JsonValue],
1✔
170
                machine_name: Optional[str]) -> SpallocJob:
171
        assert self.__session
×
172
        operation = dict(create)
×
173
        if machine_name:
×
174
            operation["machine-name"] = machine_name
×
175
        else:
176
            operation["tags"] = ["default"]
×
177
        if self.__group is not None:
×
178
            operation["group"] = self.__group
×
179
        if self.__collab is not None:
×
180
            operation["nmpi-collab"] = self.__collab
×
181
        if self.__nmpi_job is not None:
×
182
            operation["nmpi-job-id"] = self.__nmpi_job
×
183
            if self.__nmpi_user is not None:
×
184
                operation["owner"] = self.__nmpi_user
×
185
        logger.info("Posting {} to {}", operation, self.__jobs_url)
×
186
        r = self.__session.post(self.__jobs_url, operation, timeout=30)
×
187
        url = r.headers["Location"]
×
188
        return _SpallocJob(self.__session, fix_url(url))
×
189

190
    @overrides(AbstractSpallocClient.create_job)
1✔
191
    def create_job(
1✔
192
            self, num_boards: int = 1,
193
            machine_name: Optional[str] = None,
194
            keepalive: int = 45) -> SpallocJob:
195
        return self._create({
×
196
            "num-boards": int(num_boards),
197
            "keepalive-interval": f"PT{int(keepalive)}S"
198
        }, machine_name)
199

200
    @overrides(AbstractSpallocClient.create_job_rect)
1✔
201
    def create_job_rect(
1✔
202
            self, width: int, height: int,
203
            machine_name: Optional[str] = None,
204
            keepalive: int = 45) -> SpallocJob:
205
        return self._create({
×
206
            "dimensions": {
207
                "width": int(width),
208
                "height": int(height)
209
            },
210
            "keepalive-interval": f"PT{int(keepalive)}S"
211
        }, machine_name)
212

213
    @overrides(AbstractSpallocClient.create_job_board)
1✔
214
    def create_job_board(
1✔
215
            self, triad=None, physical=None, ip_address=None,
216
            machine_name=None, keepalive=45) -> SpallocJob:
217
        board: JsonObject
218
        if triad:
×
219
            x, y, z = triad
×
220
            board = {"x": int(x), "y": int(y), "z": int(z)}
×
221
        elif physical:
×
222
            c, f, b = physical
×
223
            board = {"cabinet": int(c), "frame": int(f), "board": int(b)}
×
224
        elif ip_address:
×
225
            board = {"address": str(ip_address)}
×
226
        else:
227
            raise KeyError("at least one of triad, physical and ip_address "
×
228
                           "must be given")
229
        return self._create({
×
230
            "board": board,
231
            "keepalive-interval": f"PT{int(keepalive)}S"
232
        }, machine_name)
233

234
    @overrides(AbstractSpallocClient.create_job_rect_at_board)
1✔
235
    def create_job_rect_at_board(
1✔
236
            self, width, height, triad=None, physical=None, ip_address=None,
237
            machine_name=None, keepalive=45, max_dead_boards=0):
238
        if triad:
×
239
            x, y, z = triad
×
240
            board = {"x": int(x), "y": int(y), "z": int(z)}
×
241
        elif physical:
×
242
            c, f, b = physical
×
243
            board = {"cabinet": int(c), "frame": int(f), "board": int(b)}
×
244
        elif ip_address:
×
245
            board = {"address": str(ip_address)}
×
246
        else:
247
            raise KeyError("at least one of triad, physical and ip_address "
×
248
                           "must be given")
249
        return self._create({
×
250
            "dimensions": {
251
                "width": int(width),
252
                "height": int(height)
253
            },
254
            "board": board,
255
            "keepalive-interval": f"PT{int(keepalive)}S",
256
            "max-dead-boards": int(max_dead_boards)
257
        }, machine_name)
258

259
    def close(self) -> None:
1✔
260
        # pylint: disable=protected-access
261
        if self.__session is not None:
×
262
            self.__session._purge()
×
263
        self.__session = None
×
264

265

266
class _ProxyServiceError(IOError):
1✔
267
    """
268
    An error passed to us from the server over the proxy channel.
269
    """
270

271

272
def _SpallocKeepalive(url, interval, term_queue, cookies, headers):
1✔
273
    """
274
    Actual keepalive task implementation. Don't use directly.
275
    """
276
    headers["Content-Type"] = "text/plain; charset=UTF-8"
×
277
    while True:
278
        requests.put(url, data="alive", cookies=cookies, headers=headers,
×
279
                     allow_redirects=False, timeout=10)
280
        try:
×
281
            term_queue.get(True, interval)
×
282
            break
×
283
        except queue.Empty:
×
284
            continue
×
285
        # On ValueError or OSError, just terminate the keepalive process
286
        # They happen when the term_queue is directly closed
287
        except ValueError:
×
288
            break
×
289
        except OSError:
×
290
            break
×
291

292

293
class _SpallocMachine(SessionAware, SpallocMachine):
1✔
294
    """
295
    Represents a Spalloc-controlled machine.
296

297
    Don't make this yourself. Use :py:class:`SpallocClient` instead.
298
    """
299
    __slots__ = ("__name", "__tags", "__width", "__height",
1✔
300
                 "__dead_boards", "__dead_links")
301

302
    def __init__(self, session: Session, machine_data: JsonObject):
1✔
303
        """
304
        :param _Session session:
305
        :param dict machine_data:
306
        """
307
        super().__init__(session, cast(str, machine_data["uri"]))
×
308
        self.__name = cast(str, machine_data["name"])
×
309
        self.__tags = frozenset(cast(List[str], machine_data["tags"]))
×
310
        self.__width = cast(int, machine_data["width"])
×
311
        self.__height = cast(int, machine_data["height"])
×
312
        self.__dead_boards = cast(list, machine_data["dead-boards"])
×
313
        self.__dead_links = cast(list, machine_data["dead-links"])
×
314

315
    @property
1✔
316
    @overrides(SpallocMachine.name)
1✔
317
    def name(self) -> str:
1✔
318
        return self.__name
×
319

320
    @property
1✔
321
    @overrides(SpallocMachine.tags)
1✔
322
    def tags(self) -> FrozenSet[str]:
1✔
323
        return self.__tags
×
324

325
    @property
1✔
326
    @overrides(SpallocMachine.width)
1✔
327
    def width(self) -> int:
1✔
328
        return self.__width
×
329

330
    @property
1✔
331
    @overrides(SpallocMachine.height)
1✔
332
    def height(self) -> int:
1✔
333
        return self.__height
×
334

335
    @property
1✔
336
    @overrides(SpallocMachine.dead_boards)
1✔
337
    def dead_boards(self) -> list:
1✔
338
        return self.__dead_boards
×
339

340
    @property
1✔
341
    @overrides(SpallocMachine.dead_links)
1✔
342
    def dead_links(self) -> list:
1✔
343
        return self.__dead_links
×
344

345
    @property
1✔
346
    @overrides(SpallocMachine.area)
1✔
347
    def area(self) -> Tuple[int, int]:
1✔
348
        return (self.width, self.height)
×
349

350
    def __repr__(self):
1✔
351
        return "SpallocMachine" + str((
×
352
            self.name, self.tags, self.width, self.height, self.dead_boards,
353
            self.dead_links))
354

355

356
class _ProxyPing(threading.Thread):
1✔
357
    """
358
    Sends ping messages to an open websocket
359
    """
360

361
    def __init__(self, ws, sleep_time=30):
1✔
362
        super().__init__(daemon=True)
×
363
        self.__ws = ws
×
364
        self.__sleep_time = sleep_time
×
365
        self.__closed = False
×
366
        self.start()
×
367

368
    def run(self):
1✔
369
        """
370
        The handler loop of this thread
371
        """
372
        while self.__ws.connected:
×
373
            try:
×
374
                self.__ws.ping()
×
375
            except Exception:  # pylint: disable=broad-except
×
376
                # If closed, ignore error and get out of here
377
                if self.__closed:
×
378
                    break
×
379

380
                # Make someone aware of the error
381
                logger.exception("Error in websocket before close")
×
382
            sleep(self.__sleep_time)
×
383

384
    def close(self):
1✔
385
        """
386
        Mark as closed to avoid error messages.
387
        """
388
        self.__closed = True
×
389

390

391
_WSCB: TypeAlias = Callable[[Optional[bytes]], None]
1✔
392

393

394
class _ProxyReceiver(threading.Thread):
1✔
395
    """
396
    Receives all messages off an open websocket and dispatches them to
397
    registered listeners.
398
    """
399

400
    def __init__(self, ws: WebSocket):
1✔
401
        super().__init__(daemon=True)
×
402
        self.__ws = ws
×
403
        self.__returns: Dict[int, _WSCB] = {}
×
404
        self.__handlers: Dict[int, _WSCB] = {}
×
405
        self.__correlation_id = 0
×
406
        self.__closed = False
×
407
        self.start()
×
408

409
    def run(self) -> None:
1✔
410
        """
411
        The handler loop of this thread.
412
        """
413
        while self.__ws.connected:
×
414
            try:
×
415
                result: Tuple[int, bytes] = self.__ws.recv_data()
×
416
                frame = result[1]
×
417
                if len(frame) < _msg.size:
×
418
                    # Message is out of protocol
419
                    continue
×
420
            except Exception:  # pylint: disable=broad-except
×
421
                # If closed, ignore error and get out of here
422
                if self.__closed:
×
423
                    break
×
424

425
                # Make someone aware of the error
426
                logger.exception("Error in websocket before close")
×
427

428
                # If we are disconnected before closing, make errors happen
429
                if not self.__ws.connected:
×
430
                    for rt in self.__returns.values():
×
431
                        rt(None)
×
432
                    for hd in self.__handlers.values():
×
433
                        hd(None)
×
434
                    break
×
435
            code, num = _msg.unpack_from(frame, 0)
×
436
            if code == ProxyProtocol.MSG:
×
437
                self.dispatch_message(num, frame)
×
438
            else:
439
                self.dispatch_return(num, frame)
×
440

441
    def expect_return(self, handler: _WSCB) -> int:
1✔
442
        """
443
        Register a one-shot listener for a call-like message's return.
444

445
        :return: The correlation ID
446
        """
447
        c = self.__correlation_id
×
448
        self.__correlation_id += 1
×
449
        self.__returns[c] = handler
×
450
        return c
×
451

452
    def listen(self, channel_id: int, handler: _WSCB):
1✔
453
        """
454
        Register a persistent listener for one-way messages.
455
        """
456
        self.__handlers[channel_id] = handler
×
457

458
    def dispatch_return(self, correlation_id: int, msg: bytes):
1✔
459
        """
460
        Dispatch a received call-return message.
461
        """
462
        handler = self.__returns.pop(correlation_id, None)
×
463
        if handler:
×
464
            handler(msg)
×
465

466
    def dispatch_message(self, channel_id: int, msg: bytes):
1✔
467
        """
468
        Dispatch a received one-way message.
469
        """
470
        handler = self.__handlers.get(channel_id)
×
471
        if handler:
×
472
            handler(msg)
×
473

474
    def unlisten(self, channel_id: int):
1✔
475
        """
476
        De-register a listener for a channel
477
        """
478
        self.__handlers.pop(channel_id)
×
479

480
    def close(self) -> None:
1✔
481
        """
482
        Mark receiver closed to avoid errors
483
        """
484
        self.__closed = True
×
485

486

487
class _SpallocJob(SessionAware, SpallocJob):
1✔
488
    """
489
    Represents a job in Spalloc.
490

491
    Don't make this yourself. Use :py:class:`SpallocClient` instead.
492
    """
493
    __slots__ = ("__machine_url", "__chip_url",
1✔
494
                 "_keepalive_url", "__keepalive_handle", "__proxy_handle",
495
                 "__proxy_thread", "__proxy_ping")
496

497
    def __init__(self, session: Session, job_handle: str):
1✔
498
        """
499
        :param _Session session:
500
        :param str job_handle:
501
        """
502
        super().__init__(session, job_handle)
×
503
        logger.info("established job at {}", job_handle)
×
504
        self.__machine_url = self._url + "machine"
×
505
        self.__chip_url = self._url + "chip"
×
506
        self._keepalive_url = self._url + "keepalive"
×
507
        self.__keepalive_handle: Optional[Queue] = None
×
508
        self.__proxy_handle: Optional[WebSocket] = None
×
509
        self.__proxy_thread: Optional[_ProxyReceiver] = None
×
510
        self.__proxy_ping: Optional[_ProxyPing] = None
×
511

512
    @overrides(SpallocJob.get_session_credentials_for_db)
1✔
513
    def get_session_credentials_for_db(self):
1✔
514
        config = {}
×
515
        config["SPALLOC", "service uri"] = self._service_url
×
516
        config["SPALLOC", "job uri"] = self._url
×
517
        cookies, headers = self._session_credentials
×
518
        if "Authorization" in headers:
×
519
            # We never write the auth headers themselves; we just extend the
520
            # session
521
            del headers["Authorization"]
×
522
        for k, v in cookies.items():
×
523
            config["COOKIE", k] = v
×
524
        for k, v in headers.items():
×
525
            config["HEADER", k] = v
×
526
        return config
×
527

528
    @overrides(SpallocJob.get_state)
1✔
529
    def get_state(self, wait_for_change: bool = False) -> SpallocState:
1✔
530
        timeout: Optional[int] = 10
×
531
        if wait_for_change:
×
532
            timeout = None
×
533
        obj = self._get(
×
534
            self._url, wait=wait_for_change, timeout=timeout).json()
535
        return SpallocState[obj["state"]]
×
536

537
    @overrides(SpallocJob.get_root_host)
1✔
538
    def get_root_host(self) -> Optional[str]:
1✔
539
        r = self._get(self.__machine_url)
×
540
        if r.status_code == 204:
×
541
            return None
×
542
        obj = r.json()
×
543
        for c in obj["connections"]:
×
544
            [x, y], host = c
×
545
            if x == 0 and y == 0:
×
546
                return host
×
547
        return None
×
548

549
    @overrides(SpallocJob.get_connections)
1✔
550
    def get_connections(self) -> Dict[XY, str]:
1✔
551
        r = self._get(self.__machine_url)
×
552
        if r.status_code == 204:
×
553
            return {}
×
554
        return {
×
555
            (int(x), int(y)): str(host)
556
            for ((x, y), host) in r.json()["connections"]
557
        }
558

559
    @property
1✔
560
    def __proxy_url(self) -> Optional[str]:
1✔
561
        """
562
        The URL for talking to the proxy connection system.
563
        """
564
        r = self._get(self._url)
×
565
        if r.status_code == 204:
×
566
            return None
×
567
        try:
×
568
            url = r.json()["proxy-ref"]
×
569
            logger.info("Connecting to proxy on {}", url)
×
570
            return url
×
571
        except KeyError:
×
572
            return None
×
573

574
    def __init_proxy(self) -> _ProxyReceiver:
1✔
575
        if self.__proxy_handle is None or not self.__proxy_handle.connected:
×
576
            if self.__proxy_url is None:
×
577
                raise ValueError("no proxy available")
×
578
            self.__proxy_handle = self._websocket(
×
579
                self.__proxy_url, origin=get_hostname(self._url))
580
            self.__proxy_thread = _ProxyReceiver(self.__proxy_handle)
×
581
            self.__proxy_ping = _ProxyPing(self.__proxy_handle)
×
582
        assert self.__proxy_thread is not None
×
583
        return self.__proxy_thread
×
584

585
    @overrides(SpallocJob.connect_to_board)
1✔
586
    def connect_to_board(
1✔
587
            self, x: int, y: int,
588
            port: int = SCP_SCAMP_PORT) -> SpallocSCPConnection:
UNCOV
589
        proxy = self.__init_proxy()
×
590
        return _ProxiedSCAMPConnection(
×
591
            self.__proxy_handle, proxy, int(x), int(y), int(port))
592

593
    @overrides(SpallocJob.connect_for_booting)
1✔
594
    def connect_for_booting(self) -> SpallocBootConnection:
1✔
595
        proxy = self.__init_proxy()
×
596
        return _ProxiedBootConnection(self.__proxy_handle, proxy)
×
597

598
    @overrides(SpallocJob.open_eieio_connection)
1✔
599
    def open_eieio_connection(self, x: int, y: int) -> SpallocEIEIOConnection:
1✔
600
        proxy = self.__init_proxy()
×
601
        return _ProxiedEIEIOConnection(
×
602
            self.__proxy_handle, proxy, int(x), int(y), SCP_SCAMP_PORT)
603

604
    @overrides(SpallocJob.open_eieio_listener_connection)
1✔
605
    def open_eieio_listener_connection(self) -> SpallocEIEIOListener:
1✔
606
        proxy = self.__init_proxy()
×
607
        return _ProxiedEIEIOListener(
×
608
            self.__proxy_handle, proxy, self.get_connections())
609

610
    @overrides(SpallocJob.open_udp_listener_connection)
1✔
611
    def open_udp_listener_connection(self) -> UDPConnection:
1✔
612
        proxy = self.__init_proxy()
×
UNCOV
613
        return _ProxiedUDPListener(
×
614
            self.__proxy_handle, proxy, self.get_connections())
615

616
    @overrides(SpallocJob.wait_for_state_change)
1✔
617
    def wait_for_state_change(self, old_state: SpallocState,
1✔
618
                              timeout: Optional[int] = None) -> SpallocState:
619
        while old_state != SpallocState.DESTROYED:
×
620
            obj = self._get(self._url, wait="true", timeout=timeout).json()
×
621
            s = SpallocState[obj["state"]]
×
622
            if s != old_state or s == SpallocState.DESTROYED:
×
UNCOV
623
                return s
×
UNCOV
624
        return old_state
×
625

626
    @overrides(SpallocJob.wait_until_ready)
1✔
627
    def wait_until_ready(self, timeout: Optional[int] = None,
1✔
628
                         n_retries: Optional[int] = None):
629
        state = self.get_state()
×
UNCOV
630
        retries = 0
×
631
        while (state != SpallocState.READY and
×
632
               (n_retries is None or retries < n_retries)):
633
            retries += 1
×
634
            state = self.wait_for_state_change(state, timeout=timeout)
×
UNCOV
635
            if state == SpallocState.DESTROYED:
×
UNCOV
636
                raise SpallocException("job was unexpectedly destroyed")
×
637

638
    @overrides(SpallocJob.destroy)
1✔
639
    def destroy(self, reason: str = "finished") -> None:
1✔
640
        if self.__keepalive_handle:
×
641
            self.__keepalive_handle.close()
×
642
            self.__keepalive_handle = None
×
643
        if self.__proxy_handle is not None:
×
644
            if self.__proxy_thread:
×
645
                self.__proxy_thread.close()
×
646
            if self.__proxy_ping:
×
647
                self.__proxy_ping.close()
×
648
            self.__proxy_handle.close()
×
UNCOV
649
        self._delete(self._url, reason=str(reason))
×
UNCOV
650
        logger.info("deleted job at {}", self._url)
×
651

652
    @overrides(SpallocJob.keepalive)
1✔
653
    def keepalive(self) -> None:
1✔
UNCOV
654
        self._put(self._keepalive_url, "alive")
×
655

656
    @overrides(SpallocJob.launch_keepalive_task, extend_doc=True)
1✔
657
    def launch_keepalive_task(
1✔
658
            self, period: float = 30) -> ContextManager[Process]:
659
        """
660
        .. note::
661
            Tricky! *Cannot* be done with a thread, as the main thread is known
662
            to do significant amounts of CPU-intensive work.
663
        """
664
        if self.__keepalive_handle is not None:
×
665
            raise SpallocException("cannot keep job alive from two tasks")
×
UNCOV
666
        q: Queue = Queue(1)
×
UNCOV
667
        p = Process(target=_SpallocKeepalive, args=(
×
668
            self._keepalive_url, 0 + period, q,
669
            *self._session_credentials), daemon=True)
670
        p.start()
×
UNCOV
671
        self.__keepalive_handle = q
×
UNCOV
672
        return self.__closer(q, p)
×
673

674
    @contextmanager
1✔
675
    def __closer(self, q: Queue, p: Process) -> Iterator[Process]:
1✔
UNCOV
676
        try:
×
677
            yield p
×
678
        finally:
679
            q.put("quit")
×
680
            # Give it a second, and if it still isn't dead, kill it
681
            p.join(1)
×
UNCOV
682
            if p.is_alive():
×
UNCOV
683
                p.kill()
×
684

685
    @overrides(SpallocJob.where_is_machine)
1✔
686
    def where_is_machine(self, x: int, y: int) -> Optional[
1✔
687
            Tuple[int, int, int]]:
688
        r = self._get(self.__chip_url, x=int(x), y=int(y))
×
689
        if r.status_code == 204:
×
UNCOV
690
            return None
×
UNCOV
691
        return cast(Tuple[int, int, int], tuple(
×
692
            r.json()["physical-board-coordinates"]))
693

694
    @property
1✔
695
    def _keepalive_handle(self) -> Optional[Queue]:
1✔
UNCOV
696
        return self.__keepalive_handle
×
697

698
    @_keepalive_handle.setter
1✔
699
    def _keepalive_handle(self, handle: Queue):
1✔
700
        if self.__keepalive_handle is not None:
×
UNCOV
701
            raise SpallocException("cannot keep job alive from two tasks")
×
UNCOV
702
        self.__keepalive_handle = handle
×
703

704
    @overrides(SpallocJob.create_transceiver)
1✔
705
    def create_transceiver(self) -> Transceiver:
1✔
706
        if self.get_state() != SpallocState.READY:
×
UNCOV
707
            raise SpallocException("job not ready to execute scripts")
×
UNCOV
708
        proxies: List[Connection] = [
×
709
            self.connect_to_board(x, y) for (x, y) in self.get_connections()]
710
        # Also need a boot connection
UNCOV
711
        proxies.append(self.connect_for_booting())
×
UNCOV
712
        return create_transceiver_from_connections(connections=proxies)
×
713

714
    def __repr__(self):
1✔
UNCOV
715
        return f"SpallocJob({self._url})"
×
716

717

718
class _ProxiedConnection(metaclass=AbstractBase):
1✔
719
    """
720
    Core multiplexer/demultiplexer emulating a connection that is proxied
721
    over a websocket.
722

723
    None of the methods are public because subclasses may expose a profile of
724
    them to conform to a particular type of connection.
725
    """
726

727
    def __init__(self, ws: WebSocket, receiver: _ProxyReceiver):
1✔
728
        self.__ws: Optional[WebSocket] = ws
×
729
        self.__receiver: Optional[_ProxyReceiver] = receiver
×
730
        self.__msgs: queue.SimpleQueue = queue.SimpleQueue()
×
731
        self.__call_queue: queue.Queue = queue.Queue(1)
×
732
        self.__call_lock = threading.RLock()
×
733
        self.__current_msg: Optional[bytes] = None
×
UNCOV
734
        self.__handle = self._open_connection()
×
UNCOV
735
        self.__receiver.listen(self.__handle, self.__msgs.put)
×
736

737
    @abstractmethod
1✔
738
    def _open_connection(self) -> int:
1✔
739
        raise NotImplementedError
740

741
    def _call(self, protocol: ProxyProtocol, packer: struct.Struct,
1✔
742
              unpacker: struct.Struct, *args) -> Tuple[Any, ...]:
743
        """
744
        Do a synchronous call.
745

746
        :param protocol:
747
            The protocol message number.
748
        :param packer:
749
            How to form the protocol message. The first two arguments passed
750
            will be the protocol message number and an issued correlation ID
751
            (not needed by the caller).
752
        :param unpacker:
753
            How to extract the expected response.
754
        :param args:
755
            Additional arguments to pass to the packer.
756
        :return:
757
            The results from the unpacker *after* the protocol message code and
758
            the correlation ID.
759
        :raises IOError:
760
            If something goes wrong. This could be due to the websocket being
761
            closed, or the receipt of an ERROR response.
762
        """
763
        if not self._connected:
×
764
            raise IOError("socket closed")
×
765
        if not self.__receiver:
×
766
            raise IOError("socket closed")
×
767
        if not self.__ws:
×
UNCOV
768
            raise IOError("socket closed")
×
769
        with self.__call_lock:
×
770
            # All calls via websocket use correlation_id
771
            correlation_id = self.__receiver.expect_return(
×
772
                self.__call_queue.put)
773
            self.__ws.send_binary(packer.pack(protocol, correlation_id, *args))
×
774
            if not self._connected:
×
775
                raise IOError("socket closed after send!")
×
776
            reply = self.__call_queue.get()
×
UNCOV
777
            code, _ = _msg.unpack_from(reply, 0)
×
778
            if code == ProxyProtocol.ERROR:
×
779
                # Rest of message is UTF-8 encoded error message string
780
                payload = reply[_msg.size:].decode("utf-8")
×
781
                if len(payload):
×
UNCOV
782
                    raise _ProxyServiceError(payload)
×
783
                raise _ProxyServiceError(
×
784
                    f"unknown problem with {protocol} call")
UNCOV
785
            return unpacker.unpack(reply)[2:]
×
786

787
    @property
1✔
788
    def _connected(self) -> bool:
1✔
UNCOV
789
        return bool(self.__ws and self.__ws.connected)
×
790

791
    def _throw_if_closed(self) -> None:
1✔
UNCOV
792
        if not self._connected:
×
UNCOV
793
            raise IOError("socket closed")
×
794

795
    def _close(self) -> None:
1✔
UNCOV
796
        if self._connected:
×
UNCOV
797
            channel_id, = self._call(
×
798
                ProxyProtocol.CLOSE, _close_req, _open_close_res,
799
                self.__handle)
800
            if channel_id != self.__handle:
×
801
                raise IOError("failed to close proxy socket")
×
802
        if self.__receiver:
×
803
            self.__receiver.unlisten(self.__handle)
×
UNCOV
804
        self.__ws = None
×
UNCOV
805
        self.__receiver = None
×
806

807
    def _send(self, message: bytes):
1✔
808
        self._throw_if_closed()
×
809
        # Put the header on the front and send it
810
        if not self.__ws:
×
UNCOV
811
            raise IOError("socket closed")
×
UNCOV
812
        self.__ws.send_binary(_msg.pack(
×
813
            ProxyProtocol.MSG, self.__handle) + message)
814

815
    def _send_to(self, message: bytes, x: int, y: int, port: int):
1✔
816
        self._throw_if_closed()
×
817
        # Put the header on the front and send it
818
        if not self.__ws:
×
UNCOV
819
            raise IOError("socket closed")
×
UNCOV
820
        self.__ws.send_binary(_msg_to.pack(
×
821
            ProxyProtocol.MSG_TO, self.__handle, x, y, port) + message)
822

823
    def __get(self, timeout: float = 0.5) -> bytes:
1✔
824
        """
825
        Get a value from the queue. Handles block/non-block switching and
826
        trimming of the message protocol prefix.
827
        """
UNCOV
828
        if not timeout:
×
829
            return self.__msgs.get(block=False)[_msg.size:]
×
830
        else:
UNCOV
831
            return self.__msgs.get(timeout=timeout)[_msg.size:]
×
832

833
    def _receive(self, timeout: Optional[float] = None) -> bytes:
1✔
834
        if self.__current_msg is not None:
×
UNCOV
835
            try:
×
836
                return self.__current_msg
×
837
            finally:
UNCOV
838
                self.__current_msg = None
×
839
        if timeout is None:
×
840
            while True:
841
                try:
×
842
                    return self.__get()
×
UNCOV
843
                except queue.Empty:
×
844
                    self._throw_if_closed()
×
845
        else:
846
            try:
×
847
                return self.__get(timeout)
×
848
            except queue.Empty as e:
×
UNCOV
849
                self._throw_if_closed()
×
UNCOV
850
                raise SpinnmanTimeoutException("receive", timeout) from e
×
851

852
    def _is_ready_to_receive(self, timeout: float = 0) -> bool:
1✔
853
        # If we already have a message or the queue peek succeeds, return now
854
        if self.__current_msg is not None or not self.__msgs.empty():
×
855
            return True
×
856
        try:
×
857
            self.__current_msg = self.__get(timeout)
×
858
            return True
×
UNCOV
859
        except queue.Empty:
×
UNCOV
860
            return False
×
861

862

863
class _ProxiedBidirectionalConnection(
1✔
864
        _ProxiedConnection, SpallocProxiedConnection):
865
    """
866
    A connection that talks to a particular board via the proxy.
867
    """
868

869
    def __init__(
1✔
870
            self, ws: WebSocket, receiver: _ProxyReceiver,
871
            x: int, y: int, port: int):
UNCOV
872
        self.__connect_args = (x, y, port)
×
UNCOV
873
        super().__init__(ws, receiver)
×
874

875
    @overrides(_ProxiedConnection._open_connection)
1✔
876
    def _open_connection(self):
1✔
UNCOV
877
        handle, = self._call(
×
878
            ProxyProtocol.OPEN, _open_req, _open_close_res,
879
            *self.__connect_args)
UNCOV
880
        return handle
×
881

882
    @overrides(Connection.is_connected)
1✔
883
    def is_connected(self) -> bool:
1✔
UNCOV
884
        return self._connected
×
885

886
    @overrides(Connection.close)
1✔
887
    def close(self):
1✔
UNCOV
888
        self._close()
×
889

890
    @overrides(SpallocProxiedConnection.send)
1✔
891
    def send(self, data: bytes):
1✔
892
        if not isinstance(data, (bytes, bytearray)):
×
UNCOV
893
            data = bytes(data)
×
UNCOV
894
        self._send(data)
×
895

896
    @overrides(SpallocProxiedConnection.receive)
1✔
897
    def receive(self, timeout: Optional[float] = None) -> bytes:
1✔
UNCOV
898
        return self._receive(timeout)
×
899

900
    @overrides(Listenable.is_ready_to_receive)
1✔
901
    def is_ready_to_receive(self, timeout: float = 0) -> bool:
1✔
UNCOV
902
        return self._is_ready_to_receive(timeout)
×
903

904
    @abstractmethod
1✔
905
    def __str__(self) -> str:
1✔
906
        raise NotImplementedError
907

908

909
class _ProxiedUnboundConnection(
1✔
910
        _ProxiedConnection, SpallocProxiedConnection):
911
    """
912
    A connection that can listen to all boards via the proxy, but which can
913
    only send if a target board is provided.
914
    """
915

916
    def __init__(self, ws: WebSocket, receiver: _ProxyReceiver):
1✔
917
        super().__init__(ws, receiver)
×
UNCOV
918
        self.__addr: Optional[str] = None
×
UNCOV
919
        self.__port: Optional[int] = None
×
920

921
    @overrides(_ProxiedConnection._open_connection)
1✔
922
    def _open_connection(self) -> int:
1✔
UNCOV
923
        handle, ip1, ip2, ip3, ip4, self.__port = self._call(
×
924
            ProxyProtocol.OPEN_UNBOUND, _open_listen_req, _open_listen_res)
925
        # Assemble the address into the format expected elsewhere
UNCOV
926
        self.__addr = f"{ip1}.{ip2}.{ip3}.{ip4}"
×
UNCOV
927
        return handle
×
928

929
    @property
1✔
930
    def _addr(self) -> Optional[str]:
1✔
UNCOV
931
        return self.__addr if self._connected else None
×
932

933
    @property
1✔
934
    def _port(self) -> Optional[int]:
1✔
UNCOV
935
        return self.__port if self._connected else None
×
936

937
    @overrides(Connection.is_connected)
1✔
938
    def is_connected(self) -> bool:
1✔
UNCOV
939
        return self._connected
×
940

941
    @overrides(Connection.close)
1✔
942
    def close(self) -> None:
1✔
UNCOV
943
        self._close()
×
944

945
    @overrides(SpallocProxiedConnection.send)
1✔
946
    def send(self, data: bytes) -> None:
1✔
UNCOV
947
        self._throw_if_closed()
×
UNCOV
948
        raise IOError("socket is not open for sending")
×
949

950
    @overrides(SpallocProxiedConnection.receive)
1✔
951
    def receive(self, timeout: Optional[float] = None) -> bytes:
1✔
UNCOV
952
        return self._receive(timeout)
×
953

954
    @overrides(Listenable.is_ready_to_receive)
1✔
955
    def is_ready_to_receive(self, timeout: float = 0) -> bool:
1✔
UNCOV
956
        return self._is_ready_to_receive(timeout)
×
957

958
    @abstractmethod
1✔
959
    def __str__(self) -> str:
1✔
960
        raise NotImplementedError
961

962

963
class _ProxiedSCAMPConnection(
1✔
964
        _ProxiedBidirectionalConnection, SpallocSCPConnection):
965
    __slots__ = ("__chip_x", "__chip_y")
1✔
966

967
    def __init__(
1✔
968
            self, ws: WebSocket, receiver: _ProxyReceiver,
969
            x: int, y: int, port: int):
UNCOV
970
        super().__init__(ws, receiver, x, y, port)
×
UNCOV
971
        SpallocSCPConnection.__init__(self, x, y)
×
972

973
    def __str__(self):
1✔
UNCOV
974
        return f"SCAMPConnection[proxied]({self.chip_x},{self.chip_y})"
×
975

976

977
class _ProxiedBootConnection(
1✔
978
        _ProxiedBidirectionalConnection, SpallocBootConnection):
979
    __slots__ = ()
1✔
980

981
    def __init__(self, ws: WebSocket, receiver: _ProxyReceiver):
1✔
UNCOV
982
        super().__init__(ws, receiver, 0, 0, UDP_BOOT_CONNECTION_DEFAULT_PORT)
×
983

984
    def __str__(self):
1✔
UNCOV
985
        return "BootConnection[proxied]()"
×
986

987

988
class _ProxiedEIEIOConnection(
1✔
989
        _ProxiedBidirectionalConnection,
990
        SpallocEIEIOConnection, SpallocProxiedConnection):
991
    # Special: This is a unidirectional receive-only connection
992
    __slots__ = ("__addr", "__port", "__chip_x", "__chip_y")
1✔
993

994
    def __init__(
1✔
995
            self, ws: WebSocket, receiver: _ProxyReceiver,
996
            x: int, y: int, port: int):
997
        super().__init__(ws, receiver, x, y, port)
×
UNCOV
998
        self.__chip_x = x
×
UNCOV
999
        self.__chip_y = y
×
1000

1001
    @property
1✔
1002
    @overrides(SpallocEIEIOConnection._coords)
1✔
1003
    def _coords(self) -> XY:
1✔
UNCOV
1004
        return self.__chip_x, self.__chip_y
×
1005

1006
    def send_to(
1✔
1007
            self,
1008
            data: bytes, address: tuple):  # pylint: disable=unused-argument
1009
        """
1010
        Direct ``send_to`` is unsupported.
1011
        """
UNCOV
1012
        self._throw_if_closed()
×
UNCOV
1013
        raise IOError("socket is not open for sending")
×
1014

1015
    def __str__(self):
1✔
UNCOV
1016
        return (f"EIEIOConnection[proxied](remote:{self.__chip_x},"
×
1017
                f"{self.__chip_y})")
1018

1019

1020
class _ProxiedEIEIOListener(_ProxiedUnboundConnection, SpallocEIEIOListener):
1✔
1021
    __slots__ = ("__conns", )
1✔
1022

1023
    def __init__(self, ws: WebSocket, receiver: _ProxyReceiver,
1✔
1024
                 conns: Dict[XY, str]):
1025
        super().__init__(ws, receiver)
×
1026
        # Invert the map
UNCOV
1027
        self.__conns = {ip: xy for (xy, ip) in conns.items()}
×
1028

1029
    @overrides(SpallocEIEIOListener.send_to_chip)
1✔
1030
    def send_to_chip(
1✔
1031
            self, message: bytes, x: int, y: int, port: int = SCP_SCAMP_PORT):
1032
        if not isinstance(message, (bytes, bytearray)):
×
UNCOV
1033
            message = bytes(message)
×
UNCOV
1034
        self._send_to(bytes(message), x, y, port)
×
1035

1036
    @property
1✔
1037
    @overrides(SpallocEIEIOListener.local_ip_address)
1✔
1038
    def local_ip_address(self) -> str:
1✔
UNCOV
1039
        return self._addr or "0.0.0.0"
×
1040

1041
    @property
1✔
1042
    @overrides(SpallocEIEIOListener.local_port)
1✔
1043
    def local_port(self) -> int:
1✔
UNCOV
1044
        return self._port or 0
×
1045

1046
    @overrides(SpallocEIEIOListener._get_chip_coords)
1✔
1047
    def _get_chip_coords(self, ip_address: str) -> XY:
1✔
UNCOV
1048
        return self.__conns[str(ip_address)]
×
1049

1050
    def __str__(self):
1✔
UNCOV
1051
        return f"EIEIOConnection[proxied](local:{self._addr}:{self._port})"
×
1052

1053

1054
class _ProxiedUDPListener(_ProxiedUnboundConnection, UDPConnection):
1✔
1055
    __slots__ = ("__conns", )
1✔
1056

1057
    def __init__(self, ws: WebSocket, receiver: _ProxyReceiver,
1✔
1058
                 conns: Dict[XY, str]):
1059
        super().__init__(ws, receiver)
×
1060
        # Invert the map
UNCOV
1061
        self.__conns = {ip: xy for (xy, ip) in conns.items()}
×
1062

1063
    @overrides(UDPConnection.send_to)
1✔
1064
    def send_to(self, data: bytes, address: Tuple[str, int]):
1✔
1065
        ip, port = address
×
UNCOV
1066
        x, y = self.__conns[ip]
×
UNCOV
1067
        self._send_to(data, x, y, port)
×
1068

1069
    @property
1✔
1070
    @overrides(UDPConnection.local_ip_address)
1✔
1071
    def local_ip_address(self) -> str:
1✔
UNCOV
1072
        return self._addr or "0.0.0.0"
×
1073

1074
    @property
1✔
1075
    @overrides(UDPConnection.local_port)
1✔
1076
    def local_port(self) -> int:
1✔
UNCOV
1077
        return self._port or 0
×
1078

1079
    def __str__(self):
1✔
UNCOV
1080
        return f"UDPConnection[proxied](local:{self._addr}:{self._port})"
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc