• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uc-cdis / fence / 10119489915

27 Jul 2024 12:53AM UTC coverage: 75.109% (-0.03%) from 75.141%
10119489915

push

github

k-burt-uch
feat(PXP-11366): Remove passport from userinfo

7740 of 10305 relevant lines covered (75.11%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.26
fence/resources/user/__init__.py
1
from email.mime.application import MIMEApplication
1✔
2
from email.mime.multipart import MIMEMultipart
1✔
3
from email.mime.text import MIMEText
1✔
4
from email.utils import COMMASPACE, formatdate
1✔
5
import uuid
1✔
6
from fence.jwt.token import issued_and_expiration_times
1✔
7

8
import flask
1✔
9
import jwt
1✔
10
import smtplib
1✔
11
import time
1✔
12
from authlib.common.encoding import to_unicode
1✔
13
from cdislogging import get_logger
1✔
14
from gen3authz.client.arborist.errors import ArboristError
1✔
15

16
from fence.resources import userdatamodel as udm
1✔
17
from fence.resources.google.utils import (
1✔
18
    get_linked_google_account_email,
19
    get_linked_google_account_exp,
20
    get_service_account,
21
)
22
from fence.resources.userdatamodel import get_user_groups
1✔
23

24
from fence.config import config
1✔
25
from fence.errors import NotFound, Unauthorized, UserError, InternalError
1✔
26
from fence.jwt.utils import get_jwt_header
1✔
27
from fence.models import query_for_user
1✔
28

29

30
logger = get_logger(__name__)
1✔
31

32

33
def update_user_resource(username, resource):
1✔
34
    with flask.current_app.db.session as session:
×
35
        user = find_user(username, session)
×
36
        if not user.application:
×
37
            raise UserError("User haven't started the application")
×
38
        resources = set(user.application.resources_granted or [])
×
39
        resources.add(resource)
×
40
        user.application.resources_granted = list(resources)
×
41
        if "EMAIL_SERVER" in config:
×
42
            content = "You have been granted {} resources in Bionimbus Cloud.".format(
×
43
                ", ".join(resources)
44
            )
45
            send_mail(
×
46
                config["SEND_FROM"],
47
                [user.email],
48
                "Account update from Bionimbus Cloud",
49
                text=content,
50
                server=config["EMAIL_SERVER"],
51
            )
52
        return get_user_info(user, session)
×
53

54

55
def find_user(username, session):
1✔
56
    user = query_for_user(session=session, username=username)
×
57
    if not user:
×
58
        raise NotFound("user {} not found".format(username))
×
59
    return user
×
60

61

62
def get_user(current_session, username):
1✔
63
    user = udm.get_user(current_session, username)
1✔
64
    if not user:
1✔
65
        raise NotFound("user {} not found".format(username))
1✔
66
    return user
1✔
67

68

69
def get_current_user_info():
1✔
70
    with flask.current_app.db.session as session:
1✔
71
        return get_user_info(session, session.merge(flask.g.user).username)
1✔
72

73

74
def get_user_info(current_session, username):
1✔
75
    user = get_user(current_session, username)
1✔
76
    if user.is_admin:
1✔
77
        role = "admin"
1✔
78
    else:
79
        role = "user"
1✔
80

81
    groups = udm.get_user_groups(current_session, username)["groups"]
1✔
82
    info = {
1✔
83
        "user_id": user.id,  # TODO deprecated, use 'sub'
84
        "sub": str(user.id),
85
        # getattr b/c the identity_provider sqlalchemy relationship could not exists (be None)
86
        "idp": getattr(user.identity_provider, "name", ""),
87
        "username": user.username,  # TODO deprecated, use 'name'
88
        "name": user.username,
89
        "display_name": user.display_name,  # TODO deprecated, use 'preferred_username'
90
        "preferred_username": user.display_name,
91
        "phone_number": user.phone_number,
92
        "email": user.email,
93
        "is_admin": user.is_admin,
94
        "role": role,
95
        "project_access": dict(user.project_access),
96
        "certificates_uploaded": [],
97
        "resources_granted": [],
98
        "groups": groups,
99
        "message": "",
100
    }
101

102
    if "fence_idp" in flask.session:
1✔
103
        info["fence_idp"] = flask.session["fence_idp"]
×
104
    if "shib_idp" in flask.session:
1✔
105
        info["shib_idp"] = flask.session["shib_idp"]
×
106

107
    # User SAs are stored in db with client_id = None
108
    primary_service_account = (
1✔
109
        get_service_account(client_id=None, user_id=user.id, username=user.username)
110
        or {}
111
    )
112
    primary_service_account_email = getattr(primary_service_account, "email", None)
1✔
113
    info["primary_google_service_account"] = primary_service_account_email
1✔
114

115
    if hasattr(flask.current_app, "arborist"):
1✔
116
        try:
1✔
117
            auth_mapping = flask.current_app.arborist.auth_mapping(user.username)
1✔
118
            resources = list(auth_mapping.keys())
1✔
119
        except ArboristError as exc:
×
120
            logger.error(
×
121
                f"request to arborist for user's resources failed; going to list empty. Error: {exc}"
122
            )
123
            resources = []
×
124
            auth_mapping = {}
×
125
        info["resources"] = resources
1✔
126
        info["authz"] = auth_mapping
1✔
127

128
    if user.tags is not None and len(user.tags) > 0:
1✔
129
        info["tags"] = {tag.key: tag.value for tag in user.tags}
×
130

131
    if user.application:
1✔
132
        info["resources_granted"] = user.application.resources_granted
×
133
        info["certificates_uploaded"] = [
×
134
            c.name for c in user.application.certificates_uploaded
135
        ]
136
        info["message"] = user.application.message
×
137

138
    if flask.request.get_json(force=True, silent=True):
1✔
139
        requested_userinfo_claims = (
1✔
140
            flask.request.get_json(force=True).get("claims", {}).get("userinfo", {})
141
        )
142
        optional_info = _get_optional_userinfo(user, requested_userinfo_claims)
1✔
143
        info.update(optional_info)
1✔
144

145
    return info
1✔
146

147

148
def generate_encoded_gen3_passport(user, expires_in):
1✔
149
    """
150
    Generate fresh gen3 passports with Gen3 Visas
151
    NOTE: This function isn't used yet. Originally made it to repackage RAS visas into a Gen3 Passport but
152
    due to RAS policies we aren't allowed to do that anymore.
153
    Still keeping it here so that we could repurpose this later when we need to package our own Gen3 Visas.
154
    """
155

156
    keypair = flask.current_app.keypairs[0]
×
157

158
    headers = {
×
159
        "typ": "JWT",
160
        "alg": "RS256",
161
        "kid": keypair.kid,
162
    }
163

164
    issuer = config.get("BASE_URL")
×
165
    iat, exp = issued_and_expiration_times(expires_in)
×
166

167
    payload = {
×
168
        "iss": issuer,
169
        "sub": user.id,
170
        "iat": iat,
171
        "exp": exp,
172
        "jti": str(uuid.uuid4()),
173
        "scope": "openid <ga4gh-spec-scopes>",
174
        "ga4gh_passport_v1": [],
175
    }
176

177
    logger.info("Issuing JWT Gen3 Passport")
×
178
    passport = jwt.encode(
×
179
        payload, keypair.private_key, headers=headers, algorithm="RS256"
180
    )
181
    passport = to_unicode(passport, "UTF-8")
×
182

183
    return passport
×
184

185

186
def _get_optional_userinfo(user, claims):
1✔
187
    info = {}
1✔
188
    for claim in claims:
1✔
189
        if claim == "linked_google_account":
1✔
190
            google_email = get_linked_google_account_email(user.id)
1✔
191
            info["linked_google_account"] = google_email
1✔
192
        if claim == "linked_google_account_exp":
1✔
193
            google_account_exp = get_linked_google_account_exp(user.id)
×
194
            info["linked_google_account_exp"] = google_account_exp
×
195

196
    return info
1✔
197

198

199
def send_mail(send_from, send_to, subject, text, server, certificates=None):
1✔
200
    assert isinstance(send_to, list)
×
201
    msg = MIMEMultipart(
×
202
        From=send_from, To=COMMASPACE.join(send_to), Date=formatdate(localtime=True)
203
    )
204
    msg["Subject"] = subject
×
205
    msg.attach(MIMEText(text))
×
206

207
    for cert in certificates or []:
×
208
        application = MIMEApplication(cert.data, cert.extension)
×
209
        application.add_header(
×
210
            "Content-Disposition", 'attachment; filename="{}"'.format(cert.filename)
211
        )
212
        application.set_param("name", cert.filename)
×
213
        msg.attach(application)
×
214
    smtp = smtplib.SMTP(server)
×
215
    smtp.sendmail(send_from, send_to, msg.as_string())
×
216
    smtp.close()
×
217

218

219
def get_user_accesses(current_session):
1✔
220
    user = udm.get_user_accesses()
×
221
    if not user:
×
222
        raise InternalError("Error: %s user does not exist" % flask.g.user.username)
×
223
    return user
×
224

225

226
def remove_user_from_project(current_session, user, project):
1✔
227
    access = udm.get_user_project_access_privilege(current_session, user, project)
1✔
228
    if access:
1✔
229
        current_session.delete(access)
1✔
230
    else:
231
        raise NotFound(
×
232
            "Project {0} not connected to user {1}".format(project.name, user.username)
233
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc