• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / SpiNNMan / 6574804013

19 Oct 2023 12:47PM UTC coverage: 51.937% (+1.2%) from 50.777%
6574804013

Pull #327

github

Christian-B
typing changes
Pull Request #327: Type Annotations and Checking

105 of 1288 branches covered (0.0%)

Branch coverage included in aggregate %.

2375 of 2375 new or added lines in 180 files covered. (100.0%)

4775 of 8108 relevant lines covered (58.89%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

30.65
/spinnman/spalloc/spalloc_client.py
1
# Copyright (c) 2021 The University of Manchester
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     https://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
"""
1✔
15
Implementation of the client for the Spalloc web service.
16
"""
17

18
from contextlib import contextmanager
1✔
19
from logging import getLogger
1✔
20
from multiprocessing import Process, Queue
1✔
21
from time import sleep
1✔
22
from packaging.version import Version
1✔
23
from urllib.parse import urlparse, urlunparse, ParseResult
1✔
24
import queue
1✔
25
import requests
1✔
26
import struct
1✔
27
import threading
1✔
28
from typing import (
1✔
29
    Any, ContextManager,
30
    Callable, Dict, FrozenSet, Iterable, Iterator, List, Mapping,
31
    Optional, Tuple, cast)
32
from typing_extensions import TypeAlias
1✔
33
from websocket import WebSocket  # type: ignore
1✔
34
from spinn_utilities.abstract_base import AbstractBase, abstractmethod
1✔
35
from spinn_utilities.abstract_context_manager import AbstractContextManager
1✔
36
from spinn_utilities.log import FormatAdapter
1✔
37
from spinn_utilities.typing.coords import XY
1✔
38
from spinn_utilities.typing.json import JsonObject, JsonValue
1✔
39
from spinn_utilities.overrides import overrides
1✔
40
from spinnman.connections.udp_packet_connections import UDPConnection
1✔
41
from spinnman.connections.abstract_classes import Connection, Listenable
1✔
42
from spinnman.constants import SCP_SCAMP_PORT, UDP_BOOT_CONNECTION_DEFAULT_PORT
1✔
43
from spinnman.exceptions import SpinnmanTimeoutException
1✔
44
from spinnman.exceptions import SpallocException
1✔
45
from spinnman.transceiver import (
1✔
46
    Transceiver, create_transceiver_from_connections)
47
from .spalloc_state import SpallocState
1✔
48
from .proxy_protocol import ProxyProtocol
1✔
49
from .session import Session, SessionAware
1✔
50
from .utils import parse_service_url, get_hostname
1✔
51
from .abstract_spalloc_client import AbstractSpallocClient
1✔
52
from .spalloc_machine import SpallocMachine
1✔
53
from .spalloc_job import SpallocJob
1✔
54
from .spalloc_proxied_connection import SpallocProxiedConnection
1✔
55
from .spalloc_boot_connection import SpallocBootConnection
1✔
56
from .spalloc_eieio_connection import SpallocEIEIOConnection
1✔
57
from .spalloc_eieio_listener import SpallocEIEIOListener
1✔
58
from .spalloc_scp_connection import SpallocSCPConnection
1✔
59

60
logger = FormatAdapter(getLogger(__name__))
1✔
61
_open_req = struct.Struct("<IIIII")
1✔
62
_close_req = struct.Struct("<III")
1✔
63
_open_listen_req = struct.Struct("<II")
1✔
64
# Open and close share the response structure
65
_open_close_res = struct.Struct("<III")
1✔
66
_open_listen_res = struct.Struct("<IIIBBBBI")
1✔
67
_msg = struct.Struct("<II")
1✔
68
_msg_to = struct.Struct("<IIIII")
1✔
69

70

71
def fix_url(url):
1✔
72
    parts = urlparse(url)
×
73
    if parts.scheme != 'https':
×
74
        parts = ParseResult("https", parts.netloc, parts.path,
×
75
                            parts.params, parts. query, parts.fragment)
76
    if not parts.path.endswith("/"):
×
77
        parts = ParseResult(parts.scheme, parts.netloc, parts.path + "/",
×
78
                            parts.params, parts.query, parts.fragment)
79
    return urlunparse(parts)
×
80

81

82
class SpallocClient(AbstractContextManager, AbstractSpallocClient):
1✔
83
    """
84
    Basic client library for talking to new Spalloc.
85
    """
86
    __slots__ = ("__session",
1✔
87
                 "__machines_url", "__jobs_url", "version",
88
                 "__group", "__collab", "__nmpi_job", "__nmpi_user")
89

90
    def __init__(
1✔
91
            self, service_url: str,
92
            username: Optional[str] = None, password: Optional[str] = None,
93
            bearer_token: Optional[str] = None,
94
            group: Optional[str] = None, collab: Optional[str] = None,
95
            nmpi_job: Optional[int] = None, nmpi_user: Optional[str] = None):
96
        """
97
        :param str service_url: The reference to the service.
98
            May have username and password supplied as part of the network
99
            location; if so, the ``username`` and ``password`` arguments
100
            *must* be ``None``. If ``username`` and ``password`` are not given,
101
            not even within the URL, the ``bearer_token`` must be not ``None``.
102
        :param str username: The user name to use
103
        :param str password: The password to use
104
        :param str bearer_token: The bearer token to use
105
        """
106
        if username is None and password is None:
×
107
            service_url, username, password = parse_service_url(service_url)
×
108
        self.__session: Optional[Session] = Session(
×
109
            service_url, username, password, bearer_token)
110
        obj = self.__session.renew()
×
111
        v = cast(JsonObject, obj["version"])
×
112
        self.version = Version(
×
113
            f"{v['major-version']}.{v['minor-version']}.{v['revision']}")
114
        self.__machines_url = fix_url(obj["machines-ref"])
×
115
        self.__jobs_url = fix_url(obj["jobs-ref"])
×
116
        self.__group = group
×
117
        self.__collab = collab
×
118
        self.__nmpi_job = nmpi_job
×
119
        self.__nmpi_user = nmpi_user
×
120
        logger.info("established session to {} for {}", service_url, username)
×
121

122
    @staticmethod
1✔
123
    def open_job_from_database(
1✔
124
            service_url, job_url, cookies, headers) -> SpallocJob:
125
        """
126
        Create a job from the description in the attached database. This is
127
        intended to allow for access to the job's allocated resources from
128
        visualisers and other third party code participating in the SpiNNaker
129
        Tools Notification Protocol.
130

131
        .. note::
132
            The job is not verified to exist and be running. The session
133
            credentials may have expired; if so, the job will be unable to
134
            regenerate them.
135

136
        :param str service_url:
137
        :param str job_url:
138
        :param dict(str, str) cookies:
139
        :param dict(str, str) headers:
140

141
        :return:
142
            The job handle, or ``None`` if the records in the database are
143
            absent or incomplete.
144
        :rtype: SpallocJob
145
        """
146
        session = Session(service_url, session_credentials=(cookies, headers))
×
147
        return _SpallocJob(session, job_url)
×
148

149
    @overrides(AbstractSpallocClient.list_machines)
1✔
150
    def list_machines(self) -> Dict[str, SpallocMachine]:
1✔
151
        assert self.__session
×
152
        obj = self.__session.get(self.__machines_url).json()
×
153
        return {m["name"]: _SpallocMachine(self.__session, m)
×
154
                for m in obj["machines"]}
155

156
    @overrides(AbstractSpallocClient.list_jobs)
1✔
157
    def list_jobs(self, deleted: bool = False) -> Iterable[SpallocJob]:
1✔
158
        assert self.__session
×
159
        obj = self.__session.get(
×
160
            self.__jobs_url,
161
            deleted=("true" if deleted else "false")).json()
162
        while obj["jobs"]:
×
163
            for u in obj["jobs"]:
×
164
                yield _SpallocJob(self.__session, fix_url(u))
×
165
            if "next" not in obj:
×
166
                break
×
167
            obj = self.__session.get(obj["next"]).json()
×
168

169
    def _create(self, create: Mapping[str, JsonValue],
1✔
170
                machine_name: Optional[str]) -> SpallocJob:
171
        assert self.__session
×
172
        operation = dict(create)
×
173
        if machine_name:
×
174
            operation["machine-name"] = machine_name
×
175
        else:
176
            operation["tags"] = ["default"]
×
177
        if self.__group is not None:
×
178
            operation["group"] = self.__group
×
179
        if self.__collab is not None:
×
180
            operation["nmpi-collab"] = self.__collab
×
181
        if self.__nmpi_job is not None:
×
182
            operation["nmpi-job-id"] = self.__nmpi_job
×
183
            if self.__nmpi_user is not None:
×
184
                operation["owner"] = self.__nmpi_user
×
185
        r = self.__session.post(self.__jobs_url, operation, timeout=30)
×
186
        url = r.headers["Location"]
×
187
        return _SpallocJob(self.__session, fix_url(url))
×
188

189
    @overrides(AbstractSpallocClient.create_job)
1✔
190
    def create_job(
1✔
191
            self, num_boards: int = 1,
192
            machine_name: Optional[str] = None,
193
            keepalive: int = 45) -> SpallocJob:
194
        return self._create({
×
195
            "num-boards": int(num_boards),
196
            "keepalive-interval": f"PT{int(keepalive)}S"
197
        }, machine_name)
198

199
    @overrides(AbstractSpallocClient.create_job_rect)
1✔
200
    def create_job_rect(
1✔
201
            self, width: int, height: int,
202
            machine_name: Optional[str] = None,
203
            keepalive: int = 45) -> SpallocJob:
204
        return self._create({
×
205
            "dimensions": {
206
                "width": int(width),
207
                "height": int(height)
208
            },
209
            "keepalive-interval": f"PT{int(keepalive)}S"
210
        }, machine_name)
211

212
    @overrides(AbstractSpallocClient.create_job_board)
1✔
213
    def create_job_board(
1✔
214
            self, triad=None, physical=None, ip_address=None,
215
            machine_name=None, keepalive=45) -> SpallocJob:
216
        board: JsonObject
217
        if triad:
×
218
            x, y, z = triad
×
219
            board = {"x": int(x), "y": int(y), "z": int(z)}
×
220
        elif physical:
×
221
            c, f, b = physical
×
222
            board = {"cabinet": int(c), "frame": int(f), "board": int(b)}
×
223
        elif ip_address:
×
224
            board = {"address": str(ip_address)}
×
225
        else:
226
            raise KeyError("at least one of triad, physical and ip_address "
×
227
                           "must be given")
228
        return self._create({
×
229
            "board": board,
230
            "keepalive-interval": f"PT{int(keepalive)}S"
231
        }, machine_name)
232

233
    @overrides(AbstractSpallocClient.create_job_rect_at_board)
1✔
234
    def create_job_rect_at_board(
1✔
235
            self, width, height, triad=None, physical=None, ip_address=None,
236
            machine_name=None, keepalive=45, max_dead_boards=0):
237
        if triad:
×
238
            x, y, z = triad
×
239
            board = {"x": int(x), "y": int(y), "z": int(z)}
×
240
        elif physical:
×
241
            c, f, b = physical
×
242
            board = {"cabinet": int(c), "frame": int(f), "board": int(b)}
×
243
        elif ip_address:
×
244
            board = {"address": str(ip_address)}
×
245
        else:
246
            raise KeyError("at least one of triad, physical and ip_address "
×
247
                           "must be given")
248
        return self._create({
×
249
            "dimensions": {
250
                "width": int(width),
251
                "height": int(height)
252
            },
253
            "board": board,
254
            "keepalive-interval": f"PT{int(keepalive)}S",
255
            "max-dead-boards": int(max_dead_boards)
256
        }, machine_name)
257

258
    def close(self) -> None:
1✔
259
        # pylint: disable=protected-access
260
        if self.__session is not None:
×
261
            self.__session._purge()
×
262
        self.__session = None
×
263

264

265
class _ProxyServiceError(IOError):
1✔
266
    """
267
    An error passed to us from the server over the proxy channel.
268
    """
269

270

271
def _SpallocKeepalive(url, interval, term_queue, cookies, headers):
1✔
272
    """
273
    Actual keepalive task implementation. Don't use directly.
274
    """
275
    headers["Content-Type"] = "text/plain; charset=UTF-8"
×
276
    while True:
277
        requests.put(url, data="alive", cookies=cookies, headers=headers,
×
278
                     allow_redirects=False, timeout=10)
279
        try:
×
280
            term_queue.get(True, interval)
×
281
            break
×
282
        except queue.Empty:
×
283
            continue
×
284
        # On ValueError or OSError, just terminate the keepalive process
285
        # They happen when the term_queue is directly closed
286
        except ValueError:
×
287
            break
×
288
        except OSError:
×
289
            break
×
290

291

292
class _SpallocMachine(SessionAware, SpallocMachine):
1✔
293
    """
294
    Represents a Spalloc-controlled machine.
295

296
    Don't make this yourself. Use :py:class:`SpallocClient` instead.
297
    """
298
    __slots__ = ("__name", "__tags", "__width", "__height",
1✔
299
                 "__dead_boards", "__dead_links")
300

301
    def __init__(self, session: Session, machine_data: JsonObject):
1✔
302
        """
303
        :param _Session session:
304
        :param dict machine_data:
305
        """
306
        super().__init__(session, cast(str, machine_data["uri"]))
×
307
        self.__name = cast(str, machine_data["name"])
×
308
        self.__tags = frozenset(cast(List[str], machine_data["tags"]))
×
309
        self.__width = cast(int, machine_data["width"])
×
310
        self.__height = cast(int, machine_data["height"])
×
311
        self.__dead_boards = cast(list, machine_data["dead-boards"])
×
312
        self.__dead_links = cast(list, machine_data["dead-links"])
×
313

314
    @property
1✔
315
    @overrides(SpallocMachine.name)
1✔
316
    def name(self) -> str:
1✔
317
        return self.__name
×
318

319
    @property
1✔
320
    @overrides(SpallocMachine.tags)
1✔
321
    def tags(self) -> FrozenSet[str]:
1✔
322
        return self.__tags
×
323

324
    @property
1✔
325
    @overrides(SpallocMachine.width)
1✔
326
    def width(self) -> int:
1✔
327
        return self.__width
×
328

329
    @property
1✔
330
    @overrides(SpallocMachine.height)
1✔
331
    def height(self) -> int:
1✔
332
        return self.__height
×
333

334
    @property
1✔
335
    @overrides(SpallocMachine.dead_boards)
1✔
336
    def dead_boards(self) -> list:
1✔
337
        return self.__dead_boards
×
338

339
    @property
1✔
340
    @overrides(SpallocMachine.dead_links)
1✔
341
    def dead_links(self) -> list:
1✔
342
        return self.__dead_links
×
343

344
    @property
1✔
345
    @overrides(SpallocMachine.area)
1✔
346
    def area(self) -> Tuple[int, int]:
1✔
347
        return (self.width, self.height)
×
348

349
    def __repr__(self):
1✔
350
        return "SpallocMachine" + str((
×
351
            self.name, self.tags, self.width, self.height, self.dead_boards,
352
            self.dead_links))
353

354

355
class _ProxyPing(threading.Thread):
1✔
356
    """
357
    Sends ping messages to an open websocket
358
    """
359

360
    def __init__(self, ws, sleep_time=30):
1✔
361
        super().__init__(daemon=True)
×
362
        self.__ws = ws
×
363
        self.__sleep_time = sleep_time
×
364
        self.__closed = False
×
365
        self.start()
×
366

367
    def run(self):
1✔
368
        """
369
        The handler loop of this thread
370
        """
371
        while self.__ws.connected:
×
372
            try:
×
373
                self.__ws.ping()
×
374
            except Exception:  # pylint: disable=broad-except
×
375
                # If closed, ignore error and get out of here
376
                if self.__closed:
×
377
                    break
×
378

379
                # Make someone aware of the error
380
                logger.exception("Error in websocket before close")
×
381
            sleep(self.__sleep_time)
×
382

383
    def close(self):
1✔
384
        """
385
        Mark as closed to avoid error messages.
386
        """
387
        self.__closed = True
×
388

389

390
_WSCB: TypeAlias = Callable[[Optional[bytes]], None]
1✔
391

392

393
class _ProxyReceiver(threading.Thread):
1✔
394
    """
395
    Receives all messages off an open websocket and dispatches them to
396
    registered listeners.
397
    """
398

399
    def __init__(self, ws: WebSocket):
1✔
400
        super().__init__(daemon=True)
×
401
        self.__ws = ws
×
402
        self.__returns: Dict[int, _WSCB] = {}
×
403
        self.__handlers: Dict[int, _WSCB] = {}
×
404
        self.__correlation_id = 0
×
405
        self.__closed = False
×
406
        self.start()
×
407

408
    def run(self) -> None:
1✔
409
        """
410
        The handler loop of this thread.
411
        """
412
        while self.__ws.connected:
×
413
            try:
×
414
                result: Tuple[int, bytes] = self.__ws.recv_data()
×
415
                frame = result[1]
×
416
                if len(frame) < _msg.size:
×
417
                    # Message is out of protocol
418
                    continue
×
419
            except Exception:  # pylint: disable=broad-except
×
420
                # If closed, ignore error and get out of here
421
                if self.__closed:
×
422
                    break
×
423

424
                # Make someone aware of the error
425
                logger.exception("Error in websocket before close")
×
426

427
                # If we are disconnected before closing, make errors happen
428
                if not self.__ws.connected:
×
429
                    for rt in self.__returns.values():
×
430
                        rt(None)
×
431
                    for hd in self.__handlers.values():
×
432
                        hd(None)
×
433
                    break
×
434
            code, num = _msg.unpack_from(frame, 0)
×
435
            if code == ProxyProtocol.MSG:
×
436
                self.dispatch_message(num, frame)
×
437
            else:
438
                self.dispatch_return(num, frame)
×
439

440
    def expect_return(self, handler: _WSCB) -> int:
1✔
441
        """
442
        Register a one-shot listener for a call-like message's return.
443

444
        :return: The correlation ID
445
        """
446
        c = self.__correlation_id
×
447
        self.__correlation_id += 1
×
448
        self.__returns[c] = handler
×
449
        return c
×
450

451
    def listen(self, channel_id: int, handler: _WSCB):
1✔
452
        """
453
        Register a persistent listener for one-way messages.
454
        """
455
        self.__handlers[channel_id] = handler
×
456

457
    def dispatch_return(self, correlation_id: int, msg: bytes):
1✔
458
        """
459
        Dispatch a received call-return message.
460
        """
461
        handler = self.__returns.pop(correlation_id, None)
×
462
        if handler:
×
463
            handler(msg)
×
464

465
    def dispatch_message(self, channel_id: int, msg: bytes):
1✔
466
        """
467
        Dispatch a received one-way message.
468
        """
469
        handler = self.__handlers.get(channel_id)
×
470
        if handler:
×
471
            handler(msg)
×
472

473
    def unlisten(self, channel_id: int):
1✔
474
        """
475
        De-register a listener for a channel
476
        """
477
        self.__handlers.pop(channel_id)
×
478

479
    def close(self) -> None:
1✔
480
        """
481
        Mark receiver closed to avoid errors
482
        """
483
        self.__closed = True
×
484

485

486
class _SpallocJob(SessionAware, SpallocJob):
1✔
487
    """
488
    Represents a job in Spalloc.
489

490
    Don't make this yourself. Use :py:class:`SpallocClient` instead.
491
    """
492
    __slots__ = ("__machine_url", "__chip_url",
1✔
493
                 "_keepalive_url", "__keepalive_handle", "__proxy_handle",
494
                 "__proxy_thread", "__proxy_ping")
495

496
    def __init__(self, session: Session, job_handle: str):
1✔
497
        """
498
        :param _Session session:
499
        :param str job_handle:
500
        """
501
        super().__init__(session, job_handle)
×
502
        logger.info("established job at {}", job_handle)
×
503
        self.__machine_url = self._url + "machine"
×
504
        self.__chip_url = self._url + "chip"
×
505
        self._keepalive_url = self._url + "keepalive"
×
506
        self.__keepalive_handle: Optional[Queue] = None
×
507
        self.__proxy_handle: Optional[WebSocket] = None
×
508
        self.__proxy_thread: Optional[_ProxyReceiver] = None
×
509
        self.__proxy_ping: Optional[_ProxyPing] = None
×
510

511
    @overrides(SpallocJob.get_session_credentials_for_db)
1✔
512
    def get_session_credentials_for_db(self):
1✔
513
        config = {}
×
514
        config["SPALLOC", "service uri"] = self._service_url
×
515
        config["SPALLOC", "job uri"] = self._url
×
516
        cookies, headers = self._session_credentials
×
517
        if "Authorization" in headers:
×
518
            # We never write the auth headers themselves; we just extend the
519
            # session
520
            del headers["Authorization"]
×
521
        for k, v in cookies.items():
×
522
            config["COOKIE", k] = v
×
523
        for k, v in headers.items():
×
524
            config["HEADER", k] = v
×
525
        return config
×
526

527
    @overrides(SpallocJob.get_state)
1✔
528
    def get_state(self, wait_for_change: bool = False) -> SpallocState:
1✔
529
        timeout: Optional[int] = 10
×
530
        if wait_for_change:
×
531
            timeout = None
×
532
        obj = self._get(
×
533
            self._url, wait=wait_for_change, timeout=timeout).json()
534
        return SpallocState[obj["state"]]
×
535

536
    @overrides(SpallocJob.get_root_host)
1✔
537
    def get_root_host(self) -> Optional[str]:
1✔
538
        r = self._get(self.__machine_url)
×
539
        if r.status_code == 204:
×
540
            return None
×
541
        obj = r.json()
×
542
        for c in obj["connections"]:
×
543
            [x, y], host = c
×
544
            if x == 0 and y == 0:
×
545
                return host
×
546
        return None
×
547

548
    @overrides(SpallocJob.get_connections)
1✔
549
    def get_connections(self) -> Dict[XY, str]:
1✔
550
        r = self._get(self.__machine_url)
×
551
        if r.status_code == 204:
×
552
            return {}
×
553
        return {
×
554
            (int(x), int(y)): str(host)
555
            for ((x, y), host) in r.json()["connections"]
556
        }
557

558
    @property
1✔
559
    def __proxy_url(self) -> Optional[str]:
1✔
560
        """
561
        The URL for talking to the proxy connection system.
562
        """
563
        r = self._get(self._url)
×
564
        if r.status_code == 204:
×
565
            return None
×
566
        try:
×
567
            url = r.json()["proxy-ref"]
×
568
            logger.info("Connecting to proxy on {}", url)
×
569
            return url
×
570
        except KeyError:
×
571
            return None
×
572

573
    def __init_proxy(self) -> _ProxyReceiver:
1✔
574
        if self.__proxy_handle is None or not self.__proxy_handle.connected:
×
575
            if self.__proxy_url is None:
×
576
                raise ValueError("no proxy available")
×
577
            self.__proxy_handle = self._websocket(
×
578
                self.__proxy_url, origin=get_hostname(self._url))
579
            self.__proxy_thread = _ProxyReceiver(self.__proxy_handle)
×
580
            self.__proxy_ping = _ProxyPing(self.__proxy_handle)
×
581
        assert self.__proxy_thread is not None
×
582
        return self.__proxy_thread
×
583

584
    @overrides(SpallocJob.connect_to_board)
1✔
585
    def connect_to_board(
1✔
586
            self, x: int, y: int,
587
            port: int = SCP_SCAMP_PORT) -> SpallocSCPConnection:
588
        proxy = self.__init_proxy()
×
589
        return _ProxiedSCAMPConnection(
×
590
            self.__proxy_handle, proxy, int(x), int(y), int(port))
591

592
    @overrides(SpallocJob.connect_for_booting)
1✔
593
    def connect_for_booting(self) -> SpallocBootConnection:
1✔
594
        proxy = self.__init_proxy()
×
595
        return _ProxiedBootConnection(self.__proxy_handle, proxy)
×
596

597
    @overrides(SpallocJob.open_eieio_connection)
1✔
598
    def open_eieio_connection(self, x: int, y: int) -> SpallocEIEIOConnection:
1✔
599
        proxy = self.__init_proxy()
×
600
        return _ProxiedEIEIOConnection(
×
601
            self.__proxy_handle, proxy, int(x), int(y), SCP_SCAMP_PORT)
602

603
    @overrides(SpallocJob.open_eieio_listener_connection)
1✔
604
    def open_eieio_listener_connection(self) -> SpallocEIEIOListener:
1✔
605
        proxy = self.__init_proxy()
×
606
        return _ProxiedEIEIOListener(
×
607
            self.__proxy_handle, proxy, self.get_connections())
608

609
    @overrides(SpallocJob.open_udp_listener_connection)
1✔
610
    def open_udp_listener_connection(self) -> UDPConnection:
1✔
611
        proxy = self.__init_proxy()
×
612
        return _ProxiedUDPListener(
×
613
            self.__proxy_handle, proxy, self.get_connections())
614

615
    @overrides(SpallocJob.wait_for_state_change)
1✔
616
    def wait_for_state_change(self, old_state: SpallocState,
1✔
617
                              timeout: Optional[int] = None) -> SpallocState:
618
        while old_state != SpallocState.DESTROYED:
×
619
            obj = self._get(self._url, wait="true", timeout=timeout).json()
×
620
            s = SpallocState[obj["state"]]
×
621
            if s != old_state or s == SpallocState.DESTROYED:
×
622
                return s
×
623
        return old_state
×
624

625
    @overrides(SpallocJob.wait_until_ready)
1✔
626
    def wait_until_ready(self, timeout: Optional[int] = None,
1✔
627
                         n_retries: Optional[int] = None):
628
        state = self.get_state()
×
629
        retries = 0
×
630
        while (state != SpallocState.READY and
×
631
               (n_retries is None or retries < n_retries)):
632
            retries += 1
×
633
            state = self.wait_for_state_change(state, timeout=timeout)
×
634
            if state == SpallocState.DESTROYED:
×
635
                raise SpallocException("job was unexpectedly destroyed")
×
636

637
    @overrides(SpallocJob.destroy)
1✔
638
    def destroy(self, reason: str = "finished") -> None:
1✔
639
        if self.__keepalive_handle:
×
640
            self.__keepalive_handle.close()
×
641
            self.__keepalive_handle = None
×
642
        if self.__proxy_handle is not None:
×
643
            if self.__proxy_thread:
×
644
                self.__proxy_thread.close()
×
645
            if self.__proxy_ping:
×
646
                self.__proxy_ping.close()
×
647
            self.__proxy_handle.close()
×
648
        self._delete(self._url, reason=str(reason))
×
649
        logger.info("deleted job at {}", self._url)
×
650

651
    @overrides(SpallocJob.keepalive)
1✔
652
    def keepalive(self) -> None:
1✔
653
        self._put(self._keepalive_url, "alive")
×
654

655
    @overrides(SpallocJob.launch_keepalive_task, extend_doc=True)
1✔
656
    def launch_keepalive_task(
1✔
657
            self, period: float = 30) -> ContextManager[Process]:
658
        """
659
        .. note::
660
            Tricky! *Cannot* be done with a thread, as the main thread is known
661
            to do significant amounts of CPU-intensive work.
662
        """
663
        if self.__keepalive_handle is not None:
×
664
            raise SpallocException("cannot keep job alive from two tasks")
×
665
        q: Queue = Queue(1)
×
666
        p = Process(target=_SpallocKeepalive, args=(
×
667
            self._keepalive_url, 0 + period, q,
668
            *self._session_credentials), daemon=True)
669
        p.start()
×
670
        self.__keepalive_handle = q
×
671
        return self.__closer(q, p)
×
672

673
    @contextmanager
1✔
674
    def __closer(self, q: Queue, p: Process) -> Iterator[Process]:
1✔
675
        try:
×
676
            yield p
×
677
        finally:
678
            q.put("quit")
×
679
            # Give it a second, and if it still isn't dead, kill it
680
            p.join(1)
×
681
            if p.is_alive():
×
682
                p.kill()
×
683

684
    @overrides(SpallocJob.where_is_machine)
1✔
685
    def where_is_machine(self, x: int, y: int) -> Optional[
1✔
686
            Tuple[int, int, int]]:
687
        r = self._get(self.__chip_url, x=int(x), y=int(y))
×
688
        if r.status_code == 204:
×
689
            return None
×
690
        return cast(Tuple[int, int, int], tuple(
×
691
            r.json()["physical-board-coordinates"]))
692

693
    @property
1✔
694
    def _keepalive_handle(self) -> Optional[Queue]:
1✔
695
        return self.__keepalive_handle
×
696

697
    @_keepalive_handle.setter
1✔
698
    def _keepalive_handle(self, handle: Queue):
1✔
699
        if self.__keepalive_handle is not None:
×
700
            raise SpallocException("cannot keep job alive from two tasks")
×
701
        self.__keepalive_handle = handle
×
702

703
    @overrides(SpallocJob.create_transceiver)
1✔
704
    def create_transceiver(self) -> Transceiver:
1✔
705
        if self.get_state() != SpallocState.READY:
×
706
            raise SpallocException("job not ready to execute scripts")
×
707
        proxies: List[Connection] = [
×
708
            self.connect_to_board(x, y) for (x, y) in self.get_connections()]
709
        # Also need a boot connection
710
        proxies.append(self.connect_for_booting())
×
711
        return create_transceiver_from_connections(connections=proxies)
×
712

713
    def __repr__(self):
1✔
714
        return f"SpallocJob({self._url})"
×
715

716

717
class _ProxiedConnection(metaclass=AbstractBase):
1✔
718
    """
719
    Core multiplexer/demultiplexer emulating a connection that is proxied
720
    over a websocket.
721

722
    None of the methods are public because subclasses may expose a profile of
723
    them to conform to a particular type of connection.
724
    """
725

726
    def __init__(self, ws: WebSocket, receiver: _ProxyReceiver):
1✔
727
        self.__ws: Optional[WebSocket] = ws
×
728
        self.__receiver: Optional[_ProxyReceiver] = receiver
×
729
        self.__msgs: queue.SimpleQueue = queue.SimpleQueue()
×
730
        self.__call_queue: queue.Queue = queue.Queue(1)
×
731
        self.__call_lock = threading.RLock()
×
732
        self.__current_msg: Optional[bytes] = None
×
733
        self.__handle = self._open_connection()
×
734
        self.__receiver.listen(self.__handle, self.__msgs.put)
×
735

736
    @abstractmethod
1✔
737
    def _open_connection(self) -> int:
1✔
738
        raise NotImplementedError
739

740
    def _call(self, protocol: ProxyProtocol, packer: struct.Struct,
1✔
741
              unpacker: struct.Struct, *args) -> Tuple[Any, ...]:
742
        """
743
        Do a synchronous call.
744

745
        :param protocol:
746
            The protocol message number.
747
        :param packer:
748
            How to form the protocol message. The first two arguments passed
749
            will be the protocol message number and an issued correlation ID
750
            (not needed by the caller).
751
        :param unpacker:
752
            How to extract the expected response.
753
        :param args:
754
            Additional arguments to pass to the packer.
755
        :return:
756
            The results from the unpacker *after* the protocol message code and
757
            the correlation ID.
758
        :raises IOError:
759
            If something goes wrong. This could be due to the websocket being
760
            closed, or the receipt of an ERROR response.
761
        """
762
        if not self._connected:
×
763
            raise IOError("socket closed")
×
764
        if not self.__receiver:
×
765
            raise IOError("socket closed")
×
766
        if not self.__ws:
×
767
            raise IOError("socket closed")
×
768
        with self.__call_lock:
×
769
            # All calls via websocket use correlation_id
770
            correlation_id = self.__receiver.expect_return(
×
771
                self.__call_queue.put)
772
            self.__ws.send_binary(packer.pack(protocol, correlation_id, *args))
×
773
            if not self._connected:
×
774
                raise IOError("socket closed after send!")
×
775
            reply = self.__call_queue.get()
×
776
            code, _ = _msg.unpack_from(reply, 0)
×
777
            if code == ProxyProtocol.ERROR:
×
778
                # Rest of message is UTF-8 encoded error message string
779
                payload = reply[_msg.size:].decode("utf-8")
×
780
                if len(payload):
×
781
                    raise _ProxyServiceError(payload)
×
782
                raise _ProxyServiceError(
×
783
                    f"unknown problem with {protocol} call")
784
            return unpacker.unpack(reply)[2:]
×
785

786
    @property
1✔
787
    def _connected(self) -> bool:
1✔
788
        return bool(self.__ws and self.__ws.connected)
×
789

790
    def _throw_if_closed(self) -> None:
1✔
791
        if not self._connected:
×
792
            raise IOError("socket closed")
×
793

794
    def _close(self) -> None:
1✔
795
        if self._connected:
×
796
            channel_id, = self._call(
×
797
                ProxyProtocol.CLOSE, _close_req, _open_close_res,
798
                self.__handle)
799
            if channel_id != self.__handle:
×
800
                raise IOError("failed to close proxy socket")
×
801
        if self.__receiver:
×
802
            self.__receiver.unlisten(self.__handle)
×
803
        self.__ws = None
×
804
        self.__receiver = None
×
805

806
    def _send(self, message: bytes):
1✔
807
        self._throw_if_closed()
×
808
        # Put the header on the front and send it
809
        if not self.__ws:
×
810
            raise IOError("socket closed")
×
811
        self.__ws.send_binary(_msg.pack(
×
812
            ProxyProtocol.MSG, self.__handle) + message)
813

814
    def _send_to(self, message: bytes, x: int, y: int, port: int):
1✔
815
        self._throw_if_closed()
×
816
        # Put the header on the front and send it
817
        if not self.__ws:
×
818
            raise IOError("socket closed")
×
819
        self.__ws.send_binary(_msg_to.pack(
×
820
            ProxyProtocol.MSG_TO, self.__handle, x, y, port) + message)
821

822
    def __get(self, timeout: float = 0.5) -> bytes:
1✔
823
        """
824
        Get a value from the queue. Handles block/non-block switching and
825
        trimming of the message protocol prefix.
826
        """
827
        if not timeout:
×
828
            return self.__msgs.get(block=False)[_msg.size:]
×
829
        else:
830
            return self.__msgs.get(timeout=timeout)[_msg.size:]
×
831

832
    def _receive(self, timeout: Optional[float] = None) -> bytes:
1✔
833
        if self.__current_msg is not None:
×
834
            try:
×
835
                return self.__current_msg
×
836
            finally:
837
                self.__current_msg = None
×
838
        if timeout is None:
×
839
            while True:
840
                try:
×
841
                    return self.__get()
×
842
                except queue.Empty:
×
843
                    self._throw_if_closed()
×
844
        else:
845
            try:
×
846
                return self.__get(timeout)
×
847
            except queue.Empty as e:
×
848
                self._throw_if_closed()
×
849
                raise SpinnmanTimeoutException("receive", timeout) from e
×
850

851
    def _is_ready_to_receive(self, timeout: float = 0) -> bool:
1✔
852
        # If we already have a message or the queue peek succeeds, return now
853
        if self.__current_msg is not None or not self.__msgs.empty():
×
854
            return True
×
855
        try:
×
856
            self.__current_msg = self.__get(timeout)
×
857
            return True
×
858
        except queue.Empty:
×
859
            return False
×
860

861

862
class _ProxiedBidirectionalConnection(
1✔
863
        _ProxiedConnection, SpallocProxiedConnection):
864
    """
865
    A connection that talks to a particular board via the proxy.
866
    """
867

868
    def __init__(
1✔
869
            self, ws: WebSocket, receiver: _ProxyReceiver,
870
            x: int, y: int, port: int):
871
        self.__connect_args = (x, y, port)
×
872
        super().__init__(ws, receiver)
×
873

874
    @overrides(_ProxiedConnection._open_connection)
1✔
875
    def _open_connection(self):
1✔
876
        handle, = self._call(
×
877
            ProxyProtocol.OPEN, _open_req, _open_close_res,
878
            *self.__connect_args)
879
        return handle
×
880

881
    @overrides(Connection.is_connected)
1✔
882
    def is_connected(self) -> bool:
1✔
883
        return self._connected
×
884

885
    @overrides(Connection.close)
1✔
886
    def close(self):
1✔
887
        self._close()
×
888

889
    @overrides(SpallocProxiedConnection.send)
1✔
890
    def send(self, data: bytes):
1✔
891
        if not isinstance(data, (bytes, bytearray)):
×
892
            data = bytes(data)
×
893
        self._send(data)
×
894

895
    @overrides(SpallocProxiedConnection.receive)
1✔
896
    def receive(self, timeout: Optional[float] = None) -> bytes:
1✔
897
        return self._receive(timeout)
×
898

899
    @overrides(Listenable.is_ready_to_receive)
1✔
900
    def is_ready_to_receive(self, timeout: float = 0) -> bool:
1✔
901
        return self._is_ready_to_receive(timeout)
×
902

903
    @abstractmethod
1✔
904
    def __str__(self) -> str:
1✔
905
        raise NotImplementedError
906

907

908
class _ProxiedUnboundConnection(
1✔
909
        _ProxiedConnection, SpallocProxiedConnection):
910
    """
911
    A connection that can listen to all boards via the proxy, but which can
912
    only send if a target board is provided.
913
    """
914

915
    def __init__(self, ws: WebSocket, receiver: _ProxyReceiver):
1✔
916
        super().__init__(ws, receiver)
×
917
        self.__addr: Optional[str] = None
×
918
        self.__port: Optional[int] = None
×
919

920
    @overrides(_ProxiedConnection._open_connection)
1✔
921
    def _open_connection(self) -> int:
1✔
922
        handle, ip1, ip2, ip3, ip4, self.__port = self._call(
×
923
            ProxyProtocol.OPEN_UNBOUND, _open_listen_req, _open_listen_res)
924
        # Assemble the address into the format expected elsewhere
925
        self.__addr = f"{ip1}.{ip2}.{ip3}.{ip4}"
×
926
        return handle
×
927

928
    @property
1✔
929
    def _addr(self) -> Optional[str]:
1✔
930
        return self.__addr if self._connected else None
×
931

932
    @property
1✔
933
    def _port(self) -> Optional[int]:
1✔
934
        return self.__port if self._connected else None
×
935

936
    @overrides(Connection.is_connected)
1✔
937
    def is_connected(self) -> bool:
1✔
938
        return self._connected
×
939

940
    @overrides(Connection.close)
1✔
941
    def close(self) -> None:
1✔
942
        self._close()
×
943

944
    @overrides(SpallocProxiedConnection.send)
1✔
945
    def send(self, data: bytes) -> None:
1✔
946
        self._throw_if_closed()
×
947
        raise IOError("socket is not open for sending")
×
948

949
    @overrides(SpallocProxiedConnection.receive)
1✔
950
    def receive(self, timeout: Optional[float] = None) -> bytes:
1✔
951
        return self._receive(timeout)
×
952

953
    @overrides(Listenable.is_ready_to_receive)
1✔
954
    def is_ready_to_receive(self, timeout: float = 0) -> bool:
1✔
955
        return self._is_ready_to_receive(timeout)
×
956

957
    @abstractmethod
1✔
958
    def __str__(self) -> str:
1✔
959
        raise NotImplementedError
960

961

962
class _ProxiedSCAMPConnection(
1✔
963
        _ProxiedBidirectionalConnection, SpallocSCPConnection):
964
    __slots__ = ("__chip_x", "__chip_y")
1✔
965

966
    def __init__(
1✔
967
            self, ws: WebSocket, receiver: _ProxyReceiver,
968
            x: int, y: int, port: int):
969
        super().__init__(ws, receiver, x, y, port)
×
970
        SpallocSCPConnection.__init__(self, x, y)
×
971

972
    def __str__(self):
1✔
973
        return f"SCAMPConnection[proxied]({self.chip_x},{self.chip_y})"
×
974

975

976
class _ProxiedBootConnection(
1✔
977
        _ProxiedBidirectionalConnection, SpallocBootConnection):
978
    __slots__ = ()
1✔
979

980
    def __init__(self, ws: WebSocket, receiver: _ProxyReceiver):
1✔
981
        super().__init__(ws, receiver, 0, 0, UDP_BOOT_CONNECTION_DEFAULT_PORT)
×
982

983
    def __str__(self):
1✔
984
        return "BootConnection[proxied]()"
×
985

986

987
class _ProxiedEIEIOConnection(
1✔
988
        _ProxiedBidirectionalConnection,
989
        SpallocEIEIOConnection, SpallocProxiedConnection):
990
    # Special: This is a unidirectional receive-only connection
991
    __slots__ = ("__addr", "__port", "__chip_x", "__chip_y")
1✔
992

993
    def __init__(
1✔
994
            self, ws: WebSocket, receiver: _ProxyReceiver,
995
            x: int, y: int, port: int):
996
        super().__init__(ws, receiver, x, y, port)
×
997
        self.__chip_x = x
×
998
        self.__chip_y = y
×
999

1000
    @property
1✔
1001
    @overrides(SpallocEIEIOConnection._coords)
1✔
1002
    def _coords(self) -> XY:
1✔
1003
        return self.__chip_x, self.__chip_y
×
1004

1005
    def send_to(
1✔
1006
            self,
1007
            data: bytes, address: tuple):  # pylint: disable=unused-argument
1008
        """
1009
        Direct ``send_to`` is unsupported.
1010
        """
1011
        self._throw_if_closed()
×
1012
        raise IOError("socket is not open for sending")
×
1013

1014
    def __str__(self):
1✔
1015
        return (f"EIEIOConnection[proxied](remote:{self.__chip_x},"
×
1016
                f"{self.__chip_y})")
1017

1018

1019
class _ProxiedEIEIOListener(_ProxiedUnboundConnection, SpallocEIEIOListener):
1✔
1020
    __slots__ = ("__conns", )
1✔
1021

1022
    def __init__(self, ws: WebSocket, receiver: _ProxyReceiver,
1✔
1023
                 conns: Dict[XY, str]):
1024
        super().__init__(ws, receiver)
×
1025
        # Invert the map
1026
        self.__conns = {ip: xy for (xy, ip) in conns.items()}
×
1027

1028
    @overrides(SpallocEIEIOListener.send_to_chip)
1✔
1029
    def send_to_chip(
1✔
1030
            self, message: bytes, x: int, y: int, port: int = SCP_SCAMP_PORT):
1031
        if not isinstance(message, (bytes, bytearray)):
×
1032
            message = bytes(message)
×
1033
        self._send_to(bytes(message), x, y, port)
×
1034

1035
    @property
1✔
1036
    @overrides(SpallocEIEIOListener.local_ip_address)
1✔
1037
    def local_ip_address(self) -> str:
1✔
1038
        return self._addr or "0.0.0.0"
×
1039

1040
    @property
1✔
1041
    @overrides(SpallocEIEIOListener.local_port)
1✔
1042
    def local_port(self) -> int:
1✔
1043
        return self._port or 0
×
1044

1045
    @overrides(SpallocEIEIOListener._get_chip_coords)
1✔
1046
    def _get_chip_coords(self, ip_address: str) -> XY:
1✔
1047
        return self.__conns[str(ip_address)]
×
1048

1049
    def __str__(self):
1✔
1050
        return f"EIEIOConnection[proxied](local:{self._addr}:{self._port})"
×
1051

1052

1053
class _ProxiedUDPListener(_ProxiedUnboundConnection, UDPConnection):
1✔
1054
    __slots__ = ("__conns", )
1✔
1055

1056
    def __init__(self, ws: WebSocket, receiver: _ProxyReceiver,
1✔
1057
                 conns: Dict[XY, str]):
1058
        super().__init__(ws, receiver)
×
1059
        # Invert the map
1060
        self.__conns = {ip: xy for (xy, ip) in conns.items()}
×
1061

1062
    @overrides(UDPConnection.send_to)
1✔
1063
    def send_to(self, data: bytes, address: Tuple[str, int]):
1✔
1064
        ip, port = address
×
1065
        x, y = self.__conns[ip]
×
1066
        self._send_to(data, x, y, port)
×
1067

1068
    @property
1✔
1069
    @overrides(UDPConnection.local_ip_address)
1✔
1070
    def local_ip_address(self) -> str:
1✔
1071
        return self._addr or "0.0.0.0"
×
1072

1073
    @property
1✔
1074
    @overrides(UDPConnection.local_port)
1✔
1075
    def local_port(self) -> int:
1✔
1076
        return self._port or 0
×
1077

1078
    def __str__(self):
1✔
1079
        return f"UDPConnection[proxied](local:{self._addr}:{self._port})"
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc