• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

binbashar / leverage / 10762575514

08 Sep 2024 07:34PM UTC coverage: 59.74% (-9.3%) from 69.031%
10762575514

Pull #269

github

Franr
fix test references
Pull Request #269: [BV-195] Run toolbox with the host user

204 of 508 branches covered (40.16%)

Branch coverage included in aggregate %.

63 of 67 new or added lines in 4 files covered. (94.03%)

4 existing lines in 1 file now uncovered.

2458 of 3948 relevant lines covered (62.26%)

0.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.83
/leverage/container.py
1
import json
1✔
2
import os
1✔
3
import re
1✔
4
import webbrowser
1✔
5
from io import BytesIO
1✔
6
from datetime import datetime
1✔
7
from time import sleep
1✔
8

9
import hcl2
1✔
10
from click.exceptions import Exit
1✔
11
import dockerpty
1✔
12
from configupdater import ConfigUpdater
1✔
13
from docker import DockerClient
1✔
14
from docker.errors import APIError
1✔
15
from docker.types import Mount
1✔
16
from typing import Tuple
1✔
17

18
from leverage import logger
1✔
19
from leverage._utils import AwsCredsEntryPoint, CustomEntryPoint, ExitError, ContainerSession
1✔
20
from leverage.modules.auth import refresh_layer_credentials
1✔
21
from leverage.logger import raw_logger
1✔
22
from leverage.logger import get_script_log_level
1✔
23
from leverage.path import PathsHandler
1✔
24
from leverage.conf import load as load_env
1✔
25

26
REGION = (
1✔
27
    r"(.*)"  # project folder
28
    # start region
29
    r"(global|(?:[a-z]{2}-(?:gov-)?"
30
    r"(?:central|north|south|east|west|northeast|northwest|southeast|southwest|secret|topsecret)-[1-4]))"
31
    # end region
32
    r"(.*)"  # layer
33
)
34

35

36
def get_docker_client():
1✔
37
    """Attempt to get a Docker client from the environment configuration. Halt application otherwise.
38

39
    Raises:
40
        Exit: If communication to Docker server could not be established.
41

42
    Returns:
43
        docker.DockerClient: Client for Docker daemon.
44
    """
45
    try:
×
46
        docker_client = DockerClient.from_env()
×
47
        docker_client.ping()
×
48

49
    except:
×
50
        logger.error(
×
51
            "Docker daemon doesn't seem to be responding. "
52
            "Please check it is up and running correctly before re-running the command."
53
        )
54
        raise Exit(1)
×
55

56
    return docker_client
×
57

58

59
class LeverageContainer:
1✔
60
    """Basic Leverage Container. Holds the minimum information required to run the Docker image that Leverage uses
61
    to perform its operations. Commands can be issued as interactive via `start` for when live output or user input is desired
62
    or the can be simply executed via `exec` to run silently and retrieve the command output.
63

64
    NOTE: An aggregation approach to this design should be considered instead of the current inheritance approach.
65
    """
66

67
    LEVERAGE_IMAGE = "binbash/leverage-toolbox"
1✔
68
    SHELL = "/bin/bash"
1✔
69
    CONTAINER_USER = "leverage"
1✔
70

71
    def __init__(self, client, mounts: tuple = None, env_vars: dict = None):
1✔
72
        """Project related paths are determined and stored. Project configuration is loaded.
73

74
        Args:
75
            client (docker.DockerClient): Client to interact with Docker daemon.
76
        """
77
        self.client = client
1✔
78
        # Load configs
79
        self.env_conf = load_env()
1✔
80

81
        self.paths = PathsHandler(self.env_conf, self.CONTAINER_USER)
1✔
82
        self.project = self.paths.project
1✔
83

84
        # Set image to use
85
        self.image = self.env_conf.get("TERRAFORM_IMAGE", self.LEVERAGE_IMAGE)
1✔
86
        self.image_tag = self.env_conf.get("TERRAFORM_IMAGE_TAG")
1✔
87
        if not self.image_tag:
1✔
88
            logger.error(
×
89
                "No docker image tag defined.\n"
90
                "Please set `TERRAFORM_IMAGE_TAG` variable in the project's [bold]build.env[/bold] file before running a Leverage command."
91
            )
92
            raise Exit(1)
×
93

94
        mounts = [Mount(source=source, target=target, type="bind") for source, target in mounts] if mounts else []
1✔
95
        self.host_config = self.client.api.create_host_config(security_opt=["label:disable"], mounts=mounts)
1✔
96
        self.container_config = {
1✔
97
            "image": f"{self.image}:{self.local_image_tag}",
98
            "command": "",
99
            "stdin_open": True,
100
            "environment": env_vars or {},
101
            "entrypoint": "",
102
            "working_dir": f"{self.paths.guest_base_path}/{self.paths.cwd.relative_to(self.paths.root_dir).as_posix()}",
103
            "host_config": self.host_config,
104
        }
105

106
    @property
1✔
107
    def environment(self):
1✔
108
        return self.container_config["environment"]
1✔
109

110
    @environment.setter
1✔
111
    def environment(self, value):
1✔
112
        self.container_config["environment"] = value
1✔
113

114
    @property
1✔
115
    def entrypoint(self):
1✔
116
        return self.container_config["entrypoint"]
1✔
117

118
    @entrypoint.setter
1✔
119
    def entrypoint(self, value):
1✔
120
        self.container_config["entrypoint"] = value
1✔
121

122
    @property
1✔
123
    def mounts(self):
1✔
124
        return self.container_config["host_config"]["Mounts"]
1✔
125

126
    @mounts.setter
1✔
127
    def mounts(self, value):
1✔
128
        self.container_config["host_config"]["Mounts"] = value
1✔
129

130
    @property
1✔
131
    def region(self):
1✔
132
        """
133
        Return the region of the layer.
134
        """
135
        if matches := re.match(REGION, self.paths.cwd.as_posix()):
1✔
136
            # the region (group 1) is between the projects folders (group 0) and the layers (group 2)
137
            return matches.groups()[1]
1✔
138

139
        raise ExitError(1, f"No valid region could be found at: {self.paths.cwd.as_posix()}")
×
140

141
    @property
1✔
142
    def local_image_tag(self):
1✔
143
        return f"{self.image_tag}-{os.getgid()}-{os.getuid()}"
1✔
144

145
    @property
1✔
146
    def local_image(self) -> BytesIO:
1✔
147
        """Return the local image that will be built, as a file-like object."""
148
        return BytesIO(
1✔
149
            """
150
            ARG IMAGE_TAG
151
            FROM binbash/leverage-toolbox:$IMAGE_TAG
152

153
            ARG UNAME
154
            ARG UID
155
            ARG GID
156
            RUN groupadd -g $GID -o $UNAME
157
            RUN useradd -m -u $UID -g $GID -o -s /bin/bash $UNAME
158
            RUN chown -R $UID:$GID /home/leverage
159
            USER $UNAME
160
            """.encode(
161
                "utf-8"
162
            )
163
        )
164

165
    def ensure_image(self):
1✔
166
        """
167
        Make sure the required local Docker image is available in the system. If not, build it.
168
        If the image already exists, re-build it so changes in the arguments can take effect.
169
        """
170
        logger.info(f"Checking for local docker image, tag: {self.local_image_tag}...")
1✔
171
        image_name = f"{self.image}:{self.local_image_tag}"
1✔
172

173
        # check first is our image is already available locally
174
        found_image = self.client.api.images(f"{self.image}:{self.local_image_tag}")
1✔
175
        if found_image:
1✔
176
            logger.info("[green]✔ OK[/green]\n")
1✔
177
            return
1✔
178

179
        logger.info(f"Image not found, building it...")
1✔
180
        build_args = {
1✔
181
            "IMAGE_TAG": self.image_tag,
182
            "UNAME": self.CONTAINER_USER,
183
            "GID": str(os.getgid()),
184
            "UID": str(os.getuid()),
185
        }
186

187
        stream = self.client.api.build(
1✔
188
            fileobj=self.local_image,
189
            tag=image_name,
190
            pull=True,
191
            buildargs=build_args,
192
            decode=True,
193
        )
194

195
        for line in stream:
1✔
196
            if "stream" in line and line["stream"].startswith("Successfully built"):
1✔
197
                logger.info("[green]✔ OK[/green]\n")
1✔
198
            elif "errorDetail" in line:
1✔
199
                raise ExitError(1, f"Failed building local image: {line['errorDetail']}")
1✔
200

201
    def _create_container(self, tty, command="", *args):
1✔
202
        """Create the container that will run the command.
203

204
        Args:
205
            tty (bool): Whether the container will run interactively or not.
206
            command (str, optional): Command to run. Defaults to "".
207

208
        Raises:
209
            Exit: If the container could not be created.
210

211
        Returns:
212
            dict: Reference to the created container.
213
        """
214
        command = " ".join([command] + list(args))
1✔
215
        logger.debug(f"[bold cyan]Running command:[/bold cyan] {command}")
1✔
216
        self.container_config["command"] = command
1✔
217
        self.container_config["tty"] = tty
1✔
218

219
        try:
1✔
220
            return self.client.api.create_container(**self.container_config)
1✔
221

222
        except APIError as exc:
×
223
            exc.__traceback__ = None
×
224
            exc.__context__.__traceback__ = None
×
225
            logger.exception("Error creating container:", exc_info=exc)
×
226
            raise Exit(1)
×
227

228
    def _run(self, container, run_func):
1✔
229
        """Apply the given run function to the given container, return its outputs and handle container cleanup.
230

231
        Args:
232
            container (dict): Reference to a Docker container.
233
            run_func (function): Function to apply to the given container.
234

235
        Returns:
236
            any: Whatever the given function returns.
237
        """
238
        try:
×
239
            return run_func(self.client, container)
×
240

241
        except APIError as exc:
×
242
            exc.__traceback__ = None
×
243
            exc.__context__.__traceback__ = None
×
244
            logger.exception("Error during container execution:", exc_info=exc)
×
245

246
        finally:
247
            self.client.api.stop(container)
×
248
            self.client.api.remove_container(container)
×
249

250
    def _start(self, command: str, *args):
1✔
251
        """Create an interactive container, and run command with the given arguments.
252

253
        Args:
254
            command: Command to run.
255

256
        Returns:
257
            int: Execution exit code.
258
        """
259
        container = self._create_container(True, command, *args)
1✔
260

261
        def run_func(client, container):
1✔
262
            dockerpty.start(client=client.api, container=container)
×
263
            return client.api.inspect_container(container)["State"]["ExitCode"]
×
264

265
        return self._run(container, run_func)
1✔
266

267
    def _start_with_output(self, command, *args):
1✔
268
        """
269
        Same than _start but also returns the outputs (by dumping the logs) of the container.
270
        """
271
        container = self._create_container(True, command, *args)
×
272

273
        def run_func(client, container):
×
274
            dockerpty.start(client=client.api, container=container)
×
275
            exit_code = client.api.inspect_container(container)["State"]["ExitCode"]
×
276
            logs = client.api.logs(container).decode("utf-8")
×
277
            return exit_code, logs
×
278

279
        return self._run(container, run_func)
×
280

281
    def start(self, command: str, *arguments) -> int:
1✔
282
        """Run command with the given arguments in an interactive container.
283
        Returns execution exit code.
284
        """
285
        return self._start(command, *arguments)
1✔
286

287
    def _exec(self, command: str, *args) -> Tuple[int, str]:
1✔
288
        """Create a non interactive container and execute command with the given arguments.
289
        Returns execution exit code and output.
290
        """
UNCOV
291
        container = self._create_container(False, command, *args)
×
292

UNCOV
293
        def run_func(client, container):
×
294
            client.api.start(container)
×
295
            exit_code = client.api.wait(container)["StatusCode"]
×
296
            output = client.api.logs(container).decode("utf-8")
×
297
            return exit_code, output
×
298

UNCOV
299
        return self._run(container, run_func)
×
300

301
    def exec(self, command: str, *arguments) -> Tuple[int, str]:
1✔
302
        """Execute command with the given arguments in a container.
303
        Returns execution exit code and output.
304
        """
305
        return self._exec(command, *arguments)
×
306

307
    def docker_logs(self, container):
1✔
308
        return self.client.api.logs(container).decode("utf-8")
×
309

310

311
class SSOContainer(LeverageContainer):
1✔
312
    # SSO scripts
313
    AWS_SSO_LOGIN_SCRIPT = "/home/leverage/scripts/aws-sso/aws-sso-login.sh"
1✔
314
    AWS_SSO_LOGOUT_SCRIPT = "/home/leverage/scripts/aws-sso/aws-sso-logout.sh"
1✔
315

316
    # SSO constants
317
    AWS_SSO_LOGIN_URL = "https://device.sso.{region}.amazonaws.com/?user_code={user_code}"
1✔
318
    AWS_SSO_CODE_WAIT_SECONDS = 2
1✔
319
    AWS_SSO_CODE_ATTEMPTS = 10
1✔
320
    FALLBACK_LINK_MSG = "Opening the browser... if it fails, open this link in your browser:\n{link}"
1✔
321

322
    def get_sso_access_token(self):
1✔
UNCOV
323
        with open(self.paths.sso_token_file) as token_file:
×
324
            return json.loads(token_file.read())["accessToken"]
×
325

326
    @property
1✔
327
    def sso_region_from_main_profile(self):
1✔
328
        """
329
        Same than AWSCLIContainer.get_sso_region but without using a container.
330
        """
331
        conf = ConfigUpdater()
1✔
332
        conf.read(self.paths.host_aws_profiles_file)
1✔
333
        return conf.get(f"profile {self.project}-sso", "sso_region").value
1✔
334

335
    def get_sso_code(self, container) -> str:
1✔
336
        """
337
        Find and return the SSO user code by periodically checking the logs.
338
        Up until N attempts.
339
        """
340
        logger.info("Fetching SSO code...")
1✔
341
        for _ in range(self.AWS_SSO_CODE_ATTEMPTS):
1✔
342
            # pull logs periodically until we find our SSO code
343
            logs = self.docker_logs(container)
1✔
344
            if "Then enter the code:" in logs:
1✔
345
                return logs.split("Then enter the code:")[1].split("\n")[2]
1✔
346

347
            sleep(self.AWS_SSO_CODE_WAIT_SECONDS)
1✔
348

349
        raise ExitError(1, "Get SSO code timed-out")
1✔
350

351
    def get_sso_region(self):
1✔
352
        # TODO: what about using the .region property we have now? that takes the value from the path of the layer
NEW
353
        _, region = self.exec(f"configure get sso_region --profile {self.project}-sso")
×
NEW
354
        return region
×
355

356
    def sso_login(self) -> int:
1✔
357
        region = self.get_sso_region()
1✔
358

359
        with CustomEntryPoint(self, ""):
1✔
360
            container = self._create_container(False, command=self.AWS_SSO_LOGIN_SCRIPT)
1✔
361

362
        with ContainerSession(self.client, container):
1✔
363
            # once inside this block, the SSO_LOGIN_SCRIPT is being executed in the "background"
364
            # now let's grab the user code from the logs
365
            user_code = self.get_sso_code(container)
1✔
366
            # with the user code, we can now autocomplete the url
367
            link = self.AWS_SSO_LOGIN_URL.format(region=region.strip(), user_code=user_code)
1✔
368
            webbrowser.open_new_tab(link)
1✔
369
            # The SSO code is only valid once: if the browser was able to open it, the fallback link will be invalid
370
            logger.info(self.FALLBACK_LINK_MSG.format(link=link))
1✔
371
            # now let's wait until the command locking the container resolve itself:
372
            # aws sso login will wait for the user code
373
            # once submitted to the browser, the authentication finish and the lock is released
374
            exit_code = self.client.api.wait(container)["StatusCode"]
1✔
375
            raw_logger.info(self.docker_logs(container))
1✔
376

377
        return exit_code
1✔
378

379

380
class AWSCLIContainer(SSOContainer):
1✔
381
    """Leverage Container specially tailored to run AWS CLI commands."""
382

383
    AWS_CLI_BINARY = "/usr/local/bin/aws"
1✔
384

385
    def __init__(self, client):
1✔
386
        super().__init__(client)
1✔
387

388
        self.environment = {
1✔
389
            "COMMON_CONFIG_FILE": self.paths.common_tfvars,
390
            "ACCOUNT_CONFIG_FILE": self.paths.account_tfvars,
391
            "BACKEND_CONFIG_FILE": self.paths.backend_tfvars,
392
            "AWS_SHARED_CREDENTIALS_FILE": f"{self.paths.guest_aws_credentials_dir}/credentials",
393
            "AWS_CONFIG_FILE": f"{self.paths.guest_aws_credentials_dir}/config",
394
            "SSO_CACHE_DIR": f"{self.paths.guest_aws_credentials_dir}/sso/cache",
395
            "SCRIPT_LOG_LEVEL": get_script_log_level(),
396
        }
397
        self.entrypoint = self.AWS_CLI_BINARY
1✔
398
        self.mounts = [
1✔
399
            Mount(source=self.paths.root_dir.as_posix(), target=self.paths.guest_base_path, type="bind"),
400
            Mount(
401
                source=self.paths.host_aws_credentials_dir.as_posix(),
402
                target=self.paths.guest_aws_credentials_dir,
403
                type="bind",
404
            ),
405
        ]
406

407
        logger.debug(f"[bold cyan]Container configuration:[/bold cyan]\n{json.dumps(self.container_config, indent=2)}")
1✔
408

409
    def start(self, command, profile=""):
1✔
410
        args = [] if not profile else ["--profile", profile]
×
411
        return self._start(command, *args)
×
412

413
    # FIXME: we have a context manager for this now, remove this method later!
414
    def system_start(self, command):
1✔
415
        """Momentarily override the container's default entrypoint. To run arbitrary commands and not only AWS CLI ones."""
416
        self.entrypoint = ""
×
417
        exit_code = self._start(command)
×
418
        self.entrypoint = self.AWS_CLI_BINARY
×
419
        return exit_code
×
420

421
    def exec(self, command, profile=""):
1✔
422
        args = [] if not profile else ["--profile", profile]
×
423
        return self._exec(command, *args)
×
424

425
    # FIXME: we have a context manager for this now, remove this method later!
426
    def system_exec(self, command):
1✔
427
        """Momentarily override the container's default entrypoint. To run arbitrary commands and not only AWS CLI ones."""
428
        self.entrypoint = ""
×
429
        exit_code, output = self._exec(command)
×
430

431
        self.entrypoint = self.AWS_CLI_BINARY
×
432
        return exit_code, output
×
433

434

435
class TerraformContainer(SSOContainer):
1✔
436
    """Leverage container specifically tailored to run Terraform commands.
437
    It handles authentication and some checks regarding where the command is being executed."""
438

439
    TF_BINARY = "/bin/terraform"
1✔
440

441
    TF_MFA_ENTRYPOINT = "/home/leverage/scripts/aws-mfa/aws-mfa-entrypoint.sh"
1✔
442

443
    def __init__(self, client, mounts=None, env_vars=None):
1✔
444
        super().__init__(client, mounts=mounts, env_vars=env_vars)
1✔
445

446
        self.paths.assert_running_leverage_project()
1✔
447

448
        # Set authentication methods
449
        self.sso_enabled = self.paths.common_conf.get("sso_enabled", False)
1✔
450
        self.mfa_enabled = (
1✔
451
            self.env_conf.get("MFA_ENABLED", "false") == "true"
452
        )  # TODO: Convert values to bool upon loading
453

454
        # SSH AGENT
455
        SSH_AUTH_SOCK = os.getenv("SSH_AUTH_SOCK")
1✔
456

457
        self.environment.update(
1✔
458
            {
459
                "COMMON_CONFIG_FILE": self.paths.common_tfvars,
460
                "ACCOUNT_CONFIG_FILE": self.paths.account_tfvars,
461
                "BACKEND_CONFIG_FILE": self.paths.backend_tfvars,
462
                "AWS_SHARED_CREDENTIALS_FILE": f"{self.paths.guest_aws_credentials_dir}/credentials",
463
                "AWS_CONFIG_FILE": f"{self.paths.guest_aws_credentials_dir}/config",
464
                "SRC_AWS_SHARED_CREDENTIALS_FILE": f"{self.paths.guest_aws_credentials_dir}/credentials",  # Legacy?
465
                "SRC_AWS_CONFIG_FILE": f"{self.paths.guest_aws_credentials_dir}/config",  # Legacy?
466
                "AWS_CACHE_DIR": f"{self.paths.guest_aws_credentials_dir}/cache",
467
                "SSO_CACHE_DIR": f"{self.paths.guest_aws_credentials_dir}/sso/cache",
468
                "SCRIPT_LOG_LEVEL": get_script_log_level(),
469
                "MFA_SCRIPT_LOG_LEVEL": get_script_log_level(),  # Legacy
470
                "SSH_AUTH_SOCK": "" if SSH_AUTH_SOCK is None else "/ssh-agent",
471
            }
472
        )
473
        self.entrypoint = self.TF_BINARY
1✔
474
        extra_mounts = [
1✔
475
            Mount(source=self.paths.root_dir.as_posix(), target=self.paths.guest_base_path, type="bind"),
476
            Mount(
477
                source=self.paths.host_aws_credentials_dir.as_posix(),
478
                target=self.paths.guest_aws_credentials_dir,
479
                type="bind",
480
            ),
481
            Mount(source=(self.paths.home / ".gitconfig").as_posix(), target="/etc/gitconfig", type="bind"),
482
        ]
483
        self.mounts.extend(extra_mounts)
1✔
484
        # if you have set the tf plugin cache locally
485
        if self.paths.tf_cache_dir:
1✔
486
            # then mount it too into the container
487
            self.environment["TF_PLUGIN_CACHE_DIR"] = self.paths.tf_cache_dir
1✔
488
            # given that terraform use symlinks to point from the .terraform folder into the plugin folder
489
            # we need to use the same directory inside the container
490
            # otherwise symlinks will be broken once outside the container
491
            # which will break terraform usage outside Leverage
492
            self.mounts.append(Mount(source=self.paths.tf_cache_dir, target=self.paths.tf_cache_dir, type="bind"))
1✔
493
        if SSH_AUTH_SOCK is not None:
1✔
494
            self.mounts.append(Mount(source=SSH_AUTH_SOCK, target="/ssh-agent", type="bind"))
×
495

496
        self._backend_key = None
1✔
497

498
        logger.debug(f"[bold cyan]Container configuration:[/bold cyan]\n{json.dumps(self.container_config, indent=2)}")
1✔
499

500
    def auth_method(self) -> str:
1✔
501
        """
502
        Return the expected auth method based on the SSO or MFA flags.
503

504
        In the case of MFA, we also need to tweak some env variables for AWS credentials.
505
        Once you are done with authentication, remember to revert the env var changes.
506
        """
507
        if self.sso_enabled:
1✔
508
            self._check_sso_token()
1✔
509
            # sso credentials needs to be refreshed right before we execute our command on the container
510
            refresh_layer_credentials(self)
1✔
511
        elif self.mfa_enabled:
1✔
512
            self.environment.update(
1✔
513
                {
514
                    "AWS_SHARED_CREDENTIALS_FILE": self.environment["AWS_SHARED_CREDENTIALS_FILE"].replace(
515
                        "tmp", ".aws"
516
                    ),
517
                    "AWS_CONFIG_FILE": self.environment["AWS_CONFIG_FILE"].replace("tmp", ".aws"),
518
                }
519
            )
520
            return f"{self.TF_MFA_ENTRYPOINT} -- "
1✔
521

522
        return ""
1✔
523

524
    @property
1✔
525
    def tf_default_args(self):
1✔
526
        """Array of strings containing all valid config files for layer as parameters for Terraform"""
527
        common_config_files = [
×
528
            f"-var-file={self.paths.guest_config_file(common_file)}"
529
            for common_file in self.paths.common_config_dir.glob("*.tfvars")
530
        ]
531
        account_config_files = [
×
532
            f"-var-file={self.paths.guest_config_file(account_file)}"
533
            for account_file in self.paths.account_config_dir.glob("*.tfvars")
534
        ]
535
        return common_config_files + account_config_files
×
536

537
    def enable_mfa(self):
1✔
538
        """Enable Multi-Factor Authentication."""
539
        self.mfa_enabled = True
1✔
540

541
    def enable_sso(self):
1✔
542
        """Enable Single Sign-On Authentication."""
543
        self.sso_enabled = True
1✔
544

545
    def disable_authentication(self):
1✔
546
        """Disable all authentication."""
547
        self.mfa_enabled = False
×
548
        self.sso_enabled = False
×
549

550
    def _check_sso_token(self):
1✔
551
        """Check for the existence and validity of the SSO token to be used to get credentials."""
552

553
        # Adding `token` file name to this function in order to
554
        # meet the requirement regarding to have just one
555
        # token file in the sso/cache
556
        sso_role = self.paths.account_conf.get("sso_role")
×
557
        token_file = self.paths.sso_cache / sso_role
×
558

559
        token_files = list(self.paths.sso_cache.glob("*"))
×
560
        if not token_files:
×
561
            logger.error("No AWS SSO token found. Please log in or configure SSO.")
×
562
            raise Exit(1)
×
563

564
        if token_file not in token_files:
×
565
            sso_role = "token"
×
566
            token_file = self.paths.sso_cache / sso_role
×
567
            if token_file not in token_files:
×
568
                logger.error(
×
569
                    "No valid AWS SSO token found for current account.\n"
570
                    "Please log out and reconfigure SSO before proceeding."
571
                )
572
                raise Exit(1)
×
573

574
        entrypoint = self.entrypoint
×
575
        self.entrypoint = ""
×
576

577
        _, cached_token = self._exec(f"sh -c 'cat $SSO_CACHE_DIR/{sso_role}'")
×
578
        token = json.loads(cached_token)
×
579
        expiry = datetime.strptime(token.get("expiresAt"), "%Y-%m-%dT%H:%M:%SZ")
×
580
        renewal = datetime.utcnow()
×
581

582
        if expiry < renewal:
×
583
            logger.error(
×
584
                "AWS SSO token has expired, please log back in by running [bold]leverage aws sso login[/bold]"
585
                " to refresh your credentials before re-running the last command."
586
            )
587
            raise Exit(1)
×
588

589
        self.entrypoint = entrypoint
×
590

591
    def refresh_credentials(self):
1✔
592
        with AwsCredsEntryPoint(self, override_entrypoint=""):
1✔
593
            if exit_code := self._start('echo "Done."'):
1✔
594
                return exit_code
1✔
595

596
    def start(self, command, *arguments):
1✔
597
        with AwsCredsEntryPoint(self, self.entrypoint):
×
598
            return self._start(command, *arguments)
×
599

600
    def start_in_layer(self, command, *arguments):
1✔
601
        """Run a command that can only be performed in layer level."""
602
        self.paths.check_for_layer_location()
×
603

604
        return self.start(command, *arguments)
×
605

606
    def exec(self, command, *arguments):
1✔
607
        with AwsCredsEntryPoint(self):
×
608
            return self._exec(command, *arguments)
×
609

610
    # FIXME: we have a context manager for this now, remove this method later!
611
    def system_exec(self, command):
1✔
612
        """Momentarily override the container's default entrypoint. To run arbitrary commands and not only AWS CLI ones."""
613
        original_entrypoint = self.entrypoint
×
614
        self.entrypoint = ""
×
615
        exit_code, output = self._exec(command)
×
616

617
        self.entrypoint = original_entrypoint
×
618
        return exit_code, output
×
619

620
    def start_shell(self):
1✔
621
        """Launch a shell in the container."""
622
        if self.mfa_enabled or self.sso_enabled:
1✔
623
            self.paths.check_for_layer_location()
×
624

625
        with AwsCredsEntryPoint(self, override_entrypoint=""):
1✔
626
            self._start(self.SHELL)
1✔
627

628
    def set_backend_key(self, skip_validation=False):
1✔
629
        # Scenarios:
630
        #
631
        # scenario    |  s3 backend set   |  s3 key set  |  skip_validation  |  result
632
        # 0           |  false            |  false       |  false            |  fail
633
        # 1           |  false            |  false       |  true             |  ok
634
        # 2           |  true             |  false       |  false/true       |  set the key
635
        # 3           |  true             |  true        |  false/true       |  read the key
636
        try:
×
637
            config_tf_file = self.paths.cwd / "config.tf"
×
638
            config_tf = hcl2.loads(config_tf_file.read_text()) if config_tf_file.exists() else {}
×
639
            if (
×
640
                "terraform" in config_tf
641
                and "backend" in config_tf["terraform"][0]
642
                and "s3" in config_tf["terraform"][0]["backend"][0]
643
            ):
644
                if "key" in config_tf["terraform"][0]["backend"][0]["s3"]:
×
645
                    backend_key = config_tf["terraform"][0]["backend"][0]["s3"]["key"]
×
646
                    self._backend_key = backend_key
×
647
                else:
648
                    self._backend_key = (
×
649
                        f"{self.paths.cwd.relative_to(self.paths.root_dir).as_posix()}/terraform.tfstate".replace(
650
                            "/base-", "/"
651
                        ).replace("/tools-", "/")
652
                    )
653

654
                    in_container_file_path = (
×
655
                        f"{self.paths.guest_base_path}/{config_tf_file.relative_to(self.paths.root_dir).as_posix()}"
656
                    )
657
                    resp = self.system_exec(
×
658
                        "hcledit "
659
                        f"-f {in_container_file_path} -u"
660
                        f' attribute append terraform.backend.key "\\"{self._backend_key}\\""'
661
                    )
662
            else:
663
                if not skip_validation:
×
664
                    raise KeyError()
×
665
        except (KeyError, IndexError):
×
666
            logger.error(
×
667
                "[red]✘[/red] Malformed [bold]config.tf[/bold] file. Missing Terraform backend block. In some cases you may want to skip this check by using the --skip-validation flag, e.g. the first time you initialize a terraform-backend layer."
668
            )
669
            raise Exit(1)
×
670
        except Exception as e:
×
671
            logger.error("[red]✘[/red] Malformed [bold]config.tf[/bold] file. Unable to parse.")
×
672
            logger.debug(e)
×
673
            raise Exit(1)
×
674

675
    @property
1✔
676
    def backend_key(self):
1✔
677
        return self._backend_key
×
678

679
    @backend_key.setter
1✔
680
    def backend_key(self, backend_key):
1✔
681
        self._backend_key = backend_key
×
682

683

684
class TFautomvContainer(TerraformContainer):
1✔
685
    """Leverage Container tailored to run general commands."""
686

687
    TFAUTOMV_CLI_BINARY = "/usr/local/bin/tfautomv"
1✔
688

689
    def __init__(self, client):
1✔
690
        super().__init__(client)
×
691

692
        self.environment["TF_CLI_ARGS_init"] = " ".join(self.tf_default_args)
×
693
        self.environment["TF_CLI_ARGS_plan"] = " ".join(self.tf_default_args)
×
694

695
        self.entrypoint = self.TFAUTOMV_CLI_BINARY
×
696

697
        logger.debug(f"[bold cyan]Container configuration:[/bold cyan]\n{json.dumps(self.container_config, indent=2)}")
×
698

699
    def start(self, *arguments):
1✔
700
        with AwsCredsEntryPoint(self):
×
701
            return self._start("", *arguments)
×
702

703
    def start_in_layer(self, *arguments):
1✔
704
        """Run a command that can only be performed in layer level."""
705
        self.paths.check_for_layer_location()
×
706

707
        return self.start(*arguments)
×
708

709
    def exec(self, command, *arguments):
1✔
710
        with AwsCredsEntryPoint(self):
×
711
            return self._exec(command, *arguments)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc