• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uc-cdis / fence / 13726114786

07 Mar 2025 05:35PM UTC coverage: 75.427% (+0.2%) from 75.268%
13726114786

Pull #1209

github

AlbertSnows
move send_email
Pull Request #1209: move backoff settings as well as other functions out of utils

7855 of 10414 relevant lines covered (75.43%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.59
fence/utils.py
1
import bcrypt
1✔
2
import collections
1✔
3
from functools import wraps
1✔
4
import logging
1✔
5
import json
1✔
6
from random import SystemRandom
1✔
7
import re
1✔
8
import string
1✔
9
from urllib.parse import urlencode
1✔
10
from urllib.parse import parse_qs, urlsplit, urlunsplit
1✔
11
import sys
1✔
12

13
from cdislogging import get_logger
1✔
14
import flask
1✔
15

16
from fence.errors import UserError
1✔
17
from authlib.oauth2.rfc6749.util import scope_to_list
1✔
18
from authlib.oauth2.rfc6749.errors import InvalidScopeError
1✔
19

20
rng = SystemRandom()
1✔
21
alphanumeric = string.ascii_uppercase + string.ascii_lowercase + string.digits
1✔
22
logger = get_logger(__name__)
1✔
23

24

25
def random_str(length):
1✔
26
    return "".join(rng.choice(alphanumeric) for _ in range(length))
1✔
27

28

29
def json_res(data):
1✔
30
    return flask.Response(json.dumps(data), mimetype="application/json")
×
31

32

33
def generate_client_credentials(confidential):
1✔
34
    """
35
    Generate a new client ID. If the client is confidential, also generate a new client secret.
36
    The unhashed secret should be returned to the user and the hashed secret should be stored
37
    in the database for later use.
38

39
    Args:
40
        confidential (bool): true if the client is confidential, false if it is public
41

42
    Returns:
43
        tuple: (client ID, unhashed client secret or None, hashed client secret or None)
44
    """
45
    client_id = random_str(40)
1✔
46
    client_secret = None
1✔
47
    hashed_secret = None
1✔
48
    if confidential:
1✔
49
        client_secret = random_str(55)
1✔
50
        hashed_secret = bcrypt.hashpw(
1✔
51
            client_secret.encode("utf-8"), bcrypt.gensalt()
52
        ).decode("utf-8")
53
    return client_id, client_secret, hashed_secret
1✔
54

55

56
def wrap_list_required(f):
1✔
57
    @wraps(f)
1✔
58
    def wrapper(d, *args, **kwargs):
1✔
59
        data_is_a_list = False
×
60
        if isinstance(d, list):
×
61
            d = {"data": d}
×
62
            data_is_a_list = True
×
63
        if not data_is_a_list:
×
64
            return f(d, *args, **kwargs)
×
65
        else:
66
            result = f(d, *args, **kwargs)
×
67
            return result["data"]
×
68

69
    return wrapper
1✔
70

71

72
@wrap_list_required
1✔
73
def convert_key(d, converter):
1✔
74
    if isinstance(d, str) or not isinstance(d, collections.Iterable):
×
75
        return d
×
76

77
    new = {}
×
78
    for k, v in d.items():
×
79
        new_v = v
×
80
        if isinstance(v, dict):
×
81
            new_v = convert_key(v, converter)
×
82
        elif isinstance(v, list):
×
83
            new_v = list()
×
84
            for x in v:
×
85
                new_v.append(convert_key(x, converter))
×
86
        new[converter(k)] = new_v
×
87
    return new
×
88

89

90
@wrap_list_required
1✔
91
def convert_value(d, converter):
1✔
92
    if isinstance(d, str) or not isinstance(d, collections.Iterable):
×
93
        return converter(d)
×
94

95
    new = {}
×
96
    for k, v in d.items():
×
97
        new_v = v
×
98
        if isinstance(v, dict):
×
99
            new_v = convert_value(v, converter)
×
100
        elif isinstance(v, list):
×
101
            new_v = list()
×
102
            for x in v:
×
103
                new_v.append(convert_value(x, converter))
×
104
        new[k] = converter(new_v)
×
105
    return new
×
106

107

108
def to_underscore(s):
1✔
109
    s1 = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", s)
×
110
    return re.sub("([a-z0-9])([A-Z])", r"\1_\2", s1).lower()
×
111

112

113
def strip(s):
1✔
114
    if isinstance(s, str):
×
115
        return s.strip()
×
116
    return s
×
117

118

119
def clear_cookies(response):
1✔
120
    """
121
    Set all cookies to empty and expired.
122
    """
123
    for cookie_name in list(flask.request.cookies.keys()):
1✔
124
        response.set_cookie(key=cookie_name, value="", expires=0, httponly=True)
1✔
125

126

127
def get_error_params(error, description):
1✔
128
    params = ""
×
129
    if error:
×
130
        args = {"error": error, "error_description": description}
×
131
        params = urlencode(args)
×
132
    return params
×
133

134

135
def append_query_params(original_url, **kwargs):
1✔
136
    """
137
    Add additional query string arguments to the given url.
138

139
    Example call:
140
        new_url = append_query_params(
141
            original_url, error='this is an error',
142
            another_arg='this is another argument')
143
    """
144
    scheme, netloc, path, query_string, fragment = urlsplit(original_url)
1✔
145
    query_params = parse_qs(query_string)
1✔
146
    if kwargs is not None:
1✔
147
        for key, value in kwargs.items():
1✔
148
            query_params[key] = [value]
1✔
149

150
    new_query_string = urlencode(query_params, doseq=True)
1✔
151
    new_url = urlunsplit((scheme, netloc, path, new_query_string, fragment))
1✔
152
    return new_url
1✔
153

154

155
def split_url_and_query_params(url):
1✔
156
    scheme, netloc, path, query_string, fragment = urlsplit(url)
1✔
157
    query_params = parse_qs(query_string)
1✔
158
    url = urlunsplit((scheme, netloc, path, None, fragment))
1✔
159
    return url, query_params
1✔
160

161

162
def get_valid_expiration_from_request(
1✔
163
    expiry_param="expires_in", max_limit=None, default=None
164
):
165
    """
166
    Thin wrapper around get_valid_expiration; looks for default query parameter "expires_in"
167
    in flask request, unless a different parameter name was specified.
168
    """
169
    return get_valid_expiration(
1✔
170
        flask.request.args.get(expiry_param), max_limit=max_limit, default=default
171
    )
172

173

174
def get_valid_expiration(requested_expiration, max_limit=None, default=None):
1✔
175
    """
176
    If requested_expiration is not a positive integer and not None, throw error.
177
    If max_limit is provided and requested_expiration exceeds max_limit,
178
      return max_limit.
179
    If requested_expiration is None, return default (which may also be None).
180
    Else return requested_expiration.
181
    """
182
    if requested_expiration is None:
1✔
183
        return default
1✔
184
    try:
1✔
185
        rv = int(requested_expiration)
1✔
186
        assert rv > 0
1✔
187
        if max_limit:
1✔
188
            rv = min(rv, max_limit)
1✔
189
        return rv
1✔
190
    except (ValueError, AssertionError):
1✔
191
        raise UserError(
1✔
192
            "Requested expiry must be a positive integer; instead got {}".format(
193
                requested_expiration
194
            )
195
        )
196

197

198
def _print_func_name(function):
1✔
199
    return "{}.{}".format(function.__module__, function.__name__)
1✔
200

201

202
def _print_kwargs(kwargs):
1✔
203
    return ", ".join("{}={}".format(k, repr(v)) for k, v in list(kwargs.items()))
1✔
204

205

206
def log_backoff_retry(details):
1✔
207
    args_str = ", ".join(map(str, details["args"]))
1✔
208
    kwargs_str = (
1✔
209
        (", " + _print_kwargs(details["kwargs"])) if details.get("kwargs") else ""
210
    )
211
    func_call_log = "{}({}{})".format(
1✔
212
        _print_func_name(details["target"]), args_str, kwargs_str
213
    )
214
    logging.warning(
1✔
215
        "backoff: call {func_call} delay {wait:0.1f} seconds after {tries} tries".format(
216
            func_call=func_call_log, **details
217
        )
218
    )
219

220

221
def log_backoff_giveup(details):
1✔
222
    args_str = ", ".join(map(str, details["args"]))
1✔
223
    kwargs_str = (
1✔
224
        (", " + _print_kwargs(details["kwargs"])) if details.get("kwargs") else ""
225
    )
226
    func_call_log = "{}({}{})".format(
1✔
227
        _print_func_name(details["target"]), args_str, kwargs_str
228
    )
229
    logging.error(
1✔
230
        "backoff: gave up call {func_call} after {tries} tries; exception: {exc}".format(
231
            func_call=func_call_log, exc=sys.exc_info(), **details
232
        )
233
    )
234

235

236
def exception_do_not_retry(error):
1✔
237
    def _is_status(code):
1✔
238
        return (
1✔
239
            str(getattr(error, "code", None)) == code
240
            or str(getattr(error, "status", None)) == code
241
            or str(getattr(error, "status_code", None)) == code
242
        )
243

244
    if _is_status("409") or _is_status("404"):
1✔
245
        return True
×
246

247
    return False
1✔
248

249

250
def validate_scopes(request_scopes, client):
1✔
251
    if not client:
1✔
252
        raise Exception("Client object is None")
×
253

254
    if request_scopes:
1✔
255
        scopes = scope_to_list(request_scopes)
1✔
256
        # can we get some debug logs here that log the client, what scopes they have, and what scopes were requested
257
        if not client.check_requested_scopes(set(scopes)):
1✔
258
            logger.debug(
1✔
259
                "Request Scope are "
260
                + " ".join(scopes)
261
                + " but client supported scopes are "
262
                + client.scope
263
            )
264
            raise InvalidScopeError("Failed to Authorize due to unsupported scope")
1✔
265

266
    return True
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc