• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dowjones / tokendito / 7255919053

19 Dec 2023 01:04AM UTC coverage: 91.579% (-0.1%) from 91.689%
7255919053

push

github

web-flow
hide answer, and fix lint errors (#155)

10 of 13 new or added lines in 2 files covered. (76.92%)

1 existing line in 1 file now uncovered.

1392 of 1520 relevant lines covered (91.58%)

3.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.21
/tokendito/okta.py
1
# vim: set filetype=python ts=4 sw=4
2
# -*- coding: utf-8 -*-
3
"""
4✔
4
Handle the all Okta operations.
5

6
1. Okta authentication
7
2. Update Okta Config File
8

9
"""
10
import base64
4✔
11
import codecs
4✔
12
from copy import deepcopy
4✔
13
import hashlib
4✔
14
import json
4✔
15
import logging
4✔
16
import os
4✔
17
import re
4✔
18
import sys
4✔
19
import time
4✔
20
import urllib
4✔
21

22
import bs4
4✔
23
from bs4 import BeautifulSoup
4✔
24
import requests.cookies
4✔
25
from tokendito import duo
4✔
26
from tokendito import user
4✔
27
from tokendito.http_client import HTTP_client
4✔
28

29
logger = logging.getLogger(__name__)
4✔
30

31
_status_dict = dict(
4✔
32
    E0000004="Authentication failed",
33
    E0000047="API call exceeded rate limit due to too many requests",
34
    PASSWORD_EXPIRED="Your password has expired",
35
    LOCKED_OUT="Your account is locked out",
36
)
37

38

39
def api_error_code_parser(status=None):
4✔
40
    """Status code parsing.
41

42
    param status: Response status
43
    return message: status message
44
    """
45
    if status and status in _status_dict.keys():
4✔
46
        message = f"Okta auth failed: {_status_dict[status]}"
4✔
47
    else:
48
        message = f"Okta auth failed: {status}. Please verify your settings and try again."
4✔
49
    logger.debug(f"Parsing error [{message}] ")
50
    return message
4✔
51

52

53
def get_auth_pipeline(url=None):
4✔
54
    """Get auth pipeline version."""
55
    logger.debug(f"get_auth_pipeline({url})")
56
    headers = {"accept": "application/json"}
4✔
57
    url = f"{url}/.well-known/okta-organization"
4✔
58

59
    response = HTTP_client.get(url, headers=headers)
4✔
60

61
    try:
4✔
62
        response_json = response.json()
4✔
63
    except AttributeError as e:
4✔
64
        logger.error(f"Failed to parse json in {url}{e}")
4✔
65
        sys.exit(1)
4✔
66
    try:
4✔
67
        auth_pipeline = response_json.get("pipeline", None)
4✔
68
    except (KeyError, ValueError) as e:
×
69
        logger.error(f"Failed to parse pipeline for {url}:{e}")
×
70
        sys.exit(1)
×
71
    if auth_pipeline != "idx" and auth_pipeline != "v1":
4✔
72
        logger.error(f"unsupported auth pipeline version {auth_pipeline}")
4✔
73
        sys.exit(1)
4✔
74
    logger.debug(f"Pipeline is of type {auth_pipeline}")
75
    return auth_pipeline
4✔
76

77

78
def get_auth_properties(userid=None, url=None):
4✔
79
    """Make a call to the webfinger endpoint to get the auth properties metadata.
80

81
    :param userid: User's ID for which we are requesting an auth endpoint.
82
    :param url: Okta organization URL where we are looking up the user.
83
    :returns: Dictionary containing authentication properties.
84
    """
85
    payload = {"resource": f"okta:acct:{userid}", "rel": "okta:idp"}
1✔
86
    headers = {"accept": "application/jrd+json"}
1✔
87
    url = f"{url}/.well-known/webfinger"
1✔
88
    logger.debug(f"Looking up auth endpoint for {userid} in {url}")
89

90
    # Make a GET request to the webfinger endpoint.
91
    response = HTTP_client.get(url, params=payload, headers=headers)
1✔
92

93
    # Extract properties from the response.
94
    try:
1✔
95
        ret = response.json()["links"][0]["properties"]
1✔
96
    except (KeyError, ValueError) as e:
×
97
        logger.error(f"Failed to parse authentication type in {url}:{str(e)}")
×
98
        logger.debug(f"Response: {response.text}")
99
        sys.exit(1)
×
100

101
    # Extract specific authentication properties if available.
102
    # Return a dictionary with 'metadata', 'type', and 'id' keys.
103
    properties = {}
1✔
104
    properties["metadata"] = ret.get("okta:idp:metadata", None)
1✔
105
    properties["type"] = ret.get("okta:idp:type", None)
1✔
106
    properties["id"] = ret.get("okta:idp:id", None)
1✔
107

108
    logger.debug(f"Auth properties are {properties}")
109
    return properties
1✔
110

111

112
def get_saml_request(auth_properties):
4✔
113
    """
114
    Get a SAML Request object from the Service Provider, to be submitted to the IdP.
115

116
    :param auth_properties: dict with the IdP ID and type.
117
    :returns: dict with post_url, relay_state, and base64 encoded saml request.
118
    """
119
    # Prepare the headers for the request to retrieve the SAML request.
120
    headers = {"accept": "text/html,application/xhtml+xml,application/xml"}
4✔
121

122
    # Build the URL based on the metadata and ID provided in the auth properties.
123
    base_url = user.get_base_url(auth_properties["metadata"])
4✔
124
    url = f"{base_url}/sso/idps/{auth_properties['id']}"
4✔
125

126
    logger.debug(f"Getting SAML request from {url}")
127

128
    # Make a GET request using the HTTP client to retrieve the SAML request.
129
    response = HTTP_client.get(url, headers=headers)
4✔
130

131
    # Extract the required parameters from the SAML request.
132
    saml_request = {
4✔
133
        "base_url": user.get_base_url(extract_form_post_url(response.text)),
134
        "post_url": extract_form_post_url(response.text),
135
        "relay_state": extract_saml_relaystate(response.text),
136
        "request": extract_saml_request(response.text, raw=True),
137
    }
138

139
    # Mask sensitive data in the logs for security.
140
    user.add_sensitive_value_to_be_masked(saml_request["request"])
4✔
141

142
    logger.debug(f"SAML request is {saml_request}")
143
    return saml_request
4✔
144

145

146
def send_saml_request(saml_request):
4✔
147
    """
148
    Submit SAML request to IdP, and get the response back.
149

150
    :param saml_request: dict with IdP post_url, relay_state, and saml_request
151
    :param cookies: session cookies with `sid`
152
    :returns: dict with with SP post_url, relay_state, and saml_response
153
    """
154
    # Define the payload and headers for the request
155
    payload = {
4✔
156
        "relayState": saml_request["relay_state"],
157
        "SAMLRequest": saml_request["request"],
158
    }
159

160
    headers = {
4✔
161
        "accept": "text/html,application/xhtml+xml,application/xml",
162
        "Content-Type": "application/json",
163
    }
164

165
    # Construct the URL from the provided saml_request
166
    url = saml_request["post_url"]
4✔
167

168
    # Log the SAML request details
169
    logger.debug(f"Sending SAML request to {url}")
170

171
    # Use the HTTP client to make a GET request
172
    response = HTTP_client.get(url, params=payload, headers=headers)
4✔
173

174
    logger.debug(f"{base64.b64decode(payload['SAMLRequest'])}")
175

176
    # Extract relevant information from the response to form the saml_response dictionary
177
    saml_response = {
4✔
178
        "response": extract_saml_response(response.text, raw=True),
179
        "relay_state": extract_saml_relaystate(response.text),
180
        "post_url": extract_form_post_url(response.text),
181
    }
182

183
    # Mask sensitive values for logging purposes
184
    user.add_sensitive_value_to_be_masked(saml_response["response"])
4✔
185

186
    # Return the formed SAML response
187
    return saml_response
4✔
188

189

190
def create_authz_cookies(oauth2_config, oauth2_session_data):
4✔
191
    """
192
    Set authorize redirect cookies for the HTTP client.
193

194
    Needed for SAML2 flow for OIE.
195
    """
196
    session_token = HTTP_client.session.cookies.get("sessionToken")
4✔
197
    try:
4✔
198
        oauth2_url = f"{oauth2_config['org']}/oauth2/v1"
4✔
199
        oauth2_config_reformatted = {
4✔
200
            "responseType": "code",
201
            "state": oauth2_session_data["state"],
202
            "clientId": oauth2_config["client_id"],
203
            "authorizeUrl": oauth2_config["authorization_endpoint"],
204
            "tokenUrl": oauth2_config["token_endpoint"],
205
            "scope": "openid",
206
            "sessionToken": session_token,
207
            "userInfoUrl": f"{oauth2_url}/userinfo",
208
            "revokeUrl": f"{oauth2_url}/revoke",
209
            "logoutUrl": f"{oauth2_url}/logout",
210
            "nonce": oauth2_session_data["nonce"],
211
        }
212
    except KeyError as e:
4✔
213
        logger.error(f"Missing key in config:{e}")
4✔
214
        sys.exit(1)
4✔
215

216
    cookiejar = requests.cookies.RequestsCookieJar()
4✔
217
    domain = urllib.parse.urlparse(oauth2_config["org"]).netloc
4✔
218
    cookiejar.set(
4✔
219
        "okta-oauth-redirect-params",
220
        f"{{{urllib.parse.urlencode(oauth2_config_reformatted)}}}",
221
        domain=domain,
222
        path="/",
223
    )
224
    cookiejar.set("okta-oauth-state", oauth2_session_data["state"], domain=domain, path="/")
4✔
225
    HTTP_client.add_cookies(cookiejar)  # add cookies
4✔
226

227

228
def send_saml_response(config, saml_response):
4✔
229
    """
230
    Submit SAML response to the SP.
231

232
    :param saml_response: dict with SP post_url, relay_state, and saml_response
233
    """
234
    # Define the payload and headers for the request.
235
    payload = {
4✔
236
        "SAMLResponse": saml_response["response"],
237
        "RelayState": saml_response["relay_state"],
238
    }
239
    headers = {
4✔
240
        "accept": "text/html,application/xhtml+xml,application/xml",
241
        "Content-Type": "application/x-www-form-urlencoded",
242
    }
243
    url = saml_response["post_url"]
4✔
244

245
    logger.debug(f"{base64.b64decode(saml_response['response'])}")
246
    # Log the SAML response details.
247
    logger.debug(f"Sending SAML response to {url}")
248
    # Use the HTTP client to make a POST request.
249
    response = HTTP_client.post(url, data=payload, headers=headers)
4✔
250

251
    # Get the 'sid' value from the reponse cookies.
252
    sid = response.cookies.get("sid", None)
4✔
253
    logger.debug(f"New sid is {sid}")
254

255
    # If 'sid' is present, mask its value for logging purposes.
256
    if sid:
4✔
257
        user.add_sensitive_value_to_be_masked(sid)
4✔
258
    else:
259
        logger.debug("We did not find a 'sid' entry in the cookies.")
260

261
    # Extract the state token from the response.
262
    state_token = extract_state_token(response.text)
4✔
263
    if state_token:  # TODO: this is not working yet.
4✔
264
        params = {"stateToken": state_token}
×
265
        headers = {
×
266
            "accept": "text/html,application/xhtml+xml,application/xml",
267
            "content-type": "application/json",
268
        }
269
        response = HTTP_client.get(
×
270
            # myurl, allow_redirects=False, params={"stateToken": state_token}
271
            f"{config.okta['org']}/login/token/redirect",
272
            params=params,
273
            headers=headers,
274
        )
275
        logger.warning(
×
276
            f"""
277
            State token from {url}: {state_token}. TODO: need to go from this state token
278
            to an idx cookies.
279
            """
280
        )
281

282

283
def get_session_token(config, primary_auth, headers):
4✔
284
    """Get session_token.
285

286
    :param config: Configuration object
287
    :param headers: Headers of the request
288
    :param primary_auth: Primary authentication
289
    :return: Session Token from JSON response
290
    """
291
    status = None
4✔
292
    try:
4✔
293
        status = primary_auth.get("status", None)
4✔
294
    except AttributeError:
4✔
295
        pass
4✔
296

297
    if status == "SUCCESS" and "sessionToken" in primary_auth:
4✔
298
        session_token = primary_auth.get("sessionToken")
4✔
299
    elif status == "MFA_REQUIRED":
4✔
300
        # Note: mfa_challenge should also be modified to accept and use http_client
301
        session_token = mfa_challenge(config, headers, primary_auth)
4✔
302
    else:
303
        logger.debug(f"Error parsing response: {json.dumps(primary_auth)}")
304
        logger.error(f"Okta auth failed: unknown status {status}")
4✔
305
        sys.exit(1)
4✔
306

307
    user.add_sensitive_value_to_be_masked(session_token)
4✔
308

309
    return session_token
4✔
310

311

312
def get_access_token(oauth2_config, oauth2_session_data, authorize_code):
4✔
313
    """Get OAuth token from Okta by calling /token endpoint.
314

315
    This method does not seem to be needed, calling /authorize sets the idx cookies,
316
    but we put it here to follow the flow vervatim.
317

318
    :param url: URL of the Okta OAuth token endpoint
319
    :return: OAuth token
320
    """
321
    try:
4✔
322
        payload = {
4✔
323
            "code": authorize_code,
324
            "state": oauth2_session_data["state"],
325
            "grant_type": oauth2_session_data["grant_type"],
326
            "redirect_uri": oauth2_session_data["redirect_uri"],
327
            "client_id": oauth2_config["client_id"],
328
            "code_verifier": oauth2_session_data["code_verifier"],
329
        }
330
    except KeyError as e:
×
331
        logger.error(f"Missing key in config:{e}")
×
332
        sys.exit(1)
×
333

334
    headers = {"accept": "application/json"}
4✔
335
    # Using the http_client to make the POST request
336
    response = HTTP_client.post(
4✔
337
        oauth2_config["token_endpoint"], data=payload, headers=headers, return_json=True
338
    )
339
    # We now have response['access_token'] and response['id_token'], but we dont seem to need
340
    # them to access the resources.
341
    access_token = None
4✔
342
    try:
4✔
343
        access_token = response["access_token"]
4✔
344
    except KeyError:
4✔
345
        logger.debug(f"Error parsing response: {json.dumps(response)}")
346
        # Don't do anything but a debug message, as the /token call doesnt seem to be needed.
347
    return access_token
4✔
348

349

350
def get_enduser_url(url):
4✔
351
    """Retrieve enduser URL.
352

353
    :url: Okta URL to retrieve enduser URL from
354
    :returns: enduser URL or None
355
    """
356
    enduser_url = None
4✔
357

358
    res = HTTP_client.get(url)
4✔
359
    soup = BeautifulSoup(res.text, "html.parser")
4✔
360
    pattern = re.compile(r".*enduser-v.*enduser.*")
4✔
361
    script = soup.find("script", src=pattern)
4✔
362
    if type(script) is bs4.element.Tag:
4✔
363
        logger.debug(f"Found script tag: {script['src']}")
364
        enduser_url = script["src"]
4✔
365
    return enduser_url
4✔
366

367

368
def get_client_id_by_url(url):
4✔
369
    """Retrieve clientId.
370

371
    :url: Javascript URL to retrieve clientId from
372
    :returns: clientId or None
373
    """
374
    client_id = None
4✔
375
    enduser_url = get_enduser_url(url)
4✔
376
    if enduser_url:
4✔
377
        res = HTTP_client.get(enduser_url)
4✔
378
        pattern = re.compile(r',clientId:"(?P<clientId>.*?)",')
4✔
379

380
        match = pattern.search(res.text)
4✔
381
        if match:
4✔
382
            logger.debug(f"Found clientId: {match.group('clientId')}")
383
            client_id = match.group("clientId")
4✔
384

385
    return client_id
4✔
386

387

388
def get_client_id(config):
4✔
389
    """Get the client id needed by the Authorization Code Flow.
390

391
    If a command line parameter was passed, it will take precedence.
392
    If no command line parameter was passed, it will try to determine it.
393

394
    """
395
    if "client_id" in config.okta and config.okta["client_id"]:
4✔
396
        return config.okta["client_id"]
4✔
397
    else:
398
        return get_client_id_by_url(config.okta["org"])
4✔
399

400

401
def get_redirect_uri(oauth2_url):
4✔
402
    """
403
    Get the redirect uri needed by the Authorization Code Flow.
404

405
    Return url
406
    """
407
    uri = f"{oauth2_url}/enduser/callback"
4✔
408
    return uri
4✔
409

410

411
def get_response_type():
4✔
412
    """
413
     We're only implementing code response type.
414

415
    So we're only returning "code"
416
    """
417
    return "code"
4✔
418

419

420
def get_authorize_scope():
4✔
421
    """We're only implementing openid scope.
422

423
    So we're only returning "openid", which is ok for what we do.
424
    """
425
    return "openid"
4✔
426

427

428
def get_oauth2_state():
4✔
429
    """Generate a random string for state."""
430
    state = hashlib.sha256(os.urandom(1024)).hexdigest()
4✔
431
    return state
4✔
432

433

434
def get_pkce_code_challenge_method():
4✔
435
    """
436
    Return code challenge.
437

438
    Only S256 is implemented.
439
    """
440
    return "S256"
4✔
441

442

443
def get_pkce_code_challenge(code_verifier=None):
4✔
444
    """
445
    Get PKCE Code Challenge.
446

447
    Base64-URL-encoded string of the SHA256 hash of the code verifier
448
    https://www.oauth.com/oauth2-servers/pkce/authorization-request/
449

450
    :param: code_verifier
451
    :return: code_challenge
452
    """
453
    code_challenge = hashlib.sha256(code_verifier.encode("utf-8")).digest()
4✔
454
    code_challenge = base64.urlsafe_b64encode(code_challenge).decode("utf-8")
4✔
455
    code_challenge = code_challenge.replace("=", "")
4✔
456
    return code_challenge
4✔
457

458

459
def get_pkce_code_verifier():
4✔
460
    """
461
    Get pkce code verifier.
462

463
    :return: code_verifier
464
    """
465
    code_verifier = base64.urlsafe_b64encode(os.urandom(40)).decode("utf-8")
4✔
466
    code_verifier = re.sub("[^a-zA-Z0-9]+", "", code_verifier)
4✔
467
    return code_verifier
4✔
468

469

470
def pkce_enabled():
4✔
471
    """
472
    Check of PKCE is enabled.
473

474
    Altho the authorization server config tells us our okta doesnt PKCE enabled, omitting its
475
    settings will cause the authorize code flow to fail, so we always return True.
476
    """
477
    return True
4✔
478

479

480
def get_authorize_code(response, session_token):
4✔
481
    """
482
    Get the authorize code.
483

484
    This will exit with error if we cannot get the code.
485
    It will also check the response from the /authorize call for callback errors,
486
    And if any, print and exit with error.
487
    """
488
    callback_url = response.url
4✔
489
    error_code = re.search(r"(?<=error=)[^&]+", callback_url)
4✔
490
    error_desc = re.search(r"(?<=error_description=)[^&]+", callback_url)
4✔
491
    if error_code:
4✔
492
        error_value = error_code.group()
4✔
493
        if not session_token and error_value == "login_required":
4✔
494
            return (
4✔
495
                None  # if we arent authenticated we wont have sessionToken, so ignore login error.
496
            )
497
        else:
498
            logger.error(f"Oauth2 callback error:{error_value}:{error_desc.group()}")
×
499
            logger.debug(f"Response: {response.text}")
500
            sys.exit(1)
×
501
    authorize_code = re.search(r"(?<=code=)[^&]+", callback_url)
4✔
502
    if authorize_code:
4✔
503
        return authorize_code.group()
4✔
504

505

506
def authorization_code_enabled(oauth2_config):
4✔
507
    """
508
    Determine if authorization code grant is enabled.
509

510
    Returns True if the dict key is in authorization server info, and False otherwise,
511
    """
512
    if "org" not in oauth2_config:
4✔
513
        logger.error(f"No org in config:{oauth2_config}")
4✔
514
        sys.exit(1)
4✔
515
    try:
4✔
516
        if "authorization_code" not in oauth2_config["grant_types_supported"]:
4✔
517
            return False
×
518
    except (KeyError, ValueError) as e:
×
519
        logger.error(f"No grant types supported on {oauth2_config['org']}:{str(e)}")
×
520
        sys.exit(1)
×
521
    return True
4✔
522

523

524
def authorize_request(oauth2_config, oauth2_session_data):
4✔
525
    """
526
    Call /authorize endpoint.
527

528
    :param
529
    :return: authorization code, needed for /token call
530
    """
531
    logger.debug(f"oauth_code_request({oauth2_config}, {oauth2_session_data})")
532
    headers = {"accept": "application/json", "content-type": "application/json"}
4✔
533

534
    session_token = HTTP_client.session.cookies.get("sessionToken")
4✔
535

536
    try:
4✔
537
        payload = {
4✔
538
            "client_id": oauth2_config["client_id"],
539
            "redirect_uri": oauth2_session_data["redirect_uri"],
540
            "response_type": oauth2_session_data["response_type"],
541
            "scope": oauth2_session_data["scope"],
542
            "state": oauth2_session_data["state"],
543
            "code_challenge": oauth2_session_data["code_challenge"],
544
            "code_challenge_method": oauth2_session_data["code_challenge_method"],
545
            "prompt": "none",  # dont authenticate
546
            "sessionToken": session_token,
547
        }
548
    except KeyError as e:
×
549
        logger.error(f"Missing key in config:{e}")
×
550
        sys.exit(1)
×
551

552
    response = HTTP_client.get(
4✔
553
        oauth2_config["authorization_endpoint"],
554
        headers=headers,
555
        params=payload,
556
    )
557

558
    authorize_code = get_authorize_code(response, session_token)
4✔
559
    return authorize_code
4✔
560

561

562
def get_nonce(url):
4✔
563
    """Get nonce from the org server."""
564
    userhome_url = f"{url}/app/UserHome"
4✔
565
    payload = {"session_hint": "AUTHENTICATED", "iss": urllib.parse.quote(userhome_url, safe="")}
4✔
566
    response = HTTP_client.get(url, params=payload)
4✔
567
    # '<script nonce="ABCXXXXXXXXXXXXXXXXYZ" type="text/javascript">'
568
    nonce = None
4✔
569
    pattern = re.compile(r'script nonce="(?P<nonce>.*?)" ', re.MULTILINE)
4✔
570
    match = pattern.search(response.text)
4✔
571
    if match:
4✔
572
        logger.debug(f"Found nonce: {match.group('nonce')}")
573
        nonce = match.group("nonce")
4✔
574

575
    return nonce
4✔
576

577

578
def get_oauth2_session_data(url):
4✔
579
    """
580
    Get some oauth2 session data.
581

582
    We do this to have the same in oath2 cookies and /authorize call.
583
    """
584
    authz_session_data = {
4✔
585
        "response_type": get_response_type(),
586
        "scope": get_authorize_scope(),
587
        "state": get_oauth2_state(),
588
        "redirect_uri": get_redirect_uri(url),
589
        "grant_type": "authorization_code",
590
    }
591
    authz_session_data["nonce"] = get_nonce(url)
4✔
592

593
    if pkce_enabled():
4✔
594
        code_verifier = get_pkce_code_verifier()
4✔
595
        authz_session_data["code_verifier"] = code_verifier
4✔
596
        authz_session_data["code_challenge"] = get_pkce_code_challenge(code_verifier)
4✔
597
        authz_session_data["code_challenge_method"] = get_pkce_code_challenge_method()
4✔
598

599
    return authz_session_data
4✔
600

601

602
def get_oauth2_configuration(config):
4✔
603
    """Get authorization server configuration data from Okta instance.
604

605
    :param url: URL of the Okta org
606
    :return: dict of conguration values
607
    """
608
    url = f"{config.okta['org']}/.well-known/oauth-authorization-server"
4✔
609
    headers = {"accept": "application/json"}
4✔
610
    response = HTTP_client.get(url, headers=headers)
4✔
611
    logger.debug(f"Authorization Server info: {response.json()}")
612
    # todo: handle errors.n
613
    oauth2_config = response.json()
4✔
614
    oauth2_config["org"] = config.okta["org"]
4✔
615
    oauth2_config["client_id"] = get_client_id(config)
4✔
616
    validate_oauth2_configuration(oauth2_config)
4✔
617
    return oauth2_config
4✔
618

619

620
def validate_oauth2_configuration(oauth2_config):
4✔
621
    """
622
    Validate that the oauth2 configuration has our implementation.
623

624
    Will exit with error if a mandatory config is missing.
625
    :param oauth2_config: dict of configuration values
626
    """
627
    mandadory_oauth2_config_items = {
4✔
628
        "authorization_endpoint",
629
        "token_endpoint",
630
        "grant_types_supported",
631
        "response_types_supported",
632
        "scopes_supported",
633
        "client_id",
634
        "org",
635
    }  # the authorization server must have these config elements
636
    for item in mandadory_oauth2_config_items:
4✔
637
        if item not in oauth2_config:
4✔
638
            logger.error(f"No {item} found in oauth2 configuration.")
4✔
639
            sys.exit(1)
4✔
640

641
    if "authorization_code" not in oauth2_config["grant_types_supported"]:
4✔
642
        logger.error("Authorization code grant not found.")
×
643
        sys.exit(1)
×
644
    if "code" not in oauth2_config["response_types_supported"]:
4✔
645
        logger.error("Code response type not found.")
×
646
        sys.exit(1)
×
647

648

649
def create_authn_cookies(authn_org_url, session_token):
4✔
650
    """
651
    Create session cookie.
652

653
    :param authn_org_url: org url
654
    :param session_token: session token, str
655
    :returns: cookies jar with session_id value we got using the token
656
    """
657
    # Construct the URL from the base URL provided.
658
    url = f"{authn_org_url}/api/v1/sessions"
1✔
659

660
    # Define the payload and headers for the request.
661
    data = {"sessionToken": session_token}
1✔
662
    headers = {"Content-Type": "application/json", "accept": "application/json"}
1✔
663

664
    # Log the request details.
665
    logger.debug(f"Requesting session cookies from {url}")
666

667
    # Use the HTTP client to make a POST request.
668
    response_json = HTTP_client.post(url, json=data, headers=headers, return_json=True)
1✔
669

670
    if "id" not in response_json:
1✔
671
        logger.error(f"'id' not found in response. Full response: {response_json}")
×
672
        sys.exit(1)
×
673
    session_id = response_json["id"]
1✔
674
    user.add_sensitive_value_to_be_masked(session_id)
1✔
675

676
    cookiejar = requests.cookies.RequestsCookieJar()
1✔
677
    domain = urllib.parse.urlparse(url).netloc
1✔
678
    cookiejar.set("sid", session_id, domain=domain, path="/")
1✔
679
    cookiejar.set("sessionToken", session_token, domain=domain, path="/")
1✔
680
    HTTP_client.add_cookies(cookiejar)  # add cookies
1✔
681

682

683
def idp_authenticate(config):
4✔
684
    """Authenticate user to okta."""
685
    auth_properties = get_auth_properties(userid=config.okta["username"], url=config.okta["org"])
4✔
686

687
    if "type" not in auth_properties:
4✔
688
        logger.error("Okta auth failed: unknown type.")
4✔
689
        sys.exit(1)
4✔
690

691
    auth_properties = get_auth_properties(userid=config.okta["username"], url=config.okta["org"])
4✔
692

693
    if "type" not in auth_properties:
4✔
694
        logger.error("Okta auth failed: unknown type.")
×
695
        sys.exit(1)
×
696

697
    if is_saml2_authentication(auth_properties):
4✔
698
        # We may loop thru the saml2 servers until
699
        # we find the authentication server.
700
        saml2_authenticate(config, auth_properties)
4✔
701
    elif local_authentication_enabled(auth_properties):
4✔
702
        session_token = local_authenticate(config)
4✔
703
        # authentication sends us a token
704
        # which we then put in our session cookies
705
        create_authn_cookies(config.okta["org"], session_token)
4✔
706
    else:
707
        logger.error(
4✔
708
            f"{auth_properties['type']} login via IdP Discovery is not currently supported"
709
        )
710
        sys.exit(1)
4✔
711

712

713
def access_control(config):
4✔
714
    """Authenticate and authorize with the IDP.
715

716
    if OIE is enabled and a client_id is found,run Authorization code flow and PKCE being
717
    the only implemented grant types.
718

719
    Okta uses cookies to manage sessions.
720

721
    :param config: Config object
722
    """
723
    logger.debug(f"access_control({config})")
724

725
    oauth2_config = None
4✔
726
    oauth2_session_data = None
4✔
727

728
    is_oie = oie_enabled(config.okta["org"])
4✔
729
    # We set the oauth2 data (variables and cookies) that will be used at /authorize and during
730
    # saml2 for chained orgs.
731
    if is_oie:
4✔
732
        logger.debug("OIE enabled")
733
        # save some oauth2 config data + create session data, and create authz cookies
734
        oauth2_config = get_oauth2_configuration(config)
4✔
735
        oauth2_session_data = get_oauth2_session_data(config.okta["org"])
4✔
736
        create_authz_cookies(oauth2_config, oauth2_session_data)
4✔
737
        # The flow says to initially call /authorize here, but that doesnt do anything...
738
        # idp_authorize(oauth2_config, oauth2_session_data)
739

740
    idp_authenticate(config)
4✔
741

742
    if is_oie:
4✔
743
        # call /authorize . Note: we are authenticated.
744
        idp_authorize(oauth2_config, oauth2_session_data)
4✔
745

746

747
def idp_authorize(oauth2_config, oauth2_session_data):
4✔
748
    """
749
    Authorize on the okta authorization server.
750

751
    If we arent authenticated, we will still call /authorize but won't get a code.
752
    When we are authenticated, we get an idx cookies and dont need to do anything else.
753
    """
754
    if "client_id" not in oauth2_config or not oauth2_config["client_id"]:
4✔
755
        logger.error("We are calling /authorize without a client_id")
4✔
756
        sys.exit(1)
4✔
757

758
    if authorization_code_enabled(oauth2_config):
4✔
759
        authorize_code = authorize_request(oauth2_config, oauth2_session_data)
4✔
760
        # The following get_access_token does not seem to matter, the /authorize call above sets
761
        # the idx cookies and we're done. We put it here in an attempt to follow the flow verbatim.
762
        if authorize_code:  # We got value if we were authenticated.
4✔
763
            get_access_token(oauth2_config, oauth2_session_data, authorize_code)
4✔
764

765

766
def step_up_authenticate(config, state_token):
4✔
767
    """Try to step up authenticate the user. Only supported for local auth.
768

769
    :param config: Configuration object
770
    :param state_token: The state token
771
    :return: True if step up authentication was successful; False otherwise
772
    """
773
    auth_properties = get_auth_properties(userid=config.okta["username"], url=config.okta["org"])
4✔
774
    if "type" not in auth_properties or not local_authentication_enabled(auth_properties):
4✔
775
        return False
4✔
776

777
    headers = {"content-type": "application/json", "accept": "application/json"}
4✔
778
    payload = {"stateToken": state_token}
4✔
779

780
    auth = HTTP_client.post(
4✔
781
        f"{config.okta['org']}/api/v1/authn", json=payload, headers=headers, return_json=True
782
    )
783

784
    status = auth.get("status", None)
4✔
785
    if status == "SUCCESS":
4✔
786
        return True
4✔
787
    elif status == "MFA_REQUIRED":
4✔
788
        mfa_challenge(config, headers, auth)
4✔
789
        return True
4✔
790

791
    logger.error("Okta auth failed: unknown status for step up authentication.")
4✔
792
    return False
4✔
793

794

795
def saml2_authenticate(config, auth_properties):
4✔
796
    """SAML2 authentication flow.
797

798
    :param config: Config object
799
    :param auth_properties: dict with authentication properties
800
    :returns: session ID cookie, if successful.
801
    """
802
    # Get the SAML request details
803
    saml_request = get_saml_request(auth_properties)
4✔
804

805
    # Create a copy of our configuration, so that we can freely reuse it
806
    # without Python's pass-as-reference-value interfering with it.
807
    saml2_config = deepcopy(config)
4✔
808
    saml2_config.okta["org"] = saml_request["base_url"]
4✔
809
    logger.info(f"Authentication is being redirected to {saml2_config.okta['org']}.")
4✔
810

811
    # Try to authenticate using the new configuration. This could cause
812
    # recursive calls, which allows for IdP chaining.
813
    idp_authenticate(saml2_config)
4✔
814

815
    # Once we are authenticated, send the SAML request to the IdP.
816
    # This call requires session cookies.
817
    saml_response = send_saml_request(saml_request)
4✔
818

819
    # Send SAML response from the IdP back to the SP, which will generate new
820
    # session cookies.
821
    send_saml_response(config, saml_response)
4✔
822

823

824
def oie_enabled(url):
4✔
825
    """
826
    Determine if OIE is enabled.
827

828
    :pamam url: okta org url
829
    :return: True if OIE is enabled, False otherwise
830
    """
831
    if get_auth_pipeline(url) == "idx":  # oie
4✔
832
        return True
4✔
833
    else:
834
        return False
4✔
835

836

837
def local_authenticate(config):
4✔
838
    """Authenticate user on local okta instance.
839

840
    :param config: Config object
841
    :return: authn token
842
    """
843
    session_token = None
4✔
844
    headers = {"content-type": "application/json", "accept": "application/json"}
4✔
845
    payload = {"username": config.okta["username"], "password": config.okta["password"]}
4✔
846

847
    logger.debug(f"Authenticate user to {config.okta['org']}/api/v1/authn")
848
    logger.debug(f"Sending {headers}, {payload} to {config.okta['org']}/api/vi/authn")
849

850
    primary_auth = HTTP_client.post(
4✔
851
        f"{config.okta['org']}/api/v1/authn",
852
        json=payload,
853
        headers=headers,
854
        return_json=True,
855
    )
856

857
    if "errorCode" in primary_auth:
4✔
858
        api_error_code_parser(primary_auth["errorCode"])
×
859
        sys.exit(1)
×
860

861
    while session_token is None:
4✔
862
        session_token = get_session_token(config, primary_auth, headers)
4✔
863
    logger.info(f"User has been successfully authenticated to {config.okta['org']}.")
4✔
864
    return session_token
4✔
865

866

867
def local_authentication_enabled(auth_properties):
4✔
868
    """Check whether authentication happens on the current instance.
869

870
    :param auth_properties: auth_properties dict
871
    :return: True if this is the place to authenticate, False otherwise.
872
    """
873
    # IWA (https://help.okta.com/en-us/content/topics/directory/ad-iwa-learn.htm)
874
    # should be treated as local authentication
875
    try:
4✔
876
        if auth_properties["type"] == "OKTA" or auth_properties["type"] == "IWA":
4✔
877
            return True
4✔
878
    except (TypeError, KeyError):
4✔
879
        pass
4✔
880
    return False
4✔
881

882

883
def is_saml2_authentication(auth_properties):
4✔
884
    """Check whether authentication happens via SAML2 on a different IdP.
885

886
    :param auth_properties: auth_properties dict
887
    :return: True for SAML2 on Okta, False otherwise.
888
    """
889
    try:
4✔
890
        if auth_properties["type"] == "SAML2":
4✔
891
            return True
4✔
892
    except (TypeError, KeyError):
4✔
893
        pass
4✔
894
    return False
4✔
895

896

897
def extract_saml_response(html, raw=False):
4✔
898
    """Parse html, and extract a SAML document.
899

900
    :param html: String with HTML document.
901
    :param raw: Boolean that determines whether or not the response should be decoded.
902
    :return: XML Document, or None
903
    """
904
    soup = BeautifulSoup(html, "html.parser")
4✔
905
    xml = None
4✔
906
    saml_base64 = None
4✔
907
    retval = None
4✔
908

909
    elem = soup.find("input", attrs={"name": "SAMLResponse"})
4✔
910
    if type(elem) is bs4.element.Tag:
4✔
911
        saml_base64 = str(elem.get("value"))
4✔
912
        xml = codecs.decode(saml_base64.encode("ascii"), "base64").decode("utf-8")
4✔
913

914
        retval = xml
4✔
915
        if raw:
4✔
916
            retval = saml_base64
4✔
917
    return retval
4✔
918

919

920
def extract_saml_request(html, raw=False):
4✔
921
    """Parse html, and extract a SAML document.
922

923
    :param html: String with HTML document.
924
    :param raw: Boolean that determines whether or not the response should be decoded.
925
    :return: XML Document, or None
926
    """
927
    soup = BeautifulSoup(html, "html.parser")
4✔
928
    xml = None
4✔
929
    saml_base64 = None
4✔
930
    retval = None
4✔
931

932
    elem = soup.find("input", attrs={"name": "SAMLRequest"})
4✔
933
    if type(elem) is bs4.element.Tag:
4✔
934
        saml_base64 = str(elem.get("value"))
4✔
935
        xml = codecs.decode(saml_base64.encode("ascii"), "base64").decode("utf-8")
4✔
936

937
        retval = xml
4✔
938
        if raw:
4✔
939
            retval = saml_base64
4✔
940
    return retval
4✔
941

942

943
def extract_form_post_url(html):
4✔
944
    """Parse html, and extract a Form Action POST URL.
945

946
    :param html: String with HTML document.
947
    :return: URL string, or None
948
    """
949
    soup = BeautifulSoup(html, "html.parser")
4✔
950
    post_url = None
4✔
951

952
    elem = soup.find("form", attrs={"id": "appForm"})
4✔
953
    if type(elem) is bs4.element.Tag:
4✔
954
        post_url = str(elem.get("action"))
4✔
955
    return post_url
4✔
956

957

958
def extract_saml_relaystate(html):
4✔
959
    """Parse html, and extract SAML relay state from a form.
960

961
    :param html: String with HTML document.
962
    :return: relay state value, or None
963
    """
964
    soup = BeautifulSoup(html, "html.parser")
4✔
965
    relay_state = None
4✔
966

967
    elem = soup.find("input", attrs={"name": "RelayState"})
4✔
968
    if type(elem) is bs4.element.Tag:
4✔
969
        relay_state = str(elem.get("value"))
4✔
970
    return relay_state
4✔
971

972

973
def extract_state_token(html):
4✔
974
    """Parse an HTML document, and extract a state token.
975

976
    :param html: String with HTML document
977
    :return: string with state token, or None
978
    """
979
    soup = BeautifulSoup(html, "html.parser")
4✔
980
    state_token = None
4✔
981
    pattern = re.compile(r"var stateToken = '(?P<stateToken>.*)';", re.MULTILINE)
4✔
982

983
    script = soup.find("script", string=pattern)
4✔
984
    if type(script) is bs4.element.Tag:
4✔
985
        match = pattern.search(script.text)
×
986
        if match:
×
987
            encoded_token = match.group("stateToken")
×
988
            state_token = codecs.decode(encoded_token, "unicode-escape")
×
989

990
    return state_token
4✔
991

992

993
def mfa_provider_type(
4✔
994
    config,
995
    mfa_provider,
996
    selected_factor,
997
    mfa_challenge_url,
998
    primary_auth,
999
    selected_mfa_option,
1000
    headers,
1001
    payload,
1002
):
1003
    """Receive session key.
1004

1005
    :param config: Config object
1006
    :param mfa_provider: MFA provider
1007
    :param selected_factor: Selected MFA factor
1008
    :param mfa_challenge_url: MFA challenge url
1009
    :param primary_auth: Primary authentication
1010
    :param selected_mfa_option: Selected MFA option
1011
    :return: session_key
1012

1013
    """
1014
    mfa_verify = dict()
4✔
1015
    factor_type = selected_factor.get("_embedded", {}).get("factor", {}).get("factorType", None)
4✔
1016

1017
    if mfa_provider == "DUO":
4✔
1018
        mfa_verify = duo.authenticate(selected_factor)
4✔
1019
        headers = {"content-type": "application/json", "accept": "application/json"}
4✔
1020
        mfa_verify = HTTP_client.post(
4✔
1021
            mfa_challenge_url, json=payload, headers=headers, return_json=True
1022
        )
1023

1024
    elif mfa_provider == "OKTA" and factor_type == "push":
4✔
1025
        mfa_verify = push_approval(mfa_challenge_url, payload)
4✔
1026
    elif (mfa_provider in ["OKTA", "GOOGLE"] and factor_type in ["token:software:totp", "sms"]) or (
4✔
1027
        mfa_provider == "OKTA" and factor_type == "question"
1028
    ):
1029
        mfa_verify = totp_approval(
4✔
1030
            config, selected_mfa_option, headers, mfa_challenge_url, payload, primary_auth
1031
        )
1032
    else:
1033
        logger.error(
4✔
1034
            f"Sorry, the MFA provider '{mfa_provider}:{factor_type}' is not yet supported."
1035
            " Please retry with another option."
1036
        )
1037
        sys.exit(1)
4✔
1038

1039
    if "sessionToken" not in mfa_verify:
4✔
UNCOV
1040
        logger.error(
×
1041
            f"Could not verify MFA Challenge with {mfa_provider} {primary_auth['factorType']}"
1042
        )
1043
    return mfa_verify["sessionToken"]
4✔
1044

1045

1046
def mfa_index(preset_mfa, available_mfas, mfa_options):
4✔
1047
    """Get mfa index in request.
1048

1049
    :param preset_mfa: preset mfa from settings
1050
    :param available_mfas: available mfa ids
1051
    :param mfa_options: available mfas
1052
    """
1053
    indices = []
4✔
1054
    # Gets the index number from each preset MFA in the list of avaliable ones.
1055
    if preset_mfa:
4✔
1056
        logger.debug(f"Get mfa from {available_mfas}.")
1057
        indices = [i for i, elem in enumerate(available_mfas) if preset_mfa in elem]
4✔
1058

1059
    index = None
4✔
1060
    if len(indices) == 0:
4✔
1061
        logger.debug(f"No matches with {preset_mfa}, going to get user input")
1062
        index = user.select_preferred_mfa_index(mfa_options)
4✔
1063
    elif len(indices) == 1:
4✔
1064
        logger.debug(f"One match: {preset_mfa} in {indices}")
1065
        index = indices[0]
4✔
1066
    else:
1067
        logger.error(
4✔
1068
            f"{preset_mfa} is not unique in {available_mfas}. Please check your configuration."
1069
        )
1070
        sys.exit(1)
4✔
1071

1072
    return index
4✔
1073

1074

1075
def mfa_challenge(config, headers, primary_auth):
4✔
1076
    """Handle user mfa challenges.
1077

1078
    :param config: Config object
1079
    :param headers: headers what needs to be sent to api
1080
    :param primary_auth: primary authentication
1081
    :return: Okta MFA Session token after the successful entry of the code
1082
    """
1083
    logger.debug("Handle user MFA challenges")
1084
    try:
4✔
1085
        mfa_options = primary_auth["_embedded"]["factors"]
4✔
1086
    except KeyError as error:
4✔
1087
        logger.error(f"There was a wrong response structure: \n{error}")
4✔
1088
        sys.exit(1)
4✔
1089

1090
    preset_mfa = config.okta["mfa"]
1✔
1091

1092
    available_mfas = [f"{d['provider']}_{d['factorType']}_{d['id']}" for d in mfa_options]
1✔
1093
    index = mfa_index(preset_mfa, available_mfas, mfa_options)
1✔
1094

1095
    selected_mfa_option = mfa_options[index]
1✔
1096
    logger.debug(f"Selected MFA is [{selected_mfa_option}]")
1097

1098
    mfa_challenge_url = selected_mfa_option["_links"]["verify"]["href"]
1✔
1099

1100
    payload = {
1✔
1101
        "stateToken": primary_auth["stateToken"],
1102
        "factorType": selected_mfa_option["factorType"],
1103
        "provider": selected_mfa_option["provider"],
1104
        "profile": selected_mfa_option["profile"],
1105
    }
1106

1107
    selected_factor = HTTP_client.post(
1✔
1108
        mfa_challenge_url, json=payload, headers=headers, return_json=True
1109
    )
1110

1111
    mfa_provider = selected_factor["_embedded"]["factor"]["provider"]
1✔
1112
    logger.debug(f"MFA Challenge URL: [{mfa_challenge_url}] headers: {headers}")
1113

1114
    mfa_session_token = mfa_provider_type(
1✔
1115
        config,
1116
        mfa_provider,
1117
        selected_factor,
1118
        mfa_challenge_url,
1119
        primary_auth,
1120
        selected_mfa_option,
1121
        headers,
1122
        payload,
1123
    )
1124

1125
    logger.debug(f"MFA Session Token: [{mfa_session_token}]")
1126
    return mfa_session_token
1✔
1127

1128

1129
def totp_approval(config, selected_mfa_option, headers, mfa_challenge_url, payload, primary_auth):
4✔
1130
    """Handle user mfa options.
1131

1132
    :param config: Config object
1133
    :param selected_mfa_option: Selected MFA option (SMS, push, etc)
1134
    :param headers: headers
1135
    :param mfa_challenge_url: MFA challenge URL
1136
    :param payload: payload
1137
    :param primary_auth: Primary authentication method
1138
    :return: payload data
1139

1140
    """
1141
    logger.debug(f"User MFA options selected: [{selected_mfa_option['factorType']}]")
1142
    if config.okta["mfa_response"] is None:
4✔
1143
        logger.debug("Getting verification code from user.")
1144
        if selected_mfa_option["factorType"] == "question":
4✔
NEW
1145
            config.okta["mfa_response"] = user.get_secret_input(
×
1146
                selected_mfa_option["profile"]["questionText"]
1147
            )
1148
        else:
1149
            config.okta["mfa_response"] = user.get_input("Enter your verification code: ")
4✔
1150
        user.add_sensitive_value_to_be_masked(config.okta["mfa_response"])
4✔
1151

1152
    # time to verify the mfa
1153
    payload = {
4✔
1154
        "stateToken": primary_auth["stateToken"],
1155
        "passCode": config.okta["mfa_response"],
1156
    }
1157

1158
    # Using the http_client to make the POST request
1159
    mfa_verify = HTTP_client.post(
4✔
1160
        mfa_challenge_url, json=payload, headers=headers, return_json=True
1161
    )
1162

1163
    if "sessionToken" in mfa_verify:
4✔
1164
        user.add_sensitive_value_to_be_masked(mfa_verify["sessionToken"])
4✔
1165
    logger.debug(f"mfa_verify [{json.dumps(mfa_verify)}]")
1166

1167
    # Clear out any MFA response since it is no longer valid
1168
    config.okta["mfa_response"] = None
4✔
1169

1170
    return mfa_verify
4✔
1171

1172

1173
def push_approval(mfa_challenge_url, payload):
4✔
1174
    """Handle push approval from the user.
1175

1176
    :param mfa_challenge_url: MFA challenge url
1177
    :param payload: payload which needs to be sent
1178
    :return: Session Token if succeeded or terminates if user wait goes 5 min
1179

1180
    """
1181
    logger.debug(f"Push approval with challenge_url:{mfa_challenge_url}")
1182

1183
    user.print("Waiting for an approval from the device...")
4✔
1184
    status = "MFA_CHALLENGE"
4✔
1185
    result = "WAITING"
4✔
1186
    response = {}
4✔
1187
    challenge_displayed = False
4✔
1188

1189
    headers = {"content-type": "application/json", "accept": "application/json"}
4✔
1190

1191
    while status == "MFA_CHALLENGE" and result == "WAITING":
4✔
1192
        response = HTTP_client.post(
4✔
1193
            mfa_challenge_url, json=payload, headers=headers, return_json=True
1194
        )
1195

1196
        if "sessionToken" in response:
4✔
1197
            user.add_sensitive_value_to_be_masked(response["sessionToken"])
4✔
1198

1199
        logger.debug(f"MFA Response:\n{json.dumps(response)}")
1200
        # Retrieve these values from the object, and set a sensible default if they do not
1201
        # exist.
1202
        status = response.get("status", "UNKNOWN")
4✔
1203
        result = response.get("factorResult", "UNKNOWN")
4✔
1204

1205
        # The docs at https://developer.okta.com/docs/reference/api/authn/#verify-push-factor
1206
        # state that the call will return a factorResult in [ SUCCESS, REJECTED, TIMEOUT,
1207
        # WAITING]. However, on success, SUCCESS is not set and we have to rely on the
1208
        # response["status"] instead
1209
        answer = (
4✔
1210
            response.get("_embedded", {})
1211
            .get("factor", {})
1212
            .get("_embedded", {})
1213
            .get("challenge", {})
1214
            .get("correctAnswer", None)
1215
        )
1216
        if answer and not challenge_displayed:
4✔
1217
            # If a Number Challenge response exists, retrieve it from this deeply nested path,
1218
            # otherwise set to None.
1219
            user.print(f"Number Challenge response is {answer}")
4✔
1220
            challenge_displayed = True
4✔
1221
        time.sleep(1)
4✔
1222

1223
    if status == "SUCCESS" and "sessionToken" in response:
4✔
1224
        # noop, we will return the variable later
1225
        pass
4✔
1226
    # Everything else should have a status of "MFA_CHALLENGE", and the result provides a
1227
    # hint on why the challenge failed.
1228
    elif result == "REJECTED":
4✔
1229
        logger.error("The Okta Verify push has been denied.")
4✔
1230
        sys.exit(2)
4✔
1231
    elif result == "TIMEOUT":
4✔
1232
        logger.error("Device approval window has expired.")
4✔
1233
        sys.exit(2)
4✔
1234
    else:
1235
        logger.error(f"Push response type {result} for {status} not implemented.")
4✔
1236
        sys.exit(2)
4✔
1237

1238
    return response
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc