• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

binbashar / leverage / 10565638072

pending completion
10565638072

Pull #278

github

web-flow
Merge 8485bfcd5 into d366287d3
Pull Request #278: Feature | adding python 3.12 in pyproject.toml

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.3
/leverage/modules/credentials.py
1
"""
2
    Credentials managing module.
3
"""
4
import csv
2✔
5
import json
2✔
6
import re
2✔
7
from functools import wraps
2✔
8
from pathlib import Path
2✔
9

10
import click
2✔
11
import questionary
2✔
12
from click.exceptions import Exit
2✔
13
from questionary import Choice
2✔
14
from ruamel.yaml import YAML
2✔
15

16
from leverage import __toolbox_version__
2✔
17
from leverage import logger
2✔
18
from leverage._internals import pass_state
2✔
19
from leverage._utils import ExitError
2✔
20
from leverage.container import AWSCLIContainer
2✔
21
from leverage.container import get_docker_client
2✔
22
from leverage.path import NotARepositoryError
2✔
23
from leverage.path import get_global_config_path
2✔
24
from leverage.path import get_project_root_or_current_dir_path
2✔
25

26
# Regexes for general validation
27
PROJECT_SHORT = r"[a-z]{2,4}"
2✔
28
USERNAME = r"[a-zA-Z0-9\+,=\.@\-_]{1,64}"  # https://docs.aws.amazon.com/IAM/latest/UserGuide/id_users_create.html#id_users_create_console
2✔
29
# https://docs.aws.amazon.com/IAM/latest/APIReference/API_CreateUser.html#API_CreateUser_RequestParameters
30
KEY_ID = r"[A-Z0-9]{20}"
2✔
31
SECRET_KEY = r"[a-zA-Z0-9/\+]{40}"
2✔
32
REGION = (
2✔
33
    r"[a-z]{2}-[gov-]?"
34
    r"(?:central|north|south|east|west|northeast|northwest|southeast|southwest|secret|topsecret)-[1-3]"
35
)
36
ACCOUNT_ID = r"[0-9]{12}"
2✔
37
MFA_SERIAL = rf"arn:aws:iam::{ACCOUNT_ID}:mfa/{USERNAME}"
2✔
38

39
# TODO: Remove these and get them into the global app state
40
PROJECT_ROOT = get_project_root_or_current_dir_path()
2✔
41
try:
2✔
42
    PROJECT_COMMON_TFVARS = Path(get_global_config_path())
2✔
43
except NotARepositoryError:
×
44
    PROJECT_COMMON_TFVARS = Path.cwd()
×
45

46
PROJECT_COMMON_TFVARS_FILE = "common.tfvars"
2✔
47
PROJECT_COMMON_TFVARS = PROJECT_COMMON_TFVARS / PROJECT_COMMON_TFVARS_FILE
2✔
48
PROJECT_CONFIG = PROJECT_ROOT / "project.yaml"
2✔
49
AWSCLI_CONFIG_DIR = Path.home() / ".aws"
2✔
50

51
PROFILES = {
2✔
52
    "bootstrap": {
53
        "choice_title": "Bootstrap credentials (temporary)",
54
        "profile_role": "oaar",
55
        "role": "OrganizationAccountAccessRole",
56
    },
57
    "management": {
58
        "choice_title": "Management credentials",
59
        "profile_role": "oaar",
60
        "role": "OrganizationAccountAccessRole",
61
    },
62
    "security": {
63
        "choice_title": "DevOps credentials",
64
        "profile_role": "devops",
65
        "role": "DevOps",
66
    },
67
}
68

69

70
def _exit_if_user_cancels_input(question):
2✔
71
    """Prompt user for input, exit application if user cancels it.
72

73
    Args:
74
        question (callable): Question to be asked to user.
75

76
    Raises:
77
        Exit: When user cancels input.
78

79
    Returns:
80
        any: Question return value
81
    """
82

83
    @wraps(question)
2✔
84
    def handle_keyboard_interrupt(*args, **kwargs):
2✔
85
        answer = question(*args, **kwargs)
×
86
        if answer is None:
×
87
            raise Exit(1)
×
88
        return answer
×
89

90
    return handle_keyboard_interrupt
2✔
91

92

93
@_exit_if_user_cancels_input
2✔
94
def _ask_for_short_name():
2✔
95
    """Prompt for project short name.
96

97
    Returns:
98
        str: Project short name.
99
    """
100
    return questionary.text(
×
101
        message="Project short name:",
102
        qmark=">",
103
        validate=lambda value: bool(re.fullmatch(PROJECT_SHORT, value))
104
        or "The project short name should be a two letter lowercase word",
105
    ).ask()
106

107

108
@_exit_if_user_cancels_input
2✔
109
def _ask_for_region():
2✔
110
    """Prompt for region.
111

112
    Returns:
113
        str: Region.
114
    """
115
    return questionary.text(
×
116
        message="Credentials default region:",
117
        qmark=">",
118
        default="us-east-1",
119
        validate=lambda value: bool(re.fullmatch(REGION, value)) or "Invalid region.",
120
    ).ask()
121

122

123
@_exit_if_user_cancels_input
2✔
124
def _ask_for_credentials_overwrite(profile, skip_option_title, overwrite_option_title):
2✔
125
    """Prompt user with options regarding already existing credentials, whether to
126
    skip their configuration or overwrite them.
127

128
    Args:
129
        profile (str): Name of the profile being configured.
130
        skip_option_title (str): Message to display in the `skip` option.
131
        overwrite_option_title (str): Message to display in the `overwrite` option.
132

133
    Returns:
134
        bool: Whether to overwrite the current credentials or not.
135
    """
136
    return questionary.select(
×
137
        message=f"Credentials already configured for {profile}:",
138
        qmark=">",
139
        choices=[
140
            Choice(skip_option_title, value=False, shortcut_key="s", checked=True),
141
            Choice(overwrite_option_title, value=True, shortcut_key="o"),
142
        ],
143
        use_shortcuts=True,
144
    ).ask()
145

146

147
@_exit_if_user_cancels_input
2✔
148
def _ask_for_credentials_location():
2✔
149
    """Prompt for credential input method and location if path is selected.
150

151
    Returns:
152
        Path | str: Path to location or `manual` if `Manual` is selected.
153
    """
154
    location = questionary.prompt(
×
155
        [
156
            {
157
                "type": "select",
158
                "name": "input_type",
159
                "message": "Select the means by which you'll provide the programatic keys:",
160
                "qmark": ">",
161
                "choices": [
162
                    {"name": "Path to an access keys file obtained from AWS", "value": "path"},
163
                    {"name": "Manually", "value": "manual"},
164
                ],
165
            },
166
            {
167
                "type": "path",
168
                "name": "path",
169
                "message": "Path to access keys file:",
170
                "qmark": ">",
171
                "when": lambda qs: qs["input_type"] == "path",
172
                "validate": lambda value: (Path(value).expanduser().is_file() and Path(value).expanduser().exists())
173
                or "Path must be an existing file",
174
            },
175
        ]
176
    )
177
    if not location:
×
178
        return
×
179

180
    input_type = location.get("input_type")
×
181
    return Path(location.get("path")).expanduser() if input_type == "path" else input_type
×
182

183

184
@_exit_if_user_cancels_input
2✔
185
def _ask_for_credentials():
2✔
186
    """Prompt for key id and secret keys.
187

188
    Returns:
189
        str, str: Kei ID, Secret Key
190
    """
191
    credentials = questionary.prompt(
×
192
        [
193
            {
194
                "type": "text",
195
                "name": "key_id",
196
                "message": "Key:",
197
                "qmark": ">",
198
                "validate": lambda value: bool(re.fullmatch(KEY_ID, value)) or "Invalid Key",
199
            },
200
            {
201
                "type": "password",
202
                "name": "secret_key",
203
                "message": "Secret:",
204
                "qmark": ">",
205
                "validate": lambda value: bool(re.fullmatch(SECRET_KEY, value)) or "Invalid Secret",
206
            },
207
        ]
208
    )
209
    if not credentials:
×
210
        return
×
211

212
    return list(credentials.values())
×
213

214

215
AWSCLI = None
2✔
216

217

218
def _load_project_yaml():
2✔
219
    """Load project.yaml file contents."""
220
    if not PROJECT_CONFIG.exists():
×
221
        logger.debug("No project config file found.")
×
222
        return {}
×
223

224
    try:
×
225
        return YAML().load(PROJECT_CONFIG)
×
226

227
    except Exception as exc:
×
228
        exc.__traceback__ = None
×
229
        logger.exception(message="Error loading configuration file.", exc_info=exc)
×
230
        raise Exit(1)
×
231

232

233
@click.group()
2✔
234
@pass_state
2✔
235
def credentials(state):
2✔
236
    """Manage AWS cli credentials."""
237

238
    """
239
    Scenarios on project.yaml, build.env and common.tfvars files:
240
    - if project is new project.yaml exists and build.env/common.tfvars don't
241
    - if project is already started is likely project.yaml won't exist but build.env or common.tfvars do
242

243
    In any case one of them should exist. E.g.
244

245
    if project.yaml exists:
246
        create build.env with the short_name got from project.yaml
247
    else:
248
        if build.env exists:
249
            continue
250
        else:
251
            if common.tfvars
252
            raise an exception
253

254
    If we reached the only common.tfvars scenario, we have no project name nor TERRAFORM_IMAGE_TAG.
255
    So the best chance is to read the common.tfvars directly without a conatiner, e.g. with sed or grep
256
    """
257
    project_config = _load_project_yaml()
×
258
    build_env = Path(f"{PROJECT_ROOT}/build.env")
×
259

260
    if project_config != {}:
×
261
        logger.info("Reading info from project.yaml")
×
262
        # project_config is not empty
263
        short_name = project_config.get("short_name")
×
264
        if short_name is None or not re.match("^[a-z]{2,4}$", short_name):
×
265
            logger.error("Invalid or missing project short name in project.yaml file.")
×
266
            raise Exit(1)
×
267
        if not build_env.exists():
×
268
            build_env.write_text(f"PROJECT={short_name}\nTERRAFORM_IMAGE_TAG={__toolbox_version__}")
×
269
    elif not build_env.exists():
×
270
        # project_config is not empty
271
        # and build.env does not exist
272
        # trying common.tfvars
273
        try:
×
274
            found = False
×
275
            with open(PROJECT_COMMON_TFVARS, "r") as file:
×
276
                r = r'project = "(.+)"'
×
277
                for line in file:
×
278
                    g = re.search(r, line)
×
279
                    if g:
×
280
                        found = True
×
281
                        logger.info("Reading info from common.tfvars")
×
282
                        build_env.write_text(f"PROJECT={g[1]}\nTERRAFORM_IMAGE_TAG=1.1.9")
×
283
                        break
×
284
            if not found:
×
285
                raise Exception("Config file not found")
×
286
        except Exception as e:
×
287
            logger.error(f"Neither project.yaml nor build.env nor common.tfvars files were found. {e}")
×
288
            raise Exit(1)
×
289
    else:
290
        logger.info("Reading info from build.env")
×
291

292
    state.container = AWSCLIContainer(get_docker_client())
×
293
    state.container.ensure_image()
×
294
    global AWSCLI
295
    AWSCLI = state.container
×
296

297

298
def _load_configs_for_credentials():
2✔
299
    """Load all required values to configure credentials.
300

301
    Raises:
302
        Exit: If no project has been already initialized in the system.
303

304
    Returns:
305
        dict: Values needed to configure a credential and update the files accordingly.
306
    """
307
    logger.info("Loading configuration file.")
2✔
308
    project_config = _load_project_yaml()
2✔
309

310
    logger.info("Loading project environment configuration file.")
2✔
311
    env_config = AWSCLI.env_conf
2✔
312

313
    terraform_config = {}
2✔
314
    logger.info("Loading Terraform common configuration.")
2✔
315
    terraform_config = AWSCLI.paths.common_conf
2✔
316

317
    config_values = {}
2✔
318
    config_values["short_name"] = (
2✔
319
        project_config.get("short_name")
320
        or env_config.get("PROJECT")
321
        or terraform_config.get("project")
322
        or _ask_for_short_name()
323
    )
324
    config_values["project_name"] = project_config.get("project_name") or terraform_config.get("project_long")
2✔
325

326
    # region_primary was added in refarch v1
327
    # for v2 it was replaced by region at project level
328
    region_primary = "region_primary"
2✔
329
    if not "region_primary" in project_config and not "region_primary" in terraform_config:
2✔
330
        region_primary = "region"
2✔
331
    config_values["primary_region"] = (
2✔
332
        project_config.get(region_primary) or terraform_config.get(region_primary) or _ask_for_region()
333
    )
334
    config_values["secondary_region"] = terraform_config.get("region_secondary")
2✔
335

336
    config_values["organization"] = {"accounts": []}
2✔
337
    # Accounts defined in Terraform code take priority
338
    terraform_accounts = terraform_config.get("accounts", {})
2✔
339
    if terraform_accounts:
2✔
340
        config_values["organization"]["accounts"].extend(
2✔
341
            [
342
                {"name": account_name, "email": account_info.get("email"), "id": account_info.get("id")}
343
                for account_name, account_info in terraform_accounts.items()
344
            ]
345
        )
346
    # Add accounts not found in terraform code
347
    project_accounts = [
2✔
348
        account
349
        for account in project_config.get("organization", {}).get("accounts", [])
350
        if account.get("name") not in terraform_accounts
351
    ]
352
    if project_accounts:
2✔
353
        config_values["organization"]["accounts"].extend(project_accounts)
2✔
354

355
    config_values["mfa_enabled"] = env_config.get("MFA_ENABLED", "false")
2✔
356

357
    return config_values
2✔
358

359

360
def _profile_is_configured(profile):
2✔
361
    """Check if given profile is already configured.
362

363
    Args:
364
        profile (str): Profile to check.
365

366
    Returns:
367
        bool: Whether the profile was already configured or not.
368
    """
369
    exit_code, _ = AWSCLI.exec("configure list", profile)
×
370

371
    return not exit_code
×
372

373

374
def _extract_credentials(file):
2✔
375
    """Extract AWS credentials from given file. Print message and quit application if file is malformed.
376
    Access Keys files have the form:
377
       Access key ID,Secret access key
378
       AKUDKXXXXXXXXXXXXXXX,examplesecreteLkyvWWjxi29dJ63Geo1Ggl956b
379

380
    Args:
381
        file (Path): Credentials file as obtained from AWS Console.
382

383
    Raises:
384
        Exit: When file content does not conform to expected form.
385

386
    Returns:
387
        str, str: Key ID, Secret Key
388
    """
389
    with open(file) as access_keys_file:
2✔
390
        try:
2✔
391
            keys = next(csv.DictReader(access_keys_file))
2✔
392

393
        except csv.Error:
×
394
            click.echo("\nMalformed access keys file\n")
×
395
            raise Exit(1)
×
396

397
    try:
2✔
398
        access_key_id = keys["Access key ID"]
2✔
399
        secret_access_key = keys["Secret access key"]
2✔
400

401
    except KeyError:
×
402
        click.echo("\nFields for keys not found in access keys file\n")
×
403
        raise Exit(1)
×
404

405
    if not re.match(KEY_ID, access_key_id) or not re.match(SECRET_KEY, secret_access_key):
2✔
406
        click.echo("\nMalformed keys in access keys file\n")
×
407
        raise Exit(1)
×
408

409
    return access_key_id, secret_access_key
2✔
410

411

412
def _backup_file(filename):
2✔
413
    """Create backup of a credential file using docker image.
414

415
    Args:
416
        filename (str): File to backup, either `config` or `credentials`
417
    """
418
    credential_files_env_vars = {"config": "AWS_CONFIG_FILE", "credentials": "AWS_SHARED_CREDENTIALS_FILE"}
×
419
    env_var = credential_files_env_vars.get(filename)
×
420

421
    AWSCLI.system_exec(f"sh -c 'cp ${env_var} \"${{{env_var}}}.bkp\"'")
×
422

423

424
def configure_credentials(profile, file=None, make_backup=False):
2✔
425
    """Set credentials in `credentials` file for AWS cli. Make backup if required.
426

427
    Args:
428
        profile (str): Name of the profile to configure.
429
        file (Path, optional): Credentials file. Defaults to None.
430
        make_backup (bool, optional): Whether to make a backup of the credentials file. Defaults to False.
431
    """
432
    file = file or _ask_for_credentials_location()
2✔
433

434
    if file is not None and file != "manual":
2✔
435
        key_id, secret_key = _extract_credentials(file)
2✔
436

437
    else:
438
        key_id, secret_key = _ask_for_credentials()
2✔
439

440
    if make_backup:
2✔
441
        logger.info("Backing up credentials file.")
2✔
442
        _backup_file("credentials")
2✔
443

444
    values = {"aws_access_key_id": key_id, "aws_secret_access_key": secret_key}
2✔
445

446
    for key, value in values.items():
2✔
447
        exit_code, output = AWSCLI.exec(f"configure set {key} {value}", profile)
2✔
448
        if exit_code:
2✔
449
            raise ExitError(exit_code, f"AWS CLI error: {output}")
2✔
450

451

452
def _credentials_are_valid(profile):
2✔
453
    """Check if credentials for given profile are valid.
454
    If credentials are invalid, the command output will be as follows:
455
    Exit code:
456
        255
457
    Error message:
458
        An error occurred (InvalidClientTokenId) when calling the GetCallerIdentity operation:
459
        The security token included in the request is invalid.
460

461
    Args:
462
        profile (str): Name of profile for which credentials must be checked.
463

464
    Returns:
465
        bool: Whether the credentials are valid.
466
    """
467
    error_code, output = AWSCLI.exec("sts get-caller-identity", profile)
2✔
468

469
    return error_code != 255 and "InvalidClientTokenId" not in output
2✔
470

471

472
def _get_management_account_id(profile):
2✔
473
    """Get management account id through AWS cli.
474

475
    Args:
476
        profile (str): Name of profile to configure.
477

478
    Returns:
479
        str: Management account id.
480
    """
481
    exit_code, caller_identity = AWSCLI.exec("--output json sts get-caller-identity", profile)
2✔
482
    if exit_code:
2✔
483
        raise ExitError(exit_code, f"AWS CLI error: {caller_identity}")
2✔
484

485
    caller_identity = json.loads(caller_identity)
2✔
486
    return caller_identity["Account"]
2✔
487

488

489
def _get_organization_accounts(profile, project_name):
2✔
490
    """Get organization accounts names and ids. Removing the prefixed project name from the account names.
491

492
    Args:
493
        profile (str): Credentials profile.
494
        project_name (str): Full name of the project.
495

496
    Returns:
497
        dict: Mapping of organization accounts names to ids.
498
    """
499
    exit_code, organization_accounts = AWSCLI.exec("--output json organizations list-accounts", profile)
2✔
500

501
    if exit_code:
2✔
502
        return {}
2✔
503

504
    organization_accounts = json.loads(organization_accounts)["Accounts"]
2✔
505

506
    prefix = f"{project_name}-"
2✔
507
    accounts = {}
2✔
508
    for account in organization_accounts:
2✔
509
        name = account["Name"]
2✔
510
        name = name[len(prefix) :] if name.startswith(prefix) else name
2✔
511
        accounts[name] = account["Id"]
2✔
512

513
    return accounts
2✔
514

515

516
def _get_mfa_serial(profile):
2✔
517
    """Get MFA serial for the given profile credentials.
518

519
    Args:
520
        profile (str): Name of profile.
521

522
    Returns:
523
        str: MFA device serial.
524
    """
525
    exit_code, mfa_devices = AWSCLI.exec("--output json iam list-mfa-devices", profile)
2✔
526
    if exit_code:
2✔
527
        raise ExitError(exit_code, f"AWS CLI error: {mfa_devices}")
2✔
528
    mfa_devices = json.loads(mfa_devices)
2✔
529

530
    # Either zero or one MFA device should be configured for either `management` or `security` accounts users.
531
    # Just for safety, and because we only support VirtualMFA devices, we check that the `SerialNumber` is an `arn`
532
    # https://docs.aws.amazon.com/IAM/latest/APIReference/API_MFADevice.html
533
    return next(
2✔
534
        (
535
            device["SerialNumber"]
536
            for device in mfa_devices["MFADevices"]
537
            if re.fullmatch(MFA_SERIAL, device["SerialNumber"])
538
        ),
539
        "",
540
    )
541

542

543
def configure_profile(profile, values):
2✔
544
    """Set profile in `config` file for AWS cli.
545

546
    Args:
547
        profile (str): Profile name.
548
        values (dict): Mapping of values to be set in the profile.
549
    """
550
    logger.info(f"\tConfiguring profile [bold]{profile}[/bold]")
×
551
    for key, value in values.items():
×
552
        exit_code, output = AWSCLI.exec(f"configure set {key} {value}", profile)
×
553
        if exit_code:
×
554
            raise ExitError(exit_code, f"AWS CLI error: {output}")
×
555

556

557
def configure_accounts_profiles(profile, region, organization_accounts, project_accounts, fetch_mfa_device):
2✔
558
    """Set up the required profiles for all accounts to be used with AWS cli. Backup previous profiles.
559

560
    Args:
561
        profile(str): Name of the profile to configure.
562
        region (str): Region.
563
        organization_accounts (dict): Name and id of all accounts in the organization.
564
        project_accounts (list): Name and email of all accounts in project configuration file.
565
        fetch_mfa_device (bool): Whether to fetch MFA device for profiles.
566
    """
567
    short_name, _type = profile.split("-")
2✔
568

569
    mfa_serial = ""
2✔
570
    if fetch_mfa_device:
2✔
571
        logger.info("Fetching MFA device serial.")
2✔
572
        mfa_serial = _get_mfa_serial(profile)
2✔
573
        if not mfa_serial:
2✔
574
            raise ExitError(1, "No MFA device found for user.")
2✔
575

576
    account_profiles = {}
2✔
577
    for account in project_accounts:
2✔
578
        account_name = account["name"]
2✔
579
        # DevOps roles do not have permission over management account
580
        if "security" in profile and account_name == "management":
2✔
581
            continue
×
582

583
        # TODO: Add remaining profiles for remaining accounts declared in code if enough information is available
584
        account_id = organization_accounts.get(account_name, account.get("id"))
2✔
585
        if account_id is None:
2✔
586
            continue
×
587

588
        account_profile = {
2✔
589
            "output": "json",
590
            "region": region,
591
            "role_arn": f"arn:aws:iam::{account_id}:role/{PROFILES[_type]['role']}",
592
            "source_profile": profile,
593
        }
594
        if mfa_serial:
2✔
595
            account_profile["mfa_serial"] = mfa_serial
2✔
596
        # A profile identifier looks like `le-security-oaar`
597
        account_profiles[f"{short_name}-{account_name}-{PROFILES[_type]['profile_role']}"] = account_profile
2✔
598

599
    logger.info("Backing up account profiles file.")
2✔
600
    _backup_file("config")
2✔
601

602
    for profile_identifier, profile_values in account_profiles.items():
2✔
603
        configure_profile(profile_identifier, profile_values)
2✔
604

605

606
def _update_account_ids(config):
2✔
607
    """Update accounts ids in global configuration file.
608
    It updates both `[account name]_account_id` and `accounts` variables.
609
    This last one maintaning the format:
610
    ```
611
    account = {
612
      account_name = {
613
        email = account_email,
614
        id = account_id
615
      }
616
    }
617
    ```
618

619
    Args:
620
        config (dict): Project configuration values.
621
    """
622
    if not PROJECT_COMMON_TFVARS.exists():
2✔
623
        return
×
624

625
    container_base_dir = f"/{config['project_name']}/config"
2✔
626
    container_common_tfvars_file = f"{container_base_dir}/{PROJECT_COMMON_TFVARS_FILE}"
2✔
627

628
    accs = []
2✔
629
    for account in config["organization"]["accounts"]:
2✔
630
        acc_name, acc_email, acc_id = account.values()
2✔
631

632
        acc = [f'\n    email = "{acc_email}"']
2✔
633
        if acc_id:
2✔
634
            AWSCLI.system_exec(
2✔
635
                "hcledit "
636
                f"-f {container_common_tfvars_file} -u"
637
                f' attribute set {acc_name}_account_id "\\"{acc_id}\\""'
638
            )
639

640
            acc.append(f'    id = "{acc_id}"')
2✔
641
        acc = ",\n".join(acc)
2✔
642

643
        accs.append(f"\n  {acc_name} = {{{acc}\n  }}")
2✔
644

645
    accs = ",".join(accs)
2✔
646
    accs = f"{{{accs}\n}}"
2✔
647

648
    AWSCLI.system_exec("hcledit " f"-f {container_common_tfvars_file} -u" f" attribute set accounts '{accs}'")
2✔
649

650

651
def mutually_exclusive(context, param, value):
2✔
652
    """Callback for command options --overwrite-existing-credentials and --skip-access-keys-setup mutual exclusivity verification."""
653
    me_option = {
×
654
        "overwrite_existing_credentials": "skip_access_keys_setup",
655
        "skip_access_keys_setup": "overwrite_existing_credentials",
656
    }
657

658
    if value and context.params.get(me_option[param.name], False):
×
659
        raise click.BadOptionUsage(
×
660
            option_name=param,
661
            message=(
662
                f"Option {param.opts[0]} is mutually exclusive"
663
                f" with option --{me_option[param.name].replace('_', '-')}."
664
            ),
665
            ctx=context,
666
        )
667

668
    return value
×
669

670

671
@credentials.command()
2✔
672
@click.option(
2✔
673
    "--type",
674
    type=click.Choice(["BOOTSTRAP", "MANAGEMENT", "SECURITY"], case_sensitive=False),
675
    required=True,
676
    help="Type of credentials to set.",
677
)
678
@click.option(
2✔
679
    "--credentials-file", type=click.Path(exists=True, path_type=Path), help="Path to AWS cli credentials file."
680
)
681
@click.option("--fetch-mfa-device", is_flag=True, help="Fetch MFA device configured for user.")
2✔
682
@click.option(
2✔
683
    "--overwrite-existing-credentials",
684
    is_flag=True,
685
    callback=mutually_exclusive,
686
    help=(
687
        "Overwrite existing credentials if already configured.\n" "Mutually exclusive with --skip-access-keys-setup."
688
    ),
689
)
690
@click.option(
2✔
691
    "--skip-access-keys-setup",
692
    is_flag=True,
693
    callback=mutually_exclusive,
694
    help=(
695
        "Skip access keys configuration. Continue on with assumable roles setup.\n"
696
        "Mutually exclusive with --overwrite-existing-credentials."
697
    ),
698
)
699
@click.option("--skip-assumable-roles-setup", is_flag=True, help="Don't configure the accounts assumable roles.")
2✔
700
# TODO: Add --override-role-name parameter for non-default roles in accounts
701
def configure(
2✔
702
    type,
703
    credentials_file,
704
    fetch_mfa_device,
705
    overwrite_existing_credentials,
706
    skip_access_keys_setup,
707
    skip_assumable_roles_setup,
708
):
709
    """Configure credentials for the project.
710

711
    It can handle the credentials required for the initial deployment of the project (BOOTSTRAP),
712
    a management user (MANAGEMENT) or a devops/secops user (SECURITY).
713
    """
714
    if skip_access_keys_setup and skip_assumable_roles_setup:
×
715
        logger.info("Nothing to do. Exiting.")
×
716
        return
×
717

718
    config_values = _load_configs_for_credentials()
×
719

720
    type = type.lower()
×
721
    short_name = config_values.get("short_name")
×
722
    profile = f"{short_name}-{type}"
×
723

724
    already_configured = _profile_is_configured(profile=profile)
×
725
    if already_configured and not (skip_access_keys_setup or overwrite_existing_credentials):
×
726
        title_extra = "" if skip_assumable_roles_setup else " Continue on with assumable roles setup."
×
727

728
        overwrite_existing_credentials = _ask_for_credentials_overwrite(
×
729
            profile=profile,
730
            skip_option_title=f"Skip credentials configuration.{title_extra}",
731
            overwrite_option_title="Overwrite current credentials. Backups will be made.",
732
        )
733

734
    do_configure_credentials = (
×
735
        False if skip_access_keys_setup else overwrite_existing_credentials or not already_configured
736
    )
737

738
    if do_configure_credentials:
×
739
        logger.info(f"Configuring [bold]{type}[/bold] credentials.")
×
740
        configure_credentials(profile, credentials_file, make_backup=already_configured)
×
741
        logger.info(
×
742
            f"[bold]{type.capitalize()} credentials configured in:[/bold]"
743
            f" {(AWSCLI_CONFIG_DIR / short_name / 'credentials').as_posix()}"
744
        )
745

746
        if not _credentials_are_valid(profile):
×
747
            logger.error(f"Invalid {profile} credentials. Please check the given keys.")
×
748
            return
×
749

750
    accounts = config_values.get("organization", {}).get("accounts", False)
×
751
    # First time configuring bootstrap credentials
752
    if type == "bootstrap" and not already_configured:
×
753
        management_account = next((account for account in accounts if account["name"] == "management"), None)
×
754

755
        if management_account:
×
756
            logger.info("Fetching management account id.")
×
757
            management_account_id = _get_management_account_id(profile=profile)
×
758
            management_account["id"] = management_account_id
×
759

760
            project_config_file = _load_project_yaml()
×
761
            if project_config_file and "accounts" in project_config_file.get("organization", {}):
×
762
                project_config_file["organization"]["accounts"] = accounts
×
763

764
                logger.info("Updating project configuration file.")
×
765
                YAML().dump(data=project_config_file, stream=PROJECT_CONFIG)
×
766

767
        skip_assumable_roles_setup = True
×
768

769
    profile_for_organization = profile
×
770
    # Security credentials don't have permission to access organization information
771
    if type == "security":
×
772
        for type_with_permission in ("management", "bootstrap"):
×
773
            profile_to_check = f"{short_name}-{type_with_permission}"
×
774

775
            if _profile_is_configured(profile_to_check):
×
776
                profile_for_organization = profile_to_check
×
777
                break
×
778

779
    if skip_assumable_roles_setup:
×
780
        logger.info("Skipping assumable roles configuration.")
×
781

782
    else:
783
        logger.info("Attempting to fetch organization accounts.")
×
784
        organization_accounts = _get_organization_accounts(profile_for_organization, config_values.get("project_name"))
×
785
        logger.debug(f"Organization Accounts fetched: {organization_accounts}")
×
786

787
        if organization_accounts or accounts:
×
788
            logger.info("Configuring assumable roles.")
×
789

790
            configure_accounts_profiles(
×
791
                profile, config_values["primary_region"], organization_accounts, accounts, fetch_mfa_device
792
            )
793
            logger.info(
×
794
                f"[bold]Account profiles configured in:[/bold]"
795
                f" {(AWSCLI_CONFIG_DIR / short_name / 'config').as_posix()}"
796
            )
797

798
            for account in accounts:
×
799
                try:  # Some account may not already be created
×
800
                    account["id"] = organization_accounts[account["name"]]
×
801
                except KeyError:
×
802
                    continue
×
803

804
        else:
805
            logger.info(
×
806
                "No organization has been created yet or no accounts were configured.\n"
807
                "Skipping assumable roles configuration."
808
            )
809

810
    _update_account_ids(config=config_values)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc