• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20565403496

29 Dec 2025 05:11AM UTC coverage: 84.103% (-2.8%) from 86.921%
20565403496

Pull #13567

github

web-flow
Merge 4816837a5 into 2417384aa
Pull Request #13567: Update ASF APIs

67166 of 79862 relevant lines covered (84.1%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.91
/localstack-core/localstack/services/cloudformation/engine/transformers.py
1
import copy
1✔
2
import json
1✔
3
import logging
1✔
4
import os
1✔
5
import re
1✔
6
from collections.abc import Callable
1✔
7
from copy import deepcopy
1✔
8
from dataclasses import dataclass
1✔
9
from typing import Any
1✔
10

11
import boto3
1✔
12
from botocore.exceptions import ClientError
1✔
13
from samtranslator.translator.transform import transform as transform_sam
1✔
14

15
from localstack.aws.api import CommonServiceException
1✔
16
from localstack.aws.connect import connect_to
1✔
17
from localstack.services.cloudformation.engine.parameters import StackParameter
1✔
18
from localstack.services.cloudformation.engine.policy_loader import create_policy_loader
1✔
19
from localstack.services.cloudformation.engine.template_deployer import resolve_refs_recursively
1✔
20
from localstack.services.cloudformation.engine.validations import ValidationError
1✔
21
from localstack.services.cloudformation.stores import get_cloudformation_store
1✔
22
from localstack.utils import testutil
1✔
23
from localstack.utils.objects import recurse_object
1✔
24
from localstack.utils.strings import long_uid
1✔
25

26
LOG = logging.getLogger(__name__)
1✔
27

28
SERVERLESS_TRANSFORM = "AWS::Serverless-2016-10-31"
1✔
29
EXTENSIONS_TRANSFORM = "AWS::LanguageExtensions"
1✔
30
SECRETSMANAGER_TRANSFORM = "AWS::SecretsManager-2020-07-23"
1✔
31

32
TransformResult = dict | str
1✔
33

34

35
@dataclass
1✔
36
class ResolveRefsRecursivelyContext:
1✔
37
    account_id: str
1✔
38
    region_name: str
1✔
39
    stack_name: str
1✔
40
    resources: dict
1✔
41
    mappings: dict
1✔
42
    conditions: dict
1✔
43
    parameters: dict[str, StackParameter]
1✔
44

45
    def resolve(self, value: Any) -> Any:
1✔
46
        return resolve_refs_recursively(
1✔
47
            self.account_id,
48
            self.region_name,
49
            self.stack_name,
50
            self.resources,
51
            self.mappings,
52
            self.conditions,
53
            self.parameters,
54
            value,
55
        )
56

57

58
class Transformer:
1✔
59
    """Abstract class for Fn::Transform intrinsic functions"""
60

61
    def transform(self, account_id: str, region_name: str, parameters: dict) -> TransformResult:
1✔
62
        """Apply the transformer to the given parameters and return the modified construct"""
63

64

65
class AwsIncludeTransformer(Transformer):
1✔
66
    """Implements the 'AWS::Include' transform intrinsic function"""
67

68
    def transform(self, account_id: str, region_name: str, parameters: dict) -> TransformResult:
1✔
69
        from localstack.services.cloudformation.engine.template_preparer import parse_template
×
70

71
        location = parameters.get("Location")
×
72
        if location and location.startswith("s3://"):
×
73
            s3_client = connect_to(aws_access_key_id=account_id, region_name=region_name).s3
×
74
            bucket, _, path = location.removeprefix("s3://").partition("/")
×
75
            try:
×
76
                content = testutil.download_s3_object(s3_client, bucket, path)
×
77
            except ClientError:
×
78
                LOG.error("client error downloading S3 object '%s/%s'", bucket, path)
×
79
                raise
×
80
            content = parse_template(content)
×
81
            return content
×
82
        else:
83
            LOG.warning("Unexpected Location parameter for AWS::Include transformer: %s", location)
×
84
        return parameters
×
85

86

87
# maps transformer names to implementing classes
88
transformers: dict[str, type] = {"AWS::Include": AwsIncludeTransformer}
1✔
89

90

91
def apply_intrinsic_transformations(
1✔
92
    account_id: str,
93
    region_name: str,
94
    template: dict,
95
    stack_name: str,
96
    resources: dict,
97
    mappings: dict,
98
    conditions: dict[str, bool],
99
    stack_parameters: dict,
100
) -> dict:
101
    """Resolve constructs using the 'Fn::Transform' intrinsic function."""
102

103
    def _visit(obj, path, **_):
×
104
        if isinstance(obj, dict) and "Fn::Transform" in obj:
×
105
            transform = (
×
106
                obj["Fn::Transform"]
107
                if isinstance(obj["Fn::Transform"], dict)
108
                else {"Name": obj["Fn::Transform"]}
109
            )
110
            transform_name = transform.get("Name")
×
111
            transformer_class = transformers.get(transform_name)
×
112
            macro_store = get_cloudformation_store(account_id, region_name).macros
×
113
            parameters = transform.get("Parameters") or {}
×
114
            parameters = resolve_refs_recursively(
×
115
                account_id,
116
                region_name,
117
                stack_name,
118
                resources,
119
                mappings,
120
                conditions,
121
                stack_parameters,
122
                parameters,
123
            )
124
            if transformer_class:
×
125
                transformer = transformer_class()
×
126
                transformed = transformer.transform(account_id, region_name, parameters)
×
127
                obj_copy = deepcopy(obj)
×
128
                obj_copy.pop("Fn::Transform")
×
129
                obj_copy.update(transformed)
×
130
                return obj_copy
×
131

132
            elif transform_name in macro_store:
×
133
                obj_copy = deepcopy(obj)
×
134
                obj_copy.pop("Fn::Transform")
×
135
                result = execute_macro(
×
136
                    account_id, region_name, obj_copy, transform, stack_parameters, parameters, True
137
                )
138
                return result
×
139
            else:
140
                LOG.warning(
×
141
                    "Unsupported transform function '%s' used in %s", transform_name, stack_name
142
                )
143
        return obj
×
144

145
    return recurse_object(template, _visit)
×
146

147

148
def apply_global_transformations(
1✔
149
    account_id: str,
150
    region_name: str,
151
    template: dict,
152
    stack_name: str,
153
    resources: dict,
154
    mappings: dict,
155
    conditions: dict[str, bool],
156
    stack_parameters: dict,
157
) -> dict:
158
    processed_template = deepcopy(template)
×
159
    transformations = format_template_transformations_into_list(
×
160
        processed_template.get("Transform", [])
161
    )
162
    for transformation in transformations:
×
163
        transformation_parameters = resolve_refs_recursively(
×
164
            account_id,
165
            region_name,
166
            stack_name,
167
            resources,
168
            mappings,
169
            conditions,
170
            stack_parameters,
171
            transformation.get("Parameters", {}),
172
        )
173

174
        if not isinstance(transformation["Name"], str):
×
175
            # TODO this should be done during template validation
176
            raise CommonServiceException(
×
177
                code="ValidationError",
178
                status_code=400,
179
                message="Key Name of transform definition must be a string.",
180
                sender_fault=True,
181
            )
182
        elif transformation["Name"] == SERVERLESS_TRANSFORM:
×
183
            processed_template = apply_serverless_transformation(
×
184
                account_id, region_name, processed_template, stack_parameters
185
            )
186
        elif transformation["Name"] == EXTENSIONS_TRANSFORM:
×
187
            resolve_context = ResolveRefsRecursivelyContext(
×
188
                account_id,
189
                region_name,
190
                stack_name,
191
                resources,
192
                mappings,
193
                conditions,
194
                stack_parameters,
195
            )
196

197
            processed_template = apply_language_extensions_transform(
×
198
                processed_template,
199
                resolve_context,
200
            )
201
        elif transformation["Name"] == SECRETSMANAGER_TRANSFORM:
×
202
            # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/transform-aws-secretsmanager.html
203
            LOG.warning("%s is not yet supported. Ignoring.", SECRETSMANAGER_TRANSFORM)
×
204
        else:
205
            processed_template = execute_macro(
×
206
                account_id,
207
                region_name,
208
                parsed_template=template,
209
                macro=transformation,
210
                stack_parameters=stack_parameters,
211
                transformation_parameters=transformation_parameters,
212
            )
213

214
    return processed_template
×
215

216

217
def format_template_transformations_into_list(transforms: list | dict | str) -> list[dict]:
1✔
218
    """
219
    The value of the Transform attribute can be:
220
     - a transformation name
221
     - an object like {Name: transformation, Parameters:{}}
222
     - a list a list of names of the transformations to apply
223
     - a list of objects defining a transformation
224
     so the objective of this function is to normalize the list of transformations to apply into a list of transformation objects
225
    """
226
    formatted_transformations = []
×
227
    if isinstance(transforms, str):
×
228
        formatted_transformations.append({"Name": transforms})
×
229

230
    if isinstance(transforms, dict):
×
231
        formatted_transformations.append(transforms)
×
232

233
    if isinstance(transforms, list):
×
234
        for transformation in transforms:
×
235
            if isinstance(transformation, str):
×
236
                formatted_transformations.append({"Name": transformation})
×
237
            if isinstance(transformation, dict):
×
238
                formatted_transformations.append(transformation)
×
239

240
    return formatted_transformations
×
241

242

243
def execute_macro(
1✔
244
    account_id: str,
245
    region_name: str,
246
    parsed_template: dict,
247
    macro: dict,
248
    stack_parameters: dict,
249
    transformation_parameters: dict,
250
    is_intrinsic=False,
251
) -> str:
252
    macro_definition = get_cloudformation_store(account_id, region_name).macros.get(macro["Name"])
×
253
    if not macro_definition:
×
254
        raise FailedTransformationException(
×
255
            macro["Name"], f"Transformation {macro['Name']} is not supported."
256
        )
257

258
    formatted_stack_parameters = {}
×
259
    for key, value in stack_parameters.items():
×
260
        # TODO: we want to support other types of parameters
261
        parameter_value = value.get("ParameterValue")
×
262
        if value.get("ParameterType") == "CommaDelimitedList" and isinstance(parameter_value, str):
×
263
            formatted_stack_parameters[key] = parameter_value.split(",")
×
264
        else:
265
            formatted_stack_parameters[key] = parameter_value
×
266

267
    transformation_id = f"{account_id}::{macro['Name']}"
×
268
    event = {
×
269
        "region": region_name,
270
        "accountId": account_id,
271
        "fragment": parsed_template,
272
        "transformId": transformation_id,
273
        "params": transformation_parameters,
274
        "requestId": long_uid(),
275
        "templateParameterValues": formatted_stack_parameters,
276
    }
277

278
    client = connect_to(aws_access_key_id=account_id, region_name=region_name).lambda_
×
279
    try:
×
280
        invocation = client.invoke(
×
281
            FunctionName=macro_definition["FunctionName"], Payload=json.dumps(event)
282
        )
283
    except ClientError:
×
284
        LOG.error(
×
285
            "client error executing lambda function '%s' with payload '%s'",
286
            macro_definition["FunctionName"],
287
            json.dumps(event),
288
        )
289
        raise
×
290
    if invocation.get("StatusCode") != 200 or invocation.get("FunctionError") == "Unhandled":
×
291
        raise FailedTransformationException(
×
292
            transformation=macro["Name"],
293
            message=f"Received malformed response from transform {transformation_id}. Rollback requested by user.",
294
        )
295
    result = json.loads(invocation["Payload"].read())
×
296

297
    if result.get("status") != "success":
×
298
        error_message = result.get("errorMessage")
×
299
        message = (
×
300
            f"Transform {transformation_id} failed with: {error_message}. Rollback requested by user."
301
            if error_message
302
            else f"Transform {transformation_id} failed without an error message.. Rollback requested by user."
303
        )
304
        raise FailedTransformationException(transformation=macro["Name"], message=message)
×
305

306
    if not isinstance(result.get("fragment"), dict) and not is_intrinsic:
×
307
        raise FailedTransformationException(
×
308
            transformation=macro["Name"],
309
            message="Template format error: unsupported structure.. Rollback requested by user.",
310
        )
311

312
    return result.get("fragment")
×
313

314

315
def apply_language_extensions_transform(
1✔
316
    template: dict,
317
    resolve_context: ResolveRefsRecursivelyContext,
318
) -> dict:
319
    """
320
    Resolve language extensions constructs
321
    """
322

323
    def _visit(obj, path, **_):
1✔
324
        # Fn::ForEach
325
        # TODO: can this be used in non-resource positions?
326
        if isinstance(obj, dict) and any("Fn::ForEach" in key for key in obj):
1✔
327
            newobj = {}
1✔
328
            for key in obj:
1✔
329
                if "Fn::ForEach" not in key:
1✔
330
                    newobj[key] = obj[key]
1✔
331
                    continue
1✔
332

333
                new_entries = expand_fn_foreach(obj[key], resolve_context)
1✔
334
                newobj.update(**new_entries)
1✔
335
            return newobj
1✔
336
        # Fn::Length
337
        elif isinstance(obj, dict) and "Fn::Length" in obj:
1✔
338
            value = obj["Fn::Length"]
1✔
339
            if isinstance(value, dict):
1✔
340
                value = resolve_context.resolve(value)
1✔
341

342
            if isinstance(value, list):
1✔
343
                # TODO: what if one of the elements was AWS::NoValue?
344
                # no conversion required
345
                return len(value)
1✔
346
            elif isinstance(value, str):
×
347
                length = len(value.split(","))
×
348
                return length
×
349
            return obj
×
350
        elif isinstance(obj, dict) and "Fn::ToJsonString" in obj:
1✔
351
            # TODO: is the default representation ok here?
352
            return json.dumps(obj["Fn::ToJsonString"], default=str, separators=(",", ":"))
1✔
353

354
            # reference
355
        return obj
1✔
356

357
    return recurse_object(template, _visit)
1✔
358

359

360
def expand_fn_foreach(
1✔
361
    foreach_defn: list,
362
    resolve_context: ResolveRefsRecursivelyContext,
363
    extra_replace_mapping: dict | None = None,
364
) -> dict:
365
    if len(foreach_defn) != 3:
1✔
366
        raise ValidationError(
×
367
            f"Fn::ForEach: invalid number of arguments, expected 3 got {len(foreach_defn)}"
368
        )
369
    output = {}
1✔
370
    iteration_name, iteration_value, template = foreach_defn
1✔
371
    if not isinstance(iteration_name, str):
1✔
372
        raise ValidationError(
×
373
            f"Fn::ForEach: incorrect type for iteration name '{iteration_name}', expected str"
374
        )
375
    if isinstance(iteration_value, dict):
1✔
376
        # we have a reference
377
        if "Ref" in iteration_value:
1✔
378
            iteration_value = resolve_context.resolve(iteration_value)
1✔
379
        else:
380
            raise NotImplementedError(
381
                f"Fn::Transform: intrinsic {iteration_value} not supported in this position yet"
382
            )
383
    if not isinstance(iteration_value, list):
1✔
384
        raise ValidationError(
×
385
            f"Fn::ForEach: incorrect type for iteration variables '{iteration_value}', expected list"
386
        )
387

388
    if not isinstance(template, dict):
1✔
389
        raise ValidationError(
×
390
            f"Fn::ForEach: incorrect type for template '{template}', expected dict"
391
        )
392

393
    # TODO: locations other than resources
394
    replace_template_value = "${" + iteration_name + "}"
1✔
395
    for variable in iteration_value:
1✔
396
        # there might be multiple children, which could themselves be a `Fn::ForEach` call
397
        for logical_resource_id_template in template:
1✔
398
            if logical_resource_id_template.startswith("Fn::ForEach"):
1✔
399
                result = expand_fn_foreach(
1✔
400
                    template[logical_resource_id_template],
401
                    resolve_context,
402
                    {iteration_name: variable},
403
                )
404
                output.update(**result)
1✔
405
                continue
1✔
406

407
            if replace_template_value not in logical_resource_id_template:
1✔
408
                raise ValidationError("Fn::ForEach: no placeholder in logical resource id")
×
409

410
            def gen_visit(variable: str) -> Callable:
1✔
411
                def _visit(obj: Any, path: Any):
1✔
412
                    if isinstance(obj, dict) and "Ref" in obj:
1✔
413
                        ref_variable = obj["Ref"]
1✔
414
                        if ref_variable == iteration_name:
1✔
415
                            return variable
1✔
416
                    elif isinstance(obj, dict) and "Fn::Sub" in obj:
1✔
417
                        arguments = recurse_object(obj["Fn::Sub"], _visit)
1✔
418
                        if isinstance(arguments, str):
1✔
419
                            # simple case
420
                            # TODO: can this reference anything outside of the template?
421
                            result = arguments
1✔
422
                            variables_found = re.findall("\\${([^}]+)}", arguments)
1✔
423
                            for var in variables_found:
1✔
424
                                if var == iteration_name:
1✔
425
                                    result = result.replace(f"${{{var}}}", variable)
1✔
426
                            return result
1✔
427
                        else:
428
                            raise NotImplementedError
429
                    elif isinstance(obj, dict) and "Fn::Join" in obj:
1✔
430
                        # first visit arguments
431
                        arguments = recurse_object(
1✔
432
                            obj["Fn::Join"],
433
                            _visit,
434
                        )
435
                        separator, items = arguments
1✔
436
                        return separator.join(items)
1✔
437
                    return obj
1✔
438

439
                return _visit
1✔
440

441
            logical_resource_id = logical_resource_id_template.replace(
1✔
442
                replace_template_value, variable
443
            )
444
            for key, value in (extra_replace_mapping or {}).items():
1✔
445
                logical_resource_id = logical_resource_id.replace("${" + key + "}", value)
1✔
446
            resource_body = copy.deepcopy(template[logical_resource_id_template])
1✔
447
            body = recurse_object(resource_body, gen_visit(variable))
1✔
448
            output[logical_resource_id] = body
1✔
449

450
    return output
1✔
451

452

453
def apply_serverless_transformation(
1✔
454
    account_id: str, region_name: str, parsed_template: dict, template_parameters: dict
455
) -> str | None:
456
    """only returns string when parsing SAM template, otherwise None"""
457
    # TODO: we might also want to override the access key ID to account ID
458
    region_before = os.environ.get("AWS_DEFAULT_REGION")
×
459
    if boto3.session.Session().region_name is None:
×
460
        os.environ["AWS_DEFAULT_REGION"] = region_name
×
461
    loader = create_policy_loader()
×
462
    simplified_parameters = {
×
463
        k: v.get("ResolvedValue") or v["ParameterValue"] for k, v in template_parameters.items()
464
    }
465

466
    try:
×
467
        transformed = transform_sam(parsed_template, simplified_parameters, loader)
×
468
        return transformed
×
469
    except Exception as e:
×
470
        raise FailedTransformationException(transformation=SERVERLESS_TRANSFORM, message=str(e))
×
471
    finally:
472
        # Note: we need to fix boto3 region, otherwise AWS SAM transformer fails
473
        os.environ.pop("AWS_DEFAULT_REGION", None)
×
474
        if region_before is not None:
×
475
            os.environ["AWS_DEFAULT_REGION"] = region_before
×
476

477

478
class FailedTransformationException(Exception):
1✔
479
    transformation: str
1✔
480
    msg: str
1✔
481

482
    def __init__(self, transformation: str, message: str = ""):
1✔
483
        self.transformation = transformation
1✔
484
        self.message = message
1✔
485
        super().__init__(self.message)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc