• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dowjones / tokendito / 7255919053

19 Dec 2023 01:04AM UTC coverage: 91.579% (-0.1%) from 91.689%
7255919053

push

github

web-flow
hide answer, and fix lint errors (#155)

10 of 13 new or added lines in 2 files covered. (76.92%)

1 existing line in 1 file now uncovered.

1392 of 1520 relevant lines covered (91.58%)

3.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.21
/tokendito/user.py
1
# vim: set filetype=python ts=4 sw=4
2
# -*- coding: utf-8 -*-
3
"""Helper module for AWS and Okta configuration, management and data flow."""
4✔
4
import argparse
4✔
5
import builtins
4✔
6
import codecs
4✔
7
import configparser
4✔
8
from datetime import timezone
4✔
9
from getpass import getpass
4✔
10
import json
4✔
11
import logging
4✔
12
import os
4✔
13
from pathlib import Path
4✔
14
from pkgutil import iter_modules
4✔
15
import platform
4✔
16
import re
4✔
17
import sys
4✔
18
from urllib.parse import urlparse
4✔
19

20
from botocore import __version__ as __botocore_version__
4✔
21
from bs4 import __version__ as __bs4_version__  # type: ignore (bs4 does not have PEP 561 support)
4✔
22
from bs4 import BeautifulSoup
4✔
23
import requests
4✔
24
from tokendito import __version__
4✔
25
from tokendito import aws
4✔
26
from tokendito import okta
4✔
27
from tokendito.config import Config
4✔
28
from tokendito.config import config
4✔
29
from tokendito.http_client import HTTP_client
4✔
30

31
# Unfortunately, readline is only available in non-Windows systems. There is no substitution.
32
try:
4✔
33
    import readline  # noqa: F401
4✔
34
except ModuleNotFoundError:
1✔
35
    pass
1✔
36

37
logger = logging.getLogger(__name__)
4✔
38

39
mask_items = []
4✔
40

41

42
def cmd_interface(args):
4✔
43
    """Tokendito retrieves AWS credentials after authenticating with Okta."""
44
    args = parse_cli_args(args)
4✔
45

46
    # Early logging, in case the user requests debugging via env/CLI
47
    setup_early_logging(args)
4✔
48

49
    # Set some required initial values
50
    process_options(args)
4✔
51

52
    # Late logging (default)
53
    setup_logging(config.user)
4✔
54

55
    # Validate configuration
56
    message = validate_configuration(config)
4✔
57
    if message:
4✔
58
        quiet_msg = ""
4✔
59
        if config.user["quiet"] is not False:
4✔
60
            quiet_msg = " to run in quiet mode"
4✔
61
        logger.error(
4✔
62
            f"Could not validate configuration{quiet_msg}: {'. '.join(message)}. "
63
            "Please check your settings, and try again."
64
        )
65
        sys.exit(1)
4✔
66

67
    # rm trailing / if provided as such so the urls with this as base dont have //
68
    config.okta["org"] = config.okta["org"].strip("/")
1✔
69

70
    if config.user["use_device_token"]:
1✔
71
        device_token = config.okta["device_token"]
1✔
72
        if device_token:
1✔
73
            HTTP_client.set_device_token(config.okta["org"], device_token)
×
74
        else:
75
            logger.warning(
1✔
76
                f"Device token unavailable for config profile {args.user_config_profile}. "
77
                "May see multiple MFA requests this time."
78
            )
79

80
    # get authentication and authorization cookies from okta
81
    okta.access_control(config)
1✔
82
    logger.debug(
83
        f"""
84
        about to call discover_tile
85
        we have client cookies: {HTTP_client.session.cookies}
86
        """
87
    )
88
    if config.okta["tile"]:
1✔
89
        tile_label = ""
1✔
90
        config.okta["tile"] = (config.okta["tile"], tile_label)
1✔
91
    else:
92
        config.okta["tile"] = discover_tiles(config.okta["org"])
×
93

94
    # Authenticate to AWS roles
95
    auth_tiles = aws.authenticate_to_roles(config, config.okta["tile"])
1✔
96

97
    (role_response, role_name) = aws.select_assumeable_role(auth_tiles)
1✔
98

99
    identity = aws.assert_credentials(role_response=role_response)
1✔
100
    if "Arn" not in identity and "UserId" not in identity:
1✔
101
        logger.error(
×
102
            f"There was an error retrieving and verifying AWS credentials: {role_response}"
103
        )
104
        sys.exit(1)
×
105

106
    set_profile_name(config, role_name)
1✔
107

108
    set_local_credentials(
1✔
109
        response=role_response,
110
        role=config.aws["profile"],
111
        region=config.aws["region"],
112
        output=config.aws["output"],
113
    )
114

115
    device_token = HTTP_client.get_device_token()
1✔
116
    if config.user["use_device_token"] and device_token:
1✔
117
        logger.info(f"Saving device token to config profile {args.user_config_profile}")
1✔
118
        config.okta["device_token"] = device_token
1✔
119
        update_device_token(config)
1✔
120

121
    display_selected_role(profile_name=config.aws["profile"], role_response=role_response)
1✔
122

123

124
class MaskLoggerSecret(logging.Filter):
4✔
125
    """Masks secrets in logger messages."""
4✔
126

127
    def __init__(self):
4✔
128
        """Initialize filter."""
129
        logging.Filter.__init__(self)
4✔
130

131
    def filter(self, record):
4✔
132
        """Apply filter on logger messages."""
133
        for secret in mask_items:
4✔
134
            if not isinstance(secret, str):
4✔
135
                secret = str(secret)
4✔
136
            if not isinstance(record.msg, str):
4✔
137
                record.msg = str(record.msg)
4✔
138
            record.msg = record.msg.replace(secret, "*****")
4✔
139
        return True
4✔
140

141

142
def parse_cli_args(args):
4✔
143
    """Parse command line arguments.
144

145
    :return: args parse object
146
    """
147
    parser = argparse.ArgumentParser(
4✔
148
        prog="tokendito", description="Gets an STS token to use with the AWS CLI and SDK."
149
    )
150
    parser.add_argument("--version", action="store_true", help="Displays version and exit")
4✔
151
    parser.add_argument(
4✔
152
        "--configure",
153
        action="store_true",
154
        help="Prompt user for configuration parameters",
155
    )
156
    parser.add_argument(
4✔
157
        "--username",
158
        dest="okta_username",
159
        help="username to log in to Okta. You can "
160
        "also use the TOKENDITO_OKTA_USERNAME environment variable.",
161
    )
162
    parser.add_argument(
4✔
163
        "--password",
164
        dest="okta_password",
165
        help="password to log in to Okta. You "
166
        "can also use the TOKENDITO_OKTA_PASSWORD environment variable.",
167
    )
168
    parser.add_argument(
4✔
169
        "--profile",
170
        dest="user_config_profile",
171
        default=config.user["config_profile"],
172
        help="Tokendito configuration profile to use.",
173
    )
174
    parser.add_argument(
4✔
175
        "--config-file",
176
        dest="user_config_file",
177
        default=config.user["config_file"],
178
        help=f"Use an alternative configuration file. Defaults to {config.user['config_file']}",
179
    )
180
    parser.add_argument(
4✔
181
        "--loglevel",
182
        "-l",
183
        type=lambda s: s.upper(),
184
        dest="user_loglevel",
185
        choices=["DEBUG", "INFO", "WARN", "ERROR"],
186
        help="[DEBUG|INFO|WARN|ERROR], default loglevel is WARNING.",
187
    )
188
    parser.add_argument(
4✔
189
        "--log-output-file",
190
        dest="user_log_output_file",
191
        help="Optional file to log output to.",
192
    )
193
    parser.add_argument("--aws-config-file", help="AWS Configuration file to write to.")
4✔
194
    parser.add_argument(
4✔
195
        "--aws-output",
196
        help="Sets the output type for the AWS profile.",
197
    )
198
    parser.add_argument(
4✔
199
        "--aws-profile",
200
        help="AWS profile to save as in the credentials file.",
201
    )
202
    parser.add_argument(
4✔
203
        "--aws-region",
204
        help="Sets the region for the AWS profile.",
205
    )
206
    parser.add_argument("--aws-role-arn", help="Sets the IAM role.")
4✔
207
    parser.add_argument("--aws-shared-credentials-file", help="AWS credentials file to write to.")
4✔
208

209
    okta_me_group = parser.add_mutually_exclusive_group()
4✔
210
    okta_me_group.add_argument(
4✔
211
        "--okta-org",
212
        dest="okta_org",
213
        help="Set the Okta Org base URL. This enables role auto-discovery",
214
    )
215
    okta_me_group.add_argument(
4✔
216
        "--okta-tile",
217
        help="Okta tile URL to use.",
218
    )
219
    parser.add_argument(
4✔
220
        "--okta-client-id",
221
        help="""For OIE enabled Orgs this sets the Okta client ID to replace the value
222
        found by tokendito. It is used in the authorize code flow.""",
223
    )
224
    parser.add_argument(
4✔
225
        "--okta-mfa",
226
        help="Sets the MFA method. You "
227
        "can also use the TOKENDITO_OKTA_MFA environment variable.",
228
    )
229
    parser.add_argument(
4✔
230
        "--okta-mfa-response",
231
        help="Sets the MFA response to a challenge. You "
232
        "can also use the TOKENDITO_OKTA_MFA_RESPONSE environment variable.",
233
    )
234
    parser.add_argument(
4✔
235
        "--use-device-token",
236
        dest="user_use_device_token",
237
        action="store_true",
238
        default=False,
239
        help="Use device token across sessions",
240
    )
241
    parser.add_argument(
4✔
242
        "--quiet",
243
        dest="user_quiet",
244
        action="store_true",
245
        default=False,
246
        help="Suppress output",
247
    )
248

249
    parsed_args = parser.parse_args(args)
4✔
250

251
    return parsed_args
4✔
252

253

254
def utc_to_local(utc_dt):
4✔
255
    """Convert UTC time into local time.
256

257
    :param:utc_str:datetime
258
    :return:local_time:string
259
    """
260
    try:
4✔
261
        local_time = utc_dt.replace(tzinfo=timezone.utc).astimezone(tz=None)
4✔
262
        local_time = local_time.strftime("%Y-%m-%d %H:%M:%S %Z")
4✔
263
    except TypeError as err:
4✔
264
        logger.error(f"Could not convert time: {err}")
4✔
265
        sys.exit(1)
4✔
266
    return local_time
4✔
267

268

269
def create_directory(dir_name):
4✔
270
    """Create directories on the local machine."""
271
    if os.path.isdir(dir_name) is False:
4✔
272
        try:
4✔
273
            os.makedirs(dir_name, exist_ok=True)
4✔
274
        except OSError as error:
4✔
275
            logger.error(f"Cannot continue creating directory: {dir_name}: {error.strerror}")
4✔
276
            sys.exit(1)
4✔
277

278

279
def get_submodule_names():
4✔
280
    """Inspect the current module and find any submodules.
281

282
    :return: List of submodule names
283

284
    """
285
    package = Path(__file__).resolve(strict=True)
4✔
286
    submodules = [x.name for x in iter_modules([str(package.parent)])]
4✔
287
    return submodules
4✔
288

289

290
def setup_early_logging(args):
4✔
291
    """Do a best-effort attempt to enable early logging.
292

293
    :param args: list of arguments to parse
294
    :return: dict with values set
295
    """
296
    # Get some sane defaults
297
    early_logging = config.get_defaults()["user"].copy()
4✔
298

299
    if "TOKENDITO_USER_LOGLEVEL" in os.environ:
4✔
300
        early_logging["loglevel"] = os.environ["TOKENDITO_USER_LOGLEVEL"]
4✔
301
    if "TOKENDITO_USER_LOG_OUTPUT_FILE" in os.environ:
4✔
302
        early_logging["log_output_file"] = os.environ["TOKENDITO_USER_LOG_OUTPUT_FILE"]
4✔
303

304
    if "user_loglevel" in args and args.user_loglevel:
4✔
305
        early_logging["loglevel"] = args.user_loglevel
4✔
306
    if "user_log_output_file" in args and args.user_log_output_file:
4✔
307
        early_logging["log_output_file"] = args.user_log_output_file
4✔
308

309
    setup_logging(early_logging)
4✔
310
    return early_logging
4✔
311

312

313
def setup_logging(conf):
4✔
314
    """Set logging level.
315

316
    :param conf: dictionary with config
317
    :return: loglevel name
318
    """
319
    root_logger = logging.getLogger()
4✔
320
    formatter = logging.Formatter(
4✔
321
        fmt="%(asctime)s %(levelname)s |%(name)s %(funcName)s():%(lineno)i| %(message)s"
322
    )
323
    handler = logging.StreamHandler()
4✔
324

325
    if "log_output_file" in conf and conf["log_output_file"]:
4✔
326
        handler = logging.FileHandler(conf["log_output_file"])
4✔
327
    handler.setFormatter(formatter)
4✔
328

329
    # Set a reasonable default logging format.
330
    root_logger.handlers.clear()
4✔
331
    root_logger.addHandler(handler)
4✔
332
    root_logger.addFilter(MaskLoggerSecret())
4✔
333

334
    # Pre-create a log handler for each submodule
335
    # with the same format and level. Settings are
336
    # inherited from the root logger.
337
    submodules = [f"tokendito.{x}" for x in get_submodule_names()]
4✔
338
    if "loglevel" in conf:
4✔
339
        conf["loglevel"] = conf["loglevel"].upper()
4✔
340
        for submodule in submodules:
4✔
341
            submodule_logger = logging.getLogger(submodule)
4✔
342
            submodule_logger.addFilter(MaskLoggerSecret())
4✔
343
            try:
4✔
344
                submodule_logger.setLevel(conf["loglevel"])
4✔
345
            except ValueError as err:
4✔
346
                root_logger.setLevel(config.get_defaults()["user"]["loglevel"])
4✔
347
                submodule_logger.warning(f"{err}. Plese check your configuration and try again.")
4✔
348
                break
349
    loglevel = logging.getLogger(submodules[0]).getEffectiveLevel()
4✔
350
    return loglevel
4✔
351

352

353
def print(args):
4✔
354
    """Print only if not in quiet mode. Does not affect logging."""
355
    if config.user["quiet"] is not True:
4✔
356
        builtins.print(args)
4✔
357
    return args
4✔
358

359

360
def select_role_arn(authenticated_tiles):
4✔
361
    """Select the role user wants to pick.
362

363
    :param: authenticated_tiles, mapping of authenticated tiles metadata, dict
364
    :return: user role and associated url, tuple
365
    """
366
    selected_role = None
4✔
367

368
    for url, tile in authenticated_tiles.items():
4✔
369
        logger.debug(f"Select the role user wants to pick [{tile['roles']}]")
370
        role_names = dict((role.split("/")[-1], role) for role in tile["roles"])
4✔
371
        roles = [role.split("/")[-1] for role in tile["roles"]]
4✔
372

373
        if roles.count(config.aws["profile"]) > 1:
4✔
374
            logger.error(
4✔
375
                "There are multiple matches for the profile selected, "
376
                "please use the --aws-role-arn option to select one"
377
            )
378
            sys.exit(2)
4✔
379

380
        if config.aws["profile"] in role_names.keys():
4✔
381
            selected_role = (role_names[config.aws["profile"]], url)
×
382
            logger.debug(f"Using aws_profile env var for role: [{config.aws['profile']}]")
383
            break
384
        elif config.aws["role_arn"] in tile["roles"]:
4✔
385
            selected_role = (config.aws["role_arn"], url)
1✔
386
            break
387

388
    if selected_role is None:
4✔
389
        if config.aws["role_arn"] is None:
4✔
390
            selected_role = prompt_role_choices(authenticated_tiles)
4✔
391
        else:
392
            logger.error(f"User provided rolename does not exist [{config.aws['role_arn']}]")
4✔
393
            sys.exit(2)
4✔
394

395
    logger.debug(f"Selected role: [{selected_role}]")
396

397
    return selected_role
4✔
398

399

400
def factor_type_info(factor_type, mfa_option):
4✔
401
    """Get factor info from okta reply.
402

403
    :param factor_type: mfa
404
    :param mfa_option: mfa_option
405
    :return: info about mfa
406
    """
407
    logger.debug("Choose factor info depending on factor type.")
408
    factor_info = "Not Presented"
4✔
409
    default_value = "Unknown"
4✔
410

411
    if factor_type in ["token", "token:software:totp", "token:hardware"]:
4✔
412
        factor_info = mfa_option.get("profile").get("credentialId", default_value)
4✔
413
    elif factor_type == "push":
4✔
414
        factor_info = mfa_option.get("profile").get("name", default_value)
4✔
415
    elif factor_type == "sms" or factor_type == "call":
4✔
416
        factor_info = mfa_option.get("profile").get("phoneNumber", default_value)
4✔
417
    elif factor_type == "webauthn":
4✔
418
        factor_info = mfa_option.get("profile").get("authenticatorName", default_value)
4✔
419
    elif factor_type in ["web", "u2f", "token:hotp"]:
4✔
420
        factor_info = mfa_option.get("vendorName", default_value)
4✔
421
    elif factor_type == "question":
4✔
422
        factor_info = mfa_option.get("profile").get("question", default_value)
4✔
423
    elif factor_type == "email":
4✔
424
        factor_info = mfa_option.get("profile").get("email", default_value)
4✔
425

426
    # We return the string representation of the value retrieved. There are cases where
427
    # .get() will retrieve `None` as a value (this is somehow valid). When that happens,
428
    # the caller function cannot sort the list of values.
429
    return str(factor_info)
4✔
430

431

432
def mfa_option_info(mfa_option):
4✔
433
    """Build an optional string with the MFA factor information.
434

435
    :param mfa_option: dictionary with a single MFA response.
436
    :return: pre-formatted string with MFA factor info if available, None
437
             otherwise.
438
    """
439
    logger.debug(f"Building info for: {json.dumps(mfa_option)}")
440
    factor_info = "Not Presented"
4✔
441
    if "factorType" in mfa_option:
4✔
442
        factor_type = mfa_option["factorType"]
4✔
443
        factor_info = factor_type_info(factor_type, mfa_option)
4✔
444
    return factor_info
4✔
445

446

447
def select_preferred_mfa_index(mfa_options, factor_key="provider", subfactor_key="factorType"):
4✔
448
    """Show all the MFA options to the users.
449

450
    :param mfa_options: List of available MFA options
451
    :return: MFA option selected index by the user from the output
452
    """
453
    logger.debug("Show all the MFA options to the users.")
454
    logger.debug(json.dumps(mfa_options))
455
    print("\nSelect your preferred MFA method and press Enter:")
4✔
456

457
    longest_index = len(str(len(mfa_options)))
4✔
458
    longest_factor_name = max([len(d[factor_key]) for d in mfa_options])
4✔
459
    longest_subfactor_name = max([len(d[subfactor_key]) for d in mfa_options])
4✔
460
    factor_info_indent = max([len(mfa_option_info(d)) for d in mfa_options])
4✔
461

462
    for i, mfa_option in enumerate(mfa_options):
4✔
463
        factor_id = mfa_option.get("id", "Not presented")
4✔
464
        factor_info = mfa_option_info(mfa_option)
4✔
465
        mfa = mfa_option.get(subfactor_key, "Not presented")
4✔
466
        provider = mfa_option.get(factor_key, "Not presented")
4✔
467
        print(
4✔
468
            f"[{i: >{longest_index}}]  "
469
            f"{provider: <{longest_factor_name}}  "
470
            f"{mfa: <{longest_subfactor_name}} "
471
            f"{factor_info: <{factor_info_indent}} "
472
            f"Id: {factor_id}"
473
        )
474

475
    user_input = collect_integer(len(mfa_options))
4✔
476

477
    return user_input
4✔
478

479

480
def prompt_role_choices(aut_tiles):
4✔
481
    """Ask user to select role.
482

483
    :param aut_tiles: mapping of authenticated tiles metadata, dict
484
    :return: user's role and associated url, tuple
485
    """
486
    aliases_mapping = []
×
487

488
    for url, tile in aut_tiles.items():
×
489
        logger.debug(f"Getting aliases for {url}")
490
        alias_table = get_account_aliases(tile["saml"], tile["saml_response_string"])
×
491

492
        for role in tile["roles"]:
×
493
            if alias_table:
×
494
                aliases_mapping.append((tile["label"], alias_table[role.split(":")[4]], role, url))
×
495
            else:
496
                logger.debug(f"There were no labels in {url}. Using account ID")
497
                aliases_mapping.append((tile["label"], role.split(":")[4], role, url))
×
498

499
    logger.debug("Ask user to select role")
500
    print("\nPlease select one of the following:")
×
501

502
    longest_alias = max(len(i[1]) for i in aliases_mapping)
×
503
    longest_index = len(str(len(aliases_mapping)))
×
504
    aliases_mapping = sorted(aliases_mapping)
×
505
    print_label = ""
×
506

507
    for i, data in enumerate(aliases_mapping):
×
508
        label, alias, role, _ = data
×
509
        padding_index = longest_index - len(str(i))
×
510
        if print_label != label:
×
511
            print_label = label
×
512
            print(f"\n{label}:")
×
513

514
        print(f"[{i}] {padding_index * ' '}" f"{alias: <{longest_alias}}  {role}")
×
515

516
    user_input = collect_integer(len(aliases_mapping))
×
517
    selected_role = (aliases_mapping[user_input][2], aliases_mapping[user_input][3])
×
518
    logger.debug(f"Selected role [{user_input}]")
519

520
    return selected_role
×
521

522

523
def display_selected_role(profile_name="", role_response={}):
4✔
524
    """Print details about how to assume role.
525

526
    :param profile_name: AWS profile name
527
    :param role_response: Assume Role response dict
528
    :return: message displayed.
529

530
    """
531
    try:
4✔
532
        expiration_time = role_response["Credentials"]["Expiration"]
4✔
533
    except (KeyError, TypeError) as err:
4✔
534
        logger.error(f"Could not retrieve expiration time: {err}")
4✔
535
        sys.exit(1)
4✔
536

537
    expiration_time_local = utc_to_local(expiration_time)
4✔
538
    msg = (
4✔
539
        f"\nGenerated profile '{profile_name}' in "
540
        f"{config.aws['shared_credentials_file']}.\n"
541
        "\nUse profile to authenticate to AWS:\n\t"
542
        f"aws --profile '{profile_name}' sts get-caller-identity"
543
        "\nOR\n\t"
544
        f"export AWS_PROFILE='{profile_name}'\n\n"
545
        f"Credentials are valid until {expiration_time} ({expiration_time_local})."
546
    )
547

548
    print(msg)
4✔
549
    return msg
4✔
550

551

552
def extract_arns(saml):
4✔
553
    """Extract arns from SAML decoded xml.
554

555
    :param saml: results saml decoded
556
    :return: Dict of Role and Provider ARNs
557
    """
558
    logger.debug("Decode response string as a SAML decoded value.")
559

560
    roles_and_providers = {}
4✔
561
    arn_regex = ">(arn:aws:iam::.*?,arn:aws:iam::.*?)<"
4✔
562

563
    # find all provider and role pairs.
564
    arns = re.findall(arn_regex, saml)
4✔
565
    logger.debug(f"found ARNs: {arns}")
566

567
    # stuff into dict, role is dict key.
568
    if arns:
4✔
569
        roles_and_providers = {i.split(",")[1]: i.split(",")[0] for i in arns}
4✔
570

571
    logger.debug(f"Collected ARNs: {roles_and_providers}")
572

573
    return roles_and_providers
4✔
574

575

576
def validate_okta_org(input_url=None):
4✔
577
    """Validate whether a given URL is a valid AWS Org URL in Okta.
578

579
    :param input_url: string
580
    :return: bool. True if valid, False otherwise
581
    """
582
    logger.debug(f"Will try to match '{input_url}' to a valid URL")
583

584
    url = urlparse(input_url)
4✔
585
    logger.debug(f"URL parsed as {url}")
586
    if (
4✔
587
        url.scheme == "https"
588
        and (url.path == "" or url.path == "/")
589
        and url.params == ""
590
        and url.query == ""
591
        and url.fragment == ""
592
    ):
593
        return True
4✔
594

595
    logger.debug(f"{url} does not look like a valid match.")
596
    return False
4✔
597

598

599
def validate_okta_tile(input_url=None):
4✔
600
    """Validate whether a given URL is a valid AWS tile URL in Okta.
601

602
    :param input_url: string
603
    :return: bool. True if valid, False otherwise
604
    """
605
    logger.debug(f"Will try to match '{input_url}' to a valid URL")
606

607
    url = urlparse(input_url)
4✔
608
    logger.debug(f"URL parsed as {url}")
609
    # Here, we could also check url.netloc against r'.*\.okta(preview)?\.com$'
610
    # but Okta allows the usage of custome URLs such as login.acme.com
611
    if (
4✔
612
        url.scheme == "https"
613
        and re.match(r"^/home/amazon_aws/\w{20}/\d{3}$", str(url.path)) is not None
614
    ):
615
        return True
4✔
616

617
    logger.debug(f"{url} does not look like a valid match.")
618
    return False
4✔
619

620

621
def get_account_aliases(saml_xml, saml_response_string):
4✔
622
    """Parse AWS SAML page for account aliases.
623

624
    :param saml_xml: Decoded saml response from Okta
625
    :param saml_response_string response from Okta with saml data:
626
    :return: mapping table of account ids to their aliases
627
    """
628
    soup = BeautifulSoup(saml_response_string, "html.parser")
×
629
    form = soup.find("form")
×
630
    action = form.get("action")  # type: ignore (bs4 does not have PEP 561 support)
×
631
    url = str(action)
×
632

633
    encoded_xml = codecs.encode(saml_xml.encode("utf-8"), "base64")
×
634
    aws_response = None
×
635
    try:
×
636
        aws_response = HTTP_client.post(url, data={"SAMLResponse": encoded_xml})
×
637
    except Exception as request_error:
×
638
        logger.error(f"There was an error retrieving the AWS SAML page: \n{request_error}")
×
639
        logger.debug(json.dumps(aws_response))
640
        sys.exit(1)
×
641

642
    if "Account: " not in aws_response.text:
×
643
        logger.debug("No labels found")
644
        logger.debug(json.dumps(aws_response.text))
645
        return None
×
646

647
    soup = BeautifulSoup(aws_response.text, "html.parser")
×
648
    account_names = soup.find_all(text=re.compile("Account:"))
×
649
    alias_table = {str(i.split(" ")[-1]).strip("()"): i.split(" ")[1] for i in account_names}
×
650

651
    return alias_table
×
652

653

654
def display_version():
4✔
655
    """Print program version and exit."""
656
    python_version = platform.python_version()
4✔
657
    (system, _, release, _, _, _) = platform.uname()
4✔
658
    logger.debug(f"Display version: {__version__}")
659
    print(
4✔
660
        f"tokendito/{__version__} "
661
        f"Python/{python_version} "
662
        f"{system}/{release} "
663
        f"botocore/{__botocore_version__} "
664
        f"bs4/{__bs4_version__} "
665
        f"requests/{requests.__version__}"
666
    )
667

668

669
def add_sensitive_value_to_be_masked(value, key=None):
4✔
670
    """Add value to be masked from the logs."""
671
    """If a key is passed only add it if the key refers to a secret element."""
4✔
672
    sensitive_keys = ("password", "mfa_response", "sessionToken")
4✔
673
    if key is None or key in sensitive_keys:
4✔
674
        mask_items.append(value)
4✔
675

676

677
def process_ini_file(file, profile):
4✔
678
    """Process options from a ConfigParser ini file.
679

680
    :param file: filename
681
    :param profile: profile to read
682
    :return: Config object with configuration values
683
    """
684
    res = dict()
4✔
685
    pattern = re.compile(r"^(.*?)_(.*)")
4✔
686

687
    ini = configparser.RawConfigParser(default_section=config.user["config_profile"])
4✔
688
    # Here, group(1) is the dictionary key, and group(2) the configuration element
689
    try:
4✔
690
        ini.read(file)
4✔
691
        for key, val in ini.items(profile):
4✔
692
            match = re.search(pattern, key.lower())
4✔
693
            if match:
4✔
694
                if match.group(1) not in res:
4✔
695
                    res[match.group(1)] = dict()
4✔
696
                res[match.group(1)][match.group(2)] = val
4✔
697
                add_sensitive_value_to_be_masked(val, match.group(2))
4✔
698
    except configparser.Error as err:
4✔
699
        logger.error(f"Could not load profile '{profile}': {str(err)}")
4✔
700
        sys.exit(2)
4✔
701
    logger.debug(f"Found ini directives: {res}")
702

703
    try:
4✔
704
        config_ini = Config(**res)
4✔
705

706
    except (AttributeError, KeyError, ValueError) as err:
4✔
707
        logger.error(
4✔
708
            f"The configuration file {file} in [{profile}] is incorrect: {err}"
709
            ". Please check your settings and try again."
710
        )
711
        sys.exit(1)
4✔
712
    return config_ini
4✔
713

714

715
def process_arguments(args):
4✔
716
    """Process command-line arguments.
717

718
    :param args: argparse object
719
    :return: Config object with configuration values
720
    """
721
    res = dict()
4✔
722
    pattern = re.compile(r"^(.*?)_(.*)")
4✔
723

724
    for key, val in vars(args).items():
4✔
725
        match = re.search(pattern, key.lower())
4✔
726
        if match:
4✔
727
            if match.group(1) not in get_submodule_names():
4✔
728
                continue
4✔
729
            if match.group(1) not in res:
4✔
730
                res[match.group(1)] = dict()
4✔
731
            if val:
4✔
732
                res[match.group(1)][match.group(2)] = val
4✔
733
                add_sensitive_value_to_be_masked(val, match.group(2))
4✔
734
    logger.debug(f"Found arguments: {res}")
735

736
    try:
4✔
737
        config_args = Config(**res)
4✔
738

739
    except (AttributeError, KeyError, ValueError) as err:
×
740
        logger.error(
×
741
            f"Command line arguments not correct: {err}"
742
            ". This should not happen, please contact the package maintainers."
743
        )
744
        sys.exit(1)
×
745
    return config_args
4✔
746

747

748
def process_environment(prefix="tokendito"):
4✔
749
    """Process environment variables.
750

751
    :return: Config object with configuration values.
752
    """
753
    res = dict()
4✔
754
    pattern = re.compile(rf"^({prefix})_(.*?)_(.*)")
4✔
755
    # Here, group(1) is the prefix variable, group(2) is the dictionary key,
756
    # and group(3) the configuration element.
757
    for key, val in os.environ.items():
4✔
758
        match = re.search(pattern, key.lower())
4✔
759
        if match:
4✔
760
            if match.group(2) not in res:
4✔
761
                res[match.group(2)] = dict()
4✔
762
            if val:
4✔
763
                res[match.group(2)][match.group(3)] = val
4✔
764
                add_sensitive_value_to_be_masked(val, match.group(3))
4✔
765
    logger.debug(f"Found environment variables: {res}")
766

767
    try:
4✔
768
        config_env = Config(**res)
4✔
769

770
    except (AttributeError, KeyError, ValueError) as err:
4✔
771
        logger.error(
4✔
772
            f"The environment variables are incorrectly set: {err}"
773
            ". Please check your settings and try again."
774
        )
775
        sys.exit(1)
4✔
776
    return config_env
4✔
777

778

779
def process_interactive_input(config, skip_password=False):
4✔
780
    """
781
    Request input interactively interactively for elements that are not proesent.
782

783
    :param config: Config object with some values set.
784
    :param skip_password: Whether or not ask the user for a password.
785
    :returns: Config object with necessary values set.
786
    """
787
    # Return quickly if the user attempts to run in quiet (non-interactive) mode.
788
    if config.user["quiet"] is True:
4✔
789
        logger.debug(f"Skipping interactive config: quiet mode is {config.user['quiet']}")
790
        return config
4✔
791

792
    # Reuse interactive config. It will only request the portions needed.
793
    try:
4✔
794
        details = get_interactive_config(
4✔
795
            tile=config.okta["tile"],
796
            org=config.okta["org"],
797
            username=config.okta["username"],
798
        )
799
    except (AttributeError, KeyError, ValueError) as err:
×
800
        logger.error(f"Interactive arguments are not correct: {err}")
×
801
        sys.exit(1)
×
802

803
    # Create a dict that can be passed to Config later
804
    res = dict(okta=dict())
4✔
805
    # Copy the values set by get_interactive_config
806
    if "okta_tile" in details:
4✔
807
        res["okta"]["tile"] = details["okta_tile"]
4✔
808
    if "okta_org" in details:
4✔
809
        res["okta"]["org"] = details["okta_org"]
4✔
810
    if "okta_username" in details:
4✔
811
        res["okta"]["username"] = details["okta_username"]
4✔
812

813
    if ("password" not in config.okta or config.okta["password"] == "") and not skip_password:
4✔
814
        logger.debug("No password set, will try to get one interactively")
NEW
815
        res["okta"]["password"] = get_secret_input()
×
816
        add_sensitive_value_to_be_masked(res["okta"]["password"])
×
817

818
    config_int = Config(**res)
4✔
819
    logger.debug(f"Interactive configuration is: {config_int}")
820
    config.update(config_int)
4✔
821
    return config_int
4✔
822

823

824
def get_interactive_config(tile=None, org=None, username=""):
4✔
825
    """Obtain user input from the user.
826

827
    :return: dictionary with values
828
    """
829
    logger.debug("Obtain user input for the user.")
830
    details = {}
4✔
831

832
    # We need either one of these two:
833
    while not validate_okta_org(org) and not validate_okta_tile(tile):
4✔
834
        org = get_org()
4✔
835
        tile = get_tile()
4✔
836

837
    while username == "":
4✔
838
        username = get_username()
4✔
839

840
    if org is not None:
4✔
841
        details["okta_org"] = org
4✔
842
    if tile is not None:
4✔
843
        details["okta_tile"] = tile
4✔
844
    details["okta_username"] = username
4✔
845

846
    logger.debug(f"Details: {details}")
847
    return details
4✔
848

849

850
def get_base_url(urlstring):
4✔
851
    """
852
    Extract base url from string.
853

854
    :param urlstring: url string
855
    :returns: base URL
856
    """
857
    url = urlparse(urlstring)
4✔
858
    baseurl = f"{url.scheme}://{url.netloc}"
4✔
859
    return baseurl
4✔
860

861

862
def get_org():
4✔
863
    """Get Org URL from user.
864

865
    :return: string with sanitized value, or the empty string.
866
    """
867
    message = "Okta Org URL. E.g. https://acme.okta.com/: "
4✔
868
    res = ""
4✔
869

870
    while res == "":
4✔
871
        user_data = get_input(prompt=message)
4✔
872
        user_data = user_data.strip()
4✔
873
        if user_data == "":
4✔
874
            break
875
        if not user_data.startswith("https://"):
4✔
876
            user_data = f"https://{user_data}"
4✔
877
        if validate_okta_org(user_data):
4✔
878
            res = user_data
4✔
879
        else:
880
            print("Invalid input, try again.")
881
    logger.debug(f"Org URL is: {res}")
882
    return res
4✔
883

884

885
def get_tile():
4✔
886
    """Get tile URL from user.
887

888
    :return: string with sanitized value, or the empty string.
889
    """
890
    message = (
4✔
891
        "Okta tile URL. E.g. https://acme.okta.com/home/" "amazon_aws/b07384d113edec49eaa6/123: "
892
    )
893
    res = ""
4✔
894

895
    while res == "":
4✔
896
        user_data = get_input(prompt=message)
4✔
897
        user_data = user_data.strip()
4✔
898
        if user_data == "":
4✔
899
            break
900
        if not user_data.startswith("https://"):
4✔
901
            user_data = f"https://{user_data}"
4✔
902
        if validate_okta_tile(user_data):
4✔
903
            res = user_data
4✔
904
        else:
905
            print("Invalid input, try again.")
906
    logger.debug(f"App URL is: {res}")
907
    return res
4✔
908

909

910
def get_username():
4✔
911
    """Get username from user.
912

913
    :return: string with sanitized value.
914
    """
915
    message = "Organization username. E.g. jane.doe@acme.com: "
4✔
916
    res = ""
4✔
917
    while res == "":
4✔
918
        user_data = get_input(prompt=message)
4✔
919
        user_data = user_data.strip()
4✔
920
        if user_data != "":
4✔
921
            res = user_data
4✔
922
        else:
923
            print("Invalid input, try again.")
924
    logger.debug(f"Username is {res}")
925
    return res
4✔
926

927

928
def get_secret_input(message=None):
4✔
929
    """Get secret value interactively.
930

931
    :param args: message to display user.
932
    :return: secret
933
    """
934
    secret = ""
4✔
935
    logger.debug("get_secret_value")
936

937
    tty_assertion()
4✔
938
    while secret == "":
4✔
939
        if message is None:
4✔
940
            password = getpass()
4✔
941
        else:
NEW
942
            password = getpass(message)
×
943
        secret = password
4✔
944
        logger.debug("secret value set interactively")
945
    return secret
4✔
946

947

948
def get_interactive_profile_name(default):
4✔
949
    """Get AWS profile name from user.
950

951
    :return: string with sanitized value, or the default string.
952
    """
953
    message = f"Enter a profile name or leave blank to use '{default}': "
4✔
954
    res = ""
4✔
955

956
    while res == "":
4✔
957
        user_data = get_input(prompt=message)
4✔
958
        user_data = user_data.strip()
4✔
959
        if user_data == "":
4✔
960
            res = default
4✔
961
            break
962
        if re.fullmatch("[a-zA-Z][a-zA-Z0-9_-]*", user_data):
4✔
963
            res = user_data
4✔
964
        else:
965
            print("Invalid input, try again.")
966
    logger.debug(f"Profile name is: {res}")
967
    return res
4✔
968

969

970
def set_profile_name(config_obj, role_name):
4✔
971
    """Set AWS Role alias name based on user preferences.
972

973
    :param config: Config object.
974
    :param role_name: Role name.
975
    :return: Config object.
976
    """
977
    if config_obj.aws["profile"] is None or config_obj.aws["profile"] == "":
4✔
978
        config_obj.aws["profile"] = get_interactive_profile_name(role_name)
4✔
979

980
    return config_obj
4✔
981

982

983
def update_configuration(config):
4✔
984
    """Update configuration file on local system.
985

986
    :param ini_file: Configuration file
987
    :param profile: profile in which to write.
988
    :return: None
989
    """
990
    logger.debug("Update configuration file on local system.")
991
    ini_file = config.user["config_file"]
4✔
992
    profile = config.user["config_profile"]
4✔
993

994
    contents = {}
4✔
995
    # Copy relevant parts of the configuration into an dictionary that
996
    # will be written out to disk
997
    if "org" in config.okta and config.okta["org"] is not None:
4✔
998
        contents["okta_org"] = config.okta["org"]
4✔
999
    if "tile" in config.okta and config.okta["tile"] is not None:
4✔
1000
        contents["okta_tile"] = config.okta["tile"]
4✔
1001
    if "mfa" in config.okta and config.okta["mfa"] is not None:
4✔
1002
        contents["okta_mfa"] = config.okta["mfa"]
4✔
1003
    if "username" in config.okta and config.okta["username"] != "":
4✔
1004
        contents["okta_username"] = config.okta["username"]
4✔
1005
    logger.debug(f"Adding {contents} to config file.")
1006
    update_ini(profile=profile, ini_file=ini_file, **contents)
4✔
1007
    logger.info(f"Updated {ini_file} with profile {profile}")
4✔
1008

1009

1010
def update_device_token(config):
4✔
1011
    """Update configuration file on local system with device token.
1012

1013
    :param config: the current configuration
1014
    :return: None
1015
    """
1016
    logger.debug("Update configuration file on local system with device token.")
1017
    ini_file = config.user["config_file"]
4✔
1018
    profile = config.user["config_profile"]
4✔
1019

1020
    contents = {}
4✔
1021
    # Copy relevant parts of the configuration into an dictionary that
1022
    # will be written out to disk
1023
    if "device_token" in config.okta and config.okta["device_token"] is not None:
4✔
1024
        contents["okta_device_token"] = config.okta["device_token"]
4✔
1025

1026
    logger.debug(f"Adding {contents} to config file.")
1027
    update_ini(profile=profile, ini_file=ini_file, **contents)
4✔
1028
    logger.info(f"Updated {ini_file} with profile {profile}")
4✔
1029

1030

1031
def set_local_credentials(response={}, role="default", region="us-east-1", output="json"):
4✔
1032
    """Write to local files to insert credentials.
1033

1034
    :param response: AWS AssumeRoleWithSaml response
1035
    :param role: the name of the assumed role, used for local profile
1036
    :param region: configured region for aws credential profile
1037
    :param output: configured datatype for aws cli output
1038
    :return: Role name on a successful call.
1039
    """
1040
    try:
4✔
1041
        aws_access_key_id = response["Credentials"]["AccessKeyId"]
4✔
1042
        aws_secret_access_key = response["Credentials"]["SecretAccessKey"]
4✔
1043
        aws_session_token = response["Credentials"]["SessionToken"]
4✔
1044
    except KeyError as err:
4✔
1045
        logger.error(f"Could not retrieve crendentials: {err}")
4✔
1046
        sys.exit(1)
4✔
1047

1048
    update_ini(
4✔
1049
        profile=role,
1050
        ini_file=config.aws["shared_credentials_file"],
1051
        aws_access_key_id=aws_access_key_id,
1052
        aws_secret_access_key=aws_secret_access_key,
1053
        aws_session_token=aws_session_token,
1054
    )
1055

1056
    update_ini(
4✔
1057
        profile=f"profile {role}",
1058
        ini_file=config.aws["config_file"],
1059
        output=output,
1060
        region=region,
1061
    )
1062

1063
    return role
4✔
1064

1065

1066
def update_ini(profile="", ini_file="", **kwargs):
4✔
1067
    """Update a generic INI file.
1068

1069
    :param profile: Profile name
1070
    :param ini_file: File to write to.
1071
    :param **kwargs: key/value pairs to write to the ini file
1072
    :return: ConfigParser object written
1073
    """
1074
    ini_dir = os.path.dirname(ini_file)
4✔
1075
    logger.debug(f"Updating: '{ini_file}'")
1076

1077
    create_directory(ini_dir)
4✔
1078

1079
    ini = configparser.RawConfigParser()
4✔
1080
    try:
4✔
1081
        ini.read(ini_file, encoding=config.user["encoding"])
4✔
1082
        logger.debug(f"Parsed '{ini_file}'")
1083
    except (configparser.Error, OSError) as err:
×
1084
        logger.error(f"Failed to read '{ini_file}': {err}")
×
1085
        sys.exit(1)
×
1086

1087
    if not ini.has_section(profile):
4✔
1088
        ini.add_section(profile)
4✔
1089

1090
    for key, value in kwargs.items():
4✔
1091
        ini.set(profile, key, value)
4✔
1092

1093
    try:
4✔
1094
        with open(ini_file, "w+", encoding=config.user["encoding"]) as file:
4✔
1095
            ini.write(file)
4✔
1096
        logger.debug(f"Wrote {len(kwargs.items())} keys to '{ini_file}'")
1097
    except (configparser.Error, OSError) as err:
×
1098
        logger.error(f"Failed to write to '{ini_file}': {err}")
×
1099
        sys.exit(1)
×
1100
    return ini
4✔
1101

1102

1103
def check_within_range(user_input, valid_range):
4✔
1104
    """Validate the user input is within the range of the presented menu.
1105

1106
    :param user_input: integer-validated user input.
1107
    :param valid_range: the valid range presented on the user's menu.
1108
    :return range_validation: true or false
1109
    """
1110
    range_validation = False
4✔
1111
    if int(user_input) in range(0, valid_range):
4✔
1112
        range_validation = True
4✔
1113
    else:
1114
        logger.debug(f"Valid range is {valid_range}")
1115
        logger.error("Value is not in within the selection range.")
4✔
1116
    return range_validation
4✔
1117

1118

1119
def check_integer(value):
4✔
1120
    """Validate integer.
1121

1122
    :param value: value to be validated.
1123
    :return: True when the number is a positive integer, false otherwise.
1124
    """
1125
    integer_validation = False
4✔
1126
    if str(value).isdigit():
4✔
1127
        integer_validation = True
4✔
1128
    else:
1129
        logger.error("Please enter a valid integer.")
4✔
1130

1131
    return integer_validation
4✔
1132

1133

1134
def validate_input(value, valid_range):
4✔
1135
    """Validate user input is an integer and within menu range.
1136

1137
    :param value: user input
1138
    :param valid_range: valid range based on how many menu options available to user.
1139
    """
1140
    integer_validation = check_integer(value)
4✔
1141
    if integer_validation and valid_range:
4✔
1142
        integer_validation = check_within_range(value, valid_range)
4✔
1143
    return integer_validation
4✔
1144

1145

1146
def tty_assertion():
4✔
1147
    """Ensure that a TTY is present."""
1148
    try:
4✔
1149
        assert os.isatty(sys.stdin.fileno()) is True
4✔
1150
    except (AttributeError, AssertionError, EOFError, OSError, RuntimeError):
4✔
1151
        logger.error(
4✔
1152
            "sys.stdin is not available, and interactive invocation requires stdin to be present. "
1153
            "Please check the --help argument and documentation for more details.",
1154
        )
1155
        sys.exit(1)
4✔
1156

1157

1158
def get_input(prompt="-> "):
4✔
1159
    """Collect user input for TOTP.
1160

1161
    :param prompt: optional string with prompt.
1162
    :return user_input: raw from user.
1163
    """
1164
    tty_assertion()
4✔
1165

1166
    user_input = input(f"{prompt}")
4✔
1167
    logger.debug(f"User input: {user_input}")
1168

1169
    return user_input
4✔
1170

1171

1172
def collect_integer(valid_range=0):
4✔
1173
    """Collect input from user.
1174

1175
    Prompt the user for input. Validate it and cast to integer.
1176

1177
    :param valid_range: number of menu options available to user.
1178
    :return user_input: validated, casted integer from user.
1179
    """
1180
    user_input = None
4✔
1181
    while True:
4✔
1182
        user_input = get_input()
4✔
1183
        valid_input = validate_input(user_input, valid_range)
4✔
1184
        logger.debug(f"User input validation status is {valid_input}")
1185
        if valid_input:
4✔
1186
            user_input = int(user_input)
4✔
1187
            break
1188
    return user_input
4✔
1189

1190

1191
def process_options(args):
4✔
1192
    """Collect all user-specific credentials and config params."""
1193
    if args.version:
4✔
1194
        display_version()
4✔
1195
        sys.exit(0)
4✔
1196

1197
    # 1: read ini file (if it exists)
1198
    config_ini = Config()
4✔
1199
    if not args.configure:
4✔
1200
        config_ini = process_ini_file(args.user_config_file, args.user_config_profile)
4✔
1201

1202
    # 2: override with ENV
1203
    config_env = process_environment()
4✔
1204

1205
    # 3: override with args
1206
    config_args = process_arguments(args)
4✔
1207

1208
    config.update(config_ini)
4✔
1209
    config.update(config_env)
4✔
1210
    config.update(config_args)
4✔
1211

1212
    # 4: Get missing data from the user, if necessary
1213
    config_int = process_interactive_input(config, args.configure)
4✔
1214
    config.update(config_int)
4✔
1215

1216
    sanitize_config_values(config)
4✔
1217
    logger.debug(f"Final configuration is {config}")
1218

1219
    if args.configure:
4✔
1220
        update_configuration(config)
4✔
1221
        sys.exit(0)
4✔
1222

1223

1224
def validate_basic_configuration(config):
4✔
1225
    """Ensure that basic configuration values are sane.
1226

1227
    :param config: Config element with final configuration.
1228
    :return: message with validation issues.
1229
    """
1230
    message = []
4✔
1231
    if not config.okta["username"] or config.okta["username"] == "":
4✔
1232
        message.append("Username not set")
4✔
1233
    if not config.okta["password"] or config.okta["password"] == "":
4✔
1234
        message.append("Password not set")
4✔
1235
    if not config.okta["org"] and not config.okta["tile"]:
4✔
1236
        message.append("Either Okta Org or tile URL must be defined")
4✔
1237
    if config.okta["tile"] and not validate_okta_tile(config.okta["tile"]):
4✔
1238
        message.append(f"Tile URL {config.okta['tile']} is not valid")
4✔
1239
    if config.okta["org"] and not validate_okta_org(config.okta["org"]):
4✔
1240
        message.append(f"Org URL {config.okta['org']} is not valid")
4✔
1241
    if (
4✔
1242
        config.okta["org"]
1243
        and config.okta["tile"]
1244
        and not config.okta["tile"].startswith(config.okta["org"])
1245
    ):
1246
        message.append(
4✔
1247
            f"Org URL {config.okta['org']} and Tile URL"
1248
            f" {config.okta['tile']} must be in the same domain"
1249
        )
1250

1251
    return message
4✔
1252

1253

1254
def validate_quiet_configuration(config):
4✔
1255
    """Ensure that minimum configuration settings for running quietly are met.
1256

1257
    This is kept separately from validate_basic_configuration to avoid complexity
1258
    and avoid testability. These functions should always be used together.
1259

1260
    :param config: Config element with final configuration.
1261
    :return: message with validation issues.
1262
    """
1263
    message = []
4✔
1264
    if "quiet" in config.user and config.user["quiet"] is not False:
4✔
1265
        if not config.aws["role_arn"]:
4✔
1266
            message.append("AWS role ARN not set")
4✔
1267
        if not config.okta["mfa"]:
4✔
1268
            message.append("MFA Method not set")
4✔
1269
        if not config.okta["mfa_response"] and config.okta["mfa"] != "push":
4✔
1270
            message.append("MFA Response not set")
4✔
1271

1272
    return message
4✔
1273

1274

1275
def validate_configuration(config):
4✔
1276
    """
1277
    Ensure that configuration settings are appropriate before contacting the Okta endpoint.
1278

1279
    :param config: Config element with final configuration.
1280
    :return: message with validation issues.
1281
    """
1282
    messages = validate_basic_configuration(config) + validate_quiet_configuration(config)
4✔
1283
    return messages
4✔
1284

1285

1286
def sanitize_config_values(config):
4✔
1287
    """Adjust values that may need to be corrected.
1288

1289
    :param config: Config object to adjust
1290
    :returns: modified object.
1291
    """
1292
    if config.okta["tile"]:
4✔
1293
        base_url = get_base_url(config.okta["tile"])
4✔
1294
        config.okta["org"] = base_url
4✔
1295

1296
    if config.aws["output"] not in aws.get_output_types():
4✔
1297
        config.aws["output"] = config.get_defaults()["aws"]["output"]
4✔
1298
        logger.warning(f"AWS Output reset to {config.aws['output']}")
4✔
1299

1300
    if config.aws["region"] not in aws.get_regions():
4✔
1301
        config.aws["region"] = config.get_defaults()["aws"]["region"]
4✔
1302
        logger.warning(f"AWS Region reset to {config.aws['region']}")
4✔
1303

1304
    # Expand any "~", if given by the user
1305
    if "config_dir" in config.user:
4✔
1306
        config.user["config_dir"] = os.path.expanduser(config.user["config_dir"])
4✔
1307
    if "config_file" in config.user:
4✔
1308
        config.user["config_file"] = os.path.expanduser(config.user["config_file"])
4✔
1309
    if "config_file" in config.aws:
4✔
1310
        config.aws["config_file"] = os.path.expanduser(config.aws["config_file"])
4✔
1311
    if "shared_credentials_file" in config.aws:
4✔
1312
        config.aws["shared_credentials_file"] = os.path.expanduser(
4✔
1313
            config.aws["shared_credentials_file"]
1314
        )
1315

1316
    return config
4✔
1317

1318

1319
def discover_tiles(url):
4✔
1320
    """
1321
    Discover aws tile url on user's okta dashboard.
1322

1323
    :param url: okta org url
1324
    :param cookies: HTML cookies
1325
    :returns: aws tile url. str
1326
    """
1327
    url = f"{url}/api/v1/users/me/home/tabs"
×
1328
    params = {
×
1329
        "type": "all",
1330
        "expand": ["items", "items.resource"],
1331
    }
1332
    logger.debug(f"Performing auto-discovery on {url}.")
1333
    logger.debug(f"in discover_tiles we have cookies: {HTTP_client.session.cookies}")
1334
    response_with_tabs = HTTP_client.get(url, params=params)
×
1335

1336
    tabs = response_with_tabs.json()
×
1337

1338
    aws_tiles = []
×
1339
    for tab in tabs:
×
1340
        for tile in tab["_embedded"]["items"]:
×
1341
            if "amazon_aws" in tile["_embedded"]["resource"]["linkUrl"]:
×
1342
                aws_tiles.append(tile["_embedded"]["resource"])
×
1343

1344
    if not aws_tiles:
×
1345
        logger.error("AWS tile url not found please set url and try again")
×
1346
        sys.exit(2)
×
1347

1348
    tile = (
×
1349
        {(url["linkUrl"], url["label"]) for url in aws_tiles}
1350
        if len(aws_tiles) > 1
1351
        else (aws_tiles[0]["linkUrl"], aws_tiles[0]["label"])
1352
    )
1353
    logger.debug(f"Discovered {len(tile)} URLs.")
1354

1355
    return tile
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc