• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grafana / django-saml2-auth / 10156212284

30 Jul 2024 04:36AM UTC coverage: 90.335%. Remained the same
10156212284

Pull #337

github

web-flow
Merge 546d01d96 into d8af16360
Pull Request #337: Bump setuptools from 69.5.1 to 72.1.0

916 of 1014 relevant lines covered (90.34%)

6.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.06
/django_saml2_auth/user.py
1
"""Utility functions for getting or creating user accounts"""
7✔
2

3
from datetime import datetime, timedelta, timezone
7✔
4
from typing import Any, Dict, Optional, Tuple, Union
7✔
5

6
import jwt
7✔
7
from cryptography.hazmat.primitives import serialization
7✔
8
from dictor import dictor  # type: ignore
7✔
9
from django.conf import settings
7✔
10
from django.contrib.auth import get_user_model
7✔
11
from django.contrib.auth.models import Group, User
7✔
12
from django_saml2_auth.errors import (
7✔
13
    CANNOT_DECODE_JWT_TOKEN,
14
    CREATE_USER_ERROR,
15
    GROUP_JOIN_ERROR,
16
    INVALID_JWT_ALGORITHM,
17
    NO_JWT_ALGORITHM,
18
    NO_JWT_PRIVATE_KEY,
19
    NO_JWT_PUBLIC_KEY,
20
    NO_JWT_SECRET,
21
    NO_USER_ID,
22
    SHOULD_NOT_CREATE_USER,
23
)
24
from django_saml2_auth.exceptions import SAMLAuthError
7✔
25
from django_saml2_auth.utils import run_hook
7✔
26
from jwt.algorithms import get_default_algorithms, has_crypto, requires_cryptography
7✔
27
from jwt.exceptions import PyJWTError
7✔
28

29

30
def create_new_user(
7✔
31
    email: str, first_name: Optional[str] = None, last_name: Optional[str] = None, **kwargs
32
) -> User:
33
    """Create a new user with the given information
34

35
    Args:
36
        email (str): Email
37
        first_name (str): First name
38
        last_name (str): Last name
39

40
    Keyword Args:
41
        **kwargs: Additional keyword arguments
42

43
    Raises:
44
        SAMLAuthError: There was an error creating the new user.
45
        SAMLAuthError: There was an error joining the user to the group.
46

47
    Returns:
48
        User: Returns a new user object, usually a subclass of the the User model
49
    """
50
    saml2_auth_settings = settings.SAML2_AUTH
7✔
51
    user_model = get_user_model()
7✔
52

53
    is_active = dictor(saml2_auth_settings, "NEW_USER_PROFILE.ACTIVE_STATUS", default=True)
7✔
54
    is_staff = dictor(saml2_auth_settings, "NEW_USER_PROFILE.STAFF_STATUS", default=False)
7✔
55
    is_superuser = dictor(saml2_auth_settings, "NEW_USER_PROFILE.SUPERUSER_STATUS", default=False)
7✔
56
    user_groups = dictor(saml2_auth_settings, "NEW_USER_PROFILE.USER_GROUPS", default=[])
7✔
57

58
    if first_name and last_name:
7✔
59
        kwargs["first_name"] = first_name
7✔
60
        kwargs["last_name"] = last_name
7✔
61

62
    try:
7✔
63
        user = user_model.objects.create_user(email, **kwargs)
7✔
64
        user.is_active = is_active
7✔
65
        user.is_staff = is_staff
7✔
66
        user.is_superuser = is_superuser
7✔
67
        user.save()
7✔
68
    except Exception as exc:
7✔
69
        raise SAMLAuthError(
7✔
70
            "There was an error creating the new user.",
71
            extra={
72
                "exc": exc,
73
                "exc_type": type(exc),
74
                "error_code": CREATE_USER_ERROR,
75
                "reason": "There was an error processing your request.",
76
                "status_code": 500,
77
            },
78
        )
79

80
    try:
7✔
81
        groups = [Group.objects.get(name=group) for group in user_groups]
7✔
82
        if groups:
7✔
83
            user.groups.set(groups)
7✔
84
    except Exception as exc:
7✔
85
        raise SAMLAuthError(
7✔
86
            "There was an error joining the user to the group.",
87
            extra={
88
                "exc": exc,
89
                "exc_type": type(exc),
90
                "error_code": GROUP_JOIN_ERROR,
91
                "reason": "There was an error processing your request.",
92
                "status_code": 500,
93
            },
94
        )
95

96
    user.save()
7✔
97
    user.refresh_from_db()
7✔
98

99
    return user
7✔
100

101

102
def get_or_create_user(user: Dict[str, Any]) -> Tuple[bool, User]:
7✔
103
    """Get or create a new user and optionally add it to one or more group(s)
104

105
    Args:
106
        user (Dict[str, Any]): User information
107

108
    Raises:
109
        SAMLAuthError: Cannot create user. Missing user_id.
110
        SAMLAuthError: Cannot create user.
111

112
    Returns:
113
        Tuple[bool, User]: A tuple containing user creation status and user object
114
    """
115
    saml2_auth_settings = settings.SAML2_AUTH
7✔
116
    user_model = get_user_model()
7✔
117
    created = False
7✔
118

119
    try:
7✔
120
        target_user = get_user(user)
7✔
121
    except user_model.DoesNotExist:
7✔
122
        should_create_new_user = dictor(saml2_auth_settings, "CREATE_USER", True)
7✔
123
        if should_create_new_user:
7✔
124
            user_id = get_user_id(user)
7✔
125
            if not user_id:
7✔
126
                raise SAMLAuthError(
×
127
                    "Cannot create user. Missing user_id.",
128
                    extra={
129
                        "error_code": SHOULD_NOT_CREATE_USER,
130
                        "reason": "Cannot create user. Missing user_id.",
131
                        "status_code": 400,
132
                    },
133
                )
134
            target_user = create_new_user(user_id, user["first_name"], user["last_name"])
7✔
135

136
            create_user_trigger = dictor(saml2_auth_settings, "TRIGGER.CREATE_USER")
7✔
137
            if create_user_trigger:
7✔
138
                run_hook(create_user_trigger, user)  # type: ignore
7✔
139

140
            target_user.refresh_from_db()
7✔
141
            created = True
7✔
142
        else:
143
            raise SAMLAuthError(
7✔
144
                "Cannot create user.",
145
                extra={
146
                    "exc_type": Exception,
147
                    "error_code": SHOULD_NOT_CREATE_USER,
148
                    "reason": "Due to current config, a new user should not be created.",
149
                    "status_code": 500,
150
                },
151
            )
152

153
    # Optionally update this user's group assignments by updating group memberships from SAML groups
154
    # to Django equivalents
155
    group_attribute = dictor(saml2_auth_settings, "ATTRIBUTES_MAP.groups")
7✔
156
    group_map = dictor(saml2_auth_settings, "GROUPS_MAP")
7✔
157

158
    if group_attribute and group_attribute in user["user_identity"]:
7✔
159
        groups = []
7✔
160

161
        for group_name in user["user_identity"][group_attribute]:
7✔
162
            # Group names can optionally be mapped to different names in Django
163
            if group_map and group_name in group_map:
7✔
164
                group_name_django = group_map[group_name]
7✔
165
            else:
166
                group_name_django = group_name
7✔
167

168
            try:
7✔
169
                groups.append(Group.objects.get(name=group_name_django))
7✔
170
            except Group.DoesNotExist:
7✔
171
                should_create_new_groups = dictor(saml2_auth_settings, "CREATE_GROUPS", False)
7✔
172
                if should_create_new_groups:
7✔
173
                    groups.append(Group.objects.create(name=group_name_django))
7✔
174

175
        target_user.groups.set(groups)
7✔
176

177
    return (created, target_user)
7✔
178

179

180
def get_user_id(user: Union[str, Dict[str, Any]]) -> Optional[str]:
7✔
181
    """Get user_id (username or email) from user object
182

183
    Args:
184
        user (Union[str, Dict[str, Any]]): A cleaned user info object
185

186
    Returns:
187
        Optional[str]: user_id, which is either email or username
188
    """
189
    user_model = get_user_model()
7✔
190
    user_id = None
7✔
191

192
    if isinstance(user, dict):
7✔
193
        user_id = user["email"] if user_model.USERNAME_FIELD == "email" else user["username"]
7✔
194

195
    if isinstance(user, str):
7✔
196
        user_id = user
7✔
197

198
    return user_id.lower() if user_id else None
7✔
199

200

201
def get_user(user: Union[str, Dict[str, str]]) -> User:
7✔
202
    """Get user from database given a cleaned user info object or a user_id
203

204
    Args:
205
        user (Union[str, Dict[str, str]]): Either a user_id (as str) or a cleaned user info object
206

207
    Returns:
208
        User: An instance of the User model
209
    """
210
    saml2_auth_settings = settings.SAML2_AUTH
7✔
211
    get_user_custom_method = dictor(saml2_auth_settings, "TRIGGER.GET_USER")
7✔
212

213
    user_model = get_user_model()
7✔
214
    if get_user_custom_method:
7✔
215
        found_user = run_hook(get_user_custom_method, user)  # type: ignore
7✔
216
        if not found_user:
7✔
217
            raise user_model.DoesNotExist
×
218
        else:
219
            return found_user
7✔
220

221
    user_id = get_user_id(user)
7✔
222

223
    # Should email be case-sensitive or not. Default is False (case-insensitive).
224
    login_case_sensitive = dictor(saml2_auth_settings, "LOGIN_CASE_SENSITIVE", False)
7✔
225
    id_field = (
7✔
226
        user_model.USERNAME_FIELD
227
        if login_case_sensitive
228
        else f"{user_model.USERNAME_FIELD}__iexact"
229
    )
230
    return user_model.objects.get(**{id_field: user_id})
7✔
231

232

233
def validate_jwt_algorithm(jwt_algorithm: str) -> None:
7✔
234
    """Validate JWT algorithm
235

236
    Args:
237
        jwt_algorithm (str): JWT algorithm
238

239
    Raises:
240
        SAMLAuthError: Cannot encode/decode JWT token. Specify an algorithm.
241
        SAMLAuthError: Cannot encode/decode JWT token. Specify a valid algorithm.
242
    """
243
    if not jwt_algorithm:
7✔
244
        raise SAMLAuthError(
7✔
245
            "Cannot encode/decode JWT token. Specify an algorithm.",
246
            extra={
247
                "exc_type": Exception,
248
                "error_code": NO_JWT_ALGORITHM,
249
                "reason": "Cannot create JWT token for login.",
250
                "status_code": 500,
251
            },
252
        )
253

254
    if jwt_algorithm not in list(get_default_algorithms()):
7✔
255
        raise SAMLAuthError(
×
256
            "Cannot encode/decode JWT token. Specify a valid algorithm.",
257
            extra={
258
                "exc_type": Exception,
259
                "error_code": INVALID_JWT_ALGORITHM,
260
                "reason": "Cannot encode/decode JWT token for login.",
261
                "status_code": 500,
262
            },
263
        )
264

265

266
def validate_secret(jwt_algorithm: str, jwt_secret: str) -> None:
7✔
267
    """Validate symmetric encryption key
268

269
    Args:
270
        jwt_algorithm (str): JWT algorithm
271
        jwt_secret (str): JWT secret
272

273
    Raises:
274
        SAMLAuthError: Cannot encode/decode JWT token. Specify a secret.
275
    """
276
    if jwt_algorithm not in requires_cryptography and not jwt_secret:
7✔
277
        raise SAMLAuthError(
7✔
278
            "Cannot encode/decode JWT token. Specify a secret.",
279
            extra={
280
                "exc_type": Exception,
281
                "error_code": NO_JWT_SECRET,
282
                "reason": "Cannot encode/decode JWT token for login.",
283
                "status_code": 500,
284
            },
285
        )
286

287

288
def validate_private_key(jwt_algorithm: str, jwt_private_key: str) -> None:
7✔
289
    """Validate private key
290

291
    Args:
292
        jwt_algorithm (str): JWT algorithm
293
        jwt_private_key (str): JWT private key
294

295
    Raises:
296
        SAMLAuthError: Cannot encode/decode JWT token. Specify a private key.
297
    """
298
    if (jwt_algorithm in requires_cryptography and has_crypto) and not jwt_private_key:
7✔
299
        raise SAMLAuthError(
7✔
300
            "Cannot encode/decode JWT token. Specify a private key.",
301
            extra={
302
                "exc_type": Exception,
303
                "error_code": NO_JWT_PRIVATE_KEY,
304
                "reason": "Cannot encode/decode JWT token for login.",
305
                "status_code": 500,
306
            },
307
        )
308

309

310
def validate_public_key(jwt_algorithm: str, jwt_public_key: str) -> None:
7✔
311
    """Validate public key
312

313
    Args:
314
        jwt_algorithm (str): JWT algorithm
315
        jwt_public_key (str): JWT public key
316

317
    Raises:
318
        SAMLAuthError: Cannot encode/decode JWT token. Specify a public key.
319
    """
320
    if (jwt_algorithm in requires_cryptography and has_crypto) and not jwt_public_key:
7✔
321
        raise SAMLAuthError(
7✔
322
            "Cannot encode/decode JWT token. Specify a public key.",
323
            extra={
324
                "exc_type": Exception,
325
                "error_code": NO_JWT_PUBLIC_KEY,
326
                "reason": "Cannot encode/decode JWT token for login.",
327
                "status_code": 500,
328
            },
329
        )
330

331

332
def create_jwt_token(user_id: str) -> Optional[str]:
7✔
333
    """Create a new JWT token
334

335
    Args:
336
        user_id (str): User's username or email based on User.USERNAME_FIELD
337

338
    Returns:
339
        Optional[str]: JWT token
340
    """
341
    saml2_auth_settings = settings.SAML2_AUTH
7✔
342
    user_model = get_user_model()
7✔
343

344
    jwt_algorithm = dictor(saml2_auth_settings, "JWT_ALGORITHM")
7✔
345
    validate_jwt_algorithm(jwt_algorithm)
7✔
346

347
    jwt_secret = dictor(saml2_auth_settings, "JWT_SECRET")
7✔
348
    validate_secret(jwt_algorithm, jwt_secret)
7✔
349

350
    jwt_private_key = dictor(saml2_auth_settings, "JWT_PRIVATE_KEY")
7✔
351
    validate_private_key(jwt_algorithm, jwt_private_key)
7✔
352

353
    jwt_private_key_passphrase = dictor(saml2_auth_settings, "JWT_PRIVATE_KEY_PASSPHRASE")
7✔
354
    jwt_expiration = dictor(saml2_auth_settings, "JWT_EXP", 60)  # default: 1 minute
7✔
355

356
    payload = {
7✔
357
        user_model.USERNAME_FIELD: user_id,
358
        "exp": (datetime.now(tz=timezone.utc) + timedelta(seconds=jwt_expiration)).timestamp(),
359
    }
360

361
    # If a passphrase is specified, we need to use a PEM-encoded private key
362
    # to decrypt the private key in order to encode the JWT token.
363
    if jwt_private_key_passphrase:
7✔
364
        if isinstance(jwt_private_key, str):
7✔
365
            jwt_private_key = jwt_private_key.encode()
7✔
366
        if isinstance(jwt_private_key_passphrase, str):
7✔
367
            jwt_private_key_passphrase = jwt_private_key_passphrase.encode()
7✔
368

369
        # load_pem_private_key requires data and password to be in bytes
370
        jwt_private_key = serialization.load_pem_private_key(
7✔
371
            data=jwt_private_key, password=jwt_private_key_passphrase
372
        )
373

374
    secret = (
7✔
375
        jwt_secret
376
        if (jwt_secret and jwt_algorithm not in requires_cryptography)
377
        else jwt_private_key
378
    )
379

380
    return jwt.encode(payload, secret, algorithm=jwt_algorithm)
7✔
381

382

383
def create_custom_or_default_jwt(user: Union[str, User]):
7✔
384
    """Create a new JWT token, eventually using custom trigger
385

386
    Args:
387
        user (Union[str, User]): User instance or User's username or email
388
            based on User.USERNAME_FIELD
389

390
    Raises:
391
        SAMLAuthError: Cannot create JWT token. Specify a user.
392

393
    Returns:
394
        Optional[str]: JWT token
395
    """
396
    saml2_auth_settings = settings.SAML2_AUTH
7✔
397
    user_model = get_user_model()
7✔
398

399
    custom_create_jwt_trigger = dictor(saml2_auth_settings, "TRIGGER.CUSTOM_CREATE_JWT")
7✔
400

401
    # If user is the id (user_model.USERNAME_FIELD), set it as user_id
402
    user_id: Optional[str] = None
7✔
403
    if isinstance(user, str):
7✔
404
        user_id = user
7✔
405

406
    # Check if there is a custom trigger for creating the JWT and URL query
407
    if custom_create_jwt_trigger:
7✔
408
        target_user = user
×
409
        # If user is user_id, get user instance
410
        if user_id:
×
411
            user_model = get_user_model()
×
412
            _user = {user_model.USERNAME_FIELD: user_id}
×
413
            target_user = get_user(_user)
×
414
        jwt_token = run_hook(custom_create_jwt_trigger, target_user)  # type: ignore
×
415
    else:
416
        # If user_id is not set, retrieve it from user instance
417
        if not user_id:
7✔
418
            user_id = getattr(user, user_model.USERNAME_FIELD)
×
419
        # Create a new JWT token with PyJWT
420
        if not user_id:
7✔
421
            raise SAMLAuthError(
×
422
                "Cannot create JWT token. Specify a user.",
423
                extra={
424
                    "exc_type": Exception,
425
                    "error_code": NO_USER_ID,
426
                    "reason": "Cannot create JWT token for login.",
427
                    "status_code": 500,
428
                },
429
            )
430
        jwt_token = create_jwt_token(user_id)
7✔
431

432
    return jwt_token
7✔
433

434

435
def decode_jwt_token(jwt_token: str) -> Optional[str]:
7✔
436
    """Decode a JWT token
437

438
    Args:
439
        jwt_token (str): The token to decode
440

441
    Raises:
442
        SAMLAuthError: Cannot decode JWT token.
443

444
    Returns:
445
        Optional[str]: A user_id as str or None.
446
    """
447
    saml2_auth_settings = settings.SAML2_AUTH
7✔
448

449
    jwt_algorithm = dictor(saml2_auth_settings, "JWT_ALGORITHM")
7✔
450
    validate_jwt_algorithm(jwt_algorithm)
7✔
451

452
    jwt_secret = dictor(saml2_auth_settings, "JWT_SECRET")
7✔
453
    validate_secret(jwt_algorithm, jwt_secret)
7✔
454

455
    jwt_public_key = dictor(saml2_auth_settings, "JWT_PUBLIC_KEY")
7✔
456
    validate_public_key(jwt_algorithm, jwt_public_key)
7✔
457

458
    secret = (
7✔
459
        jwt_secret
460
        if (jwt_secret and jwt_algorithm not in requires_cryptography)
461
        else jwt_public_key
462
    )
463

464
    try:
7✔
465
        data = jwt.decode(jwt_token, secret, algorithms=jwt_algorithm)
7✔
466
        user_model = get_user_model()
7✔
467
        return data[user_model.USERNAME_FIELD]
7✔
468
    except PyJWTError as exc:
7✔
469
        raise SAMLAuthError(
7✔
470
            "Cannot decode JWT token.",
471
            extra={
472
                "exc": exc,
473
                "exc_type": type(exc),
474
                "error_code": CANNOT_DECODE_JWT_TOKEN,
475
                "reason": "Cannot decode JWT token.",
476
                "status_code": 500,
477
            },
478
        )
479

480

481
def decode_custom_or_default_jwt(jwt_token: str) -> Optional[str]:
7✔
482
    """Decode a JWT token, eventually using custom trigger
483

484
    Args:
485
        jwt_token (str): The token to decode
486

487
    Raises:
488
        SAMLAuthError: Cannot decode JWT token.
489

490
    Returns:
491
        Optional[str]: A user_id as str or None.
492
    """
493
    saml2_auth_settings = settings.SAML2_AUTH
7✔
494
    custom_decode_jwt_trigger = dictor(saml2_auth_settings, "TRIGGER.CUSTOM_DECODE_JWT")
7✔
495
    if custom_decode_jwt_trigger:
7✔
496
        user_id = run_hook(custom_decode_jwt_trigger, jwt_token)  # type: ignore
×
497
    else:
498
        user_id = decode_jwt_token(jwt_token)
7✔
499
    return user_id
7✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc