• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

binbashar / leverage / 11222088369

07 Oct 2024 06:55PM UTC coverage: 59.901% (-9.3%) from 69.151%
11222088369

push

github

web-flow
[BV-195] Run toolbox with the host user (#269)

* chore: building local image + adapting code

* path fixes

* update tests

* make it a package so poetry doesn't complain

* user newer image (that supports groupadd/useradd binaries)

* tweaks

* verify if image already exists locally

* fix coverage path

* ensure_image unit tests

* Feature/support python 3.12 (#279)

* adding python 3.12 in pyproject.toml

* autogenerating python 3.12 in poetry.lock

* keep using scripts from toolbox

* fix test references

* removing ipdb

* merge master

* Bump default Toolbox-version to 1.3.2-0.2.0

---------

Co-authored-by: Exequiel Barrirero <exequielrafaela@users.noreply.github.com>
Co-authored-by: Angelo Fenoglio <angelo.fenoglio@binbash.com.ar>

206 of 504 branches covered (40.87%)

Branch coverage included in aggregate %.

65 of 69 new or added lines in 5 files covered. (94.2%)

4 existing lines in 1 file now uncovered.

2450 of 3930 relevant lines covered (62.34%)

0.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.94
/leverage/container.py
1
import json
1✔
2
import os
1✔
3
import re
1✔
4
import webbrowser
1✔
5
from io import BytesIO
1✔
6
from datetime import datetime
1✔
7
from time import sleep
1✔
8

9
import hcl2
1✔
10
from click.exceptions import Exit
1✔
11
import dockerpty
1✔
12
from configupdater import ConfigUpdater
1✔
13
from docker import DockerClient
1✔
14
from docker.errors import APIError
1✔
15
from docker.types import Mount
1✔
16
from typing import Tuple
1✔
17

18
from leverage import logger
1✔
19
from leverage._utils import AwsCredsEntryPoint, CustomEntryPoint, ExitError, ContainerSession
1✔
20
from leverage.modules.auth import refresh_layer_credentials
1✔
21
from leverage.logger import raw_logger
1✔
22
from leverage.logger import get_script_log_level
1✔
23
from leverage.path import PathsHandler
1✔
24
from leverage.conf import load as load_env
1✔
25

26
REGION = (
1✔
27
    r"(.*)"  # project folder
28
    # start region
29
    r"(global|(?:[a-z]{2}-(?:gov-)?"
30
    r"(?:central|north|south|east|west|northeast|northwest|southeast|southwest|secret|topsecret)-[1-4]))"
31
    # end region
32
    r"(.*)"  # layer
33
)
34

35

36
def get_docker_client():
1✔
37
    """Attempt to get a Docker client from the environment configuration. Halt application otherwise.
38

39
    Raises:
40
        Exit: If communication to Docker server could not be established.
41

42
    Returns:
43
        docker.DockerClient: Client for Docker daemon.
44
    """
45
    try:
×
46
        docker_client = DockerClient.from_env()
×
47
        docker_client.ping()
×
48

49
    except:
×
50
        logger.error(
×
51
            "Docker daemon doesn't seem to be responding. "
52
            "Please check it is up and running correctly before re-running the command."
53
        )
54
        raise Exit(1)
×
55

56
    return docker_client
×
57

58

59
class LeverageContainer:
1✔
60
    """Basic Leverage Container. Holds the minimum information required to run the Docker image that Leverage uses
61
    to perform its operations. Commands can be issued as interactive via `start` for when live output or user input is desired
62
    or the can be simply executed via `exec` to run silently and retrieve the command output.
63

64
    NOTE: An aggregation approach to this design should be considered instead of the current inheritance approach.
65
    """
66

67
    LEVERAGE_IMAGE = "binbash/leverage-toolbox"
1✔
68
    SHELL = "/bin/bash"
1✔
69
    CONTAINER_USER = "leverage"
1✔
70

71
    def __init__(self, client, mounts: tuple = None, env_vars: dict = None):
1✔
72
        """Project related paths are determined and stored. Project configuration is loaded.
73

74
        Args:
75
            client (docker.DockerClient): Client to interact with Docker daemon.
76
        """
77
        self.client = client
1✔
78
        # Load configs
79
        self.env_conf = load_env()
1✔
80

81
        self.paths = PathsHandler(self.env_conf, self.CONTAINER_USER)
1✔
82
        self.project = self.paths.project
1✔
83

84
        # Set image to use
85
        self.image = self.env_conf.get("TERRAFORM_IMAGE", self.LEVERAGE_IMAGE)
1✔
86
        self.image_tag = self.env_conf.get("TERRAFORM_IMAGE_TAG")
1✔
87
        if not self.image_tag:
1✔
88
            logger.error(
×
89
                "No docker image tag defined.\n"
90
                "Please set `TERRAFORM_IMAGE_TAG` variable in the project's [bold]build.env[/bold] file before running a Leverage command."
91
            )
92
            raise Exit(1)
×
93

94
        mounts = [Mount(source=source, target=target, type="bind") for source, target in mounts] if mounts else []
1✔
95
        self.host_config = self.client.api.create_host_config(security_opt=["label=disable"], mounts=mounts)
1✔
96
        self.container_config = {
1✔
97
            "image": f"{self.image}:{self.local_image_tag}",
98
            "command": "",
99
            "stdin_open": True,
100
            "environment": env_vars or {},
101
            "entrypoint": "",
102
            "working_dir": f"{self.paths.guest_base_path}/{self.paths.cwd.relative_to(self.paths.root_dir).as_posix()}",
103
            "host_config": self.host_config,
104
        }
105

106
    @property
1✔
107
    def environment(self):
1✔
108
        return self.container_config["environment"]
1✔
109

110
    @environment.setter
1✔
111
    def environment(self, value):
1✔
112
        self.container_config["environment"] = value
1✔
113

114
    @property
1✔
115
    def entrypoint(self):
1✔
116
        return self.container_config["entrypoint"]
1✔
117

118
    @entrypoint.setter
1✔
119
    def entrypoint(self, value):
1✔
120
        self.container_config["entrypoint"] = value
1✔
121

122
    @property
1✔
123
    def mounts(self):
1✔
124
        return self.container_config["host_config"]["Mounts"]
1✔
125

126
    @mounts.setter
1✔
127
    def mounts(self, value):
1✔
128
        self.container_config["host_config"]["Mounts"] = value
1✔
129

130
    @property
1✔
131
    def region(self):
1✔
132
        """
133
        Return the region of the layer.
134
        """
135
        if matches := re.match(REGION, self.paths.cwd.as_posix()):
1✔
136
            # the region (group 1) is between the projects folders (group 0) and the layers (group 2)
137
            return matches.groups()[1]
1✔
138

139
        raise ExitError(1, f"No valid region could be found at: {self.paths.cwd.as_posix()}")
×
140

141
    @property
1✔
142
    def local_image_tag(self):
1✔
143
        return f"{self.image_tag}-{os.getgid()}-{os.getuid()}"
1✔
144

145
    @property
1✔
146
    def local_image(self) -> BytesIO:
1✔
147
        """Return the local image that will be built, as a file-like object."""
148
        return BytesIO(
1✔
149
            """
150
            ARG IMAGE_TAG
151
            FROM binbash/leverage-toolbox:$IMAGE_TAG
152

153
            ARG UNAME
154
            ARG UID
155
            ARG GID
156
            RUN groupadd -g $GID -o $UNAME
157
            RUN useradd -m -u $UID -g $GID -o -s /bin/bash $UNAME
158
            RUN chown -R $UID:$GID /home/leverage
159
            USER $UNAME
160
            """.encode(
161
                "utf-8"
162
            )
163
        )
164

165
    def ensure_image(self):
1✔
166
        """
167
        Make sure the required local Docker image is available in the system. If not, build it.
168
        If the image already exists, re-build it so changes in the arguments can take effect.
169
        """
170
        logger.info(f"Checking for local docker image, tag: {self.local_image_tag}...")
1✔
171
        image_name = f"{self.image}:{self.local_image_tag}"
1✔
172

173
        # check first is our image is already available locally
174
        found_image = self.client.api.images(f"{self.image}:{self.local_image_tag}")
1✔
175
        if found_image:
1✔
176
            logger.info("[green]✔ OK[/green]\n")
1✔
177
            return
1✔
178

179
        logger.info(f"Image not found, building it...")
1✔
180
        build_args = {
1✔
181
            "IMAGE_TAG": self.image_tag,
182
            "UNAME": self.CONTAINER_USER,
183
            "GID": str(os.getgid()),
184
            "UID": str(os.getuid()),
185
        }
186

187
        stream = self.client.api.build(
1✔
188
            fileobj=self.local_image,
189
            tag=image_name,
190
            pull=True,
191
            buildargs=build_args,
192
            decode=True,
193
        )
194

195
        for line in stream:
1✔
196
            if "stream" in line and line["stream"].startswith("Successfully built"):
1✔
197
                logger.info("[green]✔ OK[/green]\n")
1✔
198
            elif "errorDetail" in line:
1✔
199
                raise ExitError(1, f"Failed building local image: {line['errorDetail']}")
1✔
200

201
    def _create_container(self, tty, command="", *args):
1✔
202
        """Create the container that will run the command.
203

204
        Args:
205
            tty (bool): Whether the container will run interactively or not.
206
            command (str, optional): Command to run. Defaults to "".
207

208
        Raises:
209
            Exit: If the container could not be created.
210

211
        Returns:
212
            dict: Reference to the created container.
213
        """
214
        command = " ".join([command] + list(args))
1✔
215
        logger.debug(f"[bold cyan]Running command:[/bold cyan] {command}")
1✔
216
        self.container_config["command"] = command
1✔
217
        self.container_config["tty"] = tty
1✔
218

219
        try:
1✔
220
            return self.client.api.create_container(**self.container_config)
1✔
221

222
        except APIError as exc:
×
223
            exc.__traceback__ = None
×
224
            exc.__context__.__traceback__ = None
×
225
            logger.exception("Error creating container:", exc_info=exc)
×
226
            raise Exit(1)
×
227

228
    def _run(self, container, run_func):
1✔
229
        """Apply the given run function to the given container, return its outputs and handle container cleanup.
230

231
        Args:
232
            container (dict): Reference to a Docker container.
233
            run_func (function): Function to apply to the given container.
234

235
        Returns:
236
            any: Whatever the given function returns.
237
        """
238
        try:
×
239
            return run_func(self.client, container)
×
240

241
        except APIError as exc:
×
242
            exc.__traceback__ = None
×
243
            exc.__context__.__traceback__ = None
×
244
            logger.exception("Error during container execution:", exc_info=exc)
×
245

246
        finally:
247
            self.client.api.stop(container)
×
248
            self.client.api.remove_container(container)
×
249

250
    def _start(self, command: str, *args):
1✔
251
        """Create an interactive container, and run command with the given arguments.
252

253
        Args:
254
            command: Command to run.
255

256
        Returns:
257
            int: Execution exit code.
258
        """
259
        container = self._create_container(True, command, *args)
1✔
260

261
        def run_func(client, container):
1✔
262
            dockerpty.start(client=client.api, container=container)
×
263
            return client.api.inspect_container(container)["State"]["ExitCode"]
×
264

265
        return self._run(container, run_func)
1✔
266

267
    def _start_with_output(self, command, *args):
1✔
268
        """
269
        Same than _start but also returns the outputs (by dumping the logs) of the container.
270
        """
271
        container = self._create_container(True, command, *args)
×
272

273
        def run_func(client, container):
×
274
            dockerpty.start(client=client.api, container=container)
×
275
            exit_code = client.api.inspect_container(container)["State"]["ExitCode"]
×
276
            logs = client.api.logs(container).decode("utf-8")
×
277
            return exit_code, logs
×
278

279
        return self._run(container, run_func)
×
280

281
    def start(self, command: str, *arguments) -> int:
1✔
282
        """Run command with the given arguments in an interactive container.
283
        Returns execution exit code.
284
        """
285
        return self._start(command, *arguments)
1✔
286

287
    def _exec(self, command: str, *args) -> Tuple[int, str]:
1✔
288
        """Create a non interactive container and execute command with the given arguments.
289
        Returns execution exit code and output.
290
        """
UNCOV
291
        container = self._create_container(False, command, *args)
×
292

UNCOV
293
        def run_func(client, container):
×
294
            client.api.start(container)
×
295
            exit_code = client.api.wait(container)["StatusCode"]
×
296
            output = client.api.logs(container).decode("utf-8")
×
297
            return exit_code, output
×
298

UNCOV
299
        return self._run(container, run_func)
×
300

301
    def exec(self, command: str, *arguments) -> Tuple[int, str]:
1✔
302
        """Execute command with the given arguments in a container.
303
        Returns execution exit code and output.
304
        """
305
        return self._exec(command, *arguments)
×
306

307
    def docker_logs(self, container):
1✔
308
        return self.client.api.logs(container).decode("utf-8")
×
309

310

311
class SSOContainer(LeverageContainer):
1✔
312
    # SSO scripts
313
    AWS_SSO_LOGIN_SCRIPT = "/home/leverage/scripts/aws-sso/aws-sso-login.sh"
1✔
314
    AWS_SSO_LOGOUT_SCRIPT = "/home/leverage/scripts/aws-sso/aws-sso-logout.sh"
1✔
315

316
    # SSO constants
317
    AWS_SSO_LOGIN_URL = "https://device.sso.{region}.amazonaws.com/?user_code={user_code}"
1✔
318
    AWS_SSO_CODE_WAIT_SECONDS = 2
1✔
319
    AWS_SSO_CODE_ATTEMPTS = 10
1✔
320
    FALLBACK_LINK_MSG = "Opening the browser... if it fails, open this link in your browser:\n{link}"
1✔
321

322
    def get_sso_access_token(self):
1✔
UNCOV
323
        with open(self.paths.sso_token_file) as token_file:
×
324
            return json.loads(token_file.read())["accessToken"]
×
325

326
    @property
1✔
327
    def sso_region_from_main_profile(self):
1✔
328
        """
329
        Same than AWSCLIContainer.get_sso_region but without using a container.
330
        """
331
        conf = ConfigUpdater()
1✔
332
        conf.read(self.paths.host_aws_profiles_file)
1✔
333
        return conf.get(f"profile {self.project}-sso", "sso_region").value
1✔
334

335
    def get_sso_code(self, container) -> str:
1✔
336
        """
337
        Find and return the SSO user code by periodically checking the logs.
338
        Up until N attempts.
339
        """
340
        logger.info("Fetching SSO code...")
1✔
341
        for _ in range(self.AWS_SSO_CODE_ATTEMPTS):
1✔
342
            # pull logs periodically until we find our SSO code
343
            logs = self.docker_logs(container)
1✔
344
            if "Then enter the code:" in logs:
1✔
345
                return logs.split("Then enter the code:")[1].split("\n")[2]
1✔
346
            else:
347
                logger.debug(logs)
1✔
348
            sleep(self.AWS_SSO_CODE_WAIT_SECONDS)
1✔
349

350
        raise ExitError(1, "Get SSO code timed-out")
1✔
351

352
    def get_sso_region(self):
1✔
353
        # TODO: what about using the .region property we have now? that takes the value from the path of the layer
NEW
354
        _, region = self.exec(f"configure get sso_region --profile {self.project}-sso")
×
NEW
355
        return region
×
356

357
    def sso_login(self) -> int:
1✔
358
        region = self.get_sso_region()
1✔
359

360
        with CustomEntryPoint(self, "sh -c"):
1✔
361
            container = self._create_container(False, command=self.AWS_SSO_LOGIN_SCRIPT)
1✔
362

363
        with ContainerSession(self.client, container):
1✔
364
            # once inside this block, the SSO_LOGIN_SCRIPT is being executed in the "background"
365
            # now let's grab the user code from the logs
366
            user_code = self.get_sso_code(container)
1✔
367
            # with the user code, we can now autocomplete the url
368
            link = self.AWS_SSO_LOGIN_URL.format(region=region.strip(), user_code=user_code)
1✔
369
            webbrowser.open_new_tab(link)
1✔
370
            # The SSO code is only valid once: if the browser was able to open it, the fallback link will be invalid
371
            logger.info(self.FALLBACK_LINK_MSG.format(link=link))
1✔
372
            # now let's wait until the command locking the container resolve itself:
373
            # aws sso login will wait for the user code
374
            # once submitted to the browser, the authentication finish and the lock is released
375
            exit_code = self.client.api.wait(container)["StatusCode"]
1✔
376
            raw_logger.info(self.docker_logs(container))
1✔
377

378
        return exit_code
1✔
379

380

381
class AWSCLIContainer(SSOContainer):
1✔
382
    """Leverage Container specially tailored to run AWS CLI commands."""
383

384
    AWS_CLI_BINARY = "/usr/local/bin/aws"
1✔
385

386
    def __init__(self, client):
1✔
387
        super().__init__(client)
1✔
388

389
        self.environment = {
1✔
390
            "COMMON_CONFIG_FILE": self.paths.common_tfvars,
391
            "ACCOUNT_CONFIG_FILE": self.paths.account_tfvars,
392
            "BACKEND_CONFIG_FILE": self.paths.backend_tfvars,
393
            "AWS_SHARED_CREDENTIALS_FILE": f"{self.paths.guest_aws_credentials_dir}/credentials",
394
            "AWS_CONFIG_FILE": f"{self.paths.guest_aws_credentials_dir}/config",
395
            "SSO_CACHE_DIR": f"{self.paths.guest_aws_credentials_dir}/sso/cache",
396
            "SCRIPT_LOG_LEVEL": get_script_log_level(),
397
        }
398
        self.entrypoint = self.AWS_CLI_BINARY
1✔
399
        self.mounts = [
1✔
400
            Mount(source=self.paths.root_dir.as_posix(), target=self.paths.guest_base_path, type="bind"),
401
            Mount(
402
                source=self.paths.host_aws_credentials_dir.as_posix(),
403
                target=self.paths.guest_aws_credentials_dir,
404
                type="bind",
405
            ),
406
        ]
407

408
        logger.debug(f"[bold cyan]Container configuration:[/bold cyan]\n{json.dumps(self.container_config, indent=2)}")
1✔
409

410
    def start(self, command, profile=""):
1✔
411
        args = [] if not profile else ["--profile", profile]
×
412
        return self._start(command, *args)
×
413

414
    # FIXME: we have a context manager for this now, remove this method later!
415
    def system_start(self, command):
1✔
416
        """Momentarily override the container's default entrypoint. To run arbitrary commands and not only AWS CLI ones."""
417
        self.entrypoint = ""
×
418
        exit_code = self._start(command)
×
419
        self.entrypoint = self.AWS_CLI_BINARY
×
420
        return exit_code
×
421

422
    def exec(self, command, profile=""):
1✔
423
        args = [] if not profile else ["--profile", profile]
×
424
        return self._exec(command, *args)
×
425

426
    # FIXME: we have a context manager for this now, remove this method later!
427
    def system_exec(self, command):
1✔
428
        """Momentarily override the container's default entrypoint. To run arbitrary commands and not only AWS CLI ones."""
429
        self.entrypoint = ""
×
430
        exit_code, output = self._exec(command)
×
431

432
        self.entrypoint = self.AWS_CLI_BINARY
×
433
        return exit_code, output
×
434

435

436
class TerraformContainer(SSOContainer):
1✔
437
    """Leverage container specifically tailored to run Terraform commands.
438
    It handles authentication and some checks regarding where the command is being executed."""
439

440
    TF_BINARY = "/bin/terraform"
1✔
441

442
    TF_MFA_ENTRYPOINT = "/home/leverage/scripts/aws-mfa/aws-mfa-entrypoint.sh"
1✔
443

444
    def __init__(self, client, mounts=None, env_vars=None):
1✔
445
        super().__init__(client, mounts=mounts, env_vars=env_vars)
1✔
446

447
        self.paths.assert_running_leverage_project()
1✔
448

449
        # Set authentication methods
450
        self.sso_enabled = self.paths.common_conf.get("sso_enabled", False)
1✔
451
        self.mfa_enabled = (
1✔
452
            self.env_conf.get("MFA_ENABLED", "false") == "true"
453
        )  # TODO: Convert values to bool upon loading
454

455
        # SSH AGENT
456
        SSH_AUTH_SOCK = os.getenv("SSH_AUTH_SOCK")
1✔
457

458
        self.environment.update(
1✔
459
            {
460
                "COMMON_CONFIG_FILE": self.paths.common_tfvars,
461
                "ACCOUNT_CONFIG_FILE": self.paths.account_tfvars,
462
                "BACKEND_CONFIG_FILE": self.paths.backend_tfvars,
463
                "AWS_SHARED_CREDENTIALS_FILE": f"{self.paths.guest_aws_credentials_dir}/credentials",
464
                "AWS_CONFIG_FILE": f"{self.paths.guest_aws_credentials_dir}/config",
465
                "SRC_AWS_SHARED_CREDENTIALS_FILE": f"{self.paths.guest_aws_credentials_dir}/credentials",  # Legacy?
466
                "SRC_AWS_CONFIG_FILE": f"{self.paths.guest_aws_credentials_dir}/config",  # Legacy?
467
                "AWS_CACHE_DIR": f"{self.paths.guest_aws_credentials_dir}/cache",
468
                "SSO_CACHE_DIR": f"{self.paths.guest_aws_credentials_dir}/sso/cache",
469
                "SCRIPT_LOG_LEVEL": get_script_log_level(),
470
                "MFA_SCRIPT_LOG_LEVEL": get_script_log_level(),  # Legacy
471
                "SSH_AUTH_SOCK": "" if SSH_AUTH_SOCK is None else "/ssh-agent",
472
            }
473
        )
474
        self.entrypoint = self.TF_BINARY
1✔
475
        extra_mounts = [
1✔
476
            Mount(source=self.paths.root_dir.as_posix(), target=self.paths.guest_base_path, type="bind"),
477
            Mount(
478
                source=self.paths.host_aws_credentials_dir.as_posix(),
479
                target=self.paths.guest_aws_credentials_dir,
480
                type="bind",
481
            ),
482
            Mount(source=(self.paths.home / ".gitconfig").as_posix(), target="/etc/gitconfig", type="bind"),
483
        ]
484
        self.mounts.extend(extra_mounts)
1✔
485
        # if you have set the tf plugin cache locally
486
        if self.paths.tf_cache_dir:
1✔
487
            # then mount it too into the container
488
            self.environment["TF_PLUGIN_CACHE_DIR"] = self.paths.tf_cache_dir
1✔
489
            # given that terraform use symlinks to point from the .terraform folder into the plugin folder
490
            # we need to use the same directory inside the container
491
            # otherwise symlinks will be broken once outside the container
492
            # which will break terraform usage outside Leverage
493
            self.mounts.append(Mount(source=self.paths.tf_cache_dir, target=self.paths.tf_cache_dir, type="bind"))
1✔
494
        if SSH_AUTH_SOCK is not None:
1✔
495
            self.mounts.append(Mount(source=SSH_AUTH_SOCK, target="/ssh-agent", type="bind"))
×
496

497
        self._backend_key = None
1✔
498

499
        logger.debug(f"[bold cyan]Container configuration:[/bold cyan]\n{json.dumps(self.container_config, indent=2)}")
1✔
500

501
    def auth_method(self) -> str:
1✔
502
        """
503
        Return the expected auth method based on the SSO or MFA flags.
504

505
        In the case of MFA, we also need to tweak some env variables for AWS credentials.
506
        Once you are done with authentication, remember to revert the env var changes.
507
        """
508
        if self.sso_enabled:
1✔
509
            self._check_sso_token()
1✔
510
            # sso credentials needs to be refreshed right before we execute our command on the container
511
            refresh_layer_credentials(self)
1✔
512
        elif self.mfa_enabled:
1✔
513
            self.environment.update(
1✔
514
                {
515
                    "AWS_SHARED_CREDENTIALS_FILE": self.environment["AWS_SHARED_CREDENTIALS_FILE"].replace(
516
                        "tmp", ".aws"
517
                    ),
518
                    "AWS_CONFIG_FILE": self.environment["AWS_CONFIG_FILE"].replace("tmp", ".aws"),
519
                }
520
            )
521
            return f"{self.TF_MFA_ENTRYPOINT} -- "
1✔
522

523
        return ""
1✔
524

525
    @property
1✔
526
    def tf_default_args(self):
1✔
527
        """Array of strings containing all valid config files for layer as parameters for Terraform"""
528
        common_config_files = [
×
529
            f"-var-file={self.paths.guest_config_file(common_file)}"
530
            for common_file in self.paths.common_config_dir.glob("*.tfvars")
531
        ]
532
        account_config_files = [
×
533
            f"-var-file={self.paths.guest_config_file(account_file)}"
534
            for account_file in self.paths.account_config_dir.glob("*.tfvars")
535
        ]
536
        return common_config_files + account_config_files
×
537

538
    def enable_mfa(self):
1✔
539
        """Enable Multi-Factor Authentication."""
540
        self.mfa_enabled = True
1✔
541

542
    def enable_sso(self):
1✔
543
        """Enable Single Sign-On Authentication."""
544
        self.sso_enabled = True
1✔
545

546
    def disable_authentication(self):
1✔
547
        """Disable all authentication."""
548
        self.mfa_enabled = False
×
549
        self.sso_enabled = False
×
550

551
    def _check_sso_token(self):
1✔
552
        """Check for the existence and validity of the SSO token to be used to get credentials."""
553

554
        # Adding `token` file name to this function in order to
555
        # meet the requirement regarding to have just one
556
        # token file in the sso/cache
557
        sso_role = self.paths.account_conf.get("sso_role")
×
558
        token_file = self.paths.sso_cache / sso_role
×
559

560
        token_files = list(self.paths.sso_cache.glob("*"))
×
561
        if not token_files:
×
562
            logger.error("No AWS SSO token found. Please log in or configure SSO.")
×
563
            raise Exit(1)
×
564

565
        if token_file not in token_files:
×
566
            sso_role = "token"
×
567
            token_file = self.paths.sso_cache / sso_role
×
568
            if token_file not in token_files:
×
569
                logger.error(
×
570
                    "No valid AWS SSO token found for current account.\n"
571
                    "Please log out and reconfigure SSO before proceeding."
572
                )
573
                raise Exit(1)
×
574

575
        entrypoint = self.entrypoint
×
576
        self.entrypoint = ""
×
577

578
        _, cached_token = self._exec(f"sh -c 'cat $SSO_CACHE_DIR/{sso_role}'")
×
579
        token = json.loads(cached_token)
×
580
        expiry = datetime.strptime(token.get("expiresAt"), "%Y-%m-%dT%H:%M:%SZ")
×
581
        renewal = datetime.utcnow()
×
582

583
        if expiry < renewal:
×
584
            logger.error(
×
585
                "AWS SSO token has expired, please log back in by running [bold]leverage aws sso login[/bold]"
586
                " to refresh your credentials before re-running the last command."
587
            )
588
            raise Exit(1)
×
589

590
        self.entrypoint = entrypoint
×
591

592
    def refresh_credentials(self):
1✔
593
        with AwsCredsEntryPoint(self, override_entrypoint=""):
1✔
594
            if exit_code := self._start('echo "Done."'):
1✔
595
                return exit_code
1✔
596

597
    def start(self, command, *arguments):
1✔
598
        with AwsCredsEntryPoint(self, self.entrypoint):
×
599
            return self._start(command, *arguments)
×
600

601
    def start_in_layer(self, command, *arguments):
1✔
602
        """Run a command that can only be performed in layer level."""
603
        self.paths.check_for_layer_location()
×
604

605
        return self.start(command, *arguments)
×
606

607
    def exec(self, command, *arguments):
1✔
608
        with AwsCredsEntryPoint(self):
×
609
            return self._exec(command, *arguments)
×
610

611
    # FIXME: we have a context manager for this now, remove this method later!
612
    def system_exec(self, command):
1✔
613
        """Momentarily override the container's default entrypoint. To run arbitrary commands and not only AWS CLI ones."""
614
        original_entrypoint = self.entrypoint
×
615
        self.entrypoint = ""
×
616
        exit_code, output = self._exec(command)
×
617

618
        self.entrypoint = original_entrypoint
×
619
        return exit_code, output
×
620

621
    def start_shell(self):
1✔
622
        """Launch a shell in the container."""
623
        if self.mfa_enabled or self.sso_enabled:
1✔
624
            self.paths.check_for_layer_location()
×
625

626
        with AwsCredsEntryPoint(self, override_entrypoint=""):
1✔
627
            self._start(self.SHELL)
1✔
628

629
    def set_backend_key(self, skip_validation=False):
1✔
630
        # Scenarios:
631
        #
632
        # scenario    |  s3 backend set   |  s3 key set  |  skip_validation  |  result
633
        # 0           |  false            |  false       |  false            |  fail
634
        # 1           |  false            |  false       |  true             |  ok
635
        # 2           |  true             |  false       |  false/true       |  set the key
636
        # 3           |  true             |  true        |  false/true       |  read the key
637
        try:
×
638
            config_tf_file = self.paths.cwd / "config.tf"
×
639
            config_tf = hcl2.loads(config_tf_file.read_text()) if config_tf_file.exists() else {}
×
640
            if (
×
641
                "terraform" in config_tf
642
                and "backend" in config_tf["terraform"][0]
643
                and "s3" in config_tf["terraform"][0]["backend"][0]
644
            ):
645
                if "key" in config_tf["terraform"][0]["backend"][0]["s3"]:
×
646
                    backend_key = config_tf["terraform"][0]["backend"][0]["s3"]["key"]
×
647
                    self._backend_key = backend_key
×
648
                else:
649
                    self._backend_key = (
×
650
                        f"{self.paths.cwd.relative_to(self.paths.root_dir).as_posix()}/terraform.tfstate".replace(
651
                            "/base-", "/"
652
                        ).replace("/tools-", "/")
653
                    )
654

655
                    in_container_file_path = (
×
656
                        f"{self.paths.guest_base_path}/{config_tf_file.relative_to(self.paths.root_dir).as_posix()}"
657
                    )
658
                    resp = self.system_exec(
×
659
                        "hcledit "
660
                        f"-f {in_container_file_path} -u"
661
                        f' attribute append terraform.backend.key "\\"{self._backend_key}\\""'
662
                    )
663
            else:
664
                if not skip_validation:
×
665
                    raise KeyError()
×
666
        except (KeyError, IndexError):
×
667
            logger.error(
×
668
                "[red]✘[/red] Malformed [bold]config.tf[/bold] file. Missing Terraform backend block. In some cases you may want to skip this check by using the --skip-validation flag, e.g. the first time you initialize a terraform-backend layer."
669
            )
670
            raise Exit(1)
×
671
        except Exception as e:
×
672
            logger.error("[red]✘[/red] Malformed [bold]config.tf[/bold] file. Unable to parse.")
×
673
            logger.debug(e)
×
674
            raise Exit(1)
×
675

676
    @property
1✔
677
    def backend_key(self):
1✔
678
        return self._backend_key
×
679

680
    @backend_key.setter
1✔
681
    def backend_key(self, backend_key):
1✔
682
        self._backend_key = backend_key
×
683

684

685
class TFautomvContainer(TerraformContainer):
1✔
686
    """Leverage Container tailored to run general commands."""
687

688
    TFAUTOMV_CLI_BINARY = "/usr/local/bin/tfautomv"
1✔
689

690
    def __init__(self, client):
1✔
691
        super().__init__(client)
×
692

693
        self.environment["TF_CLI_ARGS_init"] = " ".join(self.tf_default_args)
×
694
        self.environment["TF_CLI_ARGS_plan"] = " ".join(self.tf_default_args)
×
695

696
        self.entrypoint = self.TFAUTOMV_CLI_BINARY
×
697

698
        logger.debug(f"[bold cyan]Container configuration:[/bold cyan]\n{json.dumps(self.container_config, indent=2)}")
×
699

700
    def start(self, *arguments):
1✔
701
        with AwsCredsEntryPoint(self):
×
702
            return self._start("", *arguments)
×
703

704
    def start_in_layer(self, *arguments):
1✔
705
        """Run a command that can only be performed in layer level."""
706
        self.paths.check_for_layer_location()
×
707

708
        return self.start(*arguments)
×
709

710
    def exec(self, command, *arguments):
1✔
711
        with AwsCredsEntryPoint(self):
×
712
            return self._exec(command, *arguments)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc