• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grafana / django-saml2-auth / 5453691261

pending completion
5453691261

push

github

web-flow
Merge pull request #176 from sgabb/master

* feat(settings): add cert and key file
Add possibility to import certificate and key files from settings as described in pysaml2 documentation https://pysaml2.readthedocs.io/en/latest/howto/config.html
* fix(lint): W293 blank line contains whitespace
* Add keys to module settings

---------

Co-authored-by: Mostafa Moradian <mostafamoradian0@gmail.com>

6 of 6 new or added lines in 1 file covered. (100.0%)

858 of 957 relevant lines covered (89.66%)

12.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.91
/django_saml2_auth/saml.py
1
"""Utility functions for various SAML client functions.
14✔
2
"""
3
import base64
14✔
4
from typing import Any, Callable, Dict, Mapping, Optional, Union
14✔
5

6
from dictor import dictor  # type: ignore
14✔
7
from django.conf import settings
14✔
8
from django.http import HttpRequest, HttpResponse, HttpResponseRedirect
14✔
9
from django.urls import NoReverseMatch
14✔
10
from django_saml2_auth.errors import (ERROR_CREATING_SAML_CONFIG_OR_CLIENT,
14✔
11
                                      INVALID_METADATA_URL,
12
                                      NO_ISSUER_IN_SAML_RESPONSE,
13
                                      NO_METADATA_URL_ASSOCIATED,
14
                                      NO_METADATA_URL_OR_FILE,
15
                                      NO_NAME_ID_IN_SAML_RESPONSE,
16
                                      NO_SAML_CLIENT,
17
                                      NO_SAML_RESPONSE_FROM_CLIENT,
18
                                      NO_SAML_RESPONSE_FROM_IDP,
19
                                      NO_TOKEN_SPECIFIED,
20
                                      NO_USER_IDENTITY_IN_SAML_RESPONSE,
21
                                      NO_USERNAME_OR_EMAIL_SPECIFIED)
22
from django_saml2_auth.exceptions import SAMLAuthError
14✔
23
from django_saml2_auth.utils import get_reverse, run_hook
14✔
24
from saml2 import BINDING_HTTP_POST, BINDING_HTTP_REDIRECT, entity
14✔
25
from saml2.client import Saml2Client
14✔
26
from saml2.config import Config as Saml2Config
14✔
27
from saml2.httpbase import HTTPBase
14✔
28
from saml2.mdstore import MetaDataExtern
14✔
29
from saml2.response import AuthnResponse
14✔
30

31

32
def get_assertion_url(request: HttpRequest) -> str:
14✔
33
    """Extract protocol and domain name from request, if ASSERTION_URL is not specified in settings,
34
    otherwise the ASSERTION_URL is returned.
35

36
    Args:
37
        request (HttpRequest): Django request object
38

39
    Returns:
40
        str: Either protocol://host or ASSERTION_URL
41
    """
42
    saml2_auth_settings = settings.SAML2_AUTH
14✔
43
    assertion_url = dictor(saml2_auth_settings, "ASSERTION_URL")
14✔
44
    if assertion_url:
14✔
45
        return assertion_url
14✔
46

47
    protocol = "https" if request.is_secure() else "http"
14✔
48
    host = request.get_host()
14✔
49
    return f"{protocol}://{host}"
14✔
50

51

52
def get_default_next_url() -> Optional[str]:
14✔
53
    """Get default next url for redirection, which is either the DEFAULT_NEXT_URL from settings or
54
    admin index.
55

56
    Returns:
57
        Optional[str]: Returns default next url for redirection or admin index
58
    """
59
    saml2_auth_settings = settings.SAML2_AUTH
14✔
60
    default_next_url = dictor(saml2_auth_settings, "DEFAULT_NEXT_URL")
14✔
61
    if default_next_url:
14✔
62
        return default_next_url
14✔
63

64
    # Lazily evaluate this in case we don't have admin loaded.
65
    return get_reverse("admin:index")
14✔
66

67

68
def validate_metadata_url(url: str) -> bool:
14✔
69
    """Validates metadata URL
70

71
    Args:
72
        url (str): Metadata URL
73

74
    Returns:
75
        bool: Wether the metadata URL is valid or not
76
    """
77
    try:
14✔
78
        http_client = HTTPBase()
14✔
79
        metadata = MetaDataExtern(None, url=url, http=http_client)
14✔
80
        metadata.load()
14✔
81
    except Exception:
14✔
82
        return False
14✔
83

84
    return True
14✔
85

86

87
def get_metadata(user_id: Optional[str] = None) -> Mapping[str, Any]:
14✔
88
    """Returns metadata information, either by running the GET_METADATA_AUTO_CONF_URLS hook function
89
    if available, or by checking and returning a local file path or the METADATA_AUTO_CONF_URL. URLs
90
    are always validated and invalid URLs will be either filtered or raise a SAMLAuthError
91
    exception.
92

93
    Args:
94
        user_id (str, optional): If passed, it will be further processed by the
95
            GET_METADATA_AUTO_CONF_URLS trigger, which will return the metadata URL corresponding to
96
            the given user identifier, either email or username. Defaults to None.
97

98
    Raises:
99
        SAMLAuthError: No metadata URL associated with the given user identifier.
100
        SAMLAuthError: Invalid metadata URL.
101

102
    Returns:
103
        Mapping[str, Any]: Returns a SAML metadata object as dictionary
104
    """
105
    saml2_auth_settings = settings.SAML2_AUTH
14✔
106
    get_metadata_trigger = dictor(saml2_auth_settings, "TRIGGER.GET_METADATA_AUTO_CONF_URLS")
14✔
107
    if get_metadata_trigger:
14✔
108
        metadata_urls = run_hook(get_metadata_trigger, user_id)  # type: ignore
14✔
109
        if metadata_urls:
14✔
110
            # Filter invalid metadata URLs
111
            filtered_metadata_urls = list(
14✔
112
                filter(lambda md: validate_metadata_url(md["url"]), metadata_urls))
113
            return {"remote": filtered_metadata_urls}
14✔
114
        else:
115
            raise SAMLAuthError("No metadata URL associated with the given user identifier.",
14✔
116
                                extra={
117
                                    "exc_type": ValueError,
118
                                    "error_code": NO_METADATA_URL_ASSOCIATED,
119
                                    "reason": "There was an error processing your request.",
120
                                    "status_code": 500
121
                                })
122

123
    metadata_local_file_path = dictor(saml2_auth_settings, "METADATA_LOCAL_FILE_PATH")
14✔
124
    if metadata_local_file_path:
14✔
125
        return {"local": [metadata_local_file_path]}
14✔
126
    else:
127
        single_metadata_url = dictor(saml2_auth_settings, "METADATA_AUTO_CONF_URL")
14✔
128
        if validate_metadata_url(single_metadata_url):
14✔
129
            return {"remote": [{"url": single_metadata_url}]}
14✔
130
        else:
131
            raise SAMLAuthError("Invalid metadata URL.", extra={
14✔
132
                "exc_type": ValueError,
133
                "error_code": INVALID_METADATA_URL,
134
                "reason": "There was an error processing your request.",
135
                "status_code": 500
136
            })
137

138

139
def get_saml_client(domain: str,
14✔
140
                    acs: Callable[..., HttpResponse],
141
                    user_id: Optional[str] = None,
142
                    saml_response: Optional[str] = None) -> Optional[Saml2Client]:
143
    """Create a new Saml2Config object with the given config and return an initialized Saml2Client
144
    using the config object. The settings are read from django settings key: SAML2_AUTH.
145

146
    Args:
147
        domain (str): Domain name to get SAML config for
148
        acs (Callable[..., HttpResponse]): The acs endpoint
149
        user_id (str or None): If passed, it will be further processed by the
150
            GET_METADATA_AUTO_CONF_URLS trigger, which will return the metadata URL corresponding
151
            to the given user identifier, either email or username. Defaults to None.
152
        user_id (str or None): User identifier: username or email. Defaults to None.
153
        saml_response (str or None): decoded XML SAML response.
154

155
    Raises:
156
        SAMLAuthError: Re-raise any exception raised by Saml2Config or Saml2Client
157

158
    Returns:
159
        Optional[Saml2Client]: A Saml2Client or None
160
    """
161
    # get_reverse raises an exception if the view is not found, so we can safely ignore type errors
162
    acs_url = domain + get_reverse([acs, "acs", "django_saml2_auth:acs"])  # type: ignore
14✔
163

164
    get_user_id_from_saml_response = dictor(settings.SAML2_AUTH,
14✔
165
                                            "TRIGGER.GET_USER_ID_FROM_SAML_RESPONSE")
166
    if get_user_id_from_saml_response and saml_response:
14✔
167
        user_id = run_hook(get_user_id_from_saml_response, saml_response, user_id)  # type: ignore
×
168

169
    metadata = get_metadata(user_id)
14✔
170
    if (metadata and (
14✔
171
            ("local" in metadata and not metadata["local"]) or
172
            ("remote" in metadata and not metadata["remote"])
173
    )):
174
        raise SAMLAuthError("Metadata URL/file is missing.", extra={
14✔
175
            "exc_type": NoReverseMatch,
176
            "error_code": NO_METADATA_URL_OR_FILE,
177
            "reason": "There was an error processing your request.",
178
            "status_code": 500
179
        })
180

181
    saml2_auth_settings = settings.SAML2_AUTH
14✔
182

183
    saml_settings: Dict[str, Any] = {
14✔
184
        "metadata": metadata,
185
        "allow_unknown_attributes": True,
186
        "debug": saml2_auth_settings.get("DEBUG", False),
187
        "service": {
188
            "sp": {
189
                "endpoints": {
190
                    "assertion_consumer_service": [
191
                        (acs_url, BINDING_HTTP_REDIRECT),
192
                        (acs_url, BINDING_HTTP_POST)
193
                    ],
194
                },
195
                "allow_unsolicited": True,
196
                "authn_requests_signed": dictor(
197
                    saml2_auth_settings, "AUTHN_REQUESTS_SIGNED", default=True),
198
                "logout_requests_signed": dictor(
199
                    saml2_auth_settings, "LOGOUT_REQUESTS_SIGNED", default=True),
200
                "want_assertions_signed": dictor(
201
                    saml2_auth_settings, "WANT_ASSERTIONS_SIGNED", default=True),
202
                "want_response_signed": dictor(
203
                    saml2_auth_settings, "WANT_RESPONSE_SIGNED", default=True),
204
            },
205
        },
206
    }
207

208
    entity_id = saml2_auth_settings.get("ENTITY_ID")
14✔
209
    if entity_id:
14✔
210
        saml_settings["entityid"] = entity_id
14✔
211

212
    name_id_format = saml2_auth_settings.get("NAME_ID_FORMAT")
14✔
213
    if name_id_format:
14✔
214
        saml_settings["service"]["sp"]["name_id_format"] = name_id_format
14✔
215

216
    accepted_time_diff = saml2_auth_settings.get("ACCEPTED_TIME_DIFF")
14✔
217
    if accepted_time_diff:
14✔
218
        saml_settings['accepted_time_diff'] = accepted_time_diff
×
219

220
    key_file = saml2_auth_settings.get("KEY_FILE")
14✔
221
    if key_file:
14✔
222
        saml_settings['key_file'] = key_file
×
223

224
    cert_file = saml2_auth_settings.get("CERT_FILE")
14✔
225
    if cert_file:
14✔
226
        saml_settings['cert_file'] = cert_file
×
227

228
    try:
14✔
229
        sp_config = Saml2Config()
14✔
230
        sp_config.load(saml_settings)
14✔
231
        saml_client = Saml2Client(config=sp_config)
14✔
232
        return saml_client
14✔
233
    except Exception as exc:
14✔
234
        raise SAMLAuthError(str(exc), extra={
14✔
235
            "exc": exc,
236
            "exc_type": type(exc),
237
            "error_code": ERROR_CREATING_SAML_CONFIG_OR_CLIENT,
238
            "reason": "There was an error processing your request.",
239
            "status_code": 500
240
        })
241

242

243
def decode_saml_response(
14✔
244
        request: HttpRequest,
245
        acs: Callable[..., HttpResponse]) -> Union[
246
            HttpResponseRedirect, Optional[AuthnResponse], None]:
247
    """Given a request, the authentication response inside the SAML response body is parsed,
248
    decoded and returned. If there are any issues parsing the request, the identity or the issuer,
249
    an exception is raised.
250

251
    Args:
252
        request (HttpRequest): Django request object from identity provider (IdP)
253
        acs (Callable[..., HttpResponse]): The acs endpoint
254

255
    Raises:
256
        SAMLAuthError: There was no response from SAML client.
257
        SAMLAuthError: There was no response from SAML identity provider.
258
        SAMLAuthError: No name_id in SAML response.
259
        SAMLAuthError: No issuer/entity_id in SAML response.
260
        SAMLAuthError: No user identity in SAML response.
261

262
    Returns:
263
        Union[HttpResponseRedirect, Optional[AuthnResponse], None]: Returns an AuthnResponse
264
            object for extracting user identity from.
265
    """
266
    response = request.POST.get("SAMLResponse") or None
14✔
267
    if not response:
14✔
268
        raise SAMLAuthError("There was no response from SAML client.", extra={
×
269
            "exc_type": ValueError,
270
            "error_code": NO_SAML_RESPONSE_FROM_CLIENT,
271
            "reason": "There was an error processing your request.",
272
            "status_code": 500
273
        })
274

275
    try:
14✔
276
        saml_response = base64.b64decode(response).decode('UTF-8')
14✔
277
    except Exception:
14✔
278
        saml_response = None
14✔
279
    saml_client = get_saml_client(get_assertion_url(request), acs, saml_response=saml_response)
14✔
280
    if not saml_client:
14✔
281
        raise SAMLAuthError("There was an error creating the SAML client.", extra={
×
282
            "exc_type": ValueError,
283
            "error_code": NO_SAML_CLIENT,
284
            "reason": "There was an error processing your request.",
285
            "status_code": 500
286
        })
287

288
    authn_response = saml_client.parse_authn_request_response(response, entity.BINDING_HTTP_POST)
14✔
289
    if not authn_response:
14✔
290
        raise SAMLAuthError("There was no response from SAML identity provider.", extra={
×
291
            "exc_type": ValueError,
292
            "error_code": NO_SAML_RESPONSE_FROM_IDP,
293
            "reason": "There was an error processing your request.",
294
            "status_code": 500
295
        })
296

297
    if not authn_response.name_id:
14✔
298
        raise SAMLAuthError("No name_id in SAML response.", extra={
×
299
            "exc_type": ValueError,
300
            "error_code": NO_NAME_ID_IN_SAML_RESPONSE,
301
            "reason": "There was an error processing your request.",
302
            "status_code": 500
303
        })
304

305
    if not authn_response.issuer():
14✔
306
        raise SAMLAuthError("No issuer/entity_id in SAML response.", extra={
×
307
            "exc_type": ValueError,
308
            "error_code": NO_ISSUER_IN_SAML_RESPONSE,
309
            "reason": "There was an error processing your request.",
310
            "status_code": 500
311
        })
312

313
    if not authn_response.get_identity():
14✔
314
        raise SAMLAuthError("No user identity in SAML response.", extra={
×
315
            "exc_type": ValueError,
316
            "error_code": NO_USER_IDENTITY_IN_SAML_RESPONSE,
317
            "reason": "There was an error processing your request.",
318
            "status_code": 500
319
        })
320

321
    return authn_response
14✔
322

323

324
def extract_user_identity(user_identity: Dict[str, Any]) -> Dict[str, Optional[Any]]:
14✔
325
    """Extract user information from SAML user identity object
326

327
    Args:
328
        user_identity (Dict[str, Any]): SAML user identity object (dict)
329

330
    Raises:
331
        SAMLAuthError: No token specified.
332
        SAMLAuthError: No username or email provided.
333

334
    Returns:
335
        Dict[str, Optional[Any]]: Cleaned user information plus user_identity
336
            for backwards compatibility
337
    """
338
    saml2_auth_settings = settings.SAML2_AUTH
14✔
339

340
    email_field = dictor(
14✔
341
        saml2_auth_settings, "ATTRIBUTES_MAP.email", default="user.email")
342
    username_field = dictor(
14✔
343
        saml2_auth_settings, "ATTRIBUTES_MAP.username", default="user.username")
344
    firstname_field = dictor(
14✔
345
        saml2_auth_settings, "ATTRIBUTES_MAP.first_name", default="user.first_name")
346
    lastname_field = dictor(
14✔
347
        saml2_auth_settings, "ATTRIBUTES_MAP.last_name", default="user.last_name")
348

349
    user = {}
14✔
350
    user["email"] = dictor(user_identity, f"{email_field}/0", pathsep="/")  # Path includes "."
14✔
351
    user["username"] = dictor(user_identity, f"{username_field}/0", pathsep="/")
14✔
352
    user["first_name"] = dictor(user_identity, f"{firstname_field}/0", pathsep="/")
14✔
353
    user["last_name"] = dictor(user_identity, f"{lastname_field}/0", pathsep="/")
14✔
354

355
    token_required = dictor(saml2_auth_settings, "TOKEN_REQUIRED", default=True)
14✔
356
    if token_required:
14✔
357
        token_field = dictor(saml2_auth_settings, "ATTRIBUTES_MAP.token", default="token")
14✔
358
        user["token"] = dictor(user_identity, f"{token_field}.0")
14✔
359

360
    if user["email"]:
14✔
361
        user["email"] = user["email"].lower()
14✔
362
    if user["username"]:
14✔
363
        user["username"] = user["username"].lower()
14✔
364

365
    # For backwards compatibility
366
    user["user_identity"] = user_identity
14✔
367

368
    if not user["email"] and not user["username"]:
14✔
369
        raise SAMLAuthError("No username or email provided.", extra={
×
370
            "exc_type": ValueError,
371
            "error_code": NO_USERNAME_OR_EMAIL_SPECIFIED,
372
            "reason": "Username or email must be configured on the SAML app before logging in.",
373
            "status_code": 422
374
        })
375

376
    if token_required and not user.get("token"):
14✔
377
        raise SAMLAuthError("No token specified.", extra={
×
378
            "exc_type": ValueError,
379
            "error_code": NO_TOKEN_SPECIFIED,
380
            "reason": "Token must be configured on the SAML app before logging in.",
381
            "status_code": 422
382
        })
383

384
    return user
14✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc