• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / SpiNNMan / 6561889142

18 Oct 2023 01:51PM UTC coverage: 50.777% (-0.005%) from 50.782%
6561889142

push

github

web-flow
Merge pull request #373 from SpiNNakerManchester/pylint_fixes

Minor param renaming, doc fixes and ignores for spelling checker

90 of 1258 branches covered (0.0%)

Branch coverage included in aggregate %.

14 of 14 new or added lines in 5 files covered. (100.0%)

4584 of 7947 relevant lines covered (57.68%)

0.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

25.27
/spinnman/spalloc/session.py
1
# Copyright (c) 2021 The University of Manchester
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     https://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
from functools import wraps
1✔
15
from logging import getLogger
1✔
16
import re
1✔
17
import requests
1✔
18
from typing import Dict, Tuple
1✔
19
import websocket
1✔
20
from spinn_utilities.log import FormatAdapter
1✔
21
from .utils import clean_url
1✔
22
from spinnman.exceptions import SpallocException
1✔
23

24
logger = FormatAdapter(getLogger(__name__))
1✔
25
#: The name of the session cookie issued by Spring Security
26
_SESSION_COOKIE = "JSESSIONID"
1✔
27
#: Enable detailed debugging by setting to True
28
_debug_pretty_print = False
1✔
29

30

31
def _may_renew(method):
1✔
32
    def pp_req(request: requests.PreparedRequest):
1✔
33
        """
34
        :param ~requests.PreparedRequest request:
35
        """
36
        print('{}\n{}\r\n{}\r\n\r\n{}'.format(
×
37
            '>>>>>>>>>>>START>>>>>>>>>>>',
38
            request.method + ' ' + request.url,
39
            '\r\n'.join('{}: {}'.format(*kv)
40
                        for kv in request.headers.items()),
41
            request.body if request.body else ""))
42

43
    def pp_resp(response: requests.Response):
1✔
44
        """
45
        :param ~requests.Response response:
46
        """
47
        print('{}\n{}\r\n{}\r\n\r\n{}'.format(
×
48
            '<<<<<<<<<<<START<<<<<<<<<<<',
49
            str(response.status_code) + " " + response.reason,
50
            '\r\n'.join('{}: {}'.format(*kv)
51
                        for kv in response.headers.items()),
52
            # Assume we only get textual responses
53
            str(response.content, "UTF-8") if response.content else ""))
54

55
    @wraps(method)
1✔
56
    def call(self, *args, **kwargs):
1✔
57
        renew_count = 0
×
58
        while True:
59
            r = method(self, *args, **kwargs)
×
60
            if _debug_pretty_print:
×
61
                pp_req(r.request)
×
62
                pp_resp(r)
×
63
            if _SESSION_COOKIE in r.cookies:
×
64
                # pylint: disable=protected-access
65
                self._session_id = r.cookies[_SESSION_COOKIE]
×
66
            if r.status_code != 401 or not renew_count:
×
67
                return r
×
68
            self.renew()
×
69
            renew_count += 1
×
70

71
    return call
1✔
72

73

74
class Session:
1✔
75
    """
76
    Manages session credentials for the Spalloc client.
77

78
    .. warning::
79
        This class does not present a stable API for public consumption.
80
    """
81
    __slots__ = (
1✔
82
        "__login_form_url", "__login_submit_url", "__srv_base", "_service_url",
83
        "__username", "__password", "__token",
84
        "_session_id", "__csrf", "__csrf_header")
85

86
    def __init__(
1✔
87
            self, service_url: str,
88
            username: str = None, password: str = None, token: str = None,
89
            session_credentials: Tuple[Dict[str, str], Dict[str, str]] = None):
90
        """
91
        :param str service_url: The reference to the service.
92
            *Should not* include a username or password in it.
93
        :param str username: The user name to use
94
        :param str password: The password to use
95
        :param str token: The bearer token to use
96
        """
97
        url = clean_url(service_url)
×
98
        self.__login_form_url = url + "system/login.html"
×
99
        self.__login_submit_url = url + "system/perform_login"
×
100
        self._service_url = url
×
101
        self.__srv_base = url + "srv/spalloc/"
×
102
        self.__username = username
×
103
        self.__password = password
×
104
        self.__token = token
×
105
        if session_credentials:
×
106
            cookies, headers = session_credentials
×
107
            if _SESSION_COOKIE in cookies:
×
108
                self._session_id = cookies[_SESSION_COOKIE]
×
109
            for key, value in headers.items():
×
110
                if key == "Authorization":
×
111
                    # TODO: extract this?
112
                    pass
×
113
                else:
114
                    # Urgh
115
                    self.__csrf_header = key
×
116
                    self.__csrf = value
×
117

118
    def __handle_error_or_return(self, response):
1✔
119
        code = response.status_code
×
120
        if code >= 200 and code < 400:
×
121
            return response
×
122
        result = response.content
×
123
        raise ValueError(f"Unexpected response from server {code}\n"
×
124
                         f"    {str(result)}")
125

126
    @_may_renew
1✔
127
    def get(self, url: str, timeout: int = 10, **kwargs) -> requests.Response:
1✔
128
        """
129
        Do an HTTP ``GET`` in the session.
130

131
        :param str url:
132
        :param int timeout:
133
        :rtype: ~requests.Response
134
        :raise ValueError: If the server rejects a request
135
        """
136
        params = kwargs if kwargs else None
×
137
        cookies = {_SESSION_COOKIE: self._session_id}
×
138
        r = requests.get(url, params=params, cookies=cookies,
×
139
                         allow_redirects=False, timeout=timeout)
140
        logger.debug("GET {} returned {}", url, r.status_code)
×
141
        return self.__handle_error_or_return(r)
×
142

143
    @_may_renew
1✔
144
    def post(self, url: str, json_dict: dict, timeout: int = 10,
1✔
145
             **kwargs) -> requests.Response:
146
        """
147
        Do an HTTP ``POST`` in the session.
148

149
        :param str url:
150
        :param int timeout:
151
        :param dict json_dict:
152
        :rtype: ~requests.Response
153
        :raise ValueError: If the server rejects a request
154
        """
155
        params = kwargs if kwargs else None
×
156
        cookies, headers = self._credentials
×
157
        r = requests.post(url, params=params, json=json_dict,
×
158
                          cookies=cookies, headers=headers,
159
                          allow_redirects=False, timeout=timeout)
160
        logger.debug("POST {} returned {}", url, r.status_code)
×
161
        return self.__handle_error_or_return(r)
×
162

163
    @_may_renew
1✔
164
    def put(self, url: str, data: str, timeout: int = 10,
1✔
165
            **kwargs) -> requests.Response:
166
        """
167
        Do an HTTP ``PUT`` in the session. Puts plain text *OR* JSON!
168

169
        :param str url:
170
        :param str data:
171
        :param int timeout:
172
        :rtype: ~requests.Response
173
        :raise ValueError: If the server rejects a request
174
        """
175
        params = kwargs if kwargs else None
×
176
        cookies, headers = self._credentials
×
177
        if isinstance(data, str):
×
178
            headers["Content-Type"] = "text/plain; charset=UTF-8"
×
179
        r = requests.put(url, params=params, data=data,
×
180
                         cookies=cookies, headers=headers,
181
                         allow_redirects=False, timeout=timeout)
182
        logger.debug("PUT {} returned {}", url, r.status_code)
×
183
        return self.__handle_error_or_return(r)
×
184

185
    @_may_renew
1✔
186
    def delete(self, url: str, timeout: int = 10,
1✔
187
               **kwargs) -> requests.Response:
188
        """
189
        Do an HTTP ``DELETE`` in the session.
190

191
        :param str url:
192
        :rtype: ~requests.Response
193
        :raise ValueError: If the server rejects a request
194
        """
195
        params = kwargs if kwargs else None
×
196
        cookies, headers = self._credentials
×
197
        r = requests.delete(url, params=params, cookies=cookies,
×
198
                            headers=headers, allow_redirects=False,
199
                            timeout=timeout)
200
        logger.debug("DELETE {} returned {}", url, r.status_code)
×
201
        return self.__handle_error_or_return(r)
×
202

203
    def renew(self) -> dict:
1✔
204
        """
205
        Renews the session, logging the user into it so that state modification
206
        operations can be performed.
207

208
        :returns: Description of the root of the service, without CSRF data
209
        :rtype: dict
210
        :raises SpallocException:
211
            If the session cannot be renewed.
212
        """
213
        if self.__token:
×
214
            r = requests.get(
×
215
                self.__login_form_url,
216
                headers={"Authorization": f"Bearer {self.__token}"},
217
                allow_redirects=False, timeout=10)
218
            if not r.ok:
×
219
                raise SpallocException(f"Could not renew session: {r.content}")
×
220
            self._session_id = r.cookies[_SESSION_COOKIE]
×
221
        else:
222
            # Step one: a temporary session so we can log in
223
            csrf_matcher = re.compile(
×
224
                r"""<input type="hidden" name="_csrf" value="(.*)" />""")
225
            r = requests.get(self.__login_form_url, allow_redirects=False,
×
226
                             timeout=10)
227
            logger.debug("GET {} returned {}",
×
228
                         self.__login_form_url, r.status_code)
229
            m = csrf_matcher.search(r.text)
×
230
            if not m:
×
231
                raise SpallocException("could not establish temporary session")
×
232
            csrf = m.group(1)
×
233
            session = r.cookies[_SESSION_COOKIE]
×
234

235
            # Step two: actually do the log in
236
            form = {
×
237
                "_csrf": csrf,
238
                "username": self.__username,
239
                "password": self.__password,
240
                "submit": "submit"
241
            }
242
            # NB: returns redirect that sets a cookie
243
            r = requests.post(self.__login_submit_url,
×
244
                              cookies={_SESSION_COOKIE: session},
245
                              allow_redirects=False,
246
                              data=form, timeout=10)
247
            logger.debug("POST {} returned {}",
×
248
                         self.__login_submit_url, r.status_code)
249
            self._session_id = r.cookies[_SESSION_COOKIE]
×
250
            # We don't need to follow that redirect
251

252
        # Step three: get the basic service data and new CSRF token
253
        obj = self.get(self.__srv_base).json()
×
254
        self.__csrf_header = obj["csrf-header"]
×
255
        self.__csrf = obj["csrf-token"]
×
256
        del obj["csrf-header"]
×
257
        del obj["csrf-token"]
×
258
        return obj
×
259

260
    @property
1✔
261
    def _credentials(self) -> Tuple[Dict[str, str], Dict[str, str]]:
1✔
262
        """
263
        The credentials for requests. *Serializable.*
264
        """
265
        cookies = {_SESSION_COOKIE: self._session_id}
×
266
        headers = {self.__csrf_header: self.__csrf}
×
267
        if self.__token:
×
268
            # This would be better off done once per session only
269
            headers["Authorization"] = f"Bearer {self.__token}"
×
270
        return cookies, headers
×
271

272
    def websocket(
1✔
273
            self, url: str, header: dict = None, cookie: str = None,
274
            **kwargs) -> websocket.WebSocket:
275
        """
276
        Create a websocket that uses the session credentials to establish
277
        itself.
278

279
        :param str url: Actual location to open websocket at
280
        :param dict(str,str) header: Optional HTTP headers
281
        :param str cookie:
282
            Optional cookies (composed as semicolon-separated string)
283
        :param kwargs: Other options to :py:func:`~websocket.create_connection`
284
        :rtype: ~websocket.WebSocket
285
        """
286
        # Note: *NOT* a renewable action!
287
        if header is None:
×
288
            header = {}
×
289
        header[self.__csrf_header] = self.__csrf
×
290
        if cookie is not None:
×
291
            cookie += ";" + _SESSION_COOKIE + "=" + self._session_id
×
292
        else:
293
            cookie = _SESSION_COOKIE + "=" + self._session_id
×
294
        return websocket.create_connection(
×
295
            url, header=header, cookie=cookie, **kwargs)
296

297
    def _purge(self):
1✔
298
        """
299
        Clears out all credentials from this session, rendering the session
300
        completely inoperable henceforth.
301
        """
302
        self.__username = None
×
303
        self.__password = None
×
304
        self._session_id = None
×
305
        self.__csrf = None
×
306

307

308
class SessionAware:
1✔
309
    """
310
    Connects to the session.
311

312
    .. warning::
313
        This class does not present a stable API for public consumption.
314
    """
315
    __slots__ = ("__session", "_url")
1✔
316

317
    def __init__(self, session: Session, url: str):
1✔
318
        self.__session = session
×
319
        self._url = clean_url(url)
×
320

321
    @property
1✔
322
    def _session_credentials(self):
1✔
323
        """
324
        The current session credentials.
325
        Only supposed to be called by subclasses.
326

327
        :rtype: tuple(dict(str,str),dict(str,str))
328
        """
329
        # pylint: disable=protected-access
330
        return self.__session._credentials
×
331

332
    @property
1✔
333
    def _service_url(self):
1✔
334
        """
335
        The main service URL.
336

337
        :rtype: str
338
        """
339
        # pylint: disable=protected-access
340
        return self.__session._service_url
×
341

342
    def _get(self, url: str, **kwargs) -> requests.Response:
1✔
343
        return self.__session.get(url, **kwargs)
×
344

345
    def _post(self, url: str, json_dict: dict, **kwargs) -> requests.Response:
1✔
346
        return self.__session.post(url, json_dict, **kwargs)
×
347

348
    def _put(self, url: str, data: str, **kwargs) -> requests.Response:
1✔
349
        return self.__session.put(url, data, **kwargs)
×
350

351
    def _delete(self, url: str, **kwargs) -> requests.Response:
1✔
352
        return self.__session.delete(url, **kwargs)
×
353

354
    def _websocket(self, url: str, **kwargs) -> websocket.WebSocket:
1✔
355
        """
356
        Create a websocket that uses the session credentials to establish
357
        itself.
358

359
        :param str url: Actual location to open websocket at
360
        :rtype: ~websocket.WebSocket
361
        """
362
        return self.__session.websocket(url, **kwargs)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc