• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

binbashar / leverage / 13227561880

09 Feb 2025 04:26PM UTC coverage: 60.209% (+0.04%) from 60.173%
13227561880

Pull #296

github

Franr
fix test
Pull Request #296: BL-294 | Catch and handle HCL parsing errors

184 of 448 branches covered (41.07%)

Branch coverage included in aggregate %.

14 of 18 new or added lines in 3 files covered. (77.78%)

183 existing lines in 10 files now uncovered.

2464 of 3950 relevant lines covered (62.38%)

0.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

36.84
/leverage/modules/terraform.py
1
import re
1✔
2
from pathlib import Path
1✔
3
from typing import Sequence
1✔
4

5
import click
1✔
6
from click.exceptions import Exit
1✔
7

8
from leverage import logger
1✔
9
from leverage._internals import pass_container, pass_state
1✔
10
from leverage._utils import ExitError, parse_tf_file
1✔
11
from leverage.container import TerraformContainer
1✔
12
from leverage.container import get_docker_client
1✔
13
from leverage.modules.utils import env_var_option, mount_option, auth_mfa, auth_sso
1✔
14

15
REGION = (
1✔
16
    r"global|(?:[a-z]{2}-(?:gov-)?"
17
    r"(?:central|north|south|east|west|northeast|northwest|southeast|southwest|secret|topsecret)-[1-4])"
18
)
19

20

21
# ###########################################################################
22
# CREATE THE TERRAFORM GROUP
23
# ###########################################################################
24
@click.group()
1✔
25
@mount_option
1✔
26
@env_var_option
1✔
27
@pass_state
1✔
28
def terraform(state, env_var, mount):
1✔
29
    """Run Terraform commands in a custom containerized environment that provides extra functionality when interacting
30
    with your cloud provider such as handling multi factor authentication for you.
31
    All terraform subcommands that receive extra args will pass the given strings as is to their corresponding Terraform
32
    counterparts in the container. For example as in `leverage terraform apply -auto-approve` or
33
    `leverage terraform init -reconfigure`
34
    """
35
    if env_var:
×
36
        env_var = dict(env_var)
×
37

38
    state.container = TerraformContainer(get_docker_client(), mounts=mount, env_vars=env_var)
×
39
    state.container.ensure_image()
×
40

41

42
CONTEXT_SETTINGS = {"ignore_unknown_options": True}
1✔
43

44
# ###########################################################################
45
# CREATE THE TERRAFORM GROUP'S COMMANDS
46
# ###########################################################################
47
#
48
# --layers is a ordered comma separated list of layer names
49
# The layer names are the relative paths of those layers relative to the current directory
50
# e.g. if CLI is called from /home/user/project/management and this is the tree:
51
# home
52
# ├── user
53
# │   └── project
54
# │       └── management
55
# │           ├── global
56
# │           |   └── security-base
57
# │           |   └── sso
58
# │           └── us-east-1
59
# │               └── terraform-backend
60
#
61
# Then all three layers can be initialized as follows:
62
# leverage tf init --layers us-east-1/terraform-backend,global/security-base,global/sso
63
#
64
# It is an ordered list because the layers will be visited in the same order they were
65
# supplied.
66
#
67
layers_option = click.option(
1✔
68
    "--layers",
69
    type=str,
70
    default="",
71
    help="Layers to apply the action to. (an ordered, comma-separated list of layer names)",
72
)
73

74

75
@terraform.command(context_settings=CONTEXT_SETTINGS)
1✔
76
@click.option("--skip-validation", is_flag=True, help="Skip layout validation.")
1✔
77
@layers_option
1✔
78
@click.argument("args", nargs=-1)
1✔
79
@pass_container
1✔
80
@click.pass_context
1✔
81
def init(context, tf: TerraformContainer, skip_validation, layers, args):
1✔
82
    """
83
    Initialize this layer.
84
    """
85
    invoke_for_all_commands(layers, _init, args, skip_validation)
×
86

87

88
@terraform.command(context_settings=CONTEXT_SETTINGS)
1✔
89
@layers_option
1✔
90
@click.argument("args", nargs=-1)
1✔
91
@pass_container
1✔
92
@click.pass_context
1✔
93
def plan(context, tf, layers, args):
1✔
94
    """Generate an execution plan for this layer."""
95
    invoke_for_all_commands(layers, _plan, args)
×
96

97

98
@terraform.command(context_settings=CONTEXT_SETTINGS)
1✔
99
@layers_option
1✔
100
@click.argument("args", nargs=-1)
1✔
101
@pass_container
1✔
102
@click.pass_context
1✔
103
def apply(context, tf, layers, args):
1✔
104
    """Build or change the infrastructure in this layer."""
105
    invoke_for_all_commands(layers, _apply, args)
×
106

107

108
@terraform.command(context_settings=CONTEXT_SETTINGS)
1✔
109
@layers_option
1✔
110
@click.argument("args", nargs=-1)
1✔
111
@pass_container
1✔
112
@click.pass_context
1✔
113
def output(context, tf, layers, args):
1✔
114
    """Show all output variables of this layer."""
115
    invoke_for_all_commands(layers, _output, args)
×
116

117

118
@terraform.command(context_settings=CONTEXT_SETTINGS)
1✔
119
@layers_option
1✔
120
@click.argument("args", nargs=-1)
1✔
121
@pass_container
1✔
122
@click.pass_context
1✔
123
def destroy(context, tf, layers, args):
1✔
124
    """Destroy infrastructure in this layer."""
125
    invoke_for_all_commands(layers, _destroy, args)
×
126

127

128
@terraform.command()
1✔
129
@pass_container
1✔
130
def version(tf):
1✔
131
    """Print version."""
132
    tf.disable_authentication()
×
133
    tf.start("version")
×
134

135

136
@terraform.command()
1✔
137
@auth_mfa
1✔
138
@auth_sso
1✔
139
@pass_container
1✔
140
def shell(tf, mfa, sso):
1✔
141
    """Open a shell into the Terraform container in this layer."""
142
    tf.disable_authentication()
×
143
    if sso:
×
144
        tf.enable_sso()
×
145

146
    if mfa:
×
147
        tf.enable_mfa()
×
148

149
    tf.start_shell()
×
150

151

152
@terraform.command("format", context_settings=CONTEXT_SETTINGS)
1✔
153
@click.argument("args", nargs=-1)
1✔
154
@pass_container
1✔
155
def _format(tf, args):
1✔
156
    """Check if all files meet the canonical format and rewrite them accordingly."""
157
    args = args if "-recursive" in args else (*args, "-recursive")
×
158
    tf.disable_authentication()
×
159
    tf.start("fmt", *args)
×
160

161

162
@terraform.command()
1✔
163
@pass_container
1✔
164
def validate(tf):
1✔
165
    """Validate code of the current directory. Previous initialization might be needed."""
166
    tf.disable_authentication()
×
167
    tf.start("validate")
×
168

169

170
@terraform.command("validate-layout")
1✔
171
@pass_container
1✔
172
def validate_layout(tf):
1✔
173
    """Validate layer conforms to Leverage convention."""
174
    tf.set_backend_key()
×
175
    return _validate_layout()
×
176

177

178
@terraform.command("import")
1✔
179
@click.argument("address")
1✔
180
@click.argument("_id", metavar="ID")
1✔
181
@pass_container
1✔
182
def _import(tf, address, _id):
1✔
183
    """Import a resource."""
184
    exit_code = tf.start_in_layer("import", *tf.tf_default_args, address, _id)
×
185

186
    if exit_code:
×
187
        raise Exit(exit_code)
×
188

189

190
@terraform.command("refresh-credentials")
1✔
191
@pass_container
1✔
192
def refresh_credentials(tf):
1✔
193
    """Refresh the AWS credentials used on the current layer."""
194
    tf.paths.check_for_layer_location()
×
195
    if exit_code := tf.refresh_credentials():
×
196
        raise Exit(exit_code)
×
197

198

199
# ###########################################################################
200
# HANDLER FOR MANAGING THE BASE COMMANDS (init, plan, apply, destroy, output)
201
# ###########################################################################
202
@pass_container
1✔
203
def invoke_for_all_commands(tf, layers, command, args, skip_validation=True):
1✔
204
    """
205
    Invoke helper for "all" commands.
206

207
    Args:
208
        layers: comma separated value of relative layer path
209
            e.g.: global/security_audit,us-east-1/tf-backend
210
        command: init, plan, apply
211
    """
212

213
    # convert layers from string to list
214
    layers = layers.split(",") if len(layers) > 0 else []
×
215

216
    # based on the location type manage the layers parameter
217
    location_type = tf.paths.get_location_type()
×
218
    if location_type == "layer" and len(layers) == 0:
×
219
        # running on a layer
220
        layers = [tf.paths.cwd]
×
221
    elif location_type == "layer":
×
222
        # running on a layer but --layers was set
223
        raise ExitError(1, "Can not set [bold]--layers[/bold] inside a layer.")
×
224
    elif location_type in ["account", "layers-group"] and len(layers) == 0:
×
225
        # running on an account but --layers was not set
226
        raise ExitError(1, "[bold]--layers[/bold] has to be set.")
×
227
    elif location_type not in ["account", "layer", "layers-group"]:
×
228
        # running outside a layer and account
229
        raise ExitError(1, "This command has to be run inside a layer or account directory.")
×
230
    else:
231
        # running on an account with --layers set
232
        layers = [tf.paths.cwd / x for x in layers]
×
233

234
    # get current location
235
    original_location = tf.paths.cwd
×
236
    original_working_dir = tf.container_config["working_dir"]
×
237

238
    # validate each layer before calling the execute command
239
    for layer in layers:
×
240
        logger.debug(f"Checking for layer {layer}...")
×
241
        # change to current dir and set it in the container
242
        tf.paths.cwd = layer
×
243

244
        # check layers existence
245
        if not layer.is_dir():
×
246
            logger.error(f"Directory [red]{layer}[/red] does not exist or is not a directory\n")
×
247
            raise Exit(1)
×
248

249
        # set the s3 key
250
        tf.set_backend_key(skip_validation)
×
251

252
        # validate layer
253
        validate_for_all_commands(layer, skip_validation=skip_validation)
×
254

255
        # change to original dir and set it in the container
256
        tf.paths.cwd = original_location
×
257

258
    # check layers existence
259
    for layer in layers:
×
260
        if len(layers) > 1:
×
261
            logger.info(f"Invoking command for layer {layer}...")
×
262

263
        # change to current dir and set it in the container
264
        tf.paths.cwd = layer
×
265

266
        # set the working dir
267
        working_dir = f"{tf.paths.guest_base_path}/{tf.paths.cwd.relative_to(tf.paths.root_dir).as_posix()}"
×
268
        tf.container_config["working_dir"] = working_dir
×
269

270
        # execute the actual command
271
        command(args=args)
×
272

273
        # change to original dir and set it in the container
274
        tf.paths.cwd = original_location
×
275

276
        # change to original working dir
277
        tf.container_config["working_dir"] = original_working_dir
×
278

279
    return layers
×
280

281

282
def validate_for_all_commands(layer, skip_validation=False):
1✔
283
    """
284
    Validate existence of layer and, if set, all the Leverage related stuff
285
    of each of them
286

287
    Args:
288
        layer: a full layer directory
289
    """
290

291
    logger.debug(f"Checking layer {layer}...")
×
292
    if not skip_validation and not _validate_layout():
×
293
        logger.error(
×
294
            "Layer configuration doesn't seem to be valid. Exiting.\n"
295
            "If you are sure your configuration is actually correct "
296
            "you may skip this validation using the --skip-validation flag."
297
        )
298
        raise Exit(1)
×
299

300

301
# ###########################################################################
302
# BASE COMMAND EXECUTORS
303
# ###########################################################################
304
@pass_container
1✔
305
def _init(tf, args):
1✔
306
    """Initialize this layer."""
307

308
    args = [
1✔
309
        arg
310
        for index, arg in enumerate(args)
311
        if not arg.startswith("-backend-config") or not arg[index - 1] == "-backend-config"
312
    ]
313
    args.append(f"-backend-config={tf.paths.backend_tfvars}")
1✔
314

315
    tf.paths.check_for_layer_location()
1✔
316

317
    exit_code = tf.start_in_layer("init", *args)
1✔
318
    if exit_code:
1✔
319
        raise Exit(exit_code)
×
320

321

322
@pass_container
1✔
323
def _plan(tf, args):
1✔
324
    """Generate an execution plan for this layer."""
325
    exit_code = tf.start_in_layer("plan", *tf.tf_default_args, *args)
×
326

327
    if exit_code:
×
328
        raise Exit(exit_code)
×
329

330

331
def has_a_plan_file(args: Sequence[str]) -> bool:
1✔
332
    """Determine whether the list of arguments has a plan file at the end.
333

334
    Terraform apply arguments have the form "-target ADDRESS" or "-target=ADDRESS"
335
    in one case "-var 'NAME=value'" or "-var='NAME=value'". There are also flags
336
    with the form "-flag".
337
    We just need to know if there is or not a plan file as a last argument to
338
    decide if we prepend our default terraform arguments or not.
339

340
    Cases to consider:
341
     Args                                | Plan file present
342
    -------------------------------------|-------------------
343
     ()                                  | False
344
     ("-flag")                           | False
345
     ("-var=value")                      | False
346
     ("plan_file")                       | True
347
     (..., "-var", "value")              | False
348
     (..., "-flag", "plan_file")         | True
349
     (..., "-var=value", "plan_file")    | True
350
     (..., "-var", "value", "plan_file") | True
351

352
    """
353

354
    # Valid 'terraform apply' flags:
355
    # https://developer.hashicorp.com/terraform/cli/commands/apply
356
    tf_flags = [
1✔
357
        "-destroy",
358
        "-refresh-only",
359
        "-detailed-exitcode",
360
        "-auto-approve",
361
        "-compact-warnings",
362
        "-json",
363
        "-no-color",
364
    ]
365

366
    if not args or args[-1].startswith("-"):
1✔
367
        return False
1✔
368

369
    if len(args) > 1:
1✔
370
        second_last = args[-2]
1✔
371
        if second_last.startswith("-"):
1✔
372
            if not "=" in second_last and second_last not in tf_flags:
1✔
373
                return False
1✔
374

375
    return True
1✔
376

377

378
@pass_container
1✔
379
def _apply(tf, args: Sequence[str]) -> None:
1✔
380
    """Build or change the infrastructure in this layer."""
381
    default_args = [] if has_a_plan_file(args) else tf.tf_default_args
×
382
    logger.debug(f"Default args passed to apply command: {default_args}")
×
383

384
    exit_code = tf.start_in_layer("apply", *default_args, *args)
×
385

386
    if exit_code:
×
387
        logger.error(f"Command execution failed with exit code: {exit_code}")
×
388
        raise Exit(exit_code)
×
389

390

391
@pass_container
1✔
392
def _output(tf, args):
1✔
393
    """Show all output variables of this layer."""
394
    tf.start_in_layer("output", *args)
×
395

396

397
@pass_container
1✔
398
def _destroy(tf, args):
1✔
399
    """Destroy infrastructure in this layer."""
400
    exit_code = tf.start_in_layer("destroy", *tf.tf_default_args, *args)
×
401

402
    if exit_code:
×
403
        raise Exit(exit_code)
×
404

405

406
# ###########################################################################
407
# MISC FUNCTIONS
408
# ###########################################################################
409
def _make_layer_backend_key(cwd, account_dir, account_name):
1✔
410
    """Create expected backend key.
411

412
    Args:
413
        cwd (pathlib.Path): Current Working Directory (Layer Directory)
414
        account_dir (pathlib.Path): Account Directory
415
        account_name (str): Account Name
416

417
    Returns:
418
        list of lists: Backend bucket key parts
419
    """
420
    resp = []
×
421

422
    layer_path = cwd.relative_to(account_dir)
×
423
    layer_path = layer_path.as_posix().split("/")
×
424
    # Check region directory to keep retro compat
425
    if re.match(REGION, layer_path[0]):
×
426
        layer_paths = [layer_path[1:], layer_path]
×
427
    else:
428
        layer_paths = [layer_path]
×
429

430
    curated_layer_paths = []
×
431
    # Remove layer name prefix
432
    for layer_path in layer_paths:
×
433
        curated_layer_path = []
×
434
        for lp in layer_path:
×
435
            if lp.startswith("base-"):
×
436
                lp = lp.replace("base-", "")
×
437
            elif lp.startswith("tools-"):
×
438
                lp = lp.replace("tools-", "")
×
439
            curated_layer_path.append(lp)
×
440
        curated_layer_paths.append(curated_layer_path)
×
441

442
    curated_layer_paths_retrocomp = []
×
443
    for layer_path in curated_layer_paths:
×
444
        curated_layer_paths_retrocomp.append(layer_path)
×
445
        # check for tf/terraform variants
446
        for idx, lp in enumerate(layer_path):
×
447
            if lp.startswith("tf-"):
×
448
                layer_path_tmp = layer_path.copy()
×
449
                layer_path_tmp[idx] = layer_path_tmp[idx].replace("tf-", "terraform-")
×
450
                curated_layer_paths_retrocomp.append(layer_path_tmp)
×
451
                break
×
452
            elif lp.startswith("terraform-"):
×
453
                layer_path_tmp = layer_path.copy()
×
454
                layer_path_tmp[idx] = layer_path_tmp[idx].replace("terraform-", "tf-")
×
455
                curated_layer_paths_retrocomp.append(layer_path_tmp)
×
456
                break
×
457

458
    curated_layer_paths_withDR = []
×
459
    for layer_path in curated_layer_paths_retrocomp:
×
460
        curated_layer_paths_withDR.append(layer_path)
×
461
        curated_layer_path = []
×
462
        append_str = "-dr"
×
463
        for lp in layer_path:
×
464
            if re.match(REGION, lp):
×
465
                curated_layer_path.append(lp)
×
466
            else:
467
                curated_layer_path.append(lp + append_str)
×
468
                append_str = ""
×
469
        curated_layer_paths_withDR.append(curated_layer_path)
×
470

471
    for layer_path in curated_layer_paths_withDR:
×
472
        resp.append([account_name, *layer_path])
×
473

474
    return resp
×
475

476

477
@pass_container
1✔
478
def _validate_layout(tf: TerraformContainer):
1✔
479
    tf.paths.check_for_layer_location()
×
480

481
    # Check for `environment = <account name>` in account.tfvars
482
    account_name = tf.paths.account_conf.get("environment")
×
483
    logger.info("Checking environment name definition in [bold]account.tfvars[/bold]...")
×
484
    if account_name is None:
×
485
        logger.error("[red]✘ FAILED[/red]\n")
×
486
        raise Exit(1)
×
487
    logger.info("[green]✔ OK[/green]\n")
×
488

489
    # Check if account directory name matches with environment name
490
    if tf.paths.account_dir.stem != account_name:
×
491
        logger.warning(
×
492
            "[yellow]‼[/yellow] Account directory name does not match environment name.\n"
493
            f"  Expected [bold]{account_name}[/bold], found [bold]{tf.paths.account_dir.stem}[/bold]\n"
494
        )
495

496
    backend_key = tf.backend_key.split("/")
×
497

498
    # Flag to report layout validity
499
    valid_layout = True
×
500

501
    # Check backend bucket key
502
    expected_backend_keys = _make_layer_backend_key(tf.paths.cwd, tf.paths.account_dir, account_name)
×
503
    logger.info("Checking backend key...")
×
504
    logger.info(f"Found: '{'/'.join(backend_key)}'")
×
505
    backend_key = backend_key[:-1]
×
506

507
    if backend_key in expected_backend_keys:
×
508
        logger.info("[green]✔ OK[/green]\n")
×
509
    else:
510
        exp_message = [f"{'/'.join(x)}/terraform.tfstate" for x in expected_backend_keys]
×
511
        logger.info(f"Expected one of: {';'.join(exp_message)}")
×
512
        logger.error("[red]✘ FAILED[/red]\n")
×
513
        valid_layout = False
×
514

NEW
515
    backend_tfvars = Path(tf.paths.local_backend_tfvars)
×
NEW
516
    backend_tfvars = parse_tf_file(backend_tfvars) if backend_tfvars.exists() else {}
×
517

518
    logger.info("Checking [bold]backend.tfvars[/bold]:\n")
×
519
    names_prefix = f"{tf.project}-{account_name}"
×
520
    names_prefix_bootstrap = f"{tf.project}-bootstrap"
×
521

522
    # Check profile, bucket and dynamo table names:
523
    for field in ("profile", "bucket", "dynamodb_table"):
×
524
        logger.info(f"Checking if {field.replace('_', ' ')} starts with {names_prefix}...")
×
525
        if backend_tfvars.get(field, "").startswith(names_prefix) or (
×
526
            field == "profile" and backend_tfvars.get(field, "").startswith(names_prefix_bootstrap)
527
        ):
528
            logger.info("[green]✔ OK[/green]\n")
×
529
        else:
530
            logger.error("[red]✘ FAILED[/red]\n")
×
531
            valid_layout = False
×
532

533
    return valid_layout
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc