• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / SpiNNMan / 6574804013

19 Oct 2023 12:47PM UTC coverage: 51.937% (+1.2%) from 50.777%
6574804013

Pull #327

github

Christian-B
typing changes
Pull Request #327: Type Annotations and Checking

105 of 1288 branches covered (0.0%)

Branch coverage included in aggregate %.

2375 of 2375 new or added lines in 180 files covered. (100.0%)

4775 of 8108 relevant lines covered (58.89%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

24.87
/spinnman/spalloc/session.py
1
# Copyright (c) 2021 The University of Manchester
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     https://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
from functools import wraps
1✔
15
from logging import getLogger
1✔
16
import re
1✔
17
import requests
1✔
18
from typing import Dict, Tuple, cast, Optional
1✔
19
import websocket  # type: ignore
1✔
20
from spinn_utilities.log import FormatAdapter
1✔
21
from spinn_utilities.typing.json import JsonObject
1✔
22
from .utils import clean_url
1✔
23
from spinnman.exceptions import SpallocException
1✔
24

25
logger = FormatAdapter(getLogger(__name__))
1✔
26
#: The name of the session cookie issued by Spring Security
27
_SESSION_COOKIE = "JSESSIONID"
1✔
28
#: Enable detailed debugging by setting to True
29
_debug_pretty_print = False
1✔
30

31

32
def _may_renew(method):
1✔
33
    def pp_req(request: requests.PreparedRequest):
1✔
34
        """
35
        :param ~requests.PreparedRequest request:
36
        """
37
        print(">>>>>>>>>>>START>>>>>>>>>>>\n")
×
38
        print(f"{request.method} {request.url}")
×
39
        print('\r\n'.join('{}: {}'.format(*kv)
×
40
                          for kv in request.headers.items()))
41
        if request.body:
×
42
            print(request.body)
×
43

44
    def pp_resp(response: requests.Response):
1✔
45
        """
46
        :param ~requests.Response response:
47
        """
48
        print('{}\n{}\r\n{}\r\n\r\n{}'.format(
×
49
            '<<<<<<<<<<<START<<<<<<<<<<<',
50
            str(response.status_code) + " " + response.reason,
51
            '\r\n'.join('{}: {}'.format(*kv)
52
                        for kv in response.headers.items()),
53
            # Assume we only get textual responses
54
            str(response.content, "UTF-8") if response.content else ""))
55

56
    @wraps(method)
1✔
57
    def call(self, *args, **kwargs):
1✔
58
        renew_count = 0
×
59
        while True:
60
            r = method(self, *args, **kwargs)
×
61
            if _debug_pretty_print:
×
62
                pp_req(r.request)
×
63
                pp_resp(r)
×
64
            if _SESSION_COOKIE in r.cookies:
×
65
                # pylint: disable=protected-access
66
                self._session_id = r.cookies[_SESSION_COOKIE]
×
67
            if r.status_code != 401 or not renew_count:
×
68
                return r
×
69
            self.renew()
×
70
            renew_count += 1
×
71

72
    return call
1✔
73

74

75
class Session:
1✔
76
    """
77
    Manages session credentials for the Spalloc client.
78

79
    .. warning::
80
        This class does not present a stable API for public consumption.
81
    """
82
    __slots__ = (
1✔
83
        "__login_form_url", "__login_submit_url", "__srv_base", "_service_url",
84
        "__username", "__password", "__token",
85
        "_session_id", "__csrf", "__csrf_header")
86

87
    def __init__(
1✔
88
            self, service_url: str,
89
            username: Optional[str] = None, password: Optional[str] = None,
90
            token: Optional[str] = None,
91
            session_credentials: Optional[
92
                Tuple[Dict[str, str], Dict[str, str]]] = None):
93
        """
94
        :param str service_url: The reference to the service.
95
            *Should not* include a username or password in it.
96
        :param str username: The user name to use
97
        :param str password: The password to use
98
        :param str token: The bearer token to use
99
        """
100
        url = clean_url(service_url)
×
101
        self.__login_form_url = url + "system/login.html"
×
102
        self.__login_submit_url = url + "system/perform_login"
×
103
        self._service_url = url
×
104
        self.__srv_base = url + "srv/spalloc/"
×
105
        self.__username = username
×
106
        self.__password = password
×
107
        self.__token = token
×
108
        if session_credentials:
×
109
            cookies, headers = session_credentials
×
110
            if _SESSION_COOKIE in cookies:
×
111
                self._session_id = cookies[_SESSION_COOKIE]
×
112
            for key, value in headers.items():
×
113
                if key == "Authorization":
×
114
                    # TODO: extract this?
115
                    pass
×
116
                else:
117
                    # Urgh
118
                    self.__csrf_header = key
×
119
                    self.__csrf = value
×
120

121
    def __handle_error_or_return(self, response: requests.Response):
1✔
122
        code = response.status_code
×
123
        if code >= 200 and code < 400:
×
124
            return response
×
125
        result = response.content
×
126
        raise ValueError(f"Unexpected response from server {code}\n"
×
127
                         f"    {str(result)}")
128

129
    @_may_renew
1✔
130
    def get(self, url: str, timeout: int = 10, **kwargs) -> requests.Response:
1✔
131
        """
132
        Do an HTTP ``GET`` in the session.
133

134
        :param str url:
135
        :param int timeout:
136
        :rtype: ~requests.Response
137
        :raise ValueError: If the server rejects a request
138
        """
139
        params = kwargs if kwargs else None
×
140
        cookies = {_SESSION_COOKIE: self._session_id}
×
141
        r = requests.get(url, params=params, cookies=cookies,
×
142
                         allow_redirects=False, timeout=timeout)
143
        logger.debug("GET {} returned {}", url, r.status_code)
×
144
        return self.__handle_error_or_return(r)
×
145

146
    @_may_renew
1✔
147
    def post(self, url: str, json_dict: dict, timeout: int = 10,
1✔
148
             **kwargs) -> requests.Response:
149
        """
150
        Do an HTTP ``POST`` in the session.
151

152
        :param str url:
153
        :param int timeout:
154
        :param dict json_dict:
155
        :rtype: ~requests.Response
156
        :raise ValueError: If the server rejects a request
157
        """
158
        params = kwargs if kwargs else None
×
159
        cookies, headers = self._credentials
×
160
        r = requests.post(url, params=params, json=json_dict,
×
161
                          cookies=cookies, headers=headers,
162
                          allow_redirects=False, timeout=timeout)
163
        logger.debug("POST {} returned {}", url, r.status_code)
×
164
        return self.__handle_error_or_return(r)
×
165

166
    @_may_renew
1✔
167
    def put(self, url: str, data: str, timeout: int = 10,
1✔
168
            **kwargs) -> requests.Response:
169
        """
170
        Do an HTTP ``PUT`` in the session. Puts plain text *OR* JSON!
171

172
        :param str url:
173
        :param str data:
174
        :param int timeout:
175
        :rtype: ~requests.Response
176
        :raise ValueError: If the server rejects a request
177
        """
178
        params = kwargs if kwargs else None
×
179
        cookies, headers = self._credentials
×
180
        if isinstance(data, str):
×
181
            headers["Content-Type"] = "text/plain; charset=UTF-8"
×
182
        r = requests.put(url, params=params, data=data,
×
183
                         cookies=cookies, headers=headers,
184
                         allow_redirects=False, timeout=timeout)
185
        logger.debug("PUT {} returned {}", url, r.status_code)
×
186
        return self.__handle_error_or_return(r)
×
187

188
    @_may_renew
1✔
189
    def delete(self, url: str, timeout: int = 10,
1✔
190
               **kwargs) -> requests.Response:
191
        """
192
        Do an HTTP ``DELETE`` in the session.
193

194
        :param str url:
195
        :rtype: ~requests.Response
196
        :raise ValueError: If the server rejects a request
197
        """
198
        params = kwargs if kwargs else None
×
199
        cookies, headers = self._credentials
×
200
        r = requests.delete(url, params=params, cookies=cookies,
×
201
                            headers=headers, allow_redirects=False,
202
                            timeout=timeout)
203
        logger.debug("DELETE {} returned {}", url, r.status_code)
×
204
        return self.__handle_error_or_return(r)
×
205

206
    def renew(self) -> JsonObject:
1✔
207
        """
208
        Renews the session, logging the user into it so that state modification
209
        operations can be performed.
210

211
        :returns: Description of the root of the service, without CSRF data
212
        :rtype: dict
213
        :raises SpallocException:
214
            If the session cannot be renewed.
215
        """
216
        if self.__token:
×
217
            r = requests.get(
×
218
                self.__login_form_url,
219
                headers={"Authorization": f"Bearer {self.__token}"},
220
                allow_redirects=False, timeout=10)
221
            if not r.ok:
×
222
                raise SpallocException(
×
223
                    f"Could not renew session: {cast(str, r.content)}")
224
            self._session_id = r.cookies[_SESSION_COOKIE]
×
225
        else:
226
            # Step one: a temporary session so we can log in
227
            csrf_matcher = re.compile(
×
228
                r"""<input type="hidden" name="_csrf" value="(.*)" />""")
229
            r = requests.get(self.__login_form_url, allow_redirects=False,
×
230
                             timeout=10)
231
            logger.debug("GET {} returned {}",
×
232
                         self.__login_form_url, r.status_code)
233
            m = csrf_matcher.search(r.text)
×
234
            if not m:
×
235
                raise SpallocException("could not establish temporary session")
×
236
            csrf = m.group(1)
×
237
            session = r.cookies[_SESSION_COOKIE]
×
238

239
            # Step two: actually do the log in
240
            form = {
×
241
                "_csrf": csrf,
242
                "username": self.__username,
243
                "password": self.__password,
244
                "submit": "submit"
245
            }
246
            # NB: returns redirect that sets a cookie
247
            r = requests.post(self.__login_submit_url,
×
248
                              cookies={_SESSION_COOKIE: session},
249
                              allow_redirects=False,
250
                              data=form, timeout=10)
251
            logger.debug("POST {} returned {}",
×
252
                         self.__login_submit_url, r.status_code)
253
            self._session_id = r.cookies[_SESSION_COOKIE]
×
254
            # We don't need to follow that redirect
255

256
        # Step three: get the basic service data and new CSRF token
257
        obj: JsonObject = self.get(self.__srv_base).json()
×
258
        self.__csrf_header = cast(str, obj["csrf-header"])
×
259
        self.__csrf = cast(str, obj["csrf-token"])
×
260
        del obj["csrf-header"]
×
261
        del obj["csrf-token"]
×
262
        return obj
×
263

264
    @property
1✔
265
    def _credentials(self) -> Tuple[Dict[str, str], Dict[str, str]]:
1✔
266
        """
267
        The credentials for requests. *Serializable.*
268
        """
269
        cookies = {_SESSION_COOKIE: self._session_id}
×
270
        headers = {self.__csrf_header: self.__csrf}
×
271
        if self.__token:
×
272
            # This would be better off done once per session only
273
            headers["Authorization"] = f"Bearer {self.__token}"
×
274
        return cookies, headers
×
275

276
    def websocket(
1✔
277
            self, url: str, header: Optional[dict] = None,
278
            cookie: Optional[str] = None, **kwargs) -> websocket.WebSocket:
279
        """
280
        Create a websocket that uses the session credentials to establish
281
        itself.
282

283
        :param str url: Actual location to open websocket at
284
        :param dict(str,str) header: Optional HTTP headers
285
        :param str cookie:
286
            Optional cookies (composed as semicolon-separated string)
287
        :param kwargs: Other options to :py:func:`~websocket.create_connection`
288
        :rtype: ~websocket.WebSocket
289
        """
290
        # Note: *NOT* a renewable action!
291
        if header is None:
×
292
            header = {}
×
293
        header[self.__csrf_header] = self.__csrf
×
294
        if cookie is not None:
×
295
            cookie += ";" + _SESSION_COOKIE + "=" + self._session_id
×
296
        else:
297
            cookie = _SESSION_COOKIE + "=" + self._session_id
×
298
        return websocket.create_connection(
×
299
            url, header=header, cookie=cookie, **kwargs)
300

301
    def _purge(self):
1✔
302
        """
303
        Clears out all credentials from this session, rendering the session
304
        completely inoperable henceforth.
305
        """
306
        self.__username = None
×
307
        self.__password = None
×
308
        self._session_id = None
×
309
        self.__csrf = None
×
310

311

312
class SessionAware:
1✔
313
    """
314
    Connects to the session.
315

316
    .. warning::
317
        This class does not present a stable API for public consumption.
318
    """
319
    __slots__ = ("__session", "_url")
1✔
320

321
    def __init__(self, session: Session, url: str):
1✔
322
        self.__session = session
×
323
        self._url = clean_url(url)
×
324

325
    @property
1✔
326
    def _session_credentials(self):
1✔
327
        """
328
        The current session credentials.
329
        Only supposed to be called by subclasses.
330

331
        :rtype: tuple(dict(str,str),dict(str,str))
332
        """
333
        # pylint: disable=protected-access
334
        return self.__session._credentials
×
335

336
    @property
1✔
337
    def _service_url(self):
1✔
338
        """
339
        The main service URL.
340

341
        :rtype: str
342
        """
343
        # pylint: disable=protected-access
344
        return self.__session._service_url
×
345

346
    def _get(self, url: str, **kwargs) -> requests.Response:
1✔
347
        return self.__session.get(url, **kwargs)
×
348

349
    def _post(self, url: str, json_dict: dict, **kwargs) -> requests.Response:
1✔
350
        return self.__session.post(url, json_dict, **kwargs)
×
351

352
    def _put(self, url: str, data: str, **kwargs) -> requests.Response:
1✔
353
        return self.__session.put(url, data, **kwargs)
×
354

355
    def _delete(self, url: str, **kwargs) -> requests.Response:
1✔
356
        return self.__session.delete(url, **kwargs)
×
357

358
    def _websocket(self, url: str, **kwargs) -> websocket.WebSocket:
1✔
359
        """
360
        Create a websocket that uses the session credentials to establish
361
        itself.
362

363
        :param str url: Actual location to open websocket at
364
        :rtype: ~websocket.WebSocket
365
        """
366
        return self.__session.websocket(url, **kwargs)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc