• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grafana / django-saml2-auth / 8831429999

25 Apr 2024 11:04AM UTC coverage: 90.335% (-0.1%) from 90.434%
8831429999

Pull #286

github

web-flow
Merge 978c4397b into d2a5ef92b
Pull Request #286: Revamp tooling

95 of 120 new or added lines in 7 files covered. (79.17%)

4 existing lines in 3 files now uncovered.

916 of 1014 relevant lines covered (90.34%)

6.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.43
/django_saml2_auth/saml.py
1
"""Utility functions for various SAML client functions."""
7✔
2

3
import base64
7✔
4
from typing import Any, Callable, Dict, Mapping, Optional, Union
7✔
5

6
from dictor import dictor  # type: ignore
7✔
7
from django.conf import settings
7✔
8
from django.http import HttpRequest, HttpResponse, HttpResponseRedirect
7✔
9
from django.urls import NoReverseMatch
7✔
10
from django_saml2_auth.errors import (
7✔
11
    ERROR_CREATING_SAML_CONFIG_OR_CLIENT,
12
    INVALID_METADATA_URL,
13
    NO_ISSUER_IN_SAML_RESPONSE,
14
    NO_METADATA_URL_ASSOCIATED,
15
    NO_METADATA_URL_OR_FILE,
16
    NO_NAME_ID_IN_SAML_RESPONSE,
17
    NO_SAML_CLIENT,
18
    NO_SAML_RESPONSE_FROM_CLIENT,
19
    NO_SAML_RESPONSE_FROM_IDP,
20
    NO_TOKEN_SPECIFIED,
21
    NO_USER_IDENTITY_IN_SAML_RESPONSE,
22
    NO_USERNAME_OR_EMAIL_SPECIFIED,
23
)
24
from django_saml2_auth.exceptions import SAMLAuthError
7✔
25
from django_saml2_auth.utils import get_reverse, run_hook
7✔
26
from saml2 import BINDING_HTTP_POST, BINDING_HTTP_REDIRECT, entity
7✔
27
from saml2.client import Saml2Client
7✔
28
from saml2.config import Config as Saml2Config
7✔
29
from saml2.httpbase import HTTPBase
7✔
30
from saml2.mdstore import MetaDataExtern
7✔
31
from saml2.response import AuthnResponse
7✔
32

33

34
def get_assertion_url(request: HttpRequest) -> str:
7✔
35
    """Extract protocol and domain name from request, if ASSERTION_URL is not specified in settings,
36
    otherwise the ASSERTION_URL is returned.
37

38
    Args:
39
        request (HttpRequest): Django request object
40

41
    Returns:
42
        str: Either protocol://host or ASSERTION_URL
43
    """
44
    saml2_auth_settings = settings.SAML2_AUTH
7✔
45
    assertion_url = dictor(saml2_auth_settings, "ASSERTION_URL")
7✔
46
    if assertion_url:
7✔
47
        return assertion_url
7✔
48

49
    protocol = "https" if request.is_secure() else "http"
7✔
50
    host = request.get_host()
7✔
51
    return f"{protocol}://{host}"
7✔
52

53

54
def get_default_next_url() -> Optional[str]:
7✔
55
    """Get default next url for redirection, which is either the DEFAULT_NEXT_URL from settings or
56
    admin index.
57

58
    Returns:
59
        Optional[str]: Returns default next url for redirection or admin index
60
    """
61
    saml2_auth_settings = settings.SAML2_AUTH
7✔
62
    default_next_url = dictor(saml2_auth_settings, "DEFAULT_NEXT_URL")
7✔
63
    if default_next_url:
7✔
64
        return default_next_url
7✔
65

66
    # Lazily evaluate this in case we don't have admin loaded.
67
    return get_reverse("admin:index")
7✔
68

69

70
def validate_metadata_url(url: str) -> bool:
7✔
71
    """Validates metadata URL
72

73
    Args:
74
        url (str): Metadata URL
75

76
    Returns:
77
        bool: Wether the metadata URL is valid or not
78
    """
79
    try:
7✔
80
        http_client = HTTPBase()
7✔
81
        metadata = MetaDataExtern(None, url=url, http=http_client)
7✔
82
        metadata.load()
7✔
83
    except Exception:
7✔
84
        return False
7✔
85

86
    return True
7✔
87

88

89
def get_metadata(user_id: Optional[str] = None) -> Mapping[str, Any]:
7✔
90
    """Returns metadata information, either by running the GET_METADATA_AUTO_CONF_URLS hook function
91
    if available, or by checking and returning a local file path or the METADATA_AUTO_CONF_URL. URLs
92
    are always validated and invalid URLs will be either filtered or raise a SAMLAuthError
93
    exception.
94

95
    Args:
96
        user_id (str, optional): If passed, it will be further processed by the
97
            GET_METADATA_AUTO_CONF_URLS trigger, which will return the metadata URL corresponding to
98
            the given user identifier, either email or username. Defaults to None.
99

100
    Raises:
101
        SAMLAuthError: No metadata URL associated with the given user identifier.
102
        SAMLAuthError: Invalid metadata URL.
103

104
    Returns:
105
        Mapping[str, Any]: Returns a SAML metadata object as dictionary
106
    """
107
    saml2_auth_settings = settings.SAML2_AUTH
7✔
108
    get_metadata_trigger = dictor(saml2_auth_settings, "TRIGGER.GET_METADATA_AUTO_CONF_URLS")
7✔
109
    if get_metadata_trigger:
7✔
110
        metadata_urls = run_hook(get_metadata_trigger, user_id)  # type: ignore
7✔
111
        if metadata_urls:
7✔
112
            # Filter invalid metadata URLs
113
            filtered_metadata_urls = list(
7✔
114
                filter(lambda md: validate_metadata_url(md["url"]), metadata_urls)
115
            )
116
            return {"remote": filtered_metadata_urls}
7✔
117
        else:
118
            raise SAMLAuthError(
7✔
119
                "No metadata URL associated with the given user identifier.",
120
                extra={
121
                    "exc_type": ValueError,
122
                    "error_code": NO_METADATA_URL_ASSOCIATED,
123
                    "reason": "There was an error processing your request.",
124
                    "status_code": 500,
125
                },
126
            )
127

128
    metadata_local_file_path = dictor(saml2_auth_settings, "METADATA_LOCAL_FILE_PATH")
7✔
129
    if metadata_local_file_path:
7✔
130
        return {"local": [metadata_local_file_path]}
7✔
131
    else:
132
        single_metadata_url = dictor(saml2_auth_settings, "METADATA_AUTO_CONF_URL")
7✔
133
        if validate_metadata_url(single_metadata_url):
7✔
134
            return {"remote": [{"url": single_metadata_url}]}
7✔
135
        else:
136
            raise SAMLAuthError(
7✔
137
                "Invalid metadata URL.",
138
                extra={
139
                    "exc_type": ValueError,
140
                    "error_code": INVALID_METADATA_URL,
141
                    "reason": "There was an error processing your request.",
142
                    "status_code": 500,
143
                },
144
            )
145

146

147
def get_saml_client(
7✔
148
    domain: str,
149
    acs: Callable[..., HttpResponse],
150
    user_id: Optional[str] = None,
151
    saml_response: Optional[str] = None,
152
) -> Optional[Saml2Client]:
153
    """Create a new Saml2Config object with the given config and return an initialized Saml2Client
154
    using the config object. The settings are read from django settings key: SAML2_AUTH.
155

156
    Args:
157
        domain (str): Domain name to get SAML config for
158
        acs (Callable[..., HttpResponse]): The acs endpoint
159
        user_id (str or None): If passed, it will be further processed by the
160
            GET_METADATA_AUTO_CONF_URLS trigger, which will return the metadata URL corresponding
161
            to the given user identifier, either email or username. Defaults to None.
162
        user_id (str or None): User identifier: username or email. Defaults to None.
163
        saml_response (str or None): decoded XML SAML response.
164

165
    Raises:
166
        SAMLAuthError: Re-raise any exception raised by Saml2Config or Saml2Client
167

168
    Returns:
169
        Optional[Saml2Client]: A Saml2Client or None
170
    """
171
    # get_reverse raises an exception if the view is not found, so we can safely ignore type errors
172
    acs_url = domain + get_reverse([acs, "acs", "django_saml2_auth:acs"])  # type: ignore
7✔
173

174
    get_user_id_from_saml_response = dictor(
7✔
175
        settings.SAML2_AUTH, "TRIGGER.GET_USER_ID_FROM_SAML_RESPONSE"
176
    )
177
    if get_user_id_from_saml_response and saml_response:
7✔
178
        user_id = run_hook(get_user_id_from_saml_response, saml_response, user_id)  # type: ignore
×
179

180
    metadata = get_metadata(user_id)
7✔
181
    if metadata and (
7✔
182
        ("local" in metadata and not metadata["local"])
183
        or ("remote" in metadata and not metadata["remote"])
184
    ):
185
        raise SAMLAuthError(
7✔
186
            "Metadata URL/file is missing.",
187
            extra={
188
                "exc_type": NoReverseMatch,
189
                "error_code": NO_METADATA_URL_OR_FILE,
190
                "reason": "There was an error processing your request.",
191
                "status_code": 500,
192
            },
193
        )
194

195
    saml2_auth_settings = settings.SAML2_AUTH
7✔
196

197
    saml_settings: Dict[str, Any] = {
7✔
198
        "metadata": metadata,
199
        "allow_unknown_attributes": True,
200
        "debug": saml2_auth_settings.get("DEBUG", False),
201
        "service": {
202
            "sp": {
203
                "endpoints": {
204
                    "assertion_consumer_service": [
205
                        (acs_url, BINDING_HTTP_REDIRECT),
206
                        (acs_url, BINDING_HTTP_POST),
207
                    ],
208
                },
209
                "allow_unsolicited": True,
210
                "authn_requests_signed": dictor(
211
                    saml2_auth_settings, "AUTHN_REQUESTS_SIGNED", default=True
212
                ),
213
                "logout_requests_signed": dictor(
214
                    saml2_auth_settings, "LOGOUT_REQUESTS_SIGNED", default=True
215
                ),
216
                "want_assertions_signed": dictor(
217
                    saml2_auth_settings, "WANT_ASSERTIONS_SIGNED", default=True
218
                ),
219
                "want_response_signed": dictor(
220
                    saml2_auth_settings, "WANT_RESPONSE_SIGNED", default=True
221
                ),
222
            },
223
        },
224
    }
225

226
    entity_id = saml2_auth_settings.get("ENTITY_ID")
7✔
227
    if entity_id:
7✔
228
        saml_settings["entityid"] = entity_id
7✔
229

230
    name_id_format = saml2_auth_settings.get("NAME_ID_FORMAT")
7✔
231
    if name_id_format:
7✔
232
        saml_settings["service"]["sp"]["name_id_format"] = name_id_format
7✔
233

234
    accepted_time_diff = saml2_auth_settings.get("ACCEPTED_TIME_DIFF")
7✔
235
    if accepted_time_diff:
7✔
NEW
236
        saml_settings["accepted_time_diff"] = accepted_time_diff
×
237

238
    # Enable logging with a custom logger. See below for more details:
239
    # https://pysaml2.readthedocs.io/en/latest/howto/config.html?highlight=debug#logging
240
    logging = saml2_auth_settings.get("LOGGING")
7✔
241
    if logging:
7✔
242
        saml_settings["logging"] = logging
×
243

244
    key_file = saml2_auth_settings.get("KEY_FILE")
7✔
245
    if key_file:
7✔
246
        saml_settings["key_file"] = key_file
7✔
247

248
    cert_file = saml2_auth_settings.get("CERT_FILE")
7✔
249
    if cert_file:
7✔
250
        saml_settings["cert_file"] = cert_file
7✔
251

252
    encryption_keypairs = saml2_auth_settings.get("ENCRYPTION_KEYPAIRS")
7✔
253
    if encryption_keypairs:
7✔
254
        saml_settings["encryption_keypairs"] = encryption_keypairs
×
255
    elif key_file and cert_file:
7✔
256
        saml_settings["encryption_keypairs"] = [
7✔
257
            {
258
                "key_file": key_file,
259
                "cert_file": cert_file,
260
            }
261
        ]
262

263
    try:
7✔
264
        sp_config = Saml2Config()
7✔
265
        sp_config.load(saml_settings)
7✔
266
        saml_client = Saml2Client(config=sp_config)
7✔
267
        return saml_client
7✔
268
    except Exception as exc:
7✔
269
        raise SAMLAuthError(
7✔
270
            str(exc),
271
            extra={
272
                "exc": exc,
273
                "exc_type": type(exc),
274
                "error_code": ERROR_CREATING_SAML_CONFIG_OR_CLIENT,
275
                "reason": "There was an error processing your request.",
276
                "status_code": 500,
277
            },
278
        )
279

280

281
def decode_saml_response(
7✔
282
    request: HttpRequest, acs: Callable[..., HttpResponse]
283
) -> Union[HttpResponseRedirect, Optional[AuthnResponse], None]:
284
    """Given a request, the authentication response inside the SAML response body is parsed,
285
    decoded and returned. If there are any issues parsing the request, the identity or the issuer,
286
    an exception is raised.
287

288
    Args:
289
        request (HttpRequest): Django request object from identity provider (IdP)
290
        acs (Callable[..., HttpResponse]): The acs endpoint
291

292
    Raises:
293
        SAMLAuthError: There was no response from SAML client.
294
        SAMLAuthError: There was no response from SAML identity provider.
295
        SAMLAuthError: No name_id in SAML response.
296
        SAMLAuthError: No issuer/entity_id in SAML response.
297
        SAMLAuthError: No user identity in SAML response.
298

299
    Returns:
300
        Union[HttpResponseRedirect, Optional[AuthnResponse], None]: Returns an AuthnResponse
301
            object for extracting user identity from.
302
    """
303
    response = request.POST.get("SAMLResponse") or None
7✔
304
    if not response:
7✔
NEW
305
        raise SAMLAuthError(
×
306
            "There was no response from SAML client.",
307
            extra={
308
                "exc_type": ValueError,
309
                "error_code": NO_SAML_RESPONSE_FROM_CLIENT,
310
                "reason": "There was an error processing your request.",
311
                "status_code": 500,
312
            },
313
        )
314

315
    try:
7✔
316
        saml_response = base64.b64decode(response).decode("UTF-8")
7✔
317
    except Exception:
7✔
318
        saml_response = None
7✔
319
    saml_client = get_saml_client(get_assertion_url(request), acs, saml_response=saml_response)
7✔
320
    if not saml_client:
7✔
NEW
321
        raise SAMLAuthError(
×
322
            "There was an error creating the SAML client.",
323
            extra={
324
                "exc_type": ValueError,
325
                "error_code": NO_SAML_CLIENT,
326
                "reason": "There was an error processing your request.",
327
                "status_code": 500,
328
            },
329
        )
330

331
    authn_response = saml_client.parse_authn_request_response(response, entity.BINDING_HTTP_POST)
7✔
332
    if not authn_response:
7✔
NEW
333
        raise SAMLAuthError(
×
334
            "There was no response from SAML identity provider.",
335
            extra={
336
                "exc_type": ValueError,
337
                "error_code": NO_SAML_RESPONSE_FROM_IDP,
338
                "reason": "There was an error processing your request.",
339
                "status_code": 500,
340
            },
341
        )
342

343
    if not authn_response.name_id:
7✔
NEW
344
        raise SAMLAuthError(
×
345
            "No name_id in SAML response.",
346
            extra={
347
                "exc_type": ValueError,
348
                "error_code": NO_NAME_ID_IN_SAML_RESPONSE,
349
                "reason": "There was an error processing your request.",
350
                "status_code": 500,
351
            },
352
        )
353

354
    if not authn_response.issuer():
7✔
NEW
355
        raise SAMLAuthError(
×
356
            "No issuer/entity_id in SAML response.",
357
            extra={
358
                "exc_type": ValueError,
359
                "error_code": NO_ISSUER_IN_SAML_RESPONSE,
360
                "reason": "There was an error processing your request.",
361
                "status_code": 500,
362
            },
363
        )
364

365
    if not authn_response.get_identity():
7✔
NEW
366
        raise SAMLAuthError(
×
367
            "No user identity in SAML response.",
368
            extra={
369
                "exc_type": ValueError,
370
                "error_code": NO_USER_IDENTITY_IN_SAML_RESPONSE,
371
                "reason": "There was an error processing your request.",
372
                "status_code": 500,
373
            },
374
        )
375

376
    return authn_response
7✔
377

378

379
def extract_user_identity(user_identity: Dict[str, Any]) -> Dict[str, Optional[Any]]:
7✔
380
    """Extract user information from SAML user identity object
381

382
    Args:
383
        user_identity (Dict[str, Any]): SAML user identity object (dict)
384

385
    Raises:
386
        SAMLAuthError: No token specified.
387
        SAMLAuthError: No username or email provided.
388

389
    Returns:
390
        Dict[str, Optional[Any]]: Cleaned user information plus user_identity
391
            for backwards compatibility
392
    """
393
    saml2_auth_settings = settings.SAML2_AUTH
7✔
394

395
    email_field = dictor(saml2_auth_settings, "ATTRIBUTES_MAP.email", default="user.email")
7✔
396
    username_field = dictor(saml2_auth_settings, "ATTRIBUTES_MAP.username", default="user.username")
7✔
397
    firstname_field = dictor(
7✔
398
        saml2_auth_settings, "ATTRIBUTES_MAP.first_name", default="user.first_name"
399
    )
400
    lastname_field = dictor(
7✔
401
        saml2_auth_settings, "ATTRIBUTES_MAP.last_name", default="user.last_name"
402
    )
403

404
    user = {}
7✔
405
    user["email"] = dictor(user_identity, f"{email_field}/0", pathsep="/")  # Path includes "."
7✔
406
    user["username"] = dictor(user_identity, f"{username_field}/0", pathsep="/")
7✔
407
    user["first_name"] = dictor(user_identity, f"{firstname_field}/0", pathsep="/")
7✔
408
    user["last_name"] = dictor(user_identity, f"{lastname_field}/0", pathsep="/")
7✔
409

410
    token_required = dictor(saml2_auth_settings, "TOKEN_REQUIRED", default=True)
7✔
411
    if token_required:
7✔
412
        token_field = dictor(saml2_auth_settings, "ATTRIBUTES_MAP.token", default="token")
7✔
413
        user["token"] = dictor(user_identity, f"{token_field}.0")
7✔
414

415
    if user["email"]:
7✔
416
        user["email"] = user["email"].lower()
7✔
417
    if user["username"]:
7✔
418
        user["username"] = user["username"].lower()
7✔
419

420
    # For backwards compatibility
421
    user["user_identity"] = user_identity
7✔
422

423
    if not user["email"] and not user["username"]:
7✔
NEW
424
        raise SAMLAuthError(
×
425
            "No username or email provided.",
426
            extra={
427
                "exc_type": ValueError,
428
                "error_code": NO_USERNAME_OR_EMAIL_SPECIFIED,
429
                "reason": "Username or email must be configured on the SAML app before logging in.",
430
                "status_code": 422,
431
            },
432
        )
433

434
    if token_required and not user.get("token"):
7✔
NEW
435
        raise SAMLAuthError(
×
436
            "No token specified.",
437
            extra={
438
                "exc_type": ValueError,
439
                "error_code": NO_TOKEN_SPECIFIED,
440
                "reason": "Token must be configured on the SAML app before logging in.",
441
                "status_code": 422,
442
            },
443
        )
444

445
    return user
7✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc