• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 16820655284

07 Aug 2025 05:03PM UTC coverage: 86.841% (-0.05%) from 86.892%
16820655284

push

github

web-flow
CFNV2: support CDK bootstrap and deployment (#12967)

32 of 38 new or added lines in 5 files covered. (84.21%)

2013 existing lines in 125 files now uncovered.

66606 of 76699 relevant lines covered (86.84%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.13
/localstack-core/localstack/services/cloudformation/engine/transformers.py
1
import copy
1✔
2
import json
1✔
3
import logging
1✔
4
import os
1✔
5
import re
1✔
6
from collections.abc import Callable
1✔
7
from copy import deepcopy
1✔
8
from dataclasses import dataclass
1✔
9
from typing import Any
1✔
10

11
import boto3
1✔
12
from botocore.exceptions import ClientError
1✔
13
from samtranslator.translator.transform import transform as transform_sam
1✔
14

15
from localstack.aws.api import CommonServiceException
1✔
16
from localstack.aws.connect import connect_to
1✔
17
from localstack.services.cloudformation.engine.policy_loader import create_policy_loader
1✔
18
from localstack.services.cloudformation.engine.template_deployer import resolve_refs_recursively
1✔
19
from localstack.services.cloudformation.engine.validations import ValidationError
1✔
20
from localstack.services.cloudformation.stores import get_cloudformation_store
1✔
21
from localstack.utils import testutil
1✔
22
from localstack.utils.objects import recurse_object
1✔
23
from localstack.utils.strings import long_uid
1✔
24

25
LOG = logging.getLogger(__name__)
1✔
26

27
SERVERLESS_TRANSFORM = "AWS::Serverless-2016-10-31"
1✔
28
EXTENSIONS_TRANSFORM = "AWS::LanguageExtensions"
1✔
29
SECRETSMANAGER_TRANSFORM = "AWS::SecretsManager-2020-07-23"
1✔
30

31
TransformResult = dict | str
1✔
32

33

34
@dataclass
1✔
35
class ResolveRefsRecursivelyContext:
1✔
36
    account_id: str
1✔
37
    region_name: str
1✔
38
    stack_name: str
1✔
39
    resources: dict
1✔
40
    mappings: dict
1✔
41
    conditions: dict
1✔
42
    parameters: dict
1✔
43

44
    def resolve(self, value: Any) -> Any:
1✔
45
        return resolve_refs_recursively(
1✔
46
            self.account_id,
47
            self.region_name,
48
            self.stack_name,
49
            self.resources,
50
            self.mappings,
51
            self.conditions,
52
            self.parameters,
53
            value,
54
        )
55

56

57
class Transformer:
1✔
58
    """Abstract class for Fn::Transform intrinsic functions"""
59

60
    def transform(self, account_id: str, region_name: str, parameters: dict) -> TransformResult:
1✔
61
        """Apply the transformer to the given parameters and return the modified construct"""
62

63

64
class AwsIncludeTransformer(Transformer):
1✔
65
    """Implements the 'AWS::Include' transform intrinsic function"""
66

67
    def transform(self, account_id: str, region_name: str, parameters: dict) -> TransformResult:
1✔
68
        from localstack.services.cloudformation.engine.template_preparer import parse_template
1✔
69

70
        location = parameters.get("Location")
1✔
71
        if location and location.startswith("s3://"):
1✔
72
            s3_client = connect_to(aws_access_key_id=account_id, region_name=region_name).s3
1✔
73
            bucket, _, path = location.removeprefix("s3://").partition("/")
1✔
74
            try:
1✔
75
                content = testutil.download_s3_object(s3_client, bucket, path)
1✔
76
            except ClientError:
×
77
                LOG.error("client error downloading S3 object '%s/%s'", bucket, path)
×
UNCOV
78
                raise
×
79
            content = parse_template(content)
1✔
80
            return content
1✔
81
        else:
82
            LOG.warning("Unexpected Location parameter for AWS::Include transformer: %s", location)
×
UNCOV
83
        return parameters
×
84

85

86
# maps transformer names to implementing classes
87
transformers: dict[str, type] = {"AWS::Include": AwsIncludeTransformer}
1✔
88

89

90
def apply_intrinsic_transformations(
1✔
91
    account_id: str,
92
    region_name: str,
93
    template: dict,
94
    stack_name: str,
95
    resources: dict,
96
    mappings: dict,
97
    conditions: dict[str, bool],
98
    stack_parameters: dict,
99
) -> dict:
100
    """Resolve constructs using the 'Fn::Transform' intrinsic function."""
101

102
    def _visit(obj, path, **_):
1✔
103
        if isinstance(obj, dict) and "Fn::Transform" in obj:
1✔
104
            transform = (
1✔
105
                obj["Fn::Transform"]
106
                if isinstance(obj["Fn::Transform"], dict)
107
                else {"Name": obj["Fn::Transform"]}
108
            )
109
            transform_name = transform.get("Name")
1✔
110
            transformer_class = transformers.get(transform_name)
1✔
111
            macro_store = get_cloudformation_store(account_id, region_name).macros
1✔
112
            parameters = transform.get("Parameters") or {}
1✔
113
            parameters = resolve_refs_recursively(
1✔
114
                account_id,
115
                region_name,
116
                stack_name,
117
                resources,
118
                mappings,
119
                conditions,
120
                stack_parameters,
121
                parameters,
122
            )
123
            if transformer_class:
1✔
124
                transformer = transformer_class()
1✔
125
                transformed = transformer.transform(account_id, region_name, parameters)
1✔
126
                obj_copy = deepcopy(obj)
1✔
127
                obj_copy.pop("Fn::Transform")
1✔
128
                obj_copy.update(transformed)
1✔
129
                return obj_copy
1✔
130

131
            elif transform_name in macro_store:
1✔
132
                obj_copy = deepcopy(obj)
1✔
133
                obj_copy.pop("Fn::Transform")
1✔
134
                result = execute_macro(
1✔
135
                    account_id, region_name, obj_copy, transform, stack_parameters, parameters, True
136
                )
137
                return result
1✔
138
            else:
UNCOV
139
                LOG.warning(
×
140
                    "Unsupported transform function '%s' used in %s", transform_name, stack_name
141
                )
142
        return obj
1✔
143

144
    return recurse_object(template, _visit)
1✔
145

146

147
def apply_global_transformations(
1✔
148
    account_id: str,
149
    region_name: str,
150
    template: dict,
151
    stack_name: str,
152
    resources: dict,
153
    mappings: dict,
154
    conditions: dict[str, bool],
155
    stack_parameters: dict,
156
) -> dict:
157
    processed_template = deepcopy(template)
1✔
158
    transformations = format_template_transformations_into_list(
1✔
159
        processed_template.get("Transform", [])
160
    )
161
    for transformation in transformations:
1✔
162
        transformation_parameters = resolve_refs_recursively(
1✔
163
            account_id,
164
            region_name,
165
            stack_name,
166
            resources,
167
            mappings,
168
            conditions,
169
            stack_parameters,
170
            transformation.get("Parameters", {}),
171
        )
172

173
        if not isinstance(transformation["Name"], str):
1✔
174
            # TODO this should be done during template validation
175
            raise CommonServiceException(
1✔
176
                code="ValidationError",
177
                status_code=400,
178
                message="Key Name of transform definition must be a string.",
179
                sender_fault=True,
180
            )
181
        elif transformation["Name"] == SERVERLESS_TRANSFORM:
1✔
182
            processed_template = apply_serverless_transformation(
1✔
183
                account_id, region_name, processed_template, stack_parameters
184
            )
185
        elif transformation["Name"] == EXTENSIONS_TRANSFORM:
1✔
186
            resolve_context = ResolveRefsRecursivelyContext(
1✔
187
                account_id,
188
                region_name,
189
                stack_name,
190
                resources,
191
                mappings,
192
                conditions,
193
                stack_parameters,
194
            )
195

196
            processed_template = apply_language_extensions_transform(
1✔
197
                processed_template,
198
                resolve_context,
199
            )
200
        elif transformation["Name"] == SECRETSMANAGER_TRANSFORM:
1✔
201
            # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/transform-aws-secretsmanager.html
UNCOV
202
            LOG.warning("%s is not yet supported. Ignoring.", SECRETSMANAGER_TRANSFORM)
×
203
        else:
204
            processed_template = execute_macro(
1✔
205
                account_id,
206
                region_name,
207
                parsed_template=template,
208
                macro=transformation,
209
                stack_parameters=stack_parameters,
210
                transformation_parameters=transformation_parameters,
211
            )
212

213
    return processed_template
1✔
214

215

216
def format_template_transformations_into_list(transforms: list | dict | str) -> list[dict]:
1✔
217
    """
218
    The value of the Transform attribute can be:
219
     - a transformation name
220
     - an object like {Name: transformation, Parameters:{}}
221
     - a list a list of names of the transformations to apply
222
     - a list of objects defining a transformation
223
     so the objective of this function is to normalize the list of transformations to apply into a list of transformation objects
224
    """
225
    formatted_transformations = []
1✔
226
    if isinstance(transforms, str):
1✔
227
        formatted_transformations.append({"Name": transforms})
1✔
228

229
    if isinstance(transforms, dict):
1✔
230
        formatted_transformations.append(transforms)
1✔
231

232
    if isinstance(transforms, list):
1✔
233
        for transformation in transforms:
1✔
234
            if isinstance(transformation, str):
1✔
235
                formatted_transformations.append({"Name": transformation})
1✔
236
            if isinstance(transformation, dict):
1✔
237
                formatted_transformations.append(transformation)
1✔
238

239
    return formatted_transformations
1✔
240

241

242
def execute_macro(
1✔
243
    account_id: str,
244
    region_name: str,
245
    parsed_template: dict,
246
    macro: dict,
247
    stack_parameters: dict,
248
    transformation_parameters: dict,
249
    is_intrinsic=False,
250
) -> str:
251
    macro_definition = get_cloudformation_store(account_id, region_name).macros.get(macro["Name"])
1✔
252
    if not macro_definition:
1✔
UNCOV
253
        raise FailedTransformationException(
×
254
            macro["Name"], f"Transformation {macro['Name']} is not supported."
255
        )
256

257
    formatted_stack_parameters = {}
1✔
258
    for key, value in stack_parameters.items():
1✔
259
        # TODO: we want to support other types of parameters
260
        if value.get("ParameterType") == "CommaDelimitedList":
1✔
261
            formatted_stack_parameters[key] = value.get("ParameterValue").split(",")
1✔
262
        else:
263
            formatted_stack_parameters[key] = value.get("ParameterValue")
1✔
264

265
    transformation_id = f"{account_id}::{macro['Name']}"
1✔
266
    event = {
1✔
267
        "region": region_name,
268
        "accountId": account_id,
269
        "fragment": parsed_template,
270
        "transformId": transformation_id,
271
        "params": transformation_parameters,
272
        "requestId": long_uid(),
273
        "templateParameterValues": formatted_stack_parameters,
274
    }
275

276
    client = connect_to(aws_access_key_id=account_id, region_name=region_name).lambda_
1✔
277
    try:
1✔
278
        invocation = client.invoke(
1✔
279
            FunctionName=macro_definition["FunctionName"], Payload=json.dumps(event)
280
        )
281
    except ClientError:
×
UNCOV
282
        LOG.error(
×
283
            "client error executing lambda function '%s' with payload '%s'",
284
            macro_definition["FunctionName"],
285
            json.dumps(event),
286
        )
UNCOV
287
        raise
×
288
    if invocation.get("StatusCode") != 200 or invocation.get("FunctionError") == "Unhandled":
1✔
289
        raise FailedTransformationException(
1✔
290
            transformation=macro["Name"],
291
            message=f"Received malformed response from transform {transformation_id}. Rollback requested by user.",
292
        )
293
    result = json.loads(invocation["Payload"].read())
1✔
294

295
    if result.get("status") != "success":
1✔
296
        error_message = result.get("errorMessage")
1✔
297
        message = (
1✔
298
            f"Transform {transformation_id} failed with: {error_message}. Rollback requested by user."
299
            if error_message
300
            else f"Transform {transformation_id} failed without an error message.. Rollback requested by user."
301
        )
302
        raise FailedTransformationException(transformation=macro["Name"], message=message)
1✔
303

304
    if not isinstance(result.get("fragment"), dict) and not is_intrinsic:
1✔
305
        raise FailedTransformationException(
1✔
306
            transformation=macro["Name"],
307
            message="Template format error: unsupported structure.. Rollback requested by user.",
308
        )
309

310
    return result.get("fragment")
1✔
311

312

313
def apply_language_extensions_transform(
1✔
314
    template: dict,
315
    resolve_context: ResolveRefsRecursivelyContext,
316
) -> dict:
317
    """
318
    Resolve language extensions constructs
319
    """
320

321
    def _visit(obj, path, **_):
1✔
322
        # Fn::ForEach
323
        # TODO: can this be used in non-resource positions?
324
        if isinstance(obj, dict) and any("Fn::ForEach" in key for key in obj):
1✔
325
            newobj = {}
1✔
326
            for key in obj:
1✔
327
                if "Fn::ForEach" not in key:
1✔
328
                    newobj[key] = obj[key]
1✔
329
                    continue
1✔
330

331
                new_entries = expand_fn_foreach(obj[key], resolve_context)
1✔
332
                newobj.update(**new_entries)
1✔
333
            return newobj
1✔
334
        # Fn::Length
335
        elif isinstance(obj, dict) and "Fn::Length" in obj:
1✔
336
            value = obj["Fn::Length"]
1✔
337
            if isinstance(value, dict):
1✔
338
                value = resolve_context.resolve(value)
1✔
339

340
            if isinstance(value, list):
1✔
341
                # TODO: what if one of the elements was AWS::NoValue?
342
                # no conversion required
343
                return len(value)
1✔
344
            elif isinstance(value, str):
×
345
                length = len(value.split(","))
×
346
                return length
×
UNCOV
347
            return obj
×
348
        elif isinstance(obj, dict) and "Fn::ToJsonString" in obj:
1✔
349
            # TODO: is the default representation ok here?
350
            return json.dumps(obj["Fn::ToJsonString"], default=str, separators=(",", ":"))
1✔
351

352
            # reference
353
        return obj
1✔
354

355
    return recurse_object(template, _visit)
1✔
356

357

358
def expand_fn_foreach(
1✔
359
    foreach_defn: list,
360
    resolve_context: ResolveRefsRecursivelyContext,
361
    extra_replace_mapping: dict | None = None,
362
) -> dict:
363
    if len(foreach_defn) != 3:
1✔
UNCOV
364
        raise ValidationError(
×
365
            f"Fn::ForEach: invalid number of arguments, expected 3 got {len(foreach_defn)}"
366
        )
367
    output = {}
1✔
368
    iteration_name, iteration_value, template = foreach_defn
1✔
369
    if not isinstance(iteration_name, str):
1✔
UNCOV
370
        raise ValidationError(
×
371
            f"Fn::ForEach: incorrect type for iteration name '{iteration_name}', expected str"
372
        )
373
    if isinstance(iteration_value, dict):
1✔
374
        # we have a reference
375
        if "Ref" in iteration_value:
1✔
376
            iteration_value = resolve_context.resolve(iteration_value)
1✔
377
        else:
378
            raise NotImplementedError(
379
                f"Fn::Transform: intrinsic {iteration_value} not supported in this position yet"
380
            )
381
    if not isinstance(iteration_value, list):
1✔
UNCOV
382
        raise ValidationError(
×
383
            f"Fn::ForEach: incorrect type for iteration variables '{iteration_value}', expected list"
384
        )
385

386
    if not isinstance(template, dict):
1✔
UNCOV
387
        raise ValidationError(
×
388
            f"Fn::ForEach: incorrect type for template '{template}', expected dict"
389
        )
390

391
    # TODO: locations other than resources
392
    replace_template_value = "${" + iteration_name + "}"
1✔
393
    for variable in iteration_value:
1✔
394
        # there might be multiple children, which could themselves be a `Fn::ForEach` call
395
        for logical_resource_id_template in template:
1✔
396
            if logical_resource_id_template.startswith("Fn::ForEach"):
1✔
397
                result = expand_fn_foreach(
1✔
398
                    template[logical_resource_id_template],
399
                    resolve_context,
400
                    {iteration_name: variable},
401
                )
402
                output.update(**result)
1✔
403
                continue
1✔
404

405
            if replace_template_value not in logical_resource_id_template:
1✔
UNCOV
406
                raise ValidationError("Fn::ForEach: no placeholder in logical resource id")
×
407

408
            def gen_visit(variable: str) -> Callable:
1✔
409
                def _visit(obj: Any, path: Any):
1✔
410
                    if isinstance(obj, dict) and "Ref" in obj:
1✔
411
                        ref_variable = obj["Ref"]
1✔
412
                        if ref_variable == iteration_name:
1✔
413
                            return variable
1✔
414
                    elif isinstance(obj, dict) and "Fn::Sub" in obj:
1✔
415
                        arguments = recurse_object(obj["Fn::Sub"], _visit)
1✔
416
                        if isinstance(arguments, str):
1✔
417
                            # simple case
418
                            # TODO: can this reference anything outside of the template?
419
                            result = arguments
1✔
420
                            variables_found = re.findall("\\${([^}]+)}", arguments)
1✔
421
                            for var in variables_found:
1✔
422
                                if var == iteration_name:
1✔
423
                                    result = result.replace(f"${{{var}}}", variable)
1✔
424
                            return result
1✔
425
                        else:
426
                            raise NotImplementedError
427
                    elif isinstance(obj, dict) and "Fn::Join" in obj:
1✔
428
                        # first visit arguments
429
                        arguments = recurse_object(
1✔
430
                            obj["Fn::Join"],
431
                            _visit,
432
                        )
433
                        separator, items = arguments
1✔
434
                        return separator.join(items)
1✔
435
                    return obj
1✔
436

437
                return _visit
1✔
438

439
            logical_resource_id = logical_resource_id_template.replace(
1✔
440
                replace_template_value, variable
441
            )
442
            for key, value in (extra_replace_mapping or {}).items():
1✔
443
                logical_resource_id = logical_resource_id.replace("${" + key + "}", value)
1✔
444
            resource_body = copy.deepcopy(template[logical_resource_id_template])
1✔
445
            body = recurse_object(resource_body, gen_visit(variable))
1✔
446
            output[logical_resource_id] = body
1✔
447

448
    return output
1✔
449

450

451
def apply_serverless_transformation(
1✔
452
    account_id: str, region_name: str, parsed_template: dict, template_parameters: dict
453
) -> str | None:
454
    """only returns string when parsing SAM template, otherwise None"""
455
    # TODO: we might also want to override the access key ID to account ID
456
    region_before = os.environ.get("AWS_DEFAULT_REGION")
1✔
457
    if boto3.session.Session().region_name is None:
1✔
458
        os.environ["AWS_DEFAULT_REGION"] = region_name
1✔
459
    loader = create_policy_loader()
1✔
460
    simplified_parameters = {
1✔
461
        k: v.get("ResolvedValue") or v["ParameterValue"] for k, v in template_parameters.items()
462
    }
463

464
    try:
1✔
465
        transformed = transform_sam(parsed_template, simplified_parameters, loader)
1✔
466
        return transformed
1✔
467
    except Exception as e:
×
UNCOV
468
        raise FailedTransformationException(transformation=SERVERLESS_TRANSFORM, message=str(e))
×
469
    finally:
470
        # Note: we need to fix boto3 region, otherwise AWS SAM transformer fails
471
        os.environ.pop("AWS_DEFAULT_REGION", None)
1✔
472
        if region_before is not None:
1✔
UNCOV
473
            os.environ["AWS_DEFAULT_REGION"] = region_before
×
474

475

476
class FailedTransformationException(Exception):
1✔
477
    transformation: str
1✔
478
    msg: str
1✔
479

480
    def __init__(self, transformation: str, message: str = ""):
1✔
481
        self.transformation = transformation
1✔
482
        self.message = message
1✔
483
        super().__init__(self.message)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc