• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uc-cdis / fence / 25459641641

06 May 2026 08:35PM UTC coverage: 75.077% (-0.001%) from 75.078%
25459641641

push

github

PlanXCyborg
Merge https://github.com/uc-cdis/fence into stable

8489 of 11307 relevant lines covered (75.08%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.04
fence/auth.py
1
import re
1✔
2
import urllib.request, urllib.parse, urllib.error
1✔
3
from datetime import datetime
1✔
4
from functools import wraps
1✔
5
from json import JSONDecodeError
1✔
6

7
import backoff
1✔
8
import flask
1✔
9
from flask import current_app
1✔
10
from authutils.errors import JWTError, JWTExpiredError
1✔
11
from authutils.token.validate import (
1✔
12
    current_token,
13
    require_auth_header,
14
    set_current_token,
15
    validate_request,
16
)
17
from cdislogging import get_logger
1✔
18
import requests
1✔
19

20
from fence.authz.auth import check_arborist_auth
1✔
21
from fence.config import config
1✔
22
from fence.errors import Unauthorized, InternalError
1✔
23
from fence.jwt.validate import validate_jwt
1✔
24
from fence.models import User, IdentityProvider, query_for_user
1✔
25
from fence.user import get_current_user
1✔
26
from fence.utils import (
1✔
27
    clear_cookies,
28
    DEFAULT_BACKOFF_SETTINGS,
29
    allowed_login_redirects,
30
    domain,
31
)
32

33
logger = get_logger(__name__)
1✔
34

35

36
def get_jwt():
1✔
37
    """
38
    Return the user's JWT from authorization header. Requires flask application context.
39
    Raises:
40
        - Unauthorized, if header is missing or not in the correct format
41
    """
42
    header = flask.request.headers.get("Authorization")
1✔
43
    if not header:
1✔
44
        raise Unauthorized("missing authorization header")
1✔
45
    try:
1✔
46
        bearer, token = header.split(" ")
1✔
47
    except ValueError:
×
48
        msg = "authorization header not in expected format"
×
49
        logger.debug(f"{msg}. Received header: {header}")
×
50
        logger.error(f"{msg}.")
×
51
        raise Unauthorized(msg)
×
52
    if bearer.lower() != "bearer":
1✔
53
        raise Unauthorized("expected bearer token in auth header")
×
54
    return token
1✔
55

56

57
def build_redirect_url(hostname, path):
1✔
58
    """
59
    Compute a redirect given a hostname and next path where
60
    Args:
61
        hostname (str): may be empty string or a bare hostname or
62
               a hostname with a protocal attached (https?://...)
63
        path (int): is a path to attach to hostname
64
    Return:
65
        string url suitable for flask.redirect
66
    """
67
    redirect_base = hostname
1✔
68
    # BASE_URL may be empty or a bare hostname or a hostname with a protocol
69
    if bool(redirect_base) and not redirect_base.startswith("http"):
1✔
70
        redirect_base = "https://" + redirect_base
1✔
71
    return redirect_base + path
1✔
72

73

74
def get_ip_information_string():
1✔
75
    """
76
    Returns a string containing the client's IP address and any X-Forwarded headers.
77

78
    Returns:
79
        str: A formatted string containing the client's IP address and X-Forwarded headers.
80
    """
81
    x_forwarded_headers = [
1✔
82
        f"{header}: {value}"
83
        for header, value in flask.request.headers
84
        if "X-Forwarded" in header
85
    ]
86
    return f"flask.request.remote_addr={flask.request.remote_addr} x_forwarded_headers={x_forwarded_headers}"
1✔
87

88

89
def _identify_user_and_update_database(
1✔
90
    user,
91
    username,
92
    provider,
93
    email=None,
94
    id_from_idp=None,
95
    username_deny_regex=None,
96
) -> bool:
97
    """
98
    Create a new user if one doesn't already exist in the database. Commit the user
99
    and associated idp information to the database.
100

101
    Args:
102
        user (User): user to be logged in, if it already exists, None otherwise
103
        username (str): specific username of user to be logged in
104
        provider (str): specfic idp of user to be logged in
105
        email (str, optional): email of user (may or may not match username depending
106
            on the IdP)
107
        id_from_idp (str, optional): id from the IDP (which may be different than
108
            the username)
109

110
    Return:
111
        User: the created or updated user
112
    """
113
    username_deny_regex = username_deny_regex or config["GLOBAL_USERNAME_DENY_REGEX"]
1✔
114
    if username_deny_regex:
1✔
115
        if re.search(pattern=username_deny_regex, string=username):
1✔
116
            logger.info(
1✔
117
                f"Blocked login of user with username {username} due to deny regex: {username_deny_regex}"
118
            )
119

120
            # intentionally empty message to prevent information leakage
121
            raise Unauthorized(message="")
1✔
122

123
    if user:
1✔
124
        if user.active == False:
1✔
125
            # Abort login if user.active == False:
126
            raise Unauthorized(
1✔
127
                "User is known but not authorized/activated in the system"
128
            )
129
        _update_users_email(user, email)
1✔
130
        _update_users_id_from_idp(user, id_from_idp)
1✔
131
        _update_users_last_auth(user)
1✔
132
    else:
133
        if not config["ALLOW_NEW_USER_ON_LOGIN"]:
1✔
134
            # do not create new active users automatically
135
            raise Unauthorized("New user is not yet authorized/activated in the system")
1✔
136

137
        # add the new user
138
        user = User(username=username)
1✔
139

140
        if email:
1✔
141
            user.email = email
1✔
142

143
        if id_from_idp:
1✔
144
            user.id_from_idp = id_from_idp
1✔
145
            # TODO: update iss_sub mapping table?
146

147
    # This expression is relevant to those users who already have user and
148
    # idp info persisted to the database. We avoid unnecessarily re-saving
149
    # that user and idp info.
150
    if not user.identity_provider or not user.identity_provider.name == provider:
1✔
151
        # setup idp connection for new user (or existing user w/o it setup)
152
        idp = (
1✔
153
            current_app.scoped_session()
154
            .query(IdentityProvider)
155
            .filter(IdentityProvider.name == provider)
156
            .first()
157
        )
158
        if not idp:
1✔
159
            idp = IdentityProvider(name=provider)
1✔
160

161
        user.identity_provider = idp
1✔
162
        current_app.scoped_session().add(user)
1✔
163
        current_app.scoped_session().commit()
1✔
164

165
    # `login_in_progress_username` stored for use by the user registration code.
166
    # not using `flask.session["username"]` because other code relies on it to know
167
    # whether a user is logged in; in this case the user isn't logged in yet.
168
    flask.session["login_in_progress_username"] = user.username
1✔
169

170
    flask.g.user = user
1✔
171
    return user
1✔
172

173

174
def _is_user_registration_required_before_login(user, provider) -> bool:
1✔
175
    auto_registration_enabled = (
1✔
176
        config["OPENID_CONNECT"]
177
        .get(provider, {})
178
        .get("enable_idp_users_registration", False)
179
    )
180
    # Registration is required if:
181
    # - Registration is enabled in the config, AND
182
    # - Automatic registration is NOT enabled, AND
183
    # - The user's registration info is empty
184
    return (
1✔
185
        config["REGISTER_USERS_ON"]
186
        and not auto_registration_enabled
187
        and user.additional_info.get("registration_info", {}) == {}
188
    )
189

190

191
def login_user_or_require_registration(
1✔
192
    username, provider, upstream_idp=None, shib_idp=None, email=None, id_from_idp=None
193
) -> bool:
194
    """
195
    Check if a user needs to go through the registration flow before being logged in. If not,
196
    login the user with the given username and provider. Set values in Flask session to indicate
197
    the user being logged in.
198

199
    Args:
200
        username (str): specific username of user to be logged in
201
        provider (str): specfic idp of user to be logged in
202
        upstream_idp (str, optional): upstream fence IdP
203
        shib_idp (str, optional): upstream shibboleth IdP
204
        email (str, optional): email of user (may or may not match username depending
205
            on the IdP)
206
        id_from_idp (str, optional): id from the IDP (which may be different than
207
            the username)
208

209
    Return:
210
        bool: whether the user has been logged in (if registration is enabled and the user is not
211
            registered, this would be False)
212
    """
213

214
    def log_ip(user):
1✔
215
        ip_info = get_ip_information_string()
1✔
216
        logger.info(
1✔
217
            f"User logged in. user.id={user.id} user.username={user.username} {ip_info}"
218
        )
219

220
    def set_flask_session_values(user):
1✔
221
        """
222
        Helper fuction to set user values in the session.
223

224
        Args:
225
            user (User): User object
226
        """
227
        flask.session["username"] = user.username
1✔
228
        flask.session["user_id"] = str(user.id)
1✔
229
        flask.session["provider"] = user.identity_provider.name
1✔
230
        if upstream_idp:
1✔
231
            flask.session["upstream_idp"] = upstream_idp
×
232
        if shib_idp:
1✔
233
            flask.session["shib_idp"] = shib_idp
×
234
        flask.g.user = user
1✔
235
        flask.g.scopes = ["_all"]
1✔
236
        flask.g.token = None
1✔
237

238
    user = query_for_user(session=current_app.scoped_session(), username=username)
1✔
239
    user = _identify_user_and_update_database(
1✔
240
        user, username, provider, email, id_from_idp
241
    )
242
    log_user_in = not _is_user_registration_required_before_login(user, provider)
1✔
243
    if log_user_in:
1✔
244
        set_flask_session_values(user)
1✔
245
        log_ip(user)
1✔
246
    return log_user_in
1✔
247

248

249
@backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS)
1✔
250
def get_openid_config_for_idp(open_id_connect):
1✔
251
    """
252
    Return openid configuration for a given provider.
253
    Args:
254
        open_id_connect (dict): fence config for idp
255
    Returns:
256
        response: response of openid configuration
257
    """
258
    well_known_url = open_id_connect["discovery_url"]
1✔
259
    well_known_resp = requests.get(well_known_url)
1✔
260
    well_known_resp.raise_for_status()
1✔
261
    return well_known_resp
1✔
262

263

264
def logout(next_url, force_era_global_logout=False):
1✔
265
    """
266
    Return a redirect which another logout from IDP or the provided redirect.
267
    Depending on the IDP, this logout will propogate. For example, if using
268
    another fence as an IDP, this will hit that fence's logout endpoint.
269
    Args:
270
        next_url (str): Final redirect desired after logout
271
    """
272
    logger.debug("IN AUTH LOGOUT, next_url = {0}".format(next_url))
1✔
273

274
    # propogate logout to IDP
275
    provider_logout = None
1✔
276
    provider = flask.session.get("provider")
1✔
277

278
    if force_era_global_logout or provider == IdentityProvider.itrust:
1✔
279
        safe_url = urllib.parse.quote_plus(next_url)
1✔
280
        provider_logout = config["ITRUST_GLOBAL_LOGOUT"] + safe_url
1✔
281
    elif provider == IdentityProvider.fence:
1✔
282
        base = config["OPENID_CONNECT"]["fence"]["api_base_url"]
1✔
283
        provider_logout = base + "/logout?" + urllib.parse.urlencode({"next": next_url})
1✔
284
    elif provider == "cognito":
1✔
285
        idp_openid_connect = config["OPENID_CONNECT"]["cognito"]
1✔
286
        well_known = None
1✔
287
        try:
1✔
288
            well_known_resp = get_openid_config_for_idp(idp_openid_connect)
1✔
289
            well_known = well_known_resp.json()
1✔
290
        except requests.exceptions.HTTPError as e:
1✔
291
            logger.error(
1✔
292
                f"Well-known endpoint returned an error status after multiple retries, Cognito Session not invalidated, Logging out of Gen3. Error: {e}"
293
            )
294
        except requests.exceptions.ConnectionError as e:
1✔
295
            logger.error(
1✔
296
                f"Could not connect to well-known endpoint, Cognito Session not invalidated, Logging out of Gen3. Error: {e} "
297
            )
298
        except JSONDecodeError as e:
×
299
            logger.error(
×
300
                f"Invalid JSON resonse from well-known, Cognito Session not invalidated, Logging out of Gen3. Error: {e}"
301
            )
302
        except Exception as e:
×
303
            logger.error(
×
304
                f"Error occured trying to get well-known, Cognito Session not invalidated, Logging out from Gen3. Error: {e}"
305
            )
306
        if well_known:
1✔
307
            end_session_endpoint = well_known.get("end_session_endpoint")
1✔
308
            # NOTE: discovery url for cognito is different than the cognito api domain url. Check the domain for the APIs like end_session_endpoint or authorization_endpoint found in the well-know openid config
309
            if domain(end_session_endpoint) not in allowed_login_redirects():
1✔
310
                logger.error(
1✔
311
                    f"Logout url {end_session_endpoint} not in LOGIN_REDIRECT_WHITELIST config. Cognito Session not invalidated, Logging out from Gen3."
312
                )
313
            else:
314
                if end_session_endpoint:
1✔
315
                    provider_logout = (
1✔
316
                        end_session_endpoint
317
                        + "?"
318
                        + urllib.parse.urlencode(
319
                            {
320
                                "client_id": idp_openid_connect["client_id"],
321
                                "logout_uri": next_url,  # NOTE: This needs to be set up in the cognito console for an allowed sign-out url
322
                            }
323
                        )
324
                    )
325
                else:
326
                    logger.error(
×
327
                        "end_session_endpoint not found in well-known config. Cognito Session not invalidated. Logging out from Gen3"
328
                    )
329

330
    flask.session.clear()
1✔
331
    try:
1✔
332
        redirect_response = flask.make_response(
1✔
333
            flask.redirect(provider_logout or urllib.parse.unquote(next_url))
334
        )
335
    except Exception as e:
×
336
        logger.error(f"Error logging out: {e}")
×
337
    clear_cookies(redirect_response)
1✔
338
    return redirect_response
1✔
339

340

341
def check_scope(scope):
1✔
342
    def wrapper(f):
×
343
        @wraps(f)
×
344
        def check_scope_and_call(*args, **kwargs):
×
345
            if "_all" in flask.g.scopes or scope in flask.g.scopes:
×
346
                return f(*args, **kwargs)
×
347
            else:
348
                raise Unauthorized(
×
349
                    "Requested scope {} can't access this endpoint".format(scope)
350
                )
351

352
        return check_scope_and_call
×
353

354
    return wrapper
×
355

356

357
def login_required(scope=None):
1✔
358
    """
359
    Create decorator to require a user session
360
    """
361

362
    def decorator(f):
1✔
363
        @wraps(f)
1✔
364
        def wrapper(*args, **kwargs):
1✔
365
            if flask.session.get("username"):
1✔
366
                is_logged_in = login_user_or_require_registration(
1✔
367
                    flask.session["username"], flask.session["provider"]
368
                )
369
                if not is_logged_in:
1✔
370
                    raise Unauthorized("Please register to login")
×
371
                return f(*args, **kwargs)
1✔
372

373
            eppn = None
1✔
374
            if config["LOGIN_OPTIONS"]:
1✔
375
                enable_shib = "shibboleth" in [
1✔
376
                    option["idp"] for option in config["LOGIN_OPTIONS"]
377
                ]
378
            else:
379
                # fall back on "providers"
380
                enable_shib = "shibboleth" in (
×
381
                    config.get("ENABLED_IDENTITY_PROVIDERS") or {}
382
                ).get("providers", {})
383

384
            if enable_shib and "SHIBBOLETH_HEADER" in config:
1✔
385
                eppn = flask.request.headers.get(config["SHIBBOLETH_HEADER"])
1✔
386

387
            if config.get("MOCK_AUTH") is True:
1✔
388
                eppn = "test"
1✔
389
            # if there is authorization header for oauth
390
            if "Authorization" in flask.request.headers:
1✔
391
                has_oauth(scope=scope)
1✔
392
                return f(*args, **kwargs)
1✔
393
            # if there is shibboleth session, then create user session and
394
            # log user in
395
            elif eppn:
1✔
396
                username = eppn.split("!")[-1]
1✔
397
                flask.session["username"] = username
1✔
398
                flask.session["provider"] = IdentityProvider.itrust
1✔
399
                is_logged_in = login_user_or_require_registration(
1✔
400
                    username, flask.session["provider"]
401
                )
402
                if not is_logged_in:
1✔
403
                    raise Unauthorized("Please register to login")
×
404
                return f(*args, **kwargs)
1✔
405
            else:
406
                raise Unauthorized("Please login")
1✔
407

408
        return wrapper
1✔
409

410
    return decorator
1✔
411

412

413
def has_oauth(scope=None):
1✔
414
    scope = scope or set()
1✔
415
    scope.update({"openid"})
1✔
416
    try:
1✔
417
        access_token_claims = validate_jwt(
1✔
418
            scope=scope,
419
            purpose="access",
420
        )
421
    except JWTError as e:
1✔
422
        raise Unauthorized("failed to validate token: {}".format(e))
×
423
    if "sub" in access_token_claims:
1✔
424
        user_id = access_token_claims["sub"]
1✔
425
        user = (
1✔
426
            current_app.scoped_session().query(User).filter_by(id=int(user_id)).first()
427
        )
428
        if not user:
1✔
429
            raise Unauthorized("no user found with id: {}".format(user_id))
×
430
        # set some application context for current user
431
        flask.g.user = user
1✔
432
    # set some application context for current client id
433
    # client_id should be None if the field doesn't exist or is empty
434
    flask.g.client_id = access_token_claims.get("azp") or None
1✔
435
    flask.g.token = access_token_claims
1✔
436

437

438
def get_user_from_claims(claims):
1✔
439
    return (
1✔
440
        current_app.scoped_session()
441
        .query(User)
442
        .filter(User.id == claims["sub"])
443
        .first()
444
    )
445

446

447
def admin_login_required(function):
1✔
448
    """Use the check_arborist_auth decorator checking on admin authorization."""
449
    return check_arborist_auth(["/services/fence/admin"], "*")(function)
1✔
450

451

452
def _update_users_email(user, email):
1✔
453
    """
454
    Update email if provided and doesn't match db entry.
455
    """
456
    if email and user.email != email:
1✔
457
        logger.info(
1✔
458
            f"Updating username {user.username}'s email from {user.email} to {email}"
459
        )
460
        user.email = email
1✔
461

462
        current_app.scoped_session().add(user)
1✔
463
        current_app.scoped_session().commit()
1✔
464

465

466
def _update_users_id_from_idp(user, id_from_idp):
1✔
467
    """
468
    Update id_from_idp if provided and doesn't match db entry.
469
    """
470
    if id_from_idp and user.id_from_idp != id_from_idp:
1✔
471
        logger.info(
1✔
472
            f"Updating username {user.username}'s id_from_idp from {user.id_from_idp} to {id_from_idp}"
473
        )
474
        user.id_from_idp = id_from_idp
1✔
475

476
        current_app.scoped_session().add(user)
1✔
477
        current_app.scoped_session().commit()
1✔
478

479

480
def _update_users_last_auth(user):
1✔
481
    """
482
    Update _last_auth.
483
    """
484
    logger.info(f"Updating username {user.username}'s _last_auth.")
1✔
485
    user._last_auth = datetime.now()
1✔
486

487
    current_app.scoped_session().add(user)
1✔
488
    current_app.scoped_session().commit()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc