• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

binbashar / leverage / 17227204968

26 Aug 2025 03:39AM UTC coverage: 61.145% (+0.007%) from 61.138%
17227204968

Pull #312

github

angelofenoglio
Pythonicism
Pull Request #312: Fix | Remove reference to Terraform in error message

210 of 476 branches covered (44.12%)

Branch coverage included in aggregate %.

1 of 4 new or added lines in 3 files covered. (25.0%)

34 existing lines in 5 files now uncovered.

2610 of 4136 relevant lines covered (63.1%)

0.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.52
/leverage/modules/credentials.py
1
"""
2
    Credentials managing module.
3
"""
4

5
import csv
1✔
6
import json
1✔
7
import re
1✔
8
from functools import wraps
1✔
9
from pathlib import Path
1✔
10

11
import click
1✔
12
import questionary
1✔
13
from click.exceptions import Exit
1✔
14
from questionary import Choice
1✔
15
from ruamel.yaml import YAML
1✔
16

17
from leverage import __toolbox_version__
1✔
18
from leverage import logger
1✔
19
from leverage._internals import pass_state
1✔
20
from leverage._utils import ExitError
1✔
21
from leverage.container import AWSCLIContainer
1✔
22
from leverage.container import get_docker_client
1✔
23
from leverage.path import NotARepositoryError
1✔
24
from leverage.path import get_global_config_path
1✔
25
from leverage.path import get_project_root_or_current_dir_path
1✔
26

27
# Regexes for general validation
28
PROJECT_SHORT = r"[a-z]{2,4}"
1✔
29
USERNAME = r"[a-zA-Z0-9\+,=\.@\-_]{1,64}"  # https://docs.aws.amazon.com/IAM/latest/UserGuide/id_users_create.html#id_users_create_console
1✔
30
# https://docs.aws.amazon.com/IAM/latest/APIReference/API_CreateUser.html#API_CreateUser_RequestParameters
31
KEY_ID = r"[A-Z0-9]{20}"
1✔
32
SECRET_KEY = r"[a-zA-Z0-9/\+]{40}"
1✔
33
REGION = (
1✔
34
    r"[a-z]{2}-[gov-]?"
35
    r"(?:central|north|south|east|west|northeast|northwest|southeast|southwest|secret|topsecret)-[1-3]"
36
)
37
ACCOUNT_ID = r"[0-9]{12}"
1✔
38
MFA_SERIAL = rf"arn:aws:iam::{ACCOUNT_ID}:mfa/{USERNAME}"
1✔
39

40
# TODO: Remove these and get them into the global app state
41
PROJECT_ROOT = get_project_root_or_current_dir_path()
1✔
42
try:
1✔
43
    PROJECT_COMMON_TFVARS = Path(get_global_config_path())
1✔
44
except NotARepositoryError:
×
45
    PROJECT_COMMON_TFVARS = Path.cwd()
×
46

47
PROJECT_COMMON_TFVARS_FILE = "common.tfvars"
1✔
48
PROJECT_COMMON_TFVARS = PROJECT_COMMON_TFVARS / PROJECT_COMMON_TFVARS_FILE
1✔
49
PROJECT_CONFIG = PROJECT_ROOT / "project.yaml"
1✔
50
AWSCLI_CONFIG_DIR = Path.home() / ".aws"
1✔
51

52
PROFILES = {
1✔
53
    "bootstrap": {
54
        "choice_title": "Bootstrap credentials (temporary)",
55
        "profile_role": "oaar",
56
        "role": "OrganizationAccountAccessRole",
57
    },
58
    "management": {
59
        "choice_title": "Management credentials",
60
        "profile_role": "oaar",
61
        "role": "OrganizationAccountAccessRole",
62
    },
63
    "security": {
64
        "choice_title": "DevOps credentials",
65
        "profile_role": "devops",
66
        "role": "DevOps",
67
    },
68
}
69

70

71
def _exit_if_user_cancels_input(question):
1✔
72
    """Prompt user for input, exit application if user cancels it.
73

74
    Args:
75
        question (callable): Question to be asked to user.
76

77
    Raises:
78
        Exit: When user cancels input.
79

80
    Returns:
81
        any: Question return value
82
    """
83

84
    @wraps(question)
1✔
85
    def handle_keyboard_interrupt(*args, **kwargs):
1✔
86
        answer = question(*args, **kwargs)
×
87
        if answer is None:
×
88
            raise Exit(1)
×
89
        return answer
×
90

91
    return handle_keyboard_interrupt
1✔
92

93

94
@_exit_if_user_cancels_input
1✔
95
def _ask_for_short_name():
1✔
96
    """Prompt for project short name.
97

98
    Returns:
99
        str: Project short name.
100
    """
101
    return questionary.text(
×
102
        message="Project short name:",
103
        qmark=">",
104
        validate=lambda value: bool(re.fullmatch(PROJECT_SHORT, value))
105
        or "The project short name should be a two letter lowercase word",
106
    ).ask()
107

108

109
@_exit_if_user_cancels_input
1✔
110
def _ask_for_region():
1✔
111
    """Prompt for region.
112

113
    Returns:
114
        str: Region.
115
    """
116
    return questionary.text(
×
117
        message="Credentials default region:",
118
        qmark=">",
119
        default="us-east-1",
120
        validate=lambda value: bool(re.fullmatch(REGION, value)) or "Invalid region.",
121
    ).ask()
122

123

124
@_exit_if_user_cancels_input
1✔
125
def _ask_for_credentials_overwrite(profile, skip_option_title, overwrite_option_title):
1✔
126
    """Prompt user with options regarding already existing credentials, whether to
127
    skip their configuration or overwrite them.
128

129
    Args:
130
        profile (str): Name of the profile being configured.
131
        skip_option_title (str): Message to display in the `skip` option.
132
        overwrite_option_title (str): Message to display in the `overwrite` option.
133

134
    Returns:
135
        bool: Whether to overwrite the current credentials or not.
136
    """
137
    return questionary.select(
×
138
        message=f"Credentials already configured for {profile}:",
139
        qmark=">",
140
        choices=[
141
            Choice(skip_option_title, value=False, shortcut_key="s", checked=True),
142
            Choice(overwrite_option_title, value=True, shortcut_key="o"),
143
        ],
144
        use_shortcuts=True,
145
    ).ask()
146

147

148
@_exit_if_user_cancels_input
1✔
149
def _ask_for_credentials_location():
1✔
150
    """Prompt for credential input method and location if path is selected.
151

152
    Returns:
153
        Path | str: Path to location or `manual` if `Manual` is selected.
154
    """
155
    location = questionary.prompt(
×
156
        [
157
            {
158
                "type": "select",
159
                "name": "input_type",
160
                "message": "Select the means by which you'll provide the programmatic keys:",
161
                "qmark": ">",
162
                "choices": [
163
                    {"name": "Path to an access keys file obtained from AWS", "value": "path"},
164
                    {"name": "Manually", "value": "manual"},
165
                ],
166
            },
167
            {
168
                "type": "path",
169
                "name": "path",
170
                "message": "Path to access keys file:",
171
                "qmark": ">",
172
                "when": lambda qs: qs["input_type"] == "path",
173
                "validate": lambda value: (Path(value).expanduser().is_file() and Path(value).expanduser().exists())
174
                or "Path must be an existing file",
175
            },
176
        ]
177
    )
178
    if not location:
×
179
        return
×
180

181
    input_type = location.get("input_type")
×
182
    return Path(location.get("path")).expanduser() if input_type == "path" else input_type
×
183

184

185
@_exit_if_user_cancels_input
1✔
186
def _ask_for_credentials():
1✔
187
    """Prompt for key id and secret keys.
188

189
    Returns:
190
        str, str: Kei ID, Secret Key
191
    """
192
    credentials = questionary.prompt(
×
193
        [
194
            {
195
                "type": "text",
196
                "name": "key_id",
197
                "message": "Key:",
198
                "qmark": ">",
199
                "validate": lambda value: bool(re.fullmatch(KEY_ID, value)) or "Invalid Key",
200
            },
201
            {
202
                "type": "password",
203
                "name": "secret_key",
204
                "message": "Secret:",
205
                "qmark": ">",
206
                "validate": lambda value: bool(re.fullmatch(SECRET_KEY, value)) or "Invalid Secret",
207
            },
208
        ]
209
    )
210
    if not credentials:
×
211
        return
×
212

213
    return list(credentials.values())
×
214

215

216
AWSCLI = None
1✔
217

218

219
def _load_project_yaml():
1✔
220
    """Load project.yaml file contents."""
221
    if not PROJECT_CONFIG.exists():
×
222
        logger.debug("No project config file found.")
×
223
        return {}
×
224

225
    try:
×
226
        return YAML().load(PROJECT_CONFIG)
×
227

228
    except Exception as exc:
×
229
        exc.__traceback__ = None
×
230
        logger.exception(message="Error loading configuration file.", exc_info=exc)
×
231
        raise Exit(1)
×
232

233

234
@click.group()
1✔
235
@pass_state
1✔
236
def credentials(state):
1✔
237
    """Manage AWS cli credentials."""
238

UNCOV
239
    """
×
240
    Scenarios on project.yaml, build.env and common.tfvars files:
241
    - if project is new project.yaml exists and build.env/common.tfvars don't
242
    - if project is already started is likely project.yaml won't exist but build.env or common.tfvars do
243

244
    In any case one of them should exist. E.g.
245

246
    if project.yaml exists:
247
        create build.env with the short_name got from project.yaml
248
    else:
249
        if build.env exists:
250
            continue
251
        else:
252
            if common.tfvars
253
            raise an exception
254

255
    If we reached the only common.tfvars scenario, we have no project name nor TF_IMAGE_TAG.
256
    So the best chance is to read the common.tfvars directly without a conatiner, e.g. with sed or grep
257
    """
258
    project_config = _load_project_yaml()
×
259
    build_env = Path(f"{PROJECT_ROOT}/build.env")
×
260

261
    if project_config != {}:
×
262
        logger.info("Reading info from project.yaml")
×
263
        # project_config is not empty
264
        short_name = project_config.get("short_name")
×
265
        if short_name is None or not re.match("^[a-z]{2,4}$", short_name):
×
266
            logger.error("Invalid or missing project short name in project.yaml file.")
×
267
            raise Exit(1)
×
268
        if not build_env.exists():
×
269
            build_env.write_text(f"PROJECT={short_name}\nTF_IMAGE_TAG={__toolbox_version__}")
×
270
    elif not build_env.exists():
×
271
        # project_config is not empty
272
        # and build.env does not exist
273
        # trying common.tfvars
274
        try:
×
275
            found = False
×
276
            with open(PROJECT_COMMON_TFVARS, "r") as file:
×
277
                r = r'project = "(.+)"'
×
278
                for line in file:
×
279
                    g = re.search(r, line)
×
280
                    if g:
×
281
                        found = True
×
282
                        logger.info("Reading info from common.tfvars")
×
283
                        build_env.write_text(f"PROJECT={g[1]}\nTF_IMAGE_TAG=1.1.9")
×
284
                        break
×
285
            if not found:
×
286
                raise Exception("Config file not found")
×
287
        except Exception as e:
×
288
            logger.error(f"Neither project.yaml nor build.env nor common.tfvars files were found. {e}")
×
289
            raise Exit(1)
×
290
    else:
291
        logger.info("Reading info from build.env")
×
292

293
    state.container = AWSCLIContainer(get_docker_client())
×
294
    state.container.ensure_image()
×
295
    global AWSCLI
296
    AWSCLI = state.container
×
297

298

299
def _load_configs_for_credentials():
1✔
300
    """Load all required values to configure credentials.
301

302
    Raises:
303
        Exit: If no project has been already initialized in the system.
304

305
    Returns:
306
        dict: Values needed to configure a credential and update the files accordingly.
307
    """
308
    logger.info("Loading configuration file.")
1✔
309
    project_config = _load_project_yaml()
1✔
310

311
    logger.info("Loading project environment configuration file.")
1✔
312
    env_config = AWSCLI.env_conf
1✔
313

314
    terraform_config = {}
1✔
315
    logger.info("Loading tf common configuration.")
1✔
316
    terraform_config = AWSCLI.paths.common_conf
1✔
317

318
    config_values = {}
1✔
319
    config_values["short_name"] = (
1✔
320
        project_config.get("short_name")
321
        or env_config.get("PROJECT")
322
        or terraform_config.get("project")
323
        or _ask_for_short_name()
324
    )
325
    config_values["project_name"] = project_config.get("project_name") or terraform_config.get("project_long")
1✔
326

327
    # region_primary was added in refarch v1
328
    # for v2 it was replaced by region at project level
329
    region_primary = "region_primary"
1✔
330
    if not "region_primary" in project_config and not "region_primary" in terraform_config:
1✔
331
        region_primary = "region"
1✔
332
    config_values["primary_region"] = (
1✔
333
        project_config.get(region_primary) or terraform_config.get(region_primary) or _ask_for_region()
334
    )
335
    config_values["secondary_region"] = terraform_config.get("region_secondary")
1✔
336

337
    config_values["organization"] = {"accounts": []}
1✔
338
    # Accounts defined in Terraform code take priority
339
    terraform_accounts = terraform_config.get("accounts", {})
1✔
340
    if terraform_accounts:
1✔
341
        config_values["organization"]["accounts"].extend(
1✔
342
            [
343
                {"name": account_name, "email": account_info.get("email"), "id": account_info.get("id")}
344
                for account_name, account_info in terraform_accounts.items()
345
            ]
346
        )
347
    # Add accounts not found in terraform code
348
    project_accounts = [
1✔
349
        account
350
        for account in project_config.get("organization", {}).get("accounts", [])
351
        if account.get("name") not in terraform_accounts
352
    ]
353
    if project_accounts:
1✔
354
        config_values["organization"]["accounts"].extend(project_accounts)
1✔
355

356
    config_values["mfa_enabled"] = env_config.get("MFA_ENABLED", "false")
1✔
357

358
    return config_values
1✔
359

360

361
def _profile_is_configured(profile):
1✔
362
    """Check if given profile is already configured.
363

364
    Args:
365
        profile (str): Profile to check.
366

367
    Returns:
368
        bool: Whether the profile was already configured or not.
369
    """
370
    exit_code, _ = AWSCLI.exec("configure list", profile)
×
371

372
    return not exit_code
×
373

374

375
def _extract_credentials(file):
1✔
376
    """Extract AWS credentials from given file. Print message and quit application if file is malformed.
377
    Access Keys files have the form:
378
       Access key ID,Secret access key
379
       AKUDKXXXXXXXXXXXXXXX,examplesecreteLkyvWWjxi29dJ63Geo1Ggl956b
380

381
    Args:
382
        file (Path): Credentials file as obtained from AWS Console.
383

384
    Raises:
385
        Exit: When file content does not conform to expected form.
386

387
    Returns:
388
        str, str: Key ID, Secret Key
389
    """
390
    with open(file) as access_keys_file:
1✔
391
        try:
1✔
392
            keys = next(csv.DictReader(access_keys_file))
1✔
393

394
        except csv.Error:
×
395
            click.echo("\nMalformed access keys file\n")
×
396
            raise Exit(1)
×
397

398
    try:
1✔
399
        access_key_id = keys["Access key ID"]
1✔
400
        secret_access_key = keys["Secret access key"]
1✔
401

402
    except KeyError:
×
403
        click.echo("\nFields for keys not found in access keys file\n")
×
404
        raise Exit(1)
×
405

406
    if not re.match(KEY_ID, access_key_id) or not re.match(SECRET_KEY, secret_access_key):
1✔
407
        click.echo("\nMalformed keys in access keys file\n")
×
408
        raise Exit(1)
×
409

410
    return access_key_id, secret_access_key
1✔
411

412

413
def _backup_file(filename):
1✔
414
    """Create backup of a credential file using docker image.
415

416
    Args:
417
        filename (str): File to backup, either `config` or `credentials`
418
    """
419
    credential_files_env_vars = {"config": "AWS_CONFIG_FILE", "credentials": "AWS_SHARED_CREDENTIALS_FILE"}
×
420
    env_var = credential_files_env_vars.get(filename)
×
421

422
    AWSCLI.system_exec(f"sh -c 'cp ${env_var} \"${{{env_var}}}.bkp\"'")
×
423

424

425
def configure_credentials(profile, file=None, make_backup=False):
1✔
426
    """Set credentials in `credentials` file for AWS cli. Make backup if required.
427

428
    Args:
429
        profile (str): Name of the profile to configure.
430
        file (Path, optional): Credentials file. Defaults to None.
431
        make_backup (bool, optional): Whether to make a backup of the credentials file. Defaults to False.
432
    """
433
    file = file or _ask_for_credentials_location()
1✔
434

435
    if file is not None and file != "manual":
1✔
436
        key_id, secret_key = _extract_credentials(file)
1✔
437

438
    else:
439
        key_id, secret_key = _ask_for_credentials()
1✔
440

441
    if make_backup:
1✔
442
        logger.info("Backing up credentials file.")
1✔
443
        _backup_file("credentials")
1✔
444

445
    values = {"aws_access_key_id": key_id, "aws_secret_access_key": secret_key}
1✔
446

447
    for key, value in values.items():
1✔
448
        exit_code, output = AWSCLI.exec(f"configure set {key} {value}", profile)
1✔
449
        if exit_code:
1✔
450
            raise ExitError(exit_code, f"AWS CLI error: {output}")
1✔
451

452

453
def _credentials_are_valid(profile):
1✔
454
    """Check if credentials for given profile are valid.
455
    If credentials are invalid, the command output will be as follows:
456
    Exit code:
457
        255
458
    Error message:
459
        An error occurred (InvalidClientTokenId) when calling the GetCallerIdentity operation:
460
        The security token included in the request is invalid.
461

462
    Args:
463
        profile (str): Name of profile for which credentials must be checked.
464

465
    Returns:
466
        bool: Whether the credentials are valid.
467
    """
468
    error_code, output = AWSCLI.exec("sts get-caller-identity", profile)
1✔
469

470
    return error_code != 255 and "InvalidClientTokenId" not in output
1✔
471

472

473
def _get_management_account_id(profile):
1✔
474
    """Get management account id through AWS cli.
475

476
    Args:
477
        profile (str): Name of profile to configure.
478

479
    Returns:
480
        str: Management account id.
481
    """
482
    exit_code, caller_identity = AWSCLI.exec("--output json sts get-caller-identity", profile)
1✔
483
    if exit_code:
1✔
484
        raise ExitError(exit_code, f"AWS CLI error: {caller_identity}")
1✔
485

486
    caller_identity = json.loads(caller_identity)
1✔
487
    return caller_identity["Account"]
1✔
488

489

490
def _get_organization_accounts(profile, project_name):
1✔
491
    """Get organization accounts names and ids. Removing the prefixed project name from the account names.
492

493
    Args:
494
        profile (str): Credentials profile.
495
        project_name (str): Full name of the project.
496

497
    Returns:
498
        dict: Mapping of organization accounts names to ids.
499
    """
500
    exit_code, organization_accounts = AWSCLI.exec("--output json organizations list-accounts", profile)
1✔
501

502
    if exit_code:
1✔
503
        return {}
1✔
504

505
    organization_accounts = json.loads(organization_accounts)["Accounts"]
1✔
506

507
    prefix = f"{project_name}-"
1✔
508
    accounts = {}
1✔
509
    for account in organization_accounts:
1✔
510
        name = account["Name"]
1✔
511
        name = name[len(prefix) :] if name.startswith(prefix) else name
1✔
512
        accounts[name] = account["Id"]
1✔
513

514
    return accounts
1✔
515

516

517
def _get_mfa_serial(profile):
1✔
518
    """Get MFA serial for the given profile credentials.
519

520
    Args:
521
        profile (str): Name of profile.
522

523
    Returns:
524
        str: MFA device serial.
525
    """
526
    exit_code, mfa_devices = AWSCLI.exec("--output json iam list-mfa-devices", profile)
1✔
527
    if exit_code:
1✔
528
        raise ExitError(exit_code, f"AWS CLI error: {mfa_devices}")
1✔
529
    mfa_devices = json.loads(mfa_devices)
1✔
530

531
    # Either zero or one MFA device should be configured for either `management` or `security` accounts users.
532
    # Just for safety, and because we only support VirtualMFA devices, we check that the `SerialNumber` is an `arn`
533
    # https://docs.aws.amazon.com/IAM/latest/APIReference/API_MFADevice.html
534
    return next(
1✔
535
        (
536
            device["SerialNumber"]
537
            for device in mfa_devices["MFADevices"]
538
            if re.fullmatch(MFA_SERIAL, device["SerialNumber"])
539
        ),
540
        "",
541
    )
542

543

544
def configure_profile(profile, values):
1✔
545
    """Set profile in `config` file for AWS cli.
546

547
    Args:
548
        profile (str): Profile name.
549
        values (dict): Mapping of values to be set in the profile.
550
    """
551
    logger.info(f"\tConfiguring profile [bold]{profile}[/bold]")
×
552
    for key, value in values.items():
×
553
        exit_code, output = AWSCLI.exec(f"configure set {key} {value}", profile)
×
554
        if exit_code:
×
555
            raise ExitError(exit_code, f"AWS CLI error: {output}")
×
556

557

558
def configure_accounts_profiles(profile, region, organization_accounts, project_accounts, fetch_mfa_device):
1✔
559
    """Set up the required profiles for all accounts to be used with AWS cli. Backup previous profiles.
560

561
    Args:
562
        profile(str): Name of the profile to configure.
563
        region (str): Region.
564
        organization_accounts (dict): Name and id of all accounts in the organization.
565
        project_accounts (list): Name and email of all accounts in project configuration file.
566
        fetch_mfa_device (bool): Whether to fetch MFA device for profiles.
567
    """
568
    short_name, _type = profile.split("-")
1✔
569

570
    mfa_serial = ""
1✔
571
    if fetch_mfa_device:
1✔
572
        logger.info("Fetching MFA device serial.")
1✔
573
        mfa_serial = _get_mfa_serial(profile)
1✔
574
        if not mfa_serial:
1✔
575
            raise ExitError(1, "No MFA device found for user.")
1✔
576

577
    account_profiles = {}
1✔
578
    for account in project_accounts:
1✔
579
        account_name = account["name"]
1✔
580
        # DevOps roles do not have permission over management account
581
        if "security" in profile and account_name == "management":
1✔
582
            continue
×
583

584
        # TODO: Add remaining profiles for remaining accounts declared in code if enough information is available
585
        account_id = organization_accounts.get(account_name, account.get("id"))
1✔
586
        if account_id is None:
1✔
587
            continue
×
588

589
        account_profile = {
1✔
590
            "output": "json",
591
            "region": region,
592
            "role_arn": f"arn:aws:iam::{account_id}:role/{PROFILES[_type]['role']}",
593
            "source_profile": profile,
594
        }
595
        if mfa_serial:
1✔
596
            account_profile["mfa_serial"] = mfa_serial
1✔
597
        # A profile identifier looks like `le-security-oaar`
598
        account_profiles[f"{short_name}-{account_name}-{PROFILES[_type]['profile_role']}"] = account_profile
1✔
599

600
    logger.info("Backing up account profiles file.")
1✔
601
    _backup_file("config")
1✔
602

603
    for profile_identifier, profile_values in account_profiles.items():
1✔
604
        configure_profile(profile_identifier, profile_values)
1✔
605

606

607
def _update_account_ids(config):
1✔
608
    """Update accounts ids in global configuration file.
609
    It updates both `[account name]_account_id` and `accounts` variables.
610
    This last one maintaning the format:
611
    ```
612
    account = {
613
      account_name = {
614
        email = account_email,
615
        id = account_id
616
      }
617
    }
618
    ```
619

620
    Args:
621
        config (dict): Project configuration values.
622
    """
623
    if not PROJECT_COMMON_TFVARS.exists():
1✔
624
        return
×
625

626
    container_base_dir = f"/{config['project_name']}/config"
1✔
627
    container_common_tfvars_file = f"{container_base_dir}/{PROJECT_COMMON_TFVARS_FILE}"
1✔
628

629
    accs = []
1✔
630
    for account in config["organization"]["accounts"]:
1✔
631
        acc_name, acc_email, acc_id = account.values()
1✔
632

633
        acc = [f'\n    email = "{acc_email}"']
1✔
634
        if acc_id:
1✔
635
            AWSCLI.system_exec(
1✔
636
                "hcledit "
637
                f"-f {container_common_tfvars_file} -u"
638
                f' attribute set {acc_name}_account_id "\\"{acc_id}\\""'
639
            )
640

641
            acc.append(f'    id = "{acc_id}"')
1✔
642
        acc = ",\n".join(acc)
1✔
643

644
        accs.append(f"\n  {acc_name} = {{{acc}\n  }}")
1✔
645

646
    accs = ",".join(accs)
1✔
647
    accs = f"{{{accs}\n}}"
1✔
648

649
    AWSCLI.system_exec("hcledit " f"-f {container_common_tfvars_file} -u" f" attribute set accounts '{accs}'")
1✔
650

651

652
def mutually_exclusive(context, param, value):
1✔
653
    """Callback for command options --overwrite-existing-credentials and --skip-access-keys-setup mutual exclusivity verification."""
654
    me_option = {
×
655
        "overwrite_existing_credentials": "skip_access_keys_setup",
656
        "skip_access_keys_setup": "overwrite_existing_credentials",
657
    }
658

659
    if value and context.params.get(me_option[param.name], False):
×
660
        raise click.BadOptionUsage(
×
661
            option_name=param,
662
            message=(
663
                f"Option {param.opts[0]} is mutually exclusive"
664
                f" with option --{me_option[param.name].replace('_', '-')}."
665
            ),
666
            ctx=context,
667
        )
668

669
    return value
×
670

671

672
@credentials.command()
1✔
673
@click.option(
1✔
674
    "--type",
675
    type=click.Choice(["BOOTSTRAP", "MANAGEMENT", "SECURITY"], case_sensitive=False),
676
    required=True,
677
    help="Type of credentials to set.",
678
)
679
@click.option(
1✔
680
    "--credentials-file", type=click.Path(exists=True, path_type=Path), help="Path to AWS cli credentials file."
681
)
682
@click.option("--fetch-mfa-device", is_flag=True, help="Fetch MFA device configured for user.")
1✔
683
@click.option(
1✔
684
    "--overwrite-existing-credentials",
685
    is_flag=True,
686
    callback=mutually_exclusive,
687
    help=(
688
        "Overwrite existing credentials if already configured.\n" "Mutually exclusive with --skip-access-keys-setup."
689
    ),
690
)
691
@click.option(
1✔
692
    "--skip-access-keys-setup",
693
    is_flag=True,
694
    callback=mutually_exclusive,
695
    help=(
696
        "Skip access keys configuration. Continue on with assumable roles setup.\n"
697
        "Mutually exclusive with --overwrite-existing-credentials."
698
    ),
699
)
700
@click.option("--skip-assumable-roles-setup", is_flag=True, help="Don't configure the accounts assumable roles.")
1✔
701
# TODO: Add --override-role-name parameter for non-default roles in accounts
702
def configure(
1✔
703
    type,
704
    credentials_file,
705
    fetch_mfa_device,
706
    overwrite_existing_credentials,
707
    skip_access_keys_setup,
708
    skip_assumable_roles_setup,
709
):
710
    """Configure credentials for the project.
711

712
    It can handle the credentials required for the initial deployment of the project (BOOTSTRAP),
713
    a management user (MANAGEMENT) or a devops/secops user (SECURITY).
714
    """
715
    if skip_access_keys_setup and skip_assumable_roles_setup:
×
716
        logger.info("Nothing to do. Exiting.")
×
717
        return
×
718

719
    config_values = _load_configs_for_credentials()
×
720

721
    type = type.lower()
×
722
    short_name = config_values.get("short_name")
×
723
    profile = f"{short_name}-{type}"
×
724

725
    already_configured = _profile_is_configured(profile=profile)
×
726
    if already_configured and not (skip_access_keys_setup or overwrite_existing_credentials):
×
727
        title_extra = "" if skip_assumable_roles_setup else " Continue on with assumable roles setup."
×
728

729
        overwrite_existing_credentials = _ask_for_credentials_overwrite(
×
730
            profile=profile,
731
            skip_option_title=f"Skip credentials configuration.{title_extra}",
732
            overwrite_option_title="Overwrite current credentials. Backups will be made.",
733
        )
734

735
    do_configure_credentials = (
×
736
        False if skip_access_keys_setup else overwrite_existing_credentials or not already_configured
737
    )
738

739
    if do_configure_credentials:
×
740
        logger.info(f"Configuring [bold]{type}[/bold] credentials.")
×
741
        configure_credentials(profile, credentials_file, make_backup=already_configured)
×
742
        logger.info(
×
743
            f"[bold]{type.capitalize()} credentials configured in:[/bold]"
744
            f" {(AWSCLI_CONFIG_DIR / short_name / 'credentials').as_posix()}"
745
        )
746

747
        if not _credentials_are_valid(profile):
×
748
            logger.error(f"Invalid {profile} credentials. Please check the given keys.")
×
749
            return
×
750

751
    accounts = config_values.get("organization", {}).get("accounts", False)
×
752
    # First time configuring bootstrap credentials
753
    if type == "bootstrap" and not already_configured:
×
754
        management_account = next((account for account in accounts if account["name"] == "management"), None)
×
755

756
        if management_account:
×
757
            logger.info("Fetching management account id.")
×
758
            management_account_id = _get_management_account_id(profile=profile)
×
759
            management_account["id"] = management_account_id
×
760

761
            project_config_file = _load_project_yaml()
×
762
            if project_config_file and "accounts" in project_config_file.get("organization", {}):
×
763
                project_config_file["organization"]["accounts"] = accounts
×
764

765
                logger.info("Updating project configuration file.")
×
766
                YAML().dump(data=project_config_file, stream=PROJECT_CONFIG)
×
767

768
        skip_assumable_roles_setup = True
×
769

770
    profile_for_organization = profile
×
771
    # Security credentials don't have permission to access organization information
772
    if type == "security":
×
773
        for type_with_permission in ("management", "bootstrap"):
×
774
            profile_to_check = f"{short_name}-{type_with_permission}"
×
775

776
            if _profile_is_configured(profile_to_check):
×
777
                profile_for_organization = profile_to_check
×
778
                break
×
779

780
    if skip_assumable_roles_setup:
×
781
        logger.info("Skipping assumable roles configuration.")
×
782

783
    else:
784
        logger.info("Attempting to fetch organization accounts.")
×
785
        organization_accounts = _get_organization_accounts(profile_for_organization, config_values.get("project_name"))
×
786
        logger.debug(f"Organization Accounts fetched: {organization_accounts}")
×
787

788
        if organization_accounts or accounts:
×
789
            logger.info("Configuring assumable roles.")
×
790

791
            configure_accounts_profiles(
×
792
                profile, config_values["primary_region"], organization_accounts, accounts, fetch_mfa_device
793
            )
794
            logger.info(
×
795
                f"[bold]Account profiles configured in:[/bold]"
796
                f" {(AWSCLI_CONFIG_DIR / short_name / 'config').as_posix()}"
797
            )
798

799
            for account in accounts:
×
800
                try:  # Some account may not already be created
×
801
                    account["id"] = organization_accounts[account["name"]]
×
802
                except KeyError:
×
803
                    continue
×
804

805
        else:
806
            logger.info(
×
807
                "No organization has been created yet or no accounts were configured.\n"
808
                "Skipping assumable roles configuration."
809
            )
810

811
    _update_account_ids(config=config_values)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc