• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / SpiNNMan / 6561889142

18 Oct 2023 01:51PM UTC coverage: 50.777% (-0.005%) from 50.782%
6561889142

push

github

web-flow
Merge pull request #373 from SpiNNakerManchester/pylint_fixes

Minor param renaming, doc fixes and ignores for spelling checker

90 of 1258 branches covered (0.0%)

Branch coverage included in aggregate %.

14 of 14 new or added lines in 5 files covered. (100.0%)

4584 of 7947 relevant lines covered (57.68%)

0.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

31.36
/spinnman/spalloc/spalloc_client.py
1
# Copyright (c) 2021 The University of Manchester
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     https://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
"""
1✔
15
Implementation of the client for the Spalloc web service.
16
"""
17

18
from logging import getLogger
1✔
19
from multiprocessing import Process, Queue
1✔
20
from time import sleep
1✔
21
from packaging.version import Version
1✔
22
from urllib.parse import urlparse, urlunparse, ParseResult
1✔
23
import queue
1✔
24
import requests
1✔
25
import struct
1✔
26
import threading
1✔
27
from typing import Dict, List, Tuple
1✔
28
from websocket import WebSocket
1✔
29
from spinn_utilities.abstract_base import AbstractBase, abstractmethod
1✔
30
from spinn_utilities.abstract_context_manager import AbstractContextManager
1✔
31
from spinn_utilities.log import FormatAdapter
1✔
32
from spinn_utilities.overrides import overrides
1✔
33
from spinnman.connections.udp_packet_connections import UDPConnection
1✔
34
from spinnman.connections.abstract_classes import Connection, Listenable
1✔
35
from spinnman.constants import SCP_SCAMP_PORT, UDP_BOOT_CONNECTION_DEFAULT_PORT
1✔
36
from spinnman.exceptions import SpinnmanTimeoutException
1✔
37
from spinnman.exceptions import SpallocException
1✔
38
from spinnman.transceiver import (
1✔
39
    Transceiver, create_transceiver_from_connections)
40
from .spalloc_state import SpallocState
1✔
41
from .proxy_protocol import ProxyProtocol
1✔
42
from .session import Session, SessionAware
1✔
43
from .utils import parse_service_url, get_hostname
1✔
44
from .abstract_spalloc_client import AbstractSpallocClient
1✔
45
from .spalloc_machine import SpallocMachine
1✔
46
from .spalloc_job import SpallocJob
1✔
47
from .spalloc_proxied_connection import SpallocProxiedConnection
1✔
48
from .spalloc_boot_connection import SpallocBootConnection
1✔
49
from .spalloc_eieio_connection import SpallocEIEIOConnection
1✔
50
from .spalloc_eieio_listener import SpallocEIEIOListener
1✔
51
from .spalloc_scp_connection import SpallocSCPConnection
1✔
52

53
logger = FormatAdapter(getLogger(__name__))
1✔
54
_open_req = struct.Struct("<IIIII")
1✔
55
_close_req = struct.Struct("<III")
1✔
56
_open_listen_req = struct.Struct("<II")
1✔
57
# Open and close share the response structure
58
_open_close_res = struct.Struct("<III")
1✔
59
_open_listen_res = struct.Struct("<IIIBBBBI")
1✔
60
_msg = struct.Struct("<II")
1✔
61
_msg_to = struct.Struct("<IIIII")
1✔
62

63

64
def fix_url(url):
1✔
65
    parts = urlparse(url)
×
66
    if parts.scheme != 'https':
×
67
        parts = ParseResult("https", parts.netloc, parts.path,
×
68
                            parts.params, parts. query, parts.fragment)
69
    if not parts.path.endswith("/"):
×
70
        parts = ParseResult(parts.scheme, parts.netloc, parts.path + "/",
×
71
                            parts.params, parts.query, parts.fragment)
72
    return urlunparse(parts)
×
73

74

75
class SpallocClient(AbstractContextManager, AbstractSpallocClient):
1✔
76
    """
77
    Basic client library for talking to new Spalloc.
78
    """
79
    __slots__ = ("__session",
1✔
80
                 "__machines_url", "__jobs_url", "version",
81
                 "__group", "__collab", "__nmpi_job", "__nmpi_user")
82

83
    def __init__(
1✔
84
            self, service_url, username=None, password=None,
85
            bearer_token=None, group=None, collab=None, nmpi_job=None,
86
            nmpi_user=None):
87
        """
88
        :param str service_url: The reference to the service.
89
            May have username and password supplied as part of the network
90
            location; if so, the ``username`` and ``password`` arguments
91
            *must* be ``None``. If ``username`` and ``password`` are not given,
92
            not even within the URL, the ``bearer_token`` must be not ``None``.
93
        :param str username: The user name to use
94
        :param str password: The password to use
95
        :param str bearer_token: The bearer token to use
96
        """
97
        if username is None and password is None:
×
98
            service_url, username, password = parse_service_url(service_url)
×
99
        self.__session = Session(service_url, username, password, bearer_token)
×
100
        self.__session = Session(service_url, username, password, bearer_token)
×
101
        obj = self.__session.renew()
×
102
        v = obj["version"]
×
103
        self.version = Version(
×
104
            f"{v['major-version']}.{v['minor-version']}.{v['revision']}")
105
        self.__machines_url = fix_url(obj["machines-ref"])
×
106
        self.__jobs_url = fix_url(obj["jobs-ref"])
×
107
        self.__group = group
×
108
        self.__collab = collab
×
109
        self.__nmpi_job = nmpi_job
×
110
        self.__nmpi_user = nmpi_user
×
111
        logger.info("established session to {} for {}", service_url, username)
×
112

113
    @staticmethod
1✔
114
    def open_job_from_database(
1✔
115
            service_url, job_url, cookies, headers) -> SpallocJob:
116
        """
117
        Create a job from the description in the attached database. This is
118
        intended to allow for access to the job's allocated resources from
119
        visualisers and other third party code participating in the SpiNNaker
120
        Tools Notification Protocol.
121

122
        .. note::
123
            The job is not verified to exist and be running. The session
124
            credentials may have expired; if so, the job will be unable to
125
            regenerate them.
126

127
        :param str service_url:
128
        :param str job_url:
129
        :param dict(str, str) cookies:
130
        :param dict(str, str) headers:
131

132
        :return:
133
            The job handle, or ``None`` if the records in the database are
134
            absent or incomplete.
135
        :rtype: SpallocJob
136
        """
137
        session = Session(service_url, session_credentials=(cookies, headers))
×
138
        return _SpallocJob(session, job_url)
×
139

140
    @overrides(AbstractSpallocClient.list_machines)
1✔
141
    def list_machines(self):
1✔
142
        obj = self.__session.get(self.__machines_url).json()
×
143
        return {m["name"]: _SpallocMachine(self, m) for m in obj["machines"]}
×
144

145
    @overrides(AbstractSpallocClient.list_jobs)
1✔
146
    def list_jobs(self, deleted=False):
1✔
147
        obj = self.__session.get(
×
148
            self.__jobs_url,
149
            deleted=("true" if deleted else "false")).json()
150
        while obj["jobs"]:
×
151
            for u in obj["jobs"]:
×
152
                yield _SpallocJob(self.__session, fix_url(u))
×
153
            if "next" not in obj:
×
154
                break
×
155
            obj = self.__session.get(obj["next"]).json()
×
156

157
    def _create(self, create, machine_name):
1✔
158
        if machine_name:
×
159
            create["machine-name"] = machine_name
×
160
        else:
161
            create["tags"] = ["default"]
×
162
        if self.__group is not None:
×
163
            create["group"] = self.__group
×
164
        if self.__collab is not None:
×
165
            create["nmpi-collab"] = self.__collab
×
166
        if self.__nmpi_job is not None:
×
167
            create["nmpi-job-id"] = self.__nmpi_job
×
168
            if self.__nmpi_user is not None:
×
169
                create["owner"] = self.__nmpi_user
×
170
        r = self.__session.post(self.__jobs_url, create, timeout=30)
×
171
        url = r.headers["Location"]
×
172
        return _SpallocJob(self.__session, fix_url(url))
×
173

174
    @overrides(AbstractSpallocClient.create_job)
1✔
175
    def create_job(self, num_boards=1, machine_name=None, keepalive=45):
1✔
176
        return self._create({
×
177
            "num-boards": int(num_boards),
178
            "keepalive-interval": f"PT{int(keepalive)}S"
179
        }, machine_name)
180

181
    @overrides(AbstractSpallocClient.create_job_rect)
1✔
182
    def create_job_rect(self, width, height, machine_name=None, keepalive=45):
1✔
183
        return self._create({
×
184
            "dimensions": {
185
                "width": int(width),
186
                "height": int(height)
187
            },
188
            "keepalive-interval": f"PT{int(keepalive)}S"
189
        }, machine_name)
190

191
    @overrides(AbstractSpallocClient.create_job_board)
1✔
192
    def create_job_board(
1✔
193
            self, triad=None, physical=None, ip_address=None,
194
            machine_name=None, keepalive=45):
195
        if triad:
×
196
            x, y, z = triad
×
197
            board = {"x": int(x), "y": int(y), "z": int(z)}
×
198
        elif physical:
×
199
            c, f, b = physical
×
200
            board = {"cabinet": int(c), "frame": int(f), "board": int(b)}
×
201
        elif ip_address:
×
202
            board = {"address": str(ip_address)}
×
203
        else:
204
            raise KeyError("at least one of triad, physical and ip_address "
×
205
                           "must be given")
206
        return self._create({
×
207
            "board": board,
208
            "keepalive-interval": f"PT{int(keepalive)}S"
209
        }, machine_name)
210

211
    @overrides(AbstractSpallocClient.create_job_rect_at_board)
1✔
212
    def create_job_rect_at_board(
1✔
213
            self, width, height, triad=None, physical=None, ip_address=None,
214
            machine_name=None, keepalive=45, max_dead_boards=0):
215
        if triad:
×
216
            x, y, z = triad
×
217
            board = {"x": int(x), "y": int(y), "z": int(z)}
×
218
        elif physical:
×
219
            c, f, b = physical
×
220
            board = {"cabinet": int(c), "frame": int(f), "board": int(b)}
×
221
        elif ip_address:
×
222
            board = {"address": str(ip_address)}
×
223
        else:
224
            raise KeyError("at least one of triad, physical and ip_address "
×
225
                           "must be given")
226
        return self._create({
×
227
            "dimensions": {
228
                "width": int(width),
229
                "height": int(height)
230
            },
231
            "board": board,
232
            "keepalive-interval": f"PT{int(keepalive)}S",
233
            "max-dead-boards": int(max_dead_boards)
234
        }, machine_name)
235

236
    def close(self):
1✔
237
        # pylint: disable=protected-access
238
        if self.__session is not None:
×
239
            self.__session._purge()
×
240
        self.__session = None
×
241

242

243
class _ProxyServiceError(IOError):
1✔
244
    """
245
    An error passed to us from the server over the proxy channel.
246
    """
247

248

249
def _SpallocKeepalive(url, interval, term_queue, cookies, headers):
1✔
250
    """
251
    Actual keepalive task implementation. Don't use directly.
252
    """
253
    headers["Content-Type"] = "text/plain; charset=UTF-8"
×
254
    while True:
255
        requests.put(url, data="alive", cookies=cookies, headers=headers,
×
256
                     allow_redirects=False, timeout=10)
257
        try:
×
258
            term_queue.get(True, interval)
×
259
            break
×
260
        except queue.Empty:
×
261
            continue
×
262

263

264
class _SpallocMachine(SessionAware, SpallocMachine):
1✔
265
    """
266
    Represents a Spalloc-controlled machine.
267

268
    Don't make this yourself. Use :py:class:`SpallocClient` instead.
269
    """
270
    __slots__ = ("__name", "__tags", "__width", "__height",
1✔
271
                 "__dead_boards", "__dead_links")
272

273
    def __init__(self, session, machine_data):
1✔
274
        """
275
        :param _Session session:
276
        :param dict machine_data:
277
        """
278
        super().__init__(session, machine_data["uri"])
×
279
        self.__name = machine_data["name"]
×
280
        self.__tags = frozenset(machine_data["tags"])
×
281
        self.__width = machine_data["width"]
×
282
        self.__height = machine_data["height"]
×
283
        self.__dead_boards = machine_data["dead-boards"]
×
284
        self.__dead_links = machine_data["dead-links"]
×
285

286
    @property
1✔
287
    @overrides(SpallocMachine.name)
1✔
288
    def name(self):
1✔
289
        return self.__name
×
290

291
    @property
1✔
292
    @overrides(SpallocMachine.tags)
1✔
293
    def tags(self):
1✔
294
        return self.__tags
×
295

296
    @property
1✔
297
    @overrides(SpallocMachine.width)
1✔
298
    def width(self):
1✔
299
        return self.__width
×
300

301
    @property
1✔
302
    @overrides(SpallocMachine.height)
1✔
303
    def height(self):
1✔
304
        return self.__height
×
305

306
    @property
1✔
307
    @overrides(SpallocMachine.dead_boards)
1✔
308
    def dead_boards(self):
1✔
309
        return self.__dead_boards
×
310

311
    @property
1✔
312
    @overrides(SpallocMachine.dead_links)
1✔
313
    def dead_links(self):
1✔
314
        return self.__dead_links
×
315

316
    @property
1✔
317
    @overrides(SpallocMachine.area)
1✔
318
    def area(self):
1✔
319
        return (self.width, self.height)
×
320

321
    def __repr__(self):
1✔
322
        return "SpallocMachine" + str((
×
323
            self.name, self.tags, self.width, self.height, self.dead_boards,
324
            self.dead_links))
325

326

327
class _ProxyPing(threading.Thread):
1✔
328
    """
329
    Sends ping messages to an open websocket
330
    """
331

332
    def __init__(self, ws, sleep_time=30):
1✔
333
        super().__init__(daemon=True)
×
334
        self.__ws = ws
×
335
        self.__sleep_time = sleep_time
×
336
        self.__closed = False
×
337
        self.start()
×
338

339
    def run(self):
1✔
340
        """
341
        The handler loop of this thread
342
        """
343
        while self.__ws.connected:
×
344
            try:
×
345
                self.__ws.ping()
×
346
            except Exception:  # pylint: disable=broad-except
×
347
                # If closed, ignore error and get out of here
348
                if self.__closed:
×
349
                    break
×
350

351
                # Make someone aware of the error
352
                logger.exception("Error in websocket before close")
×
353
            sleep(self.__sleep_time)
×
354

355
    def close(self):
1✔
356
        """
357
        Mark as closed to avoid error messages.
358
        """
359
        self.__closed = True
×
360

361

362
class _ProxyReceiver(threading.Thread):
1✔
363
    """
364
    Receives all messages off an open websocket and dispatches them to
365
    registered listeners.
366
    """
367

368
    def __init__(self, ws):
1✔
369
        super().__init__(daemon=True)
×
370
        self.__ws = ws
×
371
        self.__returns = {}
×
372
        self.__handlers = {}
×
373
        self.__correlation_id = 0
×
374
        self.__closed = False
×
375
        self.start()
×
376

377
    def run(self):
1✔
378
        """
379
        The handler loop of this thread.
380
        """
381
        while self.__ws.connected:
×
382
            try:
×
383
                result = self.__ws.recv_data()
×
384
                frame = result[1]
×
385
                if len(frame) < _msg.size:
×
386
                    # Message is out of protocol
387
                    continue
×
388
            except Exception:  # pylint: disable=broad-except
×
389
                # If closed, ignore error and get out of here
390
                if self.__closed:
×
391
                    break
×
392

393
                # Make someone aware of the error
394
                logger.exception("Error in websocket before close")
×
395

396
                # If we are disconnected before closing, make errors happen
397
                if not self.__ws.connected:
×
398
                    for rt in self.__returns.values():
×
399
                        rt(None)
×
400
                    for hd in self.__handlers.values():
×
401
                        hd(None)
×
402
                    break
×
403
            code, num = _msg.unpack_from(frame, 0)
×
404
            if code == ProxyProtocol.MSG:
×
405
                self.dispatch_message(num, frame)
×
406
            else:
407
                self.dispatch_return(num, frame)
×
408

409
    def expect_return(self, handler) -> int:
1✔
410
        """
411
        Register a one-shot listener for a call-like message's return.
412

413
        :return: The correlation ID
414
        """
415
        c = self.__correlation_id
×
416
        self.__correlation_id += 1
×
417
        self.__returns[c] = handler
×
418
        return c
×
419

420
    def listen(self, channel_id: int, handler):
1✔
421
        """
422
        Register a persistent listener for one-way messages.
423
        """
424
        self.__handlers[channel_id] = handler
×
425

426
    def dispatch_return(self, correlation_id: int, msg: bytes):
1✔
427
        """
428
        Dispatch a received call-return message.
429
        """
430
        handler = self.__returns.pop(correlation_id, None)
×
431
        if handler:
×
432
            handler(msg)
×
433

434
    def dispatch_message(self, channel_id: int, msg: bytes):
1✔
435
        """
436
        Dispatch a received one-way message.
437
        """
438
        handler = self.__handlers.get(channel_id, None)
×
439
        if handler:
×
440
            handler(msg)
×
441

442
    def unlisten(self, channel_id):
1✔
443
        """
444
        De-register a listener for a channel
445
        """
446
        self.__handlers.pop(channel_id)
×
447

448
    def close(self):
1✔
449
        """
450
        Mark receiver closed to avoid errors
451
        """
452
        self.__closed = True
×
453

454

455
class _SpallocJob(SessionAware, SpallocJob):
1✔
456
    """
457
    Represents a job in Spalloc.
458

459
    Don't make this yourself. Use :py:class:`SpallocClient` instead.
460
    """
461
    __slots__ = ("__machine_url", "__chip_url",
1✔
462
                 "_keepalive_url", "__keepalive_handle", "__proxy_handle",
463
                 "__proxy_thread", "__proxy_ping")
464

465
    def __init__(self, session, job_handle):
1✔
466
        """
467
        :param _Session session:
468
        :param str job_handle:
469
        """
470
        super().__init__(session, job_handle)
×
471
        logger.info("established job at {}", job_handle)
×
472
        self.__machine_url = self._url + "machine"
×
473
        self.__chip_url = self._url + "chip"
×
474
        self._keepalive_url = self._url + "keepalive"
×
475
        self.__keepalive_handle = None
×
476
        self.__proxy_handle = None
×
477
        self.__proxy_thread = None
×
478
        self.__proxy_ping = None
×
479

480
    @overrides(SpallocJob.get_session_credentials_for_db)
1✔
481
    def get_session_credentials_for_db(self):
1✔
482
        config = {}
×
483
        config["SPALLOC", "service uri"] = self._service_url
×
484
        config["SPALLOC", "job uri"] = self._url
×
485
        cookies, headers = self._session_credentials
×
486
        for k, v in cookies.items():
×
487
            config["COOKIE", k] = v
×
488
        for k, v in headers.items():
×
489
            config["HEADER", k] = v
×
490
        if "Authorization" in headers:
×
491
            # We never write the auth headers themselves; we just extend the
492
            # session
493
            del headers["Authorization"]
×
494
        return config
×
495

496
    @overrides(SpallocJob.get_state)
1✔
497
    def get_state(self, wait_for_change=False):
1✔
498
        timeout = 10
×
499
        if wait_for_change:
×
500
            timeout = None
×
501
        obj = self._get(
×
502
            self._url, wait=wait_for_change, timeout=timeout).json()
503
        return SpallocState[obj["state"]]
×
504

505
    @overrides(SpallocJob.get_root_host)
1✔
506
    def get_root_host(self):
1✔
507
        r = self._get(self.__machine_url)
×
508
        if r.status_code == 204:
×
509
            return None
×
510
        obj = r.json()
×
511
        for c in obj["connections"]:
×
512
            [x, y], host = c
×
513
            if x == 0 and y == 0:
×
514
                return host
×
515
        return None
×
516

517
    @overrides(SpallocJob.get_connections)
1✔
518
    def get_connections(self):
1✔
519
        r = self._get(self.__machine_url)
×
520
        if r.status_code == 204:
×
521
            return None
×
522
        return {
×
523
            (int(x), int(y)): str(host)
524
            for ((x, y), host) in r.json()["connections"]
525
        }
526

527
    @property
1✔
528
    def __proxy_url(self):
1✔
529
        """
530
        The URL for talking to the proxy connection system.
531
        """
532
        r = self._get(self._url)
×
533
        if r.status_code == 204:
×
534
            return None
×
535
        try:
×
536
            url = r.json()["proxy-ref"]
×
537
            logger.info("Connecting to proxy on {}", url)
×
538
            return url
×
539
        except KeyError:
×
540
            return None
×
541

542
    def __init_proxy(self):
1✔
543
        if self.__proxy_handle is None or not self.__proxy_handle.connected:
×
544
            self.__proxy_handle = self._websocket(
×
545
                self.__proxy_url, origin=get_hostname(self._url))
546
            self.__proxy_thread = _ProxyReceiver(self.__proxy_handle)
×
547
            self.__proxy_ping = _ProxyPing(self.__proxy_handle)
×
548

549
    @overrides(SpallocJob.connect_to_board)
1✔
550
    def connect_to_board(self, x, y, port=SCP_SCAMP_PORT):
1✔
551
        self.__init_proxy()
×
552
        return _ProxiedSCAMPConnection(
×
553
            self.__proxy_handle, self.__proxy_thread,
554
            int(x), int(y), int(port))
555

556
    @overrides(SpallocJob.connect_for_booting)
1✔
557
    def connect_for_booting(self):
1✔
558
        self.__init_proxy()
×
559
        return _ProxiedBootConnection(self.__proxy_handle, self.__proxy_thread)
×
560

561
    @overrides(SpallocJob.open_eieio_connection)
1✔
562
    def open_eieio_connection(self, x, y):
1✔
563
        self.__init_proxy()
×
564
        return _ProxiedEIEIOConnection(
×
565
            self.__proxy_handle, self.__proxy_thread,
566
            int(x), int(y), SCP_SCAMP_PORT)
567

568
    @overrides(SpallocJob.open_eieio_listener_connection)
1✔
569
    def open_eieio_listener_connection(self):
1✔
570
        self.__init_proxy()
×
571
        return _ProxiedEIEIOListener(
×
572
            self.__proxy_handle, self.__proxy_thread, self.get_connections())
573

574
    @overrides(SpallocJob.open_udp_listener_connection)
1✔
575
    def open_udp_listener_connection(self):
1✔
576
        self.__init_proxy()
×
577
        return _ProxiedUDPListener(
×
578
            self.__proxy_handle, self.__proxy_thread, self.get_connections())
579

580
    @overrides(SpallocJob.wait_for_state_change)
1✔
581
    def wait_for_state_change(self, old_state, timeout=None):
1✔
582
        while old_state != SpallocState.DESTROYED:
×
583
            obj = self._get(self._url, wait="true", timeout=timeout).json()
×
584
            s = SpallocState[obj["state"]]
×
585
            if s != old_state or s == SpallocState.DESTROYED:
×
586
                return s
×
587
        return old_state
×
588

589
    @overrides(SpallocJob.wait_until_ready)
1✔
590
    def wait_until_ready(self, timeout=None, n_retries=None):
1✔
591
        state = self.get_state()
×
592
        retries = 0
×
593
        while (state != SpallocState.READY and
×
594
               (n_retries is None or retries < n_retries)):
595
            retries += 1
×
596
            state = self.wait_for_state_change(state, timeout=timeout)
×
597
            if state == SpallocState.DESTROYED:
×
598
                raise SpallocException("job was unexpectedly destroyed")
×
599

600
    @overrides(SpallocJob.destroy)
1✔
601
    def destroy(self, reason="finished"):
1✔
602
        if self.__keepalive_handle:
×
603
            self.__keepalive_handle.close()
×
604
            self.__keepalive_handle = None
×
605
        if self.__proxy_handle is not None:
×
606
            self.__proxy_thread.close()
×
607
            self.__proxy_ping.close()
×
608
            self.__proxy_handle.close()
×
609
        self._delete(self._url, reason=str(reason))
×
610
        logger.info("deleted job at {}", self._url)
×
611

612
    @overrides(SpallocJob.keepalive)
1✔
613
    def keepalive(self):
1✔
614
        self._put(self._keepalive_url, "alive")
×
615

616
    @overrides(SpallocJob.launch_keepalive_task)
1✔
617
    def launch_keepalive_task(self, period=30):
1✔
618
        """
619
        .. note::
620
            Tricky! *Cannot* be done with a thread, as the main thread is known
621
            to do significant amounts of CPU-intensive work.
622
        """
623
        class Closer(AbstractContextManager):
×
624
            def __init__(self):
×
625
                self._queue = Queue(1)
×
626
                self._p = None
×
627

628
            def close(self):
×
629
                self._queue.put("quit")
×
630
                # Give it a second, and if it still isn't dead, kill it
631
                p.join(1)
×
632
                if p.is_alive():
×
633
                    p.kill()
×
634

635
        self._keepalive_handle = Closer()
×
636
        # pylint: disable=protected-access
637
        p = Process(target=_SpallocKeepalive, args=(
×
638
            self._keepalive_url, 0 + period, self._keepalive_handle._queue,
639
            *self._session_credentials), daemon=True)
640
        p.start()
×
641
        self._keepalive_handle._p = p
×
642
        return self._keepalive_handle
×
643

644
    @overrides(SpallocJob.where_is_machine)
1✔
645
    def where_is_machine(self, x: int, y: int) -> Tuple[int, int, int]:
1✔
646
        r = self._get(self.__chip_url, x=int(x), y=int(y))
×
647
        if r.status_code == 204:
×
648
            return None
×
649
        return tuple(r.json()["physical-board-coordinates"])
×
650

651
    @property
1✔
652
    def _keepalive_handle(self):
1✔
653
        return self.__keepalive_handle
×
654

655
    @_keepalive_handle.setter
1✔
656
    def _keepalive_handle(self, handle):
1✔
657
        if self.__keepalive_handle is not None:
×
658
            raise SpallocException("cannot keep job alive from two tasks")
×
659
        self.__keepalive_handle = handle
×
660

661
    @overrides(SpallocJob.create_transceiver)
1✔
662
    def create_transceiver(self) -> Transceiver:
1✔
663
        if self.get_state() != SpallocState.READY:
×
664
            raise SpallocException("job not ready to execute scripts")
×
665
        proxies = [
×
666
            self.connect_to_board(x, y) for (x, y) in self.get_connections()]
667
        # Also need a boot connection
668
        proxies.append(self.connect_for_booting())
×
669
        return create_transceiver_from_connections(connections=proxies)
×
670

671
    def __repr__(self):
1✔
672
        return f"SpallocJob({self._url})"
×
673

674

675
class _ProxiedConnection(metaclass=AbstractBase):
1✔
676
    """
677
    Core multiplexer/demultiplexer emulating a connection that is proxied
678
    over a websocket.
679

680
    None of the methods are public because subclasses may expose a profile of
681
    them to conform to a particular type of connection.
682
    """
683

684
    def __init__(self, ws: WebSocket, receiver: _ProxyReceiver):
1✔
685
        self.__ws = ws
×
686
        self.__receiver = receiver
×
687
        self.__msgs = queue.SimpleQueue()
×
688
        self.__call_queue = queue.Queue(1)
×
689
        self.__call_lock = threading.RLock()
×
690
        self.__current_msg = None
×
691
        self.__handle = self._open_connection()
×
692
        self.__receiver.listen(self.__handle, self.__msgs.put)
×
693

694
    @abstractmethod
1✔
695
    def _open_connection(self) -> int:
1✔
696
        pass
×
697

698
    def _call(self, protocol: ProxyProtocol, packer: struct.Struct,
1✔
699
              unpacker: struct.Struct, *args) -> List[int]:
700
        """
701
        Do a synchronous call.
702

703
        :param protocol:
704
            The protocol message number.
705
        :param packer:
706
            How to form the protocol message. The first two arguments passed
707
            will be the protocol message number and an issued correlation ID
708
            (not needed by the caller).
709
        :param unpacker:
710
            How to extract the expected response.
711
        :param args:
712
            Additional arguments to pass to the packer.
713
        :return:
714
            The results from the unpacker *after* the protocol message code and
715
            the correlation ID.
716
        :raises IOError:
717
            If something goes wrong. This could be due to the websocket being
718
            closed, or the receipt of an ERROR response.
719
        """
720
        if not self._connected:
×
721
            raise IOError("socket closed")
×
722
        with self.__call_lock:
×
723
            # All calls via websocket use correlation_id
724
            correlation_id = self.__receiver.expect_return(
×
725
                self.__call_queue.put)
726
            self.__ws.send_binary(packer.pack(protocol, correlation_id, *args))
×
727
            if not self._connected:
×
728
                raise IOError("socket closed after send!")
×
729
            reply = self.__call_queue.get()
×
730
            code, _ = _msg.unpack_from(reply, 0)
×
731
            if code == ProxyProtocol.ERROR:
×
732
                # Rest of message is UTF-8 encoded error message string
733
                payload = reply[_msg.size:].decode("utf-8")
×
734
                if len(payload):
×
735
                    raise _ProxyServiceError(payload)
×
736
                raise _ProxyServiceError(
×
737
                    f"unknown problem with {protocol} call")
738
            return unpacker.unpack(reply)[2:]
×
739

740
    @property
1✔
741
    def _connected(self) -> bool:
1✔
742
        return self.__ws and self.__ws.connected
×
743

744
    def _throw_if_closed(self):
1✔
745
        if not self._connected:
×
746
            raise IOError("socket closed")
×
747

748
    def _close(self):
1✔
749
        if self._connected:
×
750
            channel_id, = self._call(
×
751
                ProxyProtocol.CLOSE, _close_req, _open_close_res,
752
                self.__handle)
753
            if channel_id != self.__handle:
×
754
                raise IOError("failed to close proxy socket")
×
755
        self.__receiver.unlisten(self.__handle)
×
756
        self.__ws = None
×
757
        self.__receiver = None
×
758

759
    def _send(self, message: bytes):
1✔
760
        self._throw_if_closed()
×
761
        # Put the header on the front and send it
762
        self.__ws.send_binary(_msg.pack(
×
763
            ProxyProtocol.MSG, self.__handle) + message)
764

765
    def _send_to(self, message: bytes, x: int, y: int, port: int):
1✔
766
        self._throw_if_closed()
×
767
        # Put the header on the front and send it
768
        self.__ws.send_binary(_msg_to.pack(
×
769
            ProxyProtocol.MSG_TO, self.__handle, x, y, port) + message)
770

771
    def __get(self, timeout: float = 0.5) -> bytes:
1✔
772
        """
773
        Get a value from the queue. Handles block/non-block switching and
774
        trimming of the message protocol prefix.
775
        """
776
        if not timeout:
×
777
            return self.__msgs.get(block=False)[_msg.size:]
×
778
        else:
779
            return self.__msgs.get(timeout=timeout)[_msg.size:]
×
780

781
    def _receive(self, timeout=None) -> bytes:
1✔
782
        if self.__current_msg is not None:
×
783
            try:
×
784
                return self.__current_msg
×
785
            finally:
786
                self.__current_msg = None
×
787
        if timeout is None:
×
788
            while True:
789
                try:
×
790
                    return self.__get()
×
791
                except queue.Empty:
×
792
                    self._throw_if_closed()
×
793
        else:
794
            try:
×
795
                return self.__get(timeout)
×
796
            except queue.Empty as e:
×
797
                self._throw_if_closed()
×
798
                raise SpinnmanTimeoutException("receive", timeout) from e
×
799

800
    def _is_ready_to_receive(self, timeout=0) -> bool:
1✔
801
        # If we already have a message or the queue peek succeeds, return now
802
        if self.__current_msg is not None or not self.__msgs.empty():
×
803
            return True
×
804
        try:
×
805
            self.__current_msg = self.__get(timeout)
×
806
            return True
×
807
        except queue.Empty:
×
808
            return False
×
809

810

811
class _ProxiedBidirectionalConnection(
1✔
812
        _ProxiedConnection, SpallocProxiedConnection):
813
    """
814
    A connection that talks to a particular board via the proxy.
815
    """
816

817
    def __init__(
1✔
818
            self, ws: WebSocket, receiver: _ProxyReceiver,
819
            x: int, y: int, port: int):
820
        self.__connect_args = (x, y, port)
×
821
        super().__init__(ws, receiver)
×
822

823
    @overrides(_ProxiedConnection._open_connection)
1✔
824
    def _open_connection(self):
1✔
825
        handle, = self._call(
×
826
            ProxyProtocol.OPEN, _open_req, _open_close_res,
827
            *self.__connect_args)
828
        return handle
×
829

830
    @overrides(Connection.is_connected)
1✔
831
    def is_connected(self) -> bool:
1✔
832
        return self._connected
×
833

834
    @overrides(Connection.close)
1✔
835
    def close(self):
1✔
836
        self._close()
×
837

838
    @overrides(SpallocProxiedConnection.send)
1✔
839
    def send(self, data: bytes):
1✔
840
        if not isinstance(data, (bytes, bytearray)):
×
841
            data = bytes(data)
×
842
        self._send(data)
×
843

844
    @overrides(SpallocProxiedConnection.receive)
1✔
845
    def receive(self, timeout=None) -> bytes:
1✔
846
        return self._receive(timeout)
×
847

848
    @overrides(Listenable.is_ready_to_receive)
1✔
849
    def is_ready_to_receive(self, timeout=0) -> bool:
1✔
850
        return self._is_ready_to_receive(timeout)
×
851

852

853
class _ProxiedUnboundConnection(
1✔
854
        _ProxiedConnection, SpallocProxiedConnection):
855
    """
856
    A connection that can listen to all boards via the proxy, but which can
857
    only send if a target board is provided.
858
    """
859

860
    def __init__(self, ws: WebSocket, receiver: _ProxyReceiver):
1✔
861
        super().__init__(ws, receiver)
×
862
        self.__addr = None
×
863
        self.__port = None
×
864

865
    @overrides(_ProxiedConnection._open_connection)
1✔
866
    def _open_connection(self) -> int:
1✔
867
        handle, ip1, ip2, ip3, ip4, self.__port = self._call(
×
868
            ProxyProtocol.OPEN_UNBOUND, _open_listen_req, _open_listen_res)
869
        # Assemble the address into the format expected elsewhere
870
        self.__addr = f"{ip1}.{ip2}.{ip3}.{ip4}"
×
871
        return handle
×
872

873
    @property
1✔
874
    def _addr(self) -> str:
1✔
875
        return self.__addr if self._connected else None
×
876

877
    @property
1✔
878
    def _port(self) -> int:
1✔
879
        return self.__port if self._connected else None
×
880

881
    @overrides(Connection.is_connected)
1✔
882
    def is_connected(self) -> bool:
1✔
883
        return self._connected
×
884

885
    @overrides(Connection.close)
1✔
886
    def close(self):
1✔
887
        self._close()
×
888

889
    @overrides(SpallocProxiedConnection.send)
1✔
890
    def send(self, data: bytes):
1✔
891
        self._throw_if_closed()
×
892
        raise IOError("socket is not open for sending")
×
893

894
    @overrides(SpallocProxiedConnection.receive)
1✔
895
    def receive(self, timeout=None) -> bytes:
1✔
896
        return self._receive(timeout)
×
897

898
    @overrides(Listenable.is_ready_to_receive)
1✔
899
    def is_ready_to_receive(self, timeout=0) -> bool:
1✔
900
        return self._is_ready_to_receive(timeout)
×
901

902

903
class _ProxiedSCAMPConnection(
1✔
904
        _ProxiedBidirectionalConnection, SpallocSCPConnection):
905
    __slots__ = ("__chip_x", "__chip_y")
1✔
906

907
    def __init__(
1✔
908
            self, ws: WebSocket, receiver: _ProxyReceiver,
909
            x: int, y: int, port: int):
910
        super().__init__(ws, receiver, x, y, port)
×
911
        SpallocSCPConnection.__init__(self, x, y)
×
912

913
    def __str__(self):
1✔
914
        return f"SCAMPConnection[proxied]({self.chip_x},{self.chip_y})"
×
915

916

917
class _ProxiedBootConnection(
1✔
918
        _ProxiedBidirectionalConnection, SpallocBootConnection):
919
    __slots__ = ()
1✔
920

921
    def __init__(self, ws: WebSocket, receiver: _ProxyReceiver):
1✔
922
        super().__init__(ws, receiver, 0, 0, UDP_BOOT_CONNECTION_DEFAULT_PORT)
×
923

924
    def __str__(self):
1✔
925
        return "BootConnection[proxied]()"
×
926

927

928
class _ProxiedEIEIOConnection(
1✔
929
        _ProxiedBidirectionalConnection,
930
        SpallocEIEIOConnection, SpallocProxiedConnection):
931
    # Special: This is a unidirectional receive-only connection
932
    __slots__ = ("__addr", "__port", "__chip_x", "__chip_y")
1✔
933

934
    def __init__(
1✔
935
            self, ws: WebSocket, receiver: _ProxyReceiver,
936
            x: int, y: int, port: int):
937
        super().__init__(ws, receiver, x, y, port)
×
938
        self.__chip_x = x
×
939
        self.__chip_y = y
×
940

941
    @property
1✔
942
    @overrides(SpallocEIEIOConnection._coords)
1✔
943
    def _coords(self):
1✔
944
        return self.__chip_x, self.__chip_y
×
945

946
    def send_to(
1✔
947
            self,
948
            data: bytes, address: tuple):  # pylint: disable=unused-argument
949
        """
950
        Direct ``send_to`` is unsupported.
951
        """
952
        self._throw_if_closed()
×
953
        raise IOError("socket is not open for sending")
×
954

955
    def __str__(self):
1✔
956
        return (f"EIEIOConnection[proxied](remote:{self.__chip_x},"
×
957
                f"{self.__chip_y})")
958

959

960
class _ProxiedEIEIOListener(_ProxiedUnboundConnection, SpallocEIEIOListener):
1✔
961
    __slots__ = ("__conns", )
1✔
962

963
    def __init__(self, ws: WebSocket, receiver: _ProxyReceiver,
1✔
964
                 conns: Dict[Tuple[int, int], str]):
965
        super().__init__(ws, receiver)
×
966
        # Invert the map
967
        self.__conns = {ip: xy for (xy, ip) in conns.items()}
×
968

969
    @overrides(SpallocEIEIOListener.send_to_chip)
1✔
970
    def send_to_chip(
1✔
971
            self, message: bytes, x: int, y: int, port: int = SCP_SCAMP_PORT):
972
        if not isinstance(message, (bytes, bytearray)):
×
973
            message = bytes(message)
×
974
        self._send_to(bytes(message), x, y, port)
×
975

976
    @property
1✔
977
    @overrides(SpallocEIEIOListener.local_ip_address)
1✔
978
    def local_ip_address(self) -> str:
1✔
979
        return self._addr
×
980

981
    @property
1✔
982
    @overrides(SpallocEIEIOListener.local_port)
1✔
983
    def local_port(self) -> int:
1✔
984
        return self._port
×
985

986
    @overrides(SpallocEIEIOListener._get_chip_coords)
1✔
987
    def _get_chip_coords(self, ip_address: str) -> Tuple[int, int]:
1✔
988
        return self.__conns[str(ip_address)]
×
989

990
    def __str__(self):
1✔
991
        return f"EIEIOConnection[proxied](local:{self._addr}:{self._port})"
×
992

993

994
class _ProxiedUDPListener(_ProxiedUnboundConnection, UDPConnection):
1✔
995
    __slots__ = ("__conns", )
1✔
996

997
    def __init__(self, ws: WebSocket, receiver: _ProxyReceiver,
1✔
998
                 conns: Dict[Tuple[int, int], str]):
999
        super().__init__(ws, receiver)
×
1000
        # Invert the map
1001
        self.__conns = {ip: xy for (xy, ip) in conns.items()}
×
1002

1003
    @overrides(UDPConnection.send_to)
1✔
1004
    def send_to(self, data: bytes, address: Tuple[str, int]):
1✔
1005
        ip, port = address
×
1006
        x, y = self.__conns[ip]
×
1007
        self._send_to(data, x, y, port)
×
1008

1009
    @property
1✔
1010
    @overrides(UDPConnection.local_ip_address)
1✔
1011
    def local_ip_address(self) -> str:
1✔
1012
        return self._addr
×
1013

1014
    @property
1✔
1015
    @overrides(UDPConnection.local_port)
1✔
1016
    def local_port(self) -> int:
1✔
1017
        return self._port
×
1018

1019
    def __str__(self):
1✔
1020
        return f"UDPConnection[proxied](local:{self._addr}:{self._port})"
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc