• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

binbashar / leverage / 13227561880

09 Feb 2025 04:26PM UTC coverage: 60.209% (+0.04%) from 60.173%
13227561880

Pull #296

github

Franr
fix test
Pull Request #296: BL-294 | Catch and handle HCL parsing errors

184 of 448 branches covered (41.07%)

Branch coverage included in aggregate %.

14 of 18 new or added lines in 3 files covered. (77.78%)

183 existing lines in 10 files now uncovered.

2464 of 3950 relevant lines covered (62.38%)

0.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.12
/leverage/modules/auth.py
1
import time
1✔
2
from pathlib import Path
1✔
3
from configparser import NoSectionError, NoOptionError
1✔
4

5
import boto3
1✔
6
from botocore.exceptions import ClientError
1✔
7
from configupdater import ConfigUpdater
1✔
8

9
from leverage import logger
1✔
10
from leverage._utils import key_finder, ExitError, get_or_create_section, parse_tf_file
1✔
11

12

13
class SkipProfile(Exception):
1✔
14
    pass
1✔
15

16

17
def get_layer_profile(raw_profile: str, config_updater: ConfigUpdater, tf_profile: str, project: str):
1✔
18
    if "local." in raw_profile:
1✔
19
        # ignore values referencing to local variables
20
        # we will search for profiles directly in locals.tf instead
21
        raise SkipProfile
1✔
22

23
    # if it is exactly that variable, we already know the layer profile is tf_profile
24
    layer_profile = tf_profile if raw_profile == "${var.profile}" else None
1✔
25

26
    # replace variables with their corresponding values
27
    raw = raw_profile.replace("${var.profile}", tf_profile).replace("${var.project}", project)
1✔
28

29
    # the project and the role are at the beginning and end of the string
30
    _, *account_name, _ = raw.split("-")
1✔
31
    account_name = "-".join(account_name)
1✔
32
    logger.info(f"Attempting to get temporary credentials for {account_name} account.")
1✔
33

34
    sso_profile = f"{project}-sso-{account_name}"
1✔
35
    # if profile wasn't configured during configuration step
36
    # it means we do not have permissions for the role in the account
37
    try:
1✔
38
        account_id = config_updater.get(f"profile {sso_profile}", "account_id").value
1✔
39
        sso_role = config_updater.get(f"profile {sso_profile}", "role_name").value
1✔
40
    except NoSectionError:
1✔
41
        raise ExitError(40, f"Missing {sso_profile} permission for account {account_name}.")
1✔
42

43
    # if we are processing a profile from a different layer, we need to build it
44
    layer_profile = layer_profile or f"{project}-{account_name}-{sso_role.lower()}"
1✔
45

46
    return account_id, account_name, sso_role, layer_profile
1✔
47

48

49
def update_config_section(updater: ConfigUpdater, layer_profile: str, data: dict):
1✔
50
    """
51
    Update the <layer_profile> section with the values given on <data>.
52
    """
53
    section = get_or_create_section(updater, layer_profile)
1✔
54
    for key, value in data.items():
1✔
55
        section.set(key, value)
1✔
56

57
    updater.update_file()
1✔
58

59

60
def get_profiles(cli):
1✔
61
    """
62
    Get the AWS profiles present on the layer by parsing some tf files.
63
    """
64
    raw_profiles = set()
1✔
65
    # these are files from the layer we are currently on
66
    for name in ("config.tf", "locals.tf"):
1✔
67
        try:
1✔
68
            tf_config = parse_tf_file(Path(name))
1✔
UNCOV
69
        except FileNotFoundError:
×
70
            continue
×
71

72
        # get all the "profile" references from the file
73
        # but avoid lookup references (we will catch those profiles from locals.tf instead)
74
        raw_profiles.update(set(key_finder(tf_config, "profile", "lookup")))
1✔
75

76
    # the profile value from <layer>/config/backend.tfvars
77
    backend_config = parse_tf_file(cli.paths.local_backend_tfvars)
1✔
78
    tf_profile = backend_config["profile"]
1✔
79

80
    return tf_profile, raw_profiles
1✔
81

82

83
def refresh_layer_credentials(cli):
1✔
84
    tf_profile, raw_profiles = get_profiles(cli)
1✔
85
    config_updater = ConfigUpdater()
1✔
86
    config_updater.read(cli.paths.host_aws_profiles_file)
1✔
87

88
    client = boto3.client("sso", region_name=cli.sso_region_from_main_profile)
1✔
89
    for raw in raw_profiles:
1✔
90
        try:
1✔
91
            account_id, account_name, sso_role, layer_profile = get_layer_profile(
1✔
92
                raw,
93
                config_updater,
94
                tf_profile,
95
                cli.project,
96
            )
97
        except SkipProfile:
×
98
            continue
×
99

100
        # check if credentials need to be renewed
101
        try:
1✔
102
            expiration = int(config_updater.get(f"profile {layer_profile}", "expiration").value) / 1000
1✔
103
        except (NoSectionError, NoOptionError):
1✔
104
            # first time using this profile, skip into the credential's retrieval step
105
            logger.debug("No cached credentials found.")
1✔
106
        else:
107
            # we reduce the validity 30 minutes, to avoid expiration over long-standing tasks
108
            renewal = time.time() + (30 * 60)
1✔
109
            logger.debug(f"Token expiration time: {expiration}")
1✔
110
            logger.debug(f"Token renewal time: {renewal}")
1✔
111
            if renewal < expiration:
1✔
112
                # still valid, nothing to do with these profile!
113
                logger.info("Using already configured temporary credentials.")
1✔
114
                continue
1✔
115

116
        # retrieve credentials
117
        logger.debug(f"Retrieving role credentials for {sso_role}...")
1✔
118
        try:
1✔
119
            credentials = client.get_role_credentials(
1✔
120
                roleName=sso_role,
121
                accountId=account_id,
122
                accessToken=cli.get_sso_access_token(),
123
            )["roleCredentials"]
124
        except ClientError as error:
1✔
125
            if error.response["Error"]["Code"] in ("AccessDeniedException", "ForbiddenException"):
1✔
126
                raise ExitError(
1✔
127
                    40,
128
                    f"User does not have permission to assume role [bold]{sso_role}[/bold]"
129
                    " in this account.\nPlease check with your administrator or try"
130
                    " running [bold]leverage aws configure sso[/bold].",
131
                )
132

133
        # update expiration on aws/<project>/config
134
        logger.info(f"Writing {layer_profile} profile")
1✔
135
        update_config_section(
1✔
136
            config_updater,
137
            f"profile {layer_profile}",
138
            data={
139
                "expiration": credentials["expiration"],
140
            },
141
        )
142
        # write credentials on aws/<project>/credentials (create the file if it doesn't exist first)
143
        creds_path = Path(cli.paths.host_aws_credentials_file)
1✔
144
        creds_path.touch(exist_ok=True)
1✔
145
        credentials_updater = ConfigUpdater()
1✔
146
        credentials_updater.read(cli.paths.host_aws_credentials_file)
1✔
147

148
        update_config_section(
1✔
149
            credentials_updater,
150
            layer_profile,
151
            data={
152
                "aws_access_key_id": credentials["accessKeyId"],
153
                "aws_secret_access_key": credentials["secretAccessKey"],
154
                "aws_session_token": credentials["sessionToken"],
155
            },
156
        )
157
        logger.info(f"Credentials for {account_name} account written successfully.")
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc