• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

binbashar / leverage / 10565875170

26 Aug 2024 07:18PM UTC coverage: 59.767% (-9.3%) from 69.031%
10565875170

Pull #269

github

web-flow
Feature/support python 3.12 (#279)

* adding python 3.12 in pyproject.toml

* autogenerating python 3.12 in poetry.lock
Pull Request #269: [BV-195] Run toolbox with the host user

204 of 508 branches covered (40.16%)

Branch coverage included in aggregate %.

67 of 71 new or added lines in 4 files covered. (94.37%)

5 existing lines in 1 file now uncovered.

2464 of 3956 relevant lines covered (62.29%)

0.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.99
/leverage/container.py
1
import json
1✔
2
import os
1✔
3
import re
1✔
4
import webbrowser
1✔
5
from io import BytesIO
1✔
6
from pathlib import Path
1✔
7
from datetime import datetime
1✔
8
from time import sleep
1✔
9

10
import hcl2
1✔
11
from click.exceptions import Exit
1✔
12
import dockerpty
1✔
13
from configupdater import ConfigUpdater
1✔
14
from docker import DockerClient
1✔
15
from docker.errors import APIError
1✔
16
from docker.types import Mount
1✔
17
from typing import Tuple
1✔
18

19
from leverage import logger
1✔
20
from leverage._utils import AwsCredsEntryPoint, CustomEntryPoint, ExitError, ContainerSession
1✔
21
from leverage.modules.auth import refresh_layer_credentials
1✔
22
from leverage.logger import raw_logger
1✔
23
from leverage.logger import get_script_log_level
1✔
24
from leverage.path import PathsHandler
1✔
25
from leverage.conf import load as load_env
1✔
26

27
REGION = (
1✔
28
    r"(.*)"  # project folder
29
    # start region
30
    r"(global|(?:[a-z]{2}-(?:gov-)?"
31
    r"(?:central|north|south|east|west|northeast|northwest|southeast|southwest|secret|topsecret)-[1-4]))"
32
    # end region
33
    r"(.*)"  # layer
34
)
35

36

37
def get_docker_client():
1✔
38
    """Attempt to get a Docker client from the environment configuration. Halt application otherwise.
39

40
    Raises:
41
        Exit: If communication to Docker server could not be established.
42

43
    Returns:
44
        docker.DockerClient: Client for Docker daemon.
45
    """
46
    try:
×
47
        docker_client = DockerClient.from_env()
×
48
        docker_client.ping()
×
49

50
    except:
×
51
        logger.error(
×
52
            "Docker daemon doesn't seem to be responding. "
53
            "Please check it is up and running correctly before re-running the command."
54
        )
55
        raise Exit(1)
×
56

57
    return docker_client
×
58

59

60
class LeverageContainer:
1✔
61
    """Basic Leverage Container. Holds the minimum information required to run the Docker image that Leverage uses
62
    to perform its operations. Commands can be issued as interactive via `start` for when live output or user input is desired
63
    or the can be simply executed via `exec` to run silently and retrieve the command output.
64

65
    NOTE: An aggregation approach to this design should be considered instead of the current inheritance approach.
66
    """
67

68
    LEVERAGE_IMAGE = "binbash/leverage-toolbox"
1✔
69
    SHELL = "/bin/bash"
1✔
70
    CONTAINER_USER = "leverage"
1✔
71

72
    def __init__(self, client, mounts: tuple = None, env_vars: dict = None):
1✔
73
        """Project related paths are determined and stored. Project configuration is loaded.
74

75
        Args:
76
            client (docker.DockerClient): Client to interact with Docker daemon.
77
        """
78
        self.client = client
1✔
79
        # Load configs
80
        self.env_conf = load_env()
1✔
81

82
        self.paths = PathsHandler(self.env_conf, self.CONTAINER_USER)
1✔
83
        self.project = self.paths.project
1✔
84

85
        # Set image to use
86
        self.image = self.env_conf.get("TERRAFORM_IMAGE", self.LEVERAGE_IMAGE)
1✔
87
        self.image_tag = self.env_conf.get("TERRAFORM_IMAGE_TAG")
1✔
88
        if not self.image_tag:
1✔
89
            logger.error(
×
90
                "No docker image tag defined.\n"
91
                "Please set `TERRAFORM_IMAGE_TAG` variable in the project's [bold]build.env[/bold] file before running a Leverage command."
92
            )
93
            raise Exit(1)
×
94

95
        mounts = [Mount(source=source, target=target, type="bind") for source, target in mounts] if mounts else []
1✔
96
        self.host_config = self.client.api.create_host_config(security_opt=["label:disable"], mounts=mounts)
1✔
97
        self.container_config = {
1✔
98
            "image": f"{self.image}:{self.local_image_tag}",
99
            "command": "",
100
            "stdin_open": True,
101
            "environment": env_vars or {},
102
            "entrypoint": "",
103
            "working_dir": f"{self.paths.guest_base_path}/{self.paths.cwd.relative_to(self.paths.root_dir).as_posix()}",
104
            "host_config": self.host_config,
105
        }
106

107
    @property
1✔
108
    def environment(self):
1✔
109
        return self.container_config["environment"]
1✔
110

111
    @environment.setter
1✔
112
    def environment(self, value):
1✔
113
        self.container_config["environment"] = value
1✔
114

115
    @property
1✔
116
    def entrypoint(self):
1✔
117
        return self.container_config["entrypoint"]
1✔
118

119
    @entrypoint.setter
1✔
120
    def entrypoint(self, value):
1✔
121
        self.container_config["entrypoint"] = value
1✔
122

123
    @property
1✔
124
    def mounts(self):
1✔
125
        return self.container_config["host_config"]["Mounts"]
1✔
126

127
    @mounts.setter
1✔
128
    def mounts(self, value):
1✔
UNCOV
129
        self.container_config["host_config"]["Mounts"] = value
×
130

131
    @property
1✔
132
    def region(self):
1✔
133
        """
134
        Return the region of the layer.
135
        """
136
        if matches := re.match(REGION, self.paths.cwd.as_posix()):
1✔
137
            # the region (group 1) is between the projects folders (group 0) and the layers (group 2)
138
            return matches.groups()[1]
1✔
139

140
        raise ExitError(1, f"No valid region could be found at: {self.paths.cwd.as_posix()}")
×
141

142
    @property
1✔
143
    def local_image_tag(self):
1✔
144
        return f"{self.image_tag}-{os.getgid()}-{os.getuid()}"
1✔
145

146
    @property
1✔
147
    def local_image(self) -> BytesIO:
1✔
148
        """Return the local image that will be built, as a file-like object."""
149
        return BytesIO(
1✔
150
            """
151
            ARG IMAGE_TAG
152
            FROM binbash/leverage-toolbox:$IMAGE_TAG
153

154
            ARG UNAME
155
            ARG UID
156
            ARG GID
157
            RUN groupadd -g $GID -o $UNAME
158
            RUN useradd -m -u $UID -g $GID -o -s /bin/bash $UNAME
159
            USER $UNAME
160
            """.encode(
161
                "utf-8"
162
            )
163
        )
164

165
    def ensure_image(self):
1✔
166
        """
167
        Make sure the required local Docker image is available in the system. If not, build it.
168
        If the image already exists, re-build it so changes in the arguments can take effect.
169
        """
170
        logger.info(f"Checking for local docker image, tag: {self.local_image_tag}...")
1✔
171
        image_name = f"{self.image}:{self.local_image_tag}"
1✔
172

173
        # check first is our image is already available locally
174
        found_image = self.client.api.images(f"{self.image}:{self.local_image_tag}")
1✔
175
        if found_image:
1✔
176
            logger.info("[green]✔ OK[/green]\n")
1✔
177
            return
1✔
178

179
        logger.info(f"Image not found, building it...")
1✔
180
        build_args = {
1✔
181
            "IMAGE_TAG": self.image_tag,
182
            "UNAME": self.CONTAINER_USER,
183
            "GID": str(os.getgid()),
184
            "UID": str(os.getuid()),
185
        }
186

187
        stream = self.client.api.build(
1✔
188
            fileobj=self.local_image,
189
            tag=image_name,
190
            pull=True,
191
            buildargs=build_args,
192
            decode=True,
193
        )
194

195
        for line in stream:
1✔
196
            if "stream" in line and line["stream"].startswith("Successfully built"):
1✔
197
                logger.info("[green]✔ OK[/green]\n")
1✔
198
            elif "errorDetail" in line:
1✔
199
                raise ExitError(1, f"Failed building local image: {line['errorDetail']}")
1✔
200

201
    def _create_container(self, tty, command="", *args):
1✔
202
        """Create the container that will run the command.
203

204
        Args:
205
            tty (bool): Whether the container will run interactively or not.
206
            command (str, optional): Command to run. Defaults to "".
207

208
        Raises:
209
            Exit: If the container could not be created.
210

211
        Returns:
212
            dict: Reference to the created container.
213
        """
214
        command = " ".join([command] + list(args))
1✔
215
        logger.debug(f"[bold cyan]Running command:[/bold cyan] {command}")
1✔
216
        self.container_config["command"] = command
1✔
217
        self.container_config["tty"] = tty
1✔
218

219
        try:
1✔
220
            return self.client.api.create_container(**self.container_config)
1✔
221

222
        except APIError as exc:
×
223
            exc.__traceback__ = None
×
224
            exc.__context__.__traceback__ = None
×
225
            logger.exception("Error creating container:", exc_info=exc)
×
226
            raise Exit(1)
×
227

228
    def _run(self, container, run_func):
1✔
229
        """Apply the given run function to the given container, return its outputs and handle container cleanup.
230

231
        Args:
232
            container (dict): Reference to a Docker container.
233
            run_func (function): Function to apply to the given container.
234

235
        Returns:
236
            any: Whatever the given function returns.
237
        """
238
        try:
×
239
            return run_func(self.client, container)
×
240

241
        except APIError as exc:
×
242
            exc.__traceback__ = None
×
243
            exc.__context__.__traceback__ = None
×
244
            logger.exception("Error during container execution:", exc_info=exc)
×
245

246
        finally:
247
            self.client.api.stop(container)
×
248
            self.client.api.remove_container(container)
×
249

250
    def _start(self, command: str, *args):
1✔
251
        """Create an interactive container, and run command with the given arguments.
252

253
        Args:
254
            command: Command to run.
255

256
        Returns:
257
            int: Execution exit code.
258
        """
259
        container = self._create_container(True, command, *args)
1✔
260

261
        def run_func(client, container):
1✔
262
            dockerpty.start(client=client.api, container=container)
×
263
            return client.api.inspect_container(container)["State"]["ExitCode"]
×
264

265
        return self._run(container, run_func)
1✔
266

267
    def _start_with_output(self, command, *args):
1✔
268
        """
269
        Same than _start but also returns the outputs (by dumping the logs) of the container.
270
        """
271
        container = self._create_container(True, command, *args)
×
272

273
        def run_func(client, container):
×
274
            dockerpty.start(client=client.api, container=container)
×
275
            exit_code = client.api.inspect_container(container)["State"]["ExitCode"]
×
276
            logs = client.api.logs(container).decode("utf-8")
×
277
            return exit_code, logs
×
278

279
        return self._run(container, run_func)
×
280

281
    def start(self, command: str, *arguments) -> int:
1✔
282
        """Run command with the given arguments in an interactive container.
283
        Returns execution exit code.
284
        """
285
        return self._start(command, *arguments)
1✔
286

287
    def _exec(self, command: str, *args) -> Tuple[int, str]:
1✔
288
        """Create a non interactive container and execute command with the given arguments.
289
        Returns execution exit code and output.
290
        """
UNCOV
291
        container = self._create_container(False, command, *args)
×
292

UNCOV
293
        def run_func(client, container):
×
294
            client.api.start(container)
×
295
            exit_code = client.api.wait(container)["StatusCode"]
×
296
            output = client.api.logs(container).decode("utf-8")
×
297
            return exit_code, output
×
298

UNCOV
299
        return self._run(container, run_func)
×
300

301
    def exec(self, command: str, *arguments) -> Tuple[int, str]:
1✔
302
        """Execute command with the given arguments in a container.
303
        Returns execution exit code and output.
304
        """
305
        return self._exec(command, *arguments)
×
306

307
    def docker_logs(self, container):
1✔
308
        return self.client.api.logs(container).decode("utf-8")
×
309

310

311
class SSOContainer(LeverageContainer):
1✔
312
    # SSO scripts
313
    AWS_SSO_LOGIN_SCRIPT = "/opt/scripts/aws-sso/aws-sso-login.sh"
1✔
314
    AWS_SSO_LOGOUT_SCRIPT = "/opt/scripts/aws-sso/aws-sso-logout.sh"
1✔
315

316
    # SSO constants
317
    AWS_SSO_LOGIN_URL = "https://device.sso.{region}.amazonaws.com/?user_code={user_code}"
1✔
318
    AWS_SSO_CODE_WAIT_SECONDS = 2
1✔
319
    AWS_SSO_CODE_ATTEMPTS = 10
1✔
320
    FALLBACK_LINK_MSG = "Opening the browser... if it fails, open this link in your browser:\n{link}"
1✔
321

322
    def __init__(self, client, mounts=None, env_vars=None):
1✔
323
        super().__init__(client, mounts=mounts, env_vars=env_vars)
1✔
324
        self.mounts.extend(
1✔
325
            [
326
                Mount(source=(Path(__file__).parent / "scripts").as_posix(), target="/opt/scripts", type="bind"),
327
            ]
328
        )
329

330
    def get_sso_access_token(self):
1✔
UNCOV
331
        with open(self.paths.sso_token_file) as token_file:
×
332
            return json.loads(token_file.read())["accessToken"]
×
333

334
    @property
1✔
335
    def sso_region_from_main_profile(self):
1✔
336
        """
337
        Same than AWSCLIContainer.get_sso_region but without using a container.
338
        """
339
        conf = ConfigUpdater()
1✔
340
        conf.read(self.paths.host_aws_profiles_file)
1✔
341
        return conf.get(f"profile {self.project}-sso", "sso_region").value
1✔
342

343
    def get_sso_code(self, container) -> str:
1✔
344
        """
345
        Find and return the SSO user code by periodically checking the logs.
346
        Up until N attempts.
347
        """
348
        logger.info("Fetching SSO code...")
1✔
349
        for _ in range(self.AWS_SSO_CODE_ATTEMPTS):
1✔
350
            # pull logs periodically until we find our SSO code
351
            logs = self.docker_logs(container)
1✔
352
            if "Then enter the code:" in logs:
1✔
353
                return logs.split("Then enter the code:")[1].split("\n")[2]
1✔
354

355
            sleep(self.AWS_SSO_CODE_WAIT_SECONDS)
1✔
356

357
        raise ExitError(1, "Get SSO code timed-out")
1✔
358

359
    def get_sso_region(self):
1✔
360
        # TODO: what about using the .region property we have now? that takes the value from the path of the layer
NEW
361
        _, region = self.exec(f"configure get sso_region --profile {self.project}-sso")
×
NEW
362
        return region
×
363

364
    def sso_login(self) -> int:
1✔
365
        region = self.get_sso_region()
1✔
366

367
        with CustomEntryPoint(self, ""):
1✔
368
            container = self._create_container(False, command=self.AWS_SSO_LOGIN_SCRIPT)
1✔
369

370
        with ContainerSession(self.client, container):
1✔
371
            # once inside this block, the SSO_LOGIN_SCRIPT is being executed in the "background"
372
            # now let's grab the user code from the logs
373
            user_code = self.get_sso_code(container)
1✔
374
            # with the user code, we can now autocomplete the url
375
            link = self.AWS_SSO_LOGIN_URL.format(region=region.strip(), user_code=user_code)
1✔
376
            webbrowser.open_new_tab(link)
1✔
377
            # The SSO code is only valid once: if the browser was able to open it, the fallback link will be invalid
378
            logger.info(self.FALLBACK_LINK_MSG.format(link=link))
1✔
379
            # now let's wait until the command locking the container resolve itself:
380
            # aws sso login will wait for the user code
381
            # once submitted to the browser, the authentication finish and the lock is released
382
            exit_code = self.client.api.wait(container)["StatusCode"]
1✔
383
            raw_logger.info(self.docker_logs(container))
1✔
384

385
        return exit_code
1✔
386

387

388
class AWSCLIContainer(SSOContainer):
1✔
389
    """Leverage Container specially tailored to run AWS CLI commands."""
390

391
    AWS_CLI_BINARY = "/usr/local/bin/aws"
1✔
392

393
    def __init__(self, client):
1✔
394
        super().__init__(client)
1✔
395

396
        self.environment = {
1✔
397
            "COMMON_CONFIG_FILE": self.paths.common_tfvars,
398
            "ACCOUNT_CONFIG_FILE": self.paths.account_tfvars,
399
            "BACKEND_CONFIG_FILE": self.paths.backend_tfvars,
400
            "AWS_SHARED_CREDENTIALS_FILE": f"{self.paths.guest_aws_credentials_dir}/credentials",
401
            "AWS_CONFIG_FILE": f"{self.paths.guest_aws_credentials_dir}/config",
402
            "SSO_CACHE_DIR": f"{self.paths.guest_aws_credentials_dir}/sso/cache",
403
            "SCRIPT_LOG_LEVEL": get_script_log_level(),
404
        }
405
        self.entrypoint = self.AWS_CLI_BINARY
1✔
406
        self.mounts.extend(
1✔
407
            [
408
                Mount(source=self.paths.root_dir.as_posix(), target=self.paths.guest_base_path, type="bind"),
409
                Mount(
410
                    source=self.paths.host_aws_credentials_dir.as_posix(),
411
                    target=self.paths.guest_aws_credentials_dir,
412
                    type="bind",
413
                ),
414
            ]
415
        )
416

417
        logger.debug(f"[bold cyan]Container configuration:[/bold cyan]\n{json.dumps(self.container_config, indent=2)}")
1✔
418

419
    def start(self, command, profile=""):
1✔
420
        args = [] if not profile else ["--profile", profile]
×
421
        return self._start(command, *args)
×
422

423
    # FIXME: we have a context manager for this now, remove this method later!
424
    def system_start(self, command):
1✔
425
        """Momentarily override the container's default entrypoint. To run arbitrary commands and not only AWS CLI ones."""
426
        self.entrypoint = ""
×
427
        exit_code = self._start(command)
×
428
        self.entrypoint = self.AWS_CLI_BINARY
×
429
        return exit_code
×
430

431
    def exec(self, command, profile=""):
1✔
432
        args = [] if not profile else ["--profile", profile]
×
433
        return self._exec(command, *args)
×
434

435
    # FIXME: we have a context manager for this now, remove this method later!
436
    def system_exec(self, command):
1✔
437
        """Momentarily override the container's default entrypoint. To run arbitrary commands and not only AWS CLI ones."""
438
        self.entrypoint = ""
×
439
        exit_code, output = self._exec(command)
×
440

441
        self.entrypoint = self.AWS_CLI_BINARY
×
442
        return exit_code, output
×
443

444

445
class TerraformContainer(SSOContainer):
1✔
446
    """Leverage container specifically tailored to run Terraform commands.
447
    It handles authentication and some checks regarding where the command is being executed."""
448

449
    TF_BINARY = "/bin/terraform"
1✔
450

451
    TF_MFA_ENTRYPOINT = "/opt/scripts/aws-mfa/aws-mfa-entrypoint.sh"
1✔
452

453
    def __init__(self, client, mounts=None, env_vars=None):
1✔
454
        super().__init__(client, mounts=mounts, env_vars=env_vars)
1✔
455

456
        self.paths.assert_running_leverage_project()
1✔
457

458
        # Set authentication methods
459
        self.sso_enabled = self.paths.common_conf.get("sso_enabled", False)
1✔
460
        self.mfa_enabled = (
1✔
461
            self.env_conf.get("MFA_ENABLED", "false") == "true"
462
        )  # TODO: Convert values to bool upon loading
463

464
        # SSH AGENT
465
        SSH_AUTH_SOCK = os.getenv("SSH_AUTH_SOCK")
1✔
466

467
        self.environment.update(
1✔
468
            {
469
                "COMMON_CONFIG_FILE": self.paths.common_tfvars,
470
                "ACCOUNT_CONFIG_FILE": self.paths.account_tfvars,
471
                "BACKEND_CONFIG_FILE": self.paths.backend_tfvars,
472
                "AWS_SHARED_CREDENTIALS_FILE": f"{self.paths.guest_aws_credentials_dir}/credentials",
473
                "AWS_CONFIG_FILE": f"{self.paths.guest_aws_credentials_dir}/config",
474
                "SRC_AWS_SHARED_CREDENTIALS_FILE": f"{self.paths.guest_aws_credentials_dir}/credentials",  # Legacy?
475
                "SRC_AWS_CONFIG_FILE": f"{self.paths.guest_aws_credentials_dir}/config",  # Legacy?
476
                "AWS_CACHE_DIR": f"{self.paths.guest_aws_credentials_dir}/cache",
477
                "SSO_CACHE_DIR": f"{self.paths.guest_aws_credentials_dir}/sso/cache",
478
                "SCRIPT_LOG_LEVEL": get_script_log_level(),
479
                "MFA_SCRIPT_LOG_LEVEL": get_script_log_level(),  # Legacy
480
                "SSH_AUTH_SOCK": "" if SSH_AUTH_SOCK is None else "/ssh-agent",
481
            }
482
        )
483
        self.entrypoint = self.TF_BINARY
1✔
484
        extra_mounts = [
1✔
485
            Mount(source=self.paths.root_dir.as_posix(), target=self.paths.guest_base_path, type="bind"),
486
            Mount(
487
                source=self.paths.host_aws_credentials_dir.as_posix(),
488
                target=self.paths.guest_aws_credentials_dir,
489
                type="bind",
490
            ),
491
            Mount(source=(self.paths.home / ".gitconfig").as_posix(), target="/etc/gitconfig", type="bind"),
492
        ]
493
        self.mounts.extend(extra_mounts)
1✔
494
        # if you have set the tf plugin cache locally
495
        if self.paths.tf_cache_dir:
1✔
496
            # then mount it too into the container
497
            self.environment["TF_PLUGIN_CACHE_DIR"] = self.paths.tf_cache_dir
1✔
498
            # given that terraform use symlinks to point from the .terraform folder into the plugin folder
499
            # we need to use the same directory inside the container
500
            # otherwise symlinks will be broken once outside the container
501
            # which will break terraform usage outside Leverage
502
            self.mounts.append(Mount(source=self.paths.tf_cache_dir, target=self.paths.tf_cache_dir, type="bind"))
1✔
503
        if SSH_AUTH_SOCK is not None:
1✔
504
            self.mounts.append(Mount(source=SSH_AUTH_SOCK, target="/ssh-agent", type="bind"))
×
505

506
        self._backend_key = None
1✔
507

508
        logger.debug(f"[bold cyan]Container configuration:[/bold cyan]\n{json.dumps(self.container_config, indent=2)}")
1✔
509

510
    def auth_method(self) -> str:
1✔
511
        """
512
        Return the expected auth method based on the SSO or MFA flags.
513

514
        In the case of MFA, we also need to tweak some env variables for AWS credentials.
515
        Once you are done with authentication, remember to revert the env var changes.
516
        """
517
        if self.sso_enabled:
1✔
518
            self._check_sso_token()
1✔
519
            # sso credentials needs to be refreshed right before we execute our command on the container
520
            refresh_layer_credentials(self)
1✔
521
        elif self.mfa_enabled:
1✔
522
            self.environment.update(
1✔
523
                {
524
                    "AWS_SHARED_CREDENTIALS_FILE": self.environment["AWS_SHARED_CREDENTIALS_FILE"].replace(
525
                        "tmp", ".aws"
526
                    ),
527
                    "AWS_CONFIG_FILE": self.environment["AWS_CONFIG_FILE"].replace("tmp", ".aws"),
528
                }
529
            )
530
            return f"{self.TF_MFA_ENTRYPOINT} -- "
1✔
531

532
        return ""
1✔
533

534
    @property
1✔
535
    def tf_default_args(self):
1✔
536
        """Array of strings containing all valid config files for layer as parameters for Terraform"""
537
        common_config_files = [
×
538
            f"-var-file={self.paths.guest_config_file(common_file)}"
539
            for common_file in self.paths.common_config_dir.glob("*.tfvars")
540
        ]
541
        account_config_files = [
×
542
            f"-var-file={self.paths.guest_config_file(account_file)}"
543
            for account_file in self.paths.account_config_dir.glob("*.tfvars")
544
        ]
545
        return common_config_files + account_config_files
×
546

547
    def enable_mfa(self):
1✔
548
        """Enable Multi-Factor Authentication."""
549
        self.mfa_enabled = True
1✔
550

551
    def enable_sso(self):
1✔
552
        """Enable Single Sign-On Authentication."""
553
        self.sso_enabled = True
1✔
554

555
    def disable_authentication(self):
1✔
556
        """Disable all authentication."""
557
        self.mfa_enabled = False
×
558
        self.sso_enabled = False
×
559

560
    def _check_sso_token(self):
1✔
561
        """Check for the existence and validity of the SSO token to be used to get credentials."""
562

563
        # Adding `token` file name to this function in order to
564
        # meet the requirement regarding to have just one
565
        # token file in the sso/cache
566
        sso_role = self.paths.account_conf.get("sso_role")
×
567
        token_file = self.paths.sso_cache / sso_role
×
568

569
        token_files = list(self.paths.sso_cache.glob("*"))
×
570
        if not token_files:
×
571
            logger.error("No AWS SSO token found. Please log in or configure SSO.")
×
572
            raise Exit(1)
×
573

574
        if token_file not in token_files:
×
575
            sso_role = "token"
×
576
            token_file = self.paths.sso_cache / sso_role
×
577
            if token_file not in token_files:
×
578
                logger.error(
×
579
                    "No valid AWS SSO token found for current account.\n"
580
                    "Please log out and reconfigure SSO before proceeding."
581
                )
582
                raise Exit(1)
×
583

584
        entrypoint = self.entrypoint
×
585
        self.entrypoint = ""
×
586

587
        _, cached_token = self._exec(f"sh -c 'cat $SSO_CACHE_DIR/{sso_role}'")
×
588
        token = json.loads(cached_token)
×
589
        expiry = datetime.strptime(token.get("expiresAt"), "%Y-%m-%dT%H:%M:%SZ")
×
590
        renewal = datetime.utcnow()
×
591

592
        if expiry < renewal:
×
593
            logger.error(
×
594
                "AWS SSO token has expired, please log back in by running [bold]leverage aws sso login[/bold]"
595
                " to refresh your credentials before re-running the last command."
596
            )
597
            raise Exit(1)
×
598

599
        self.entrypoint = entrypoint
×
600

601
    def refresh_credentials(self):
1✔
602
        with AwsCredsEntryPoint(self, override_entrypoint=""):
1✔
603
            if exit_code := self._start('echo "Done."'):
1✔
604
                return exit_code
1✔
605

606
    def start(self, command, *arguments):
1✔
607
        with AwsCredsEntryPoint(self, self.entrypoint):
×
608
            return self._start(command, *arguments)
×
609

610
    def start_in_layer(self, command, *arguments):
1✔
611
        """Run a command that can only be performed in layer level."""
612
        self.paths.check_for_layer_location()
×
613

614
        return self.start(command, *arguments)
×
615

616
    def exec(self, command, *arguments):
1✔
617
        with AwsCredsEntryPoint(self):
×
618
            return self._exec(command, *arguments)
×
619

620
    # FIXME: we have a context manager for this now, remove this method later!
621
    def system_exec(self, command):
1✔
622
        """Momentarily override the container's default entrypoint. To run arbitrary commands and not only AWS CLI ones."""
623
        original_entrypoint = self.entrypoint
×
624
        self.entrypoint = ""
×
625
        exit_code, output = self._exec(command)
×
626

627
        self.entrypoint = original_entrypoint
×
628
        return exit_code, output
×
629

630
    def start_shell(self):
1✔
631
        """Launch a shell in the container."""
632
        if self.mfa_enabled or self.sso_enabled:
1✔
633
            self.paths.check_for_layer_location()
×
634

635
        with AwsCredsEntryPoint(self, override_entrypoint=""):
1✔
636
            self._start(self.SHELL)
1✔
637

638
    def set_backend_key(self, skip_validation=False):
1✔
639
        # Scenarios:
640
        #
641
        # scenario    |  s3 backend set   |  s3 key set  |  skip_validation  |  result
642
        # 0           |  false            |  false       |  false            |  fail
643
        # 1           |  false            |  false       |  true             |  ok
644
        # 2           |  true             |  false       |  false/true       |  set the key
645
        # 3           |  true             |  true        |  false/true       |  read the key
646
        try:
×
647
            config_tf_file = self.paths.cwd / "config.tf"
×
648
            config_tf = hcl2.loads(config_tf_file.read_text()) if config_tf_file.exists() else {}
×
649
            if (
×
650
                "terraform" in config_tf
651
                and "backend" in config_tf["terraform"][0]
652
                and "s3" in config_tf["terraform"][0]["backend"][0]
653
            ):
654
                if "key" in config_tf["terraform"][0]["backend"][0]["s3"]:
×
655
                    backend_key = config_tf["terraform"][0]["backend"][0]["s3"]["key"]
×
656
                    self._backend_key = backend_key
×
657
                else:
658
                    self._backend_key = (
×
659
                        f"{self.paths.cwd.relative_to(self.paths.root_dir).as_posix()}/terraform.tfstate".replace(
660
                            "/base-", "/"
661
                        ).replace("/tools-", "/")
662
                    )
663

664
                    in_container_file_path = (
×
665
                        f"{self.paths.guest_base_path}/{config_tf_file.relative_to(self.paths.root_dir).as_posix()}"
666
                    )
667
                    resp = self.system_exec(
×
668
                        "hcledit "
669
                        f"-f {in_container_file_path} -u"
670
                        f' attribute append terraform.backend.key "\\"{self._backend_key}\\""'
671
                    )
672
            else:
673
                if not skip_validation:
×
674
                    raise KeyError()
×
675
        except (KeyError, IndexError):
×
676
            logger.error(
×
677
                "[red]✘[/red] Malformed [bold]config.tf[/bold] file. Missing Terraform backend block. In some cases you may want to skip this check by using the --skip-validation flag, e.g. the first time you initialize a terraform-backend layer."
678
            )
679
            raise Exit(1)
×
680
        except Exception as e:
×
681
            logger.error("[red]✘[/red] Malformed [bold]config.tf[/bold] file. Unable to parse.")
×
682
            logger.debug(e)
×
683
            raise Exit(1)
×
684

685
    @property
1✔
686
    def backend_key(self):
1✔
687
        return self._backend_key
×
688

689
    @backend_key.setter
1✔
690
    def backend_key(self, backend_key):
1✔
691
        self._backend_key = backend_key
×
692

693

694
class TFautomvContainer(TerraformContainer):
1✔
695
    """Leverage Container tailored to run general commands."""
696

697
    TFAUTOMV_CLI_BINARY = "/usr/local/bin/tfautomv"
1✔
698

699
    def __init__(self, client):
1✔
700
        super().__init__(client)
×
701

702
        self.environment["TF_CLI_ARGS_init"] = " ".join(self.tf_default_args)
×
703
        self.environment["TF_CLI_ARGS_plan"] = " ".join(self.tf_default_args)
×
704

705
        self.entrypoint = self.TFAUTOMV_CLI_BINARY
×
706

707
        logger.debug(f"[bold cyan]Container configuration:[/bold cyan]\n{json.dumps(self.container_config, indent=2)}")
×
708

709
    def start(self, *arguments):
1✔
710
        with AwsCredsEntryPoint(self):
×
711
            return self._start("", *arguments)
×
712

713
    def start_in_layer(self, *arguments):
1✔
714
        """Run a command that can only be performed in layer level."""
715
        self.paths.check_for_layer_location()
×
716

717
        return self.start(*arguments)
×
718

719
    def exec(self, command, *arguments):
1✔
720
        with AwsCredsEntryPoint(self):
×
721
            return self._exec(command, *arguments)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc