• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dowjones / tokendito / 7255968625

19 Dec 2023 01:09AM UTC coverage: 91.623% (+0.04%) from 91.579%
7255968625

Pull #150

github

web-flow
Merge 56213b263 into 6e92fe8a3
Pull Request #150: Fix: Step-Up challenge assertion is too narrow.

15 of 18 new or added lines in 3 files covered. (83.33%)

1 existing line in 1 file now uncovered.

1400 of 1528 relevant lines covered (91.62%)

3.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.31
/tokendito/okta.py
1
# vim: set filetype=python ts=4 sw=4
2
# -*- coding: utf-8 -*-
3
"""
4✔
4
Handle the all Okta operations.
5

6
1. Okta authentication
7
2. Update Okta Config File
8

9
"""
10
import base64
4✔
11
import codecs
4✔
12
from copy import deepcopy
4✔
13
import hashlib
4✔
14
import json
4✔
15
import logging
4✔
16
import os
4✔
17
import re
4✔
18
import sys
4✔
19
import time
4✔
20
import urllib
4✔
21

22
import bs4
4✔
23
from bs4 import BeautifulSoup
4✔
24
import requests.cookies
4✔
25
from tokendito import duo
4✔
26
from tokendito import user
4✔
27
from tokendito.http_client import HTTP_client
4✔
28

29
logger = logging.getLogger(__name__)
4✔
30

31
_status_dict = dict(
4✔
32
    E0000004="Authentication failed",
33
    E0000047="API call exceeded rate limit due to too many requests",
34
    PASSWORD_EXPIRED="Your password has expired",
35
    LOCKED_OUT="Your account is locked out",
36
)
37

38

39
def api_error_code_parser(status=None):
4✔
40
    """Status code parsing.
41

42
    param status: Response status
43
    return message: status message
44
    """
45
    if status and status in _status_dict.keys():
4✔
46
        message = f"Okta auth failed: {_status_dict[status]}"
4✔
47
    else:
48
        message = f"Okta auth failed: {status}. Please verify your settings and try again."
4✔
49
    logger.debug(f"Parsing error [{message}] ")
50
    return message
4✔
51

52

53
def get_auth_pipeline(url=None):
4✔
54
    """Get auth pipeline version."""
55
    logger.debug(f"get_auth_pipeline({url})")
56
    headers = {"accept": "application/json"}
4✔
57
    url = f"{url}/.well-known/okta-organization"
4✔
58

59
    response = HTTP_client.get(url, headers=headers)
4✔
60

61
    try:
4✔
62
        response_json = response.json()
4✔
63
    except AttributeError as e:
4✔
64
        logger.error(f"Failed to parse json in {url}{e}")
4✔
65
        sys.exit(1)
4✔
66
    try:
4✔
67
        auth_pipeline = response_json.get("pipeline", None)
4✔
68
    except (KeyError, ValueError) as e:
×
69
        logger.error(f"Failed to parse pipeline for {url}:{e}")
×
70
        sys.exit(1)
×
71
    if auth_pipeline != "idx" and auth_pipeline != "v1":
4✔
72
        logger.error(f"unsupported auth pipeline version {auth_pipeline}")
4✔
73
        sys.exit(1)
4✔
74
    logger.debug(f"Pipeline is of type {auth_pipeline}")
75
    return auth_pipeline
4✔
76

77

78
def get_auth_properties(userid=None, url=None):
4✔
79
    """Make a call to the webfinger endpoint to get the auth properties metadata.
80

81
    :param userid: User's ID for which we are requesting an auth endpoint.
82
    :param url: Okta organization URL where we are looking up the user.
83
    :returns: Dictionary containing authentication properties.
84
    """
85
    payload = {"resource": f"okta:acct:{userid}", "rel": "okta:idp"}
1✔
86
    headers = {"accept": "application/jrd+json"}
1✔
87
    url = f"{url}/.well-known/webfinger"
1✔
88
    logger.debug(f"Looking up auth endpoint for {userid} in {url}")
89

90
    # Make a GET request to the webfinger endpoint.
91
    response = HTTP_client.get(url, params=payload, headers=headers)
1✔
92

93
    # Extract properties from the response.
94
    try:
1✔
95
        ret = response.json()["links"][0]["properties"]
1✔
96
    except (KeyError, ValueError) as e:
×
97
        logger.error(f"Failed to parse authentication type in {url}:{str(e)}")
×
98
        logger.debug(f"Response: {response.text}")
99
        sys.exit(1)
×
100

101
    # Extract specific authentication properties if available.
102
    # Return a dictionary with 'metadata', 'type', and 'id' keys.
103
    properties = {}
1✔
104
    properties["metadata"] = ret.get("okta:idp:metadata", None)
1✔
105
    properties["type"] = ret.get("okta:idp:type", None)
1✔
106
    properties["id"] = ret.get("okta:idp:id", None)
1✔
107

108
    logger.debug(f"Auth properties are {properties}")
109
    return properties
1✔
110

111

112
def get_saml_request(auth_properties):
4✔
113
    """
114
    Get a SAML Request object from the Service Provider, to be submitted to the IdP.
115

116
    :param auth_properties: dict with the IdP ID and type.
117
    :returns: dict with post_url, relay_state, and base64 encoded saml request.
118
    """
119
    # Prepare the headers for the request to retrieve the SAML request.
120
    headers = {"accept": "text/html,application/xhtml+xml,application/xml"}
4✔
121

122
    # Build the URL based on the metadata and ID provided in the auth properties.
123
    base_url = user.get_base_url(auth_properties["metadata"])
4✔
124
    url = f"{base_url}/sso/idps/{auth_properties['id']}"
4✔
125

126
    logger.debug(f"Getting SAML request from {url}")
127

128
    # Make a GET request using the HTTP client to retrieve the SAML request.
129
    response = HTTP_client.get(url, headers=headers)
4✔
130

131
    # Extract the required parameters from the SAML request.
132
    post_url = extract_form_post_url(response.text)
4✔
133
    base_url = user.get_base_url(post_url)
4✔
134
    saml_request = {
4✔
135
        "base_url": base_url,
136
        "post_url": post_url,
137
        "relay_state": extract_saml_relaystate(response.text),
138
        "request": extract_saml_request(response.text, raw=True),
139
    }
140

141
    # Mask sensitive data in the logs for security.
142
    user.add_sensitive_value_to_be_masked(saml_request["request"])
4✔
143

144
    logger.debug(f"SAML request is {saml_request}")
145
    return saml_request
4✔
146

147

148
def send_saml_request(saml_request):
4✔
149
    """
150
    Submit SAML request to IdP, and get the response back.
151

152
    :param saml_request: dict with IdP post_url, relay_state, and saml_request
153
    :returns: dict with with SP post_url, relay_state, and saml_response
154
    """
155
    # Define the payload and headers for the request
156
    payload = {
4✔
157
        "relayState": saml_request["relay_state"],
158
        "SAMLRequest": saml_request["request"],
159
    }
160

161
    headers = {
4✔
162
        "accept": "text/html,application/xhtml+xml,application/xml",
163
        "Content-Type": "application/json",
164
    }
165

166
    # Construct the URL from the provided saml_request
167
    url = saml_request["post_url"]
4✔
168

169
    # Log the SAML request details
170
    logger.debug(f"Sending SAML request to {url}")
171

172
    # Use the HTTP client to make a GET request
173
    response = HTTP_client.get(url, params=payload, headers=headers)
4✔
174

175
    # Extract relevant information from the response to form the saml_response dictionary
176
    saml_response = {
4✔
177
        "response": extract_saml_response(response.text, raw=True),
178
        "relay_state": extract_saml_relaystate(response.text),
179
        "post_url": extract_form_post_url(response.text),
180
    }
181

182
    # Mask sensitive values for logging purposes
183
    user.add_sensitive_value_to_be_masked(saml_response["response"])
4✔
184

185
    logger.debug(f"SAML response is {saml_response}")
186
    # Return the formed SAML response
187
    return saml_response
4✔
188

189

190
def create_authz_cookies(oauth2_config, oauth2_session_data):
4✔
191
    """
192
    Set authorize redirect cookies for the HTTP client.
193

194
    Needed for SAML2 flow for OIE.
195
    """
196
    try:
4✔
197
        oauth2_url = f"{oauth2_config['org']}/oauth2/v1"
4✔
198
        oauth2_config_reformatted = {
4✔
199
            "responseType": "code",
200
            "state": oauth2_session_data["state"],
201
            "nonce": oauth2_session_data["nonce"],
202
            "scopes": [
203
                "openid",
204
                "profile",
205
                "email",
206
                "okta.users.read.self",
207
                "okta.users.manage.self",
208
                "okta.internal.enduser.read",
209
                "okta.internal.enduser.manage",
210
                "okta.enduser.dashboard.read",
211
                "okta.enduser.dashboard.manage",
212
            ],
213
            "clientId": oauth2_config["client_id"],
214
            "urls": {
215
                "issuer": oauth2_config["issuer"],
216
                "authorizeUrl": oauth2_config["authorization_endpoint"],
217
                "userinfoUrl": f"{oauth2_url}/userinfo",
218
                "tokenUrl": oauth2_config["token_endpoint"],
219
                "revokeUrl": f"{oauth2_url}/revoke",
220
                "logoutUrl": f"{oauth2_url}/logout",
221
            },
222
            "ignoreSignature": False,
223
        }
224
    except KeyError as e:
4✔
225
        logger.error(f"Missing key in config:{e}")
4✔
226
        sys.exit(1)
4✔
227

228
    cookiejar = requests.cookies.RequestsCookieJar()
4✔
229
    domain = urllib.parse.urlparse(oauth2_config["org"]).netloc
4✔
230
    cookiejar.set(
4✔
231
        "okta-oauth-redirect-params",
232
        urllib.parse.quote(
233
            json.dumps(oauth2_config_reformatted, separators=(",", ":")), safe="{}:[]/"
234
        ),
235
        domain=domain,
236
        path="/",
237
    )
238
    cookiejar.set("okta-oauth-state", oauth2_session_data["state"], domain=domain, path="/")
4✔
239
    cookiejar.set("okta-oauth-nonce", oauth2_session_data["nonce"], domain=domain, path="/")
4✔
240
    cookiejar.set("ln", oauth2_config["ln"], domain=domain, path="/")
4✔
241
    HTTP_client.add_cookies(cookiejar)  # add cookies
4✔
242

243

244
def send_saml_response(config, saml_response):
4✔
245
    """
246
    Submit SAML response to the SP.
247

248
    :param saml_response: dict with SP post_url, relay_state, and saml_response
249
    """
250
    # Define the payload and headers for the request.
251
    payload = {
4✔
252
        "SAMLResponse": saml_response["response"],
253
        "RelayState": saml_response["relay_state"],
254
    }
255
    headers = {
4✔
256
        "accept": "text/html,application/xhtml+xml,application/xml",
257
        "Content-Type": "application/x-www-form-urlencoded",
258
    }
259
    url = saml_response["post_url"]
4✔
260

261
    # Log the SAML response details.
262
    logger.debug(f"Sending SAML response to {url}")
263
    # Use the HTTP client to make a POST request.
264
    response = HTTP_client.post(url, data=payload, headers=headers)
4✔
265

266
    # Get the 'sid' value from the reponse cookies.
267
    sid = response.cookies.get("sid", None)
4✔
268

269
    # If 'sid' is present, mask its value for logging purposes.
270
    if sid:
4✔
271
        user.add_sensitive_value_to_be_masked(sid)
4✔
272
    else:
273
        logger.debug("We did not find a 'sid' entry in the cookies.")
274

275
    # Extract the state token from the response.
276
    state_token = extract_state_token(response.text)
4✔
277
    if state_token:  # TODO: this is not working yet.
4✔
278
        params = {"stateToken": state_token}
×
279
        headers = {
×
280
            "accept": "text/html,application/xhtml+xml,application/xml",
281
        }
UNCOV
282
        response = HTTP_client.get(
×
283
            f"{config.okta['org']}/login/token/redirect",
284
            params=params,
285
            headers=headers,
286
        )
NEW
287
        if "idx" not in response.cookies:
×
NEW
288
            logger.error(
×
289
                f"Session cookie idx for {config.okta['org']} not found. Please file a bug."
290
            )
291
            logger.debug(f"Response: {response.headers}")
292
            logger.debug(f"Response: {response.text}")
NEW
293
            sys.exit(2)
×
294

295

296
def get_session_token(config, primary_auth, headers):
4✔
297
    """Get session_token.
298

299
    :param config: Configuration object
300
    :param headers: Headers of the request
301
    :param primary_auth: Primary authentication
302
    :return: Session Token from JSON response
303
    """
304
    status = None
4✔
305
    try:
4✔
306
        status = primary_auth.get("status", None)
4✔
307
    except AttributeError:
4✔
308
        pass
4✔
309

310
    if status == "SUCCESS" and "sessionToken" in primary_auth:
4✔
311
        session_token = primary_auth.get("sessionToken")
4✔
312
    elif status == "MFA_REQUIRED":
4✔
313
        # Note: mfa_challenge should also be modified to accept and use http_client
314
        session_token = mfa_challenge(config, headers, primary_auth)
4✔
315
    else:
316
        logger.debug(f"Error parsing response: {json.dumps(primary_auth)}")
317
        logger.error(f"Okta auth failed: unknown status {status}")
4✔
318
        sys.exit(1)
4✔
319

320
    user.add_sensitive_value_to_be_masked(session_token)
4✔
321

322
    return session_token
4✔
323

324

325
def get_access_token(oauth2_config, oauth2_session_data, authorize_code):
4✔
326
    """Get OAuth token from Okta by calling /token endpoint.
327

328
    This method does not seem to be needed, calling /authorize sets the idx cookies,
329
    but we put it here to follow the flow vervatim.
330

331
    :param url: URL of the Okta OAuth token endpoint
332
    :return: OAuth token
333
    """
334
    try:
4✔
335
        payload = {
4✔
336
            "code": authorize_code,
337
            "state": oauth2_session_data["state"],
338
            "grant_type": oauth2_session_data["grant_type"],
339
            "redirect_uri": oauth2_session_data["redirect_uri"],
340
            "client_id": oauth2_config["client_id"],
341
            "code_verifier": oauth2_session_data["code_verifier"],
342
        }
343
    except KeyError as e:
×
344
        logger.error(f"Missing key in config:{e}")
×
345
        sys.exit(1)
×
346

347
    headers = {"accept": "application/json"}
4✔
348
    # Using the http_client to make the POST request
349
    response = HTTP_client.post(
4✔
350
        oauth2_config["token_endpoint"], data=payload, headers=headers, return_json=True
351
    )
352
    # We now have response['access_token'] and response['id_token'], but we dont seem to need
353
    # them to access the resources.
354
    access_token = None
4✔
355
    try:
4✔
356
        access_token = response["access_token"]
4✔
357
    except KeyError:
4✔
358
        logger.debug(f"Error parsing response: {json.dumps(response)}")
359
        # Don't do anything but a debug message, as the /token call doesnt seem to be needed.
360
    return access_token
4✔
361

362

363
def get_enduser_url(url):
4✔
364
    """Retrieve enduser URL.
365

366
    :url: Okta URL to retrieve enduser URL from
367
    :returns: enduser URL or None
368
    """
369
    enduser_url = None
4✔
370

371
    res = HTTP_client.get(url)
4✔
372
    soup = BeautifulSoup(res.text, "html.parser")
4✔
373
    pattern = re.compile(r".*enduser-v.*enduser.*")
4✔
374
    script = soup.find("script", src=pattern)
4✔
375
    if type(script) is bs4.element.Tag:
4✔
376
        logger.debug(f"Found script tag: {script['src']}")
377
        enduser_url = script["src"]
4✔
378
    return enduser_url
4✔
379

380

381
def get_client_id_by_url(url):
4✔
382
    """Retrieve clientId.
383

384
    :url: Javascript URL to retrieve clientId from
385
    :returns: clientId or None
386
    """
387
    client_id = None
4✔
388
    enduser_url = get_enduser_url(url)
4✔
389
    if enduser_url:
4✔
390
        res = HTTP_client.get(enduser_url)
4✔
391
        pattern = re.compile(r',clientId:"(?P<clientId>.*?)",')
4✔
392

393
        match = pattern.search(res.text)
4✔
394
        if match:
4✔
395
            logger.debug(f"Found clientId: {match.group('clientId')}")
396
            client_id = match.group("clientId")
4✔
397

398
    return client_id
4✔
399

400

401
def get_client_id(config):
4✔
402
    """Get the client id needed by the Authorization Code Flow.
403

404
    If a command line parameter was passed, it will take precedence.
405
    If no command line parameter was passed, it will try to determine it.
406

407
    """
408
    if "client_id" in config.okta and config.okta["client_id"]:
4✔
409
        return config.okta["client_id"]
4✔
410
    else:
411
        return get_client_id_by_url(config.okta["org"])
4✔
412

413

414
def get_redirect_uri(oauth2_url):
4✔
415
    """
416
    Get the redirect uri needed by the Authorization Code Flow.
417

418
    Return url
419
    """
420
    uri = f"{oauth2_url}/enduser/callback"
4✔
421
    return uri
4✔
422

423

424
def get_response_type():
4✔
425
    """
426
     We're only implementing code response type.
427

428
    So we're only returning "code"
429
    """
430
    return "code"
4✔
431

432

433
def get_authorize_scope():
4✔
434
    """We're only implementing openid scope."""
435
    return "openid"
4✔
436

437

438
def get_oauth2_state():
4✔
439
    """Generate a random string for state."""
440
    state = hashlib.sha256(os.urandom(1024)).hexdigest()
4✔
441
    return state
4✔
442

443

444
def get_pkce_code_challenge_method():
4✔
445
    """
446
    Return code challenge.
447

448
    Only S256 is implemented.
449
    """
450
    return "S256"
4✔
451

452

453
def get_pkce_code_challenge(code_verifier=None):
4✔
454
    """
455
    Get PKCE Code Challenge.
456

457
    Base64-URL-encoded string of the SHA256 hash of the code verifier
458
    https://www.oauth.com/oauth2-servers/pkce/authorization-request/
459

460
    :param: code_verifier
461
    :return: code_challenge
462
    """
463
    code_challenge = hashlib.sha256(code_verifier.encode("utf-8")).digest()
4✔
464
    code_challenge = base64.urlsafe_b64encode(code_challenge).decode("utf-8")
4✔
465
    code_challenge = code_challenge.replace("=", "")
4✔
466
    return code_challenge
4✔
467

468

469
def get_pkce_code_verifier():
4✔
470
    """
471
    Get pkce code verifier.
472

473
    :return: code_verifier
474
    """
475
    code_verifier = base64.urlsafe_b64encode(os.urandom(40)).decode("utf-8")
4✔
476
    code_verifier = re.sub("[^a-zA-Z0-9]+", "", code_verifier)
4✔
477
    return code_verifier
4✔
478

479

480
def pkce_enabled():
4✔
481
    """
482
    Check of PKCE is enabled.
483

484
    Altho the authorization server config tells us our okta doesnt PKCE enabled, omitting its
485
    settings will cause the authorize code flow to fail, so we always return True.
486
    """
487
    return True
4✔
488

489

490
def get_authorize_code(response, session_token):
4✔
491
    """
492
    Get the authorize code.
493

494
    This will exit with error if we cannot get the code.
495
    It will also check the response from the /authorize call for callback errors,
496
    And if any, print and exit with error.
497
    """
498
    callback_url = response.url
4✔
499
    error_code = re.search(r"(?<=error=)[^&]+", callback_url)
4✔
500
    error_desc = re.search(r"(?<=error_description=)[^&]+", callback_url)
4✔
501
    if error_code:
4✔
502
        error_value = error_code.group()
4✔
503
        if not session_token and error_value == "login_required":
4✔
504
            return (
4✔
505
                None  # if we arent authenticated we wont have sessionToken, so ignore login error.
506
            )
507
        else:
508
            logger.error(f"Oauth2 callback error:{error_value}:{error_desc.group()}")
×
509
            logger.debug(f"Response: {response.text}")
510
            sys.exit(1)
×
511
    authorize_code = re.search(r"(?<=code=)[^&]+", callback_url)
4✔
512
    if authorize_code:
4✔
513
        return authorize_code.group()
4✔
514

515

516
def authorization_code_enabled(oauth2_config):
4✔
517
    """
518
    Determine if authorization code grant is enabled.
519

520
    Returns True if the dict key is in authorization server info, and False otherwise,
521
    """
522
    if "org" not in oauth2_config:
4✔
523
        logger.error(f"No org in config:{oauth2_config}")
4✔
524
        sys.exit(1)
4✔
525
    try:
4✔
526
        if "authorization_code" not in oauth2_config["grant_types_supported"]:
4✔
527
            return False
×
528
    except (KeyError, ValueError) as e:
×
529
        logger.error(f"No grant types supported on {oauth2_config['org']}:{str(e)}")
×
530
        sys.exit(1)
×
531
    return True
4✔
532

533

534
def authorize_request(oauth2_config, oauth2_session_data):
4✔
535
    """
536
    Call /authorize endpoint.
537

538
    :param
539
    :return: authorization code, needed for /token call
540
    """
541
    logger.debug(f"oauth_code_request({oauth2_config}, {oauth2_session_data})")
542
    headers = {"accept": "application/json", "content-type": "application/json"}
4✔
543

544
    session_token = HTTP_client.session.cookies.get("sessionToken")
4✔
545

546
    try:
4✔
547
        payload = {
4✔
548
            "client_id": oauth2_config["client_id"],
549
            "redirect_uri": oauth2_session_data["redirect_uri"],
550
            "response_type": oauth2_session_data["response_type"],
551
            "scope": oauth2_session_data["scope"],
552
            "state": oauth2_session_data["state"],
553
            "code_challenge": oauth2_session_data["code_challenge"],
554
            "code_challenge_method": oauth2_session_data["code_challenge_method"],
555
            "prompt": "none",  # dont authenticate
556
            "sessionToken": session_token,
557
        }
558
    except KeyError as e:
×
559
        logger.error(f"Missing key in config:{e}")
×
560
        sys.exit(1)
×
561

562
    response = HTTP_client.get(
4✔
563
        oauth2_config["authorization_endpoint"],
564
        headers=headers,
565
        params=payload,
566
    )
567

568
    idx = HTTP_client.session.cookies.get("idx", None)
4✔
569
    if idx:
4✔
570
        user.add_sensitive_value_to_be_masked(idx)
1✔
571
    else:
572
        logger.debug("We did not find an 'idx' entry in the cookies.")
573

574
    authorize_code = get_authorize_code(response, session_token)
4✔
575
    return authorize_code
4✔
576

577

578
def get_nonce(url):
4✔
579
    """Get nonce from the org server."""
580
    userhome_url = f"{url}/app/UserHome"
4✔
581
    payload = {"session_hint": "AUTHENTICATED", "iss": urllib.parse.quote(userhome_url, safe="")}
4✔
582
    response = HTTP_client.get(url, params=payload)
4✔
583
    # '<script nonce="ABCXXXXXXXXXXXXXXXXYZ" type="text/javascript">'
584
    nonce = None
4✔
585
    pattern = re.compile(r'script nonce="(?P<nonce>.*?)" ', re.MULTILINE)
4✔
586
    match = pattern.search(response.text)
4✔
587
    if match:
4✔
588
        logger.debug(f"Found nonce: {match.group('nonce')}")
589
        nonce = match.group("nonce")
4✔
590

591
    return nonce
4✔
592

593

594
def get_oauth2_session_data(url):
4✔
595
    """
596
    Get some oauth2 session data.
597

598
    We do this to have the same in oath2 cookies and /authorize call.
599
    """
600
    authz_session_data = {
4✔
601
        "response_type": get_response_type(),
602
        "scope": get_authorize_scope(),
603
        "state": get_oauth2_state(),
604
        "redirect_uri": get_redirect_uri(url),
605
        "grant_type": "authorization_code",
606
    }
607
    authz_session_data["nonce"] = get_nonce(url)
4✔
608

609
    if pkce_enabled():
4✔
610
        code_verifier = get_pkce_code_verifier()
4✔
611
        authz_session_data["code_verifier"] = code_verifier
4✔
612
        authz_session_data["code_challenge"] = get_pkce_code_challenge(code_verifier)
4✔
613
        authz_session_data["code_challenge_method"] = get_pkce_code_challenge_method()
4✔
614

615
    return authz_session_data
4✔
616

617

618
def get_oauth2_configuration(config):
4✔
619
    """Get authorization server configuration data from Okta instance.
620

621
    :param url: URL of the Okta org
622
    :return: dict of conguration values
623
    """
624
    url = f"{config.okta['org']}/.well-known/oauth-authorization-server"
4✔
625
    headers = {"accept": "application/json"}
4✔
626
    response = HTTP_client.get(url, headers=headers)
4✔
627
    logger.debug(f"Authorization Server info: {response.json()}")
628
    # TODO: handle errors
629
    oauth2_config = response.json()
4✔
630
    oauth2_config["org"] = config.okta["org"]
4✔
631
    oauth2_config["client_id"] = get_client_id(config)
4✔
632
    oauth2_config["ln"] = config.okta["username"]
4✔
633
    validate_oauth2_configuration(oauth2_config)
4✔
634
    return oauth2_config
4✔
635

636

637
def validate_oauth2_configuration(oauth2_config):
4✔
638
    """
639
    Validate that the oauth2 configuration has our implementation.
640

641
    Will exit with error if a mandatory config is missing.
642
    :param oauth2_config: dict of configuration values
643
    """
644
    mandadory_oauth2_config_items = {
4✔
645
        "authorization_endpoint",
646
        "token_endpoint",
647
        "grant_types_supported",
648
        "response_types_supported",
649
        "scopes_supported",
650
        "client_id",
651
        "org",
652
        "ln",
653
    }  # the authorization server must have these config elements
654
    for item in mandadory_oauth2_config_items:
4✔
655
        if item not in oauth2_config:
4✔
656
            logger.error(f"No {item} found in oauth2 configuration.")
4✔
657
            sys.exit(1)
4✔
658

659
    if "authorization_code" not in oauth2_config["grant_types_supported"]:
4✔
660
        logger.error("Authorization code grant not found.")
×
661
        sys.exit(1)
×
662
    if "code" not in oauth2_config["response_types_supported"]:
4✔
663
        logger.error("Code response type not found.")
×
664
        sys.exit(1)
×
665

666

667
def create_authn_cookies(authn_org_url, session_token):
4✔
668
    """
669
    Create session cookie.
670

671
    :param authn_org_url: org url
672
    :param session_token: session token, str
673
    :returns: cookies jar with session_id value we got using the token
674
    """
675
    # Construct the URL from the base URL provided.
676
    url = f"{authn_org_url}/api/v1/sessions"
1✔
677

678
    # Define the payload and headers for the request.
679
    data = {"sessionToken": session_token}
1✔
680
    headers = {"Content-Type": "application/json", "accept": "application/json"}
1✔
681

682
    # Log the request details.
683
    logger.debug(f"Requesting session cookies from {url}")
684

685
    # Use the HTTP client to make a POST request.
686
    response_json = HTTP_client.post(url, json=data, headers=headers, return_json=True)
1✔
687

688
    if "id" not in response_json:
1✔
689
        logger.error(f"'id' not found in response. Full response: {response_json}")
×
690
        sys.exit(1)
×
691
    session_id = response_json["id"]
1✔
692
    user.add_sensitive_value_to_be_masked(session_id)
1✔
693

694
    cookiejar = requests.cookies.RequestsCookieJar()
1✔
695
    domain = urllib.parse.urlparse(url).netloc
1✔
696
    cookiejar.set("sid", session_id, domain=domain, path="/")
1✔
697
    cookiejar.set("sessionToken", session_token, domain=domain, path="/")
1✔
698
    HTTP_client.add_cookies(cookiejar)  # add cookies
1✔
699

700

701
def idp_authenticate(config):
4✔
702
    """Authenticate user to okta."""
703
    auth_properties = get_auth_properties(userid=config.okta["username"], url=config.okta["org"])
4✔
704

705
    if "type" not in auth_properties:
4✔
706
        logger.error("Okta auth failed: unknown type.")
4✔
707
        sys.exit(1)
4✔
708

709
    # Possible recursion ahead. The exit condition should be the first if statement.
710
    if local_authentication_enabled(auth_properties):
4✔
711
        session_token = local_authenticate(config)
4✔
712
        # authentication sends us a token
713
        # which we then put in our session cookies
714
        create_authn_cookies(config.okta["org"], session_token)
4✔
715
    elif is_saml2_authentication(auth_properties):
4✔
716
        # We may loop thru the saml2 servers until
717
        # we find the authentication server.
718
        saml2_authenticate(config, auth_properties)
4✔
719
    else:
720
        logger.error(
4✔
721
            f"{auth_properties['type']} login via IdP Discovery is not currently supported"
722
        )
723
        sys.exit(1)
4✔
724

725

726
def access_control(config):
4✔
727
    """Authenticate and authorize with the IDP.
728

729
    if OIE is enabled and a client_id is found,run Authorization code flow and PKCE being
730
    the only implemented grant types.
731

732
    Okta uses cookies to manage sessions.
733

734
    :param config: Config object
735
    """
736
    logger.debug(f"access_control({config})")
737

738
    oauth2_config = None
4✔
739
    oauth2_session_data = None
4✔
740

741
    is_oie = oie_enabled(config.okta["org"])
4✔
742
    # We set the oauth2 data (variables and cookies) that will be used at /authorize and during
743
    # saml2 for chained orgs.
744
    if is_oie:
4✔
745
        logger.debug("OIE enabled")
746
        # save some oauth2 config data + create session data, and create authz cookies
747
        oauth2_config = get_oauth2_configuration(config)
4✔
748
        oauth2_session_data = get_oauth2_session_data(config.okta["org"])
4✔
749
        create_authz_cookies(oauth2_config, oauth2_session_data)
4✔
750
        # The flow says to initially call /authorize here, but that doesnt do anything.
751
        idp_authorize(oauth2_config, oauth2_session_data)
4✔
752
        # We call it later, after we are authenticated.
753

754
    idp_authenticate(config)
4✔
755

756
    if is_oie:
4✔
757
        # call /authorize . Note: we are authenticated.
758
        idp_authorize(oauth2_config, oauth2_session_data)
4✔
759

760

761
def idp_authorize(oauth2_config, oauth2_session_data):
4✔
762
    """
763
    Authorize on the okta authorization server.
764

765
    If we arent authenticated, we will still call /authorize but won't get a code.
766
    When we are authenticated, we get an idx cookies and dont need to do anything else.
767
    """
768
    if "client_id" not in oauth2_config or not oauth2_config["client_id"]:
4✔
769
        logger.error("We are calling /authorize without a client_id")
4✔
770
        sys.exit(1)
4✔
771

772
    if authorization_code_enabled(oauth2_config):
4✔
773
        authorize_code = authorize_request(oauth2_config, oauth2_session_data)
4✔
774
        # The following get_access_token does not seem to matter, the /authorize call above sets
775
        # the idx cookies and we're done. We put it here in an attempt to follow the flow verbatim.
776
        if authorize_code:  # We got value if we were authenticated.
4✔
777
            get_access_token(oauth2_config, oauth2_session_data, authorize_code)
4✔
778

779

780
def step_up_authenticate(config, state_token):
4✔
781
    """Try to step up authenticate the user. Only supported for local auth.
782

783
    :param config: Configuration object
784
    :param state_token: The state token
785
    :return: True if step up authentication was successful; False otherwise
786
    """
787
    auth_properties = get_auth_properties(userid=config.okta["username"], url=config.okta["org"])
4✔
788
    if "type" not in auth_properties or not local_authentication_enabled(auth_properties):
4✔
789
        return False
4✔
790

791
    headers = {"content-type": "application/json", "accept": "application/json"}
4✔
792
    payload = {"stateToken": state_token}
4✔
793

794
    auth = HTTP_client.post(
4✔
795
        f"{config.okta['org']}/api/v1/authn", json=payload, headers=headers, return_json=True
796
    )
797

798
    status = auth.get("status", None)
4✔
799
    if status == "SUCCESS":
4✔
800
        return True
4✔
801
    elif status == "MFA_REQUIRED":
4✔
802
        mfa_challenge(config, headers, auth)
4✔
803
        return True
4✔
804

805
    logger.error("Okta auth failed: unknown status for step up authentication.")
4✔
806
    return False
4✔
807

808

809
def saml2_authenticate(config, auth_properties):
4✔
810
    """SAML2 authentication flow.
811

812
    :param config: Config object
813
    :param auth_properties: dict with authentication properties
814
    :returns: session ID cookie, if successful.
815
    """
816
    # Get the SAML request details
817
    saml_request = get_saml_request(auth_properties)
4✔
818

819
    # Create a copy of our configuration, so that we can freely reuse it
820
    # without Python's pass-as-reference-value interfering with it.
821
    saml2_config = deepcopy(config)
4✔
822
    saml2_config.okta["org"] = saml_request["base_url"]
4✔
823
    logger.info(f"Authentication is being redirected to {saml2_config.okta['org']}.")
4✔
824

825
    # Try to authenticate using the new configuration. This could cause
826
    # recursive calls, which allows for IdP chaining.
827
    idp_authenticate(saml2_config)
4✔
828

829
    # Once we are authenticated, send the SAML request to the IdP.
830
    # This call requires session cookies.
831
    saml_response = send_saml_request(saml_request)
4✔
832

833
    # Send SAML response from the IdP back to the SP, which will generate new
834
    # session cookies.
835
    send_saml_response(config, saml_response)
4✔
836

837

838
def oie_enabled(url):
4✔
839
    """
840
    Determine if OIE is enabled.
841

842
    :pamam url: okta org url
843
    :return: True if OIE is enabled, False otherwise
844
    """
845
    if get_auth_pipeline(url) == "idx":  # oie
4✔
846
        return True
4✔
847
    else:
848
        return False
4✔
849

850

851
def local_authenticate(config):
4✔
852
    """Authenticate user on local okta instance.
853

854
    :param config: Config object
855
    :return: authn token
856
    """
857
    session_token = None
4✔
858
    headers = {"content-type": "application/json", "accept": "application/json"}
4✔
859
    payload = {"username": config.okta["username"], "password": config.okta["password"]}
4✔
860

861
    logger.debug(f"Authenticate user to {config.okta['org']}/api/v1/authn")
862
    logger.debug(f"Sending {headers}, {payload} to {config.okta['org']}/api/vi/authn")
863

864
    primary_auth = HTTP_client.post(
4✔
865
        f"{config.okta['org']}/api/v1/authn",
866
        json=payload,
867
        headers=headers,
868
        return_json=True,
869
    )
870

871
    if "errorCode" in primary_auth:
4✔
872
        api_error_code_parser(primary_auth["errorCode"])
×
873
        sys.exit(1)
×
874

875
    while session_token is None:
4✔
876
        session_token = get_session_token(config, primary_auth, headers)
4✔
877
    logger.info(f"User has been successfully authenticated to {config.okta['org']}.")
4✔
878
    return session_token
4✔
879

880

881
def local_authentication_enabled(auth_properties):
4✔
882
    """Check whether authentication happens on the current instance.
883

884
    :param auth_properties: auth_properties dict
885
    :return: True if this is the place to authenticate, False otherwise.
886
    """
887
    # IWA (https://help.okta.com/en-us/content/topics/directory/ad-iwa-learn.htm)
888
    # should be treated as local authentication
889
    try:
4✔
890
        if auth_properties["type"] == "OKTA" or auth_properties["type"] == "IWA":
4✔
891
            return True
4✔
892
    except (TypeError, KeyError):
4✔
893
        pass
4✔
894
    return False
4✔
895

896

897
def is_saml2_authentication(auth_properties):
4✔
898
    """Check whether authentication happens via SAML2 on a different IdP.
899

900
    :param auth_properties: auth_properties dict
901
    :return: True for SAML2 on Okta, False otherwise.
902
    """
903
    try:
4✔
904
        if auth_properties["type"] == "SAML2":
4✔
905
            return True
4✔
906
    except (TypeError, KeyError):
4✔
907
        pass
4✔
908
    return False
4✔
909

910

911
def extract_saml_response(html, raw=False):
4✔
912
    """Parse html, and extract a SAML document.
913

914
    :param html: String with HTML document.
915
    :param raw: Boolean that determines whether or not the response should be decoded.
916
    :return: XML Document, or None
917
    """
918
    soup = BeautifulSoup(html, "html.parser")
4✔
919
    xml = None
4✔
920
    saml_base64 = None
4✔
921
    retval = None
4✔
922

923
    elem = soup.find("input", attrs={"name": "SAMLResponse"})
4✔
924
    if type(elem) is bs4.element.Tag:
4✔
925
        saml_base64 = str(elem.get("value"))
4✔
926
        xml = codecs.decode(saml_base64.encode("ascii"), "base64").decode("utf-8")
4✔
927

928
        retval = xml
4✔
929
        if raw:
4✔
930
            retval = saml_base64
4✔
931
    return retval
4✔
932

933

934
def extract_saml_request(html, raw=False):
4✔
935
    """Parse html, and extract a SAML document.
936

937
    :param html: String with HTML document.
938
    :param raw: Boolean that determines whether or not the response should be decoded.
939
    :return: XML Document, or None
940
    """
941
    soup = BeautifulSoup(html, "html.parser")
4✔
942
    xml = None
4✔
943
    saml_base64 = None
4✔
944
    retval = None
4✔
945

946
    elem = soup.find("input", attrs={"name": "SAMLRequest"})
4✔
947
    if type(elem) is bs4.element.Tag:
4✔
948
        saml_base64 = str(elem.get("value"))
4✔
949
        xml = codecs.decode(saml_base64.encode("ascii"), "base64").decode("utf-8")
4✔
950

951
        retval = xml
4✔
952
        if raw:
4✔
953
            retval = saml_base64
4✔
954
    return retval
4✔
955

956

957
def extract_form_post_url(html):
4✔
958
    """Parse html, and extract a Form Action POST URL.
959

960
    :param html: String with HTML document.
961
    :return: URL string, or None
962
    """
963
    soup = BeautifulSoup(html, "html.parser")
4✔
964
    post_url = None
4✔
965
    elem = soup.find("form", attrs={"id": "appForm"})
4✔
966
    if type(elem) is bs4.element.Tag:
4✔
967
        post_url = str(elem.get("action"))
4✔
968
        logger.debug(f"Found POST URL: {post_url}")
969
    return post_url
4✔
970

971

972
def extract_saml_relaystate(html):
4✔
973
    """Parse html, and extract SAML relay state from a form.
974

975
    :param html: String with HTML document.
976
    :return: relay state value, or None
977
    """
978
    soup = BeautifulSoup(html, "html.parser")
4✔
979
    relay_state = None
4✔
980

981
    elem = soup.find("input", attrs={"name": "RelayState"})
4✔
982
    if type(elem) is bs4.element.Tag:
4✔
983
        relay_state = str(elem.get("value"))
4✔
984
    return relay_state
4✔
985

986

987
def extract_state_token(html):
4✔
988
    """Parse an HTML document, and extract a state token.
989

990
    :param html: String with HTML document
991
    :return: string with state token, or None
992
    """
993
    soup = BeautifulSoup(html, "html.parser")
4✔
994
    state_token = None
4✔
995
    pattern = re.compile(r"var stateToken = '(?P<stateToken>.*)';", re.MULTILINE)
4✔
996

997
    script = soup.find("script", string=pattern)
4✔
998
    if type(script) is bs4.element.Tag:
4✔
999
        match = pattern.search(script.text)
×
1000
        if match:
×
1001
            encoded_token = match.group("stateToken")
×
1002
            state_token = codecs.decode(encoded_token, "unicode-escape")
×
1003

1004
    return state_token
4✔
1005

1006

1007
def mfa_provider_type(
4✔
1008
    config,
1009
    mfa_provider,
1010
    selected_factor,
1011
    mfa_challenge_url,
1012
    primary_auth,
1013
    selected_mfa_option,
1014
    headers,
1015
    payload,
1016
):
1017
    """Receive session key.
1018

1019
    :param config: Config object
1020
    :param mfa_provider: MFA provider
1021
    :param selected_factor: Selected MFA factor
1022
    :param mfa_challenge_url: MFA challenge url
1023
    :param primary_auth: Primary authentication
1024
    :param selected_mfa_option: Selected MFA option
1025
    :return: session_key
1026

1027
    """
1028
    mfa_verify = dict()
4✔
1029
    factor_type = selected_factor.get("_embedded", {}).get("factor", {}).get("factorType", None)
4✔
1030

1031
    if mfa_provider == "DUO":
4✔
1032
        mfa_verify = duo.authenticate(selected_factor)
4✔
1033
        headers = {"content-type": "application/json", "accept": "application/json"}
4✔
1034
        mfa_verify = HTTP_client.post(
4✔
1035
            mfa_challenge_url, json=payload, headers=headers, return_json=True
1036
        )
1037

1038
    elif mfa_provider == "OKTA" and factor_type == "push":
4✔
1039
        mfa_verify = push_approval(mfa_challenge_url, payload)
4✔
1040
    elif (mfa_provider in ["OKTA", "GOOGLE"] and factor_type in ["token:software:totp", "sms"]) or (
4✔
1041
        mfa_provider == "OKTA" and factor_type == "question"
1042
    ):
1043
        mfa_verify = totp_approval(
4✔
1044
            config, selected_mfa_option, headers, mfa_challenge_url, payload, primary_auth
1045
        )
1046
    else:
1047
        logger.error(
4✔
1048
            f"Sorry, the MFA provider '{mfa_provider}:{factor_type}' is not yet supported."
1049
            " Please retry with another option."
1050
        )
1051
        sys.exit(1)
4✔
1052

1053
    if "sessionToken" not in mfa_verify:
4✔
1054
        logger.error(
×
1055
            f"Could not verify MFA Challenge with {mfa_provider} {primary_auth['factorType']}"
1056
        )
1057
    return mfa_verify["sessionToken"]
4✔
1058

1059

1060
def mfa_index(preset_mfa, available_mfas, mfa_options):
4✔
1061
    """Get mfa index in request.
1062

1063
    :param preset_mfa: preset mfa from settings
1064
    :param available_mfas: available mfa ids
1065
    :param mfa_options: available mfas
1066
    """
1067
    indices = []
4✔
1068
    # Gets the index number from each preset MFA in the list of avaliable ones.
1069
    if preset_mfa:
4✔
1070
        logger.debug(f"Get mfa from {available_mfas}.")
1071
        indices = [i for i, elem in enumerate(available_mfas) if preset_mfa in elem]
4✔
1072

1073
    index = None
4✔
1074
    if len(indices) == 0:
4✔
1075
        logger.debug(f"No matches with {preset_mfa}, going to get user input")
1076
        index = user.select_preferred_mfa_index(mfa_options)
4✔
1077
    elif len(indices) == 1:
4✔
1078
        logger.debug(f"One match: {preset_mfa} in {indices}")
1079
        index = indices[0]
4✔
1080
    else:
1081
        logger.error(
4✔
1082
            f"{preset_mfa} is not unique in {available_mfas}. Please check your configuration."
1083
        )
1084
        sys.exit(1)
4✔
1085

1086
    return index
4✔
1087

1088

1089
def mfa_challenge(config, headers, primary_auth):
4✔
1090
    """Handle user mfa challenges.
1091

1092
    :param config: Config object
1093
    :param headers: headers what needs to be sent to api
1094
    :param primary_auth: primary authentication
1095
    :return: Okta MFA Session token after the successful entry of the code
1096
    """
1097
    logger.debug("Handle user MFA challenge")
1098
    try:
4✔
1099
        mfa_options = primary_auth["_embedded"]["factors"]
4✔
1100
    except KeyError as error:
4✔
1101
        logger.error(f"There was a wrong response structure: \n{error}")
4✔
1102
        sys.exit(1)
4✔
1103

1104
    preset_mfa = config.okta["mfa"]
1✔
1105

1106
    available_mfas = [f"{d['provider']}_{d['factorType']}_{d['id']}" for d in mfa_options]
1✔
1107
    index = mfa_index(preset_mfa, available_mfas, mfa_options)
1✔
1108

1109
    selected_mfa_option = mfa_options[index]
1✔
1110
    logger.debug(f"Selected MFA is [{selected_mfa_option}]")
1111

1112
    mfa_challenge_url = selected_mfa_option["_links"]["verify"]["href"]
1✔
1113

1114
    payload = {
1✔
1115
        "stateToken": primary_auth["stateToken"],
1116
        "factorType": selected_mfa_option["factorType"],
1117
        "provider": selected_mfa_option["provider"],
1118
        "profile": selected_mfa_option["profile"],
1119
    }
1120

1121
    selected_factor = HTTP_client.post(
1✔
1122
        mfa_challenge_url, json=payload, headers=headers, return_json=True
1123
    )
1124

1125
    mfa_provider = selected_factor["_embedded"]["factor"]["provider"]
1✔
1126
    logger.debug(f"MFA Challenge URL: [{mfa_challenge_url}] headers: {headers}")
1127

1128
    mfa_session_token = mfa_provider_type(
1✔
1129
        config,
1130
        mfa_provider,
1131
        selected_factor,
1132
        mfa_challenge_url,
1133
        primary_auth,
1134
        selected_mfa_option,
1135
        headers,
1136
        payload,
1137
    )
1138

1139
    logger.debug(f"MFA Session Token: [{mfa_session_token}]")
1140
    return mfa_session_token
1✔
1141

1142

1143
def totp_approval(config, selected_mfa_option, headers, mfa_challenge_url, payload, primary_auth):
4✔
1144
    """Handle user mfa options.
1145

1146
    :param config: Config object
1147
    :param selected_mfa_option: Selected MFA option (SMS, push, etc)
1148
    :param headers: headers
1149
    :param mfa_challenge_url: MFA challenge URL
1150
    :param payload: payload
1151
    :param primary_auth: Primary authentication method
1152
    :return: payload data
1153

1154
    """
1155
    logger.debug(f"User MFA options selected: [{selected_mfa_option['factorType']}]")
1156
    if config.okta["mfa_response"] is None:
4✔
1157
        logger.debug("Getting verification code from user.")
1158
        if selected_mfa_option["factorType"] == "question":
4✔
1159
            config.okta["mfa_response"] = user.get_secret_input(
×
1160
                selected_mfa_option["profile"]["questionText"]
1161
            )
1162
        else:
1163
            config.okta["mfa_response"] = user.get_input("Enter your verification code: ")
4✔
1164
        user.add_sensitive_value_to_be_masked(config.okta["mfa_response"])
4✔
1165

1166
    # time to verify the mfa
1167
    payload = {
4✔
1168
        "stateToken": primary_auth["stateToken"],
1169
        "passCode": config.okta["mfa_response"],
1170
    }
1171

1172
    # Using the http_client to make the POST request
1173
    mfa_verify = HTTP_client.post(
4✔
1174
        mfa_challenge_url, json=payload, headers=headers, return_json=True
1175
    )
1176

1177
    if "sessionToken" in mfa_verify:
4✔
1178
        user.add_sensitive_value_to_be_masked(mfa_verify["sessionToken"])
4✔
1179
    logger.debug(f"mfa_verify [{json.dumps(mfa_verify)}]")
1180

1181
    # Clear out any MFA response since it is no longer valid
1182
    config.okta["mfa_response"] = None
4✔
1183

1184
    return mfa_verify
4✔
1185

1186

1187
def push_approval(mfa_challenge_url, payload):
4✔
1188
    """Handle push approval from the user.
1189

1190
    :param mfa_challenge_url: MFA challenge url
1191
    :param payload: payload which needs to be sent
1192
    :return: Session Token if succeeded or terminates if user wait goes 5 min
1193

1194
    """
1195
    logger.debug(f"Push approval with challenge_url:{mfa_challenge_url}")
1196

1197
    user.print("Waiting for an approval from the device...")
4✔
1198
    status = "MFA_CHALLENGE"
4✔
1199
    result = "WAITING"
4✔
1200
    response = {}
4✔
1201
    challenge_displayed = False
4✔
1202

1203
    headers = {"content-type": "application/json", "accept": "application/json"}
4✔
1204

1205
    while status == "MFA_CHALLENGE" and result == "WAITING":
4✔
1206
        response = HTTP_client.post(
4✔
1207
            mfa_challenge_url, json=payload, headers=headers, return_json=True
1208
        )
1209

1210
        if "sessionToken" in response:
4✔
1211
            user.add_sensitive_value_to_be_masked(response["sessionToken"])
4✔
1212

1213
        logger.debug(f"MFA Response:\n{json.dumps(response)}")
1214
        # Retrieve these values from the object, and set a sensible default if they do not
1215
        # exist.
1216
        status = response.get("status", "UNKNOWN")
4✔
1217
        result = response.get("factorResult", "UNKNOWN")
4✔
1218

1219
        # The docs at https://developer.okta.com/docs/reference/api/authn/#verify-push-factor
1220
        # state that the call will return a factorResult in [ SUCCESS, REJECTED, TIMEOUT,
1221
        # WAITING]. However, on success, SUCCESS is not set and we have to rely on the
1222
        # response["status"] instead
1223
        answer = (
4✔
1224
            response.get("_embedded", {})
1225
            .get("factor", {})
1226
            .get("_embedded", {})
1227
            .get("challenge", {})
1228
            .get("correctAnswer", None)
1229
        )
1230
        if answer and not challenge_displayed:
4✔
1231
            # If a Number Challenge response exists, retrieve it from this deeply nested path,
1232
            # otherwise set to None.
1233
            user.print(f"Number Challenge response is {answer}")
4✔
1234
            challenge_displayed = True
4✔
1235
        time.sleep(1)
4✔
1236

1237
    if status == "SUCCESS" and "sessionToken" in response:
4✔
1238
        # noop, we will return the variable later
1239
        pass
4✔
1240
    # Everything else should have a status of "MFA_CHALLENGE", and the result provides a
1241
    # hint on why the challenge failed.
1242
    elif result == "REJECTED":
4✔
1243
        logger.error("The Okta Verify push has been denied.")
4✔
1244
        sys.exit(2)
4✔
1245
    elif result == "TIMEOUT":
4✔
1246
        logger.error("Device approval window has expired.")
4✔
1247
        sys.exit(2)
4✔
1248
    else:
1249
        logger.error(f"Push response type {result} for {status} not implemented.")
4✔
1250
        sys.exit(2)
4✔
1251

1252
    return response
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc