• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 16981563750

14 Aug 2025 10:49PM UTC coverage: 86.896% (+0.04%) from 86.852%
16981563750

push

github

web-flow
add support for Fn::Tranform in CFnV2 (#12966)

Co-authored-by: Simon Walker <simon.walker@localstack.cloud>

181 of 195 new or added lines in 6 files covered. (92.82%)

348 existing lines in 22 files now uncovered.

66915 of 77006 relevant lines covered (86.9%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.94
/localstack-core/localstack/services/cloudformation/engine/v2/change_set_model_transform.py
1
import copy
1✔
2
import json
1✔
3
import logging
1✔
4
import os
1✔
5
import re
1✔
6
from typing import Any, Final, TypedDict
1✔
7

8
import boto3
1✔
9
import jsonpath_ng
1✔
10
from botocore.exceptions import ClientError
1✔
11
from samtranslator.translator.transform import transform as transform_sam
1✔
12

13
from localstack.aws.connect import connect_to
1✔
14
from localstack.services.cloudformation.engine.policy_loader import create_policy_loader
1✔
15
from localstack.services.cloudformation.engine.template_preparer import parse_template
1✔
16
from localstack.services.cloudformation.engine.transformers import (
1✔
17
    FailedTransformationException,
18
)
19
from localstack.services.cloudformation.engine.v2.change_set_model import (
1✔
20
    ChangeType,
21
    FnTransform,
22
    Maybe,
23
    NodeGlobalTransform,
24
    NodeIntrinsicFunction,
25
    NodeIntrinsicFunctionFnTransform,
26
    NodeProperties,
27
    NodeResources,
28
    NodeTransform,
29
    Nothing,
30
    Scope,
31
    is_nothing,
32
)
33
from localstack.services.cloudformation.engine.v2.change_set_model_preproc import (
1✔
34
    ChangeSetModelPreproc,
35
    PreprocEntityDelta,
36
    PreprocProperties,
37
)
38
from localstack.services.cloudformation.engine.validations import ValidationError
1✔
39
from localstack.services.cloudformation.stores import get_cloudformation_store
1✔
40
from localstack.services.cloudformation.v2.entities import ChangeSet
1✔
41
from localstack.services.cloudformation.v2.types import engine_parameter_value
1✔
42
from localstack.utils import testutil
1✔
43
from localstack.utils.strings import long_uid
1✔
44

45
LOG = logging.getLogger(__name__)
1✔
46

47
SERVERLESS_TRANSFORM = "AWS::Serverless-2016-10-31"
1✔
48
EXTENSIONS_TRANSFORM = "AWS::LanguageExtensions"
1✔
49
SECRETSMANAGER_TRANSFORM = "AWS::SecretsManager-2020-07-23"
1✔
50
INCLUDE_TRANSFORM = "AWS::Include"
1✔
51

52
_SCOPE_TRANSFORM_TEMPLATE_OUTCOME: Final[Scope] = Scope("TRANSFORM_TEMPLATE_OUTCOME")
1✔
53

54

55
# TODO: evaluate the use of subtypes to represent and validate types of transforms
56
class GlobalTransform:
1✔
57
    name: str
1✔
58
    parameters: Maybe[dict]
1✔
59

60
    def __init__(self, name: str, parameters: Maybe[dict]):
1✔
61
        self.name = name
1✔
62
        self.parameters = parameters
1✔
63

64

65
class TransformPreprocParameter(TypedDict):
1✔
66
    # TODO: expand
67
    ParameterKey: str
1✔
68
    ParameterValue: Any
1✔
69
    ParameterType: str | None
1✔
70

71

72
class ChangeSetModelTransform(ChangeSetModelPreproc):
1✔
73
    _before_parameters: Final[dict]
1✔
74
    _after_parameters: Final[dict]
1✔
75
    _before_template: Final[Maybe[dict]]
1✔
76
    _after_template: Final[Maybe[dict]]
1✔
77

78
    def __init__(
1✔
79
        self,
80
        change_set: ChangeSet,
81
        before_parameters: dict,
82
        after_parameters: dict,
83
        before_template: dict | None,
84
        after_template: dict | None,
85
    ):
86
        super().__init__(change_set=change_set)
1✔
87
        self._before_parameters = before_parameters
1✔
88
        self._after_parameters = after_parameters
1✔
89
        self._before_template = before_template or Nothing
1✔
90
        self._after_template = after_template or Nothing
1✔
91

92
    # Ported from v1:
93
    @staticmethod
1✔
94
    def _apply_global_serverless_transformation(
1✔
95
        region_name: str, template: dict, parameters: dict
96
    ) -> dict:
97
        """only returns string when parsing SAM template, otherwise None"""
98
        # TODO: we might also want to override the access key ID to account ID
99
        region_before = os.environ.get("AWS_DEFAULT_REGION")
1✔
100
        if boto3.session.Session().region_name is None:
1✔
101
            os.environ["AWS_DEFAULT_REGION"] = region_name
1✔
102
        loader = create_policy_loader()
1✔
103
        # The following transformation function can carry out in-place changes ensure this cannot occur.
104
        template = copy.deepcopy(template)
1✔
105
        parameters = copy.deepcopy(parameters)
1✔
106
        try:
1✔
107
            transformed = transform_sam(template, parameters, loader)
1✔
108
            return transformed
1✔
109
        except Exception as e:
×
110
            raise FailedTransformationException(transformation=SERVERLESS_TRANSFORM, message=str(e))
×
111
        finally:
112
            # Note: we need to fix boto3 region, otherwise AWS SAM transformer fails
113
            os.environ.pop("AWS_DEFAULT_REGION", None)
1✔
114
            if region_before is not None:
1✔
115
                os.environ["AWS_DEFAULT_REGION"] = region_before
×
116

117
    def _compute_include_transform(self, parameters: dict, fragment: dict) -> dict:
1✔
118
        location = parameters.get("Location")
1✔
119
        if not location or not location.startswith("s3://"):
1✔
120
            raise FailedTransformationException(
×
121
                transformation=INCLUDE_TRANSFORM,
122
                message="Unexpected Location parameter for AWS::Include transformer: %s" % location,
123
            )
124

125
        s3_client = connect_to(
1✔
126
            aws_access_key_id=self._change_set.account_id, region_name=self._change_set.region_name
127
        ).s3
128
        bucket, _, path = location.removeprefix("s3://").partition("/")
1✔
129
        try:
1✔
130
            content = testutil.download_s3_object(s3_client, bucket, path)
1✔
131
        except ClientError:
×
132
            raise FailedTransformationException(
×
133
                transformation=INCLUDE_TRANSFORM,
134
                message="Error downloading S3 object '%s/%s'" % (bucket, path),
135
            )
136
        try:
1✔
137
            template_to_include = parse_template(content)
1✔
138
        except Exception as e:
×
139
            raise FailedTransformationException(transformation=INCLUDE_TRANSFORM, message=str(e))
×
140

141
        return {**fragment, **template_to_include}
1✔
142

143
    def _apply_global_transform(
1✔
144
        self, global_transform: GlobalTransform, template: dict, parameters: dict
145
    ) -> dict:
146
        transform_name = global_transform.name
1✔
147
        if transform_name == EXTENSIONS_TRANSFORM:
1✔
148
            # Applied lazily in downstream tasks (see ChangeSetModelPreproc).
149
            transformed_template = template
1✔
150
        elif transform_name == SERVERLESS_TRANSFORM:
1✔
151
            transformed_template = self._apply_global_serverless_transformation(
1✔
152
                region_name=self._change_set.region_name,
153
                template=template,
154
                parameters=parameters,
155
            )
156
        elif transform_name == SECRETSMANAGER_TRANSFORM:
1✔
157
            # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/transform-aws-secretsmanager.html
158
            LOG.warning("%s is not yet supported. Ignoring.", SECRETSMANAGER_TRANSFORM)
×
159
            transformed_template = template
×
160
        elif transform_name == INCLUDE_TRANSFORM:
1✔
161
            transformed_template = self._compute_include_transform(
1✔
162
                parameters=global_transform.parameters,
163
                fragment=template,
164
            )
165
        else:
166
            transformed_template = self._invoke_macro(
1✔
167
                name=global_transform.name,
168
                parameters=global_transform.parameters
169
                if not is_nothing(global_transform.parameters)
170
                else {},
171
                fragment=template,
172
                allow_string=False,
173
            )
174
        return transformed_template
1✔
175

176
    def transform(self) -> tuple[dict, dict]:
1✔
177
        self._setup_runtime_cache()
1✔
178

179
        node_template = self._change_set.update_model.node_template
1✔
180

181
        parameters_delta = self.visit_node_parameters(node_template.parameters)
1✔
182
        parameters_before = parameters_delta.before
1✔
183
        parameters_after = parameters_delta.after
1✔
184

185
        self.visit_node_resources(node_template.resources)
1✔
186

187
        transform_delta: PreprocEntityDelta[list[GlobalTransform], list[GlobalTransform]] = (
1✔
188
            self.visit_node_transform(node_template.transform)
189
        )
190
        transform_before: Maybe[list[GlobalTransform]] = transform_delta.before
1✔
191
        transform_after: Maybe[list[GlobalTransform]] = transform_delta.after
1✔
192

193
        transformed_before_template = self._before_template
1✔
194
        if transform_before and not is_nothing(self._before_template):
1✔
195
            if _SCOPE_TRANSFORM_TEMPLATE_OUTCOME in self._before_cache:
1✔
196
                transformed_before_template = self._before_cache[_SCOPE_TRANSFORM_TEMPLATE_OUTCOME]
1✔
197
            else:
198
                for before_global_transform in transform_before:
1✔
199
                    if not is_nothing(before_global_transform.name):
1✔
200
                        transformed_before_template = self._apply_global_transform(
1✔
201
                            global_transform=before_global_transform,
202
                            parameters=parameters_before,
203
                            template=transformed_before_template,
204
                        )
205

206
                # Macro transformations won't remove the transform from the template
207
                if "Transform" in transformed_before_template:
1✔
208
                    transformed_before_template.pop("Transform")
1✔
209
                self._before_cache[_SCOPE_TRANSFORM_TEMPLATE_OUTCOME] = transformed_before_template
1✔
210

211
        transformed_after_template = self._after_template
1✔
212
        if transform_after and not is_nothing(self._after_template):
1✔
213
            transformed_after_template = self._after_template
1✔
214
            for after_global_transform in transform_after:
1✔
215
                if not is_nothing(after_global_transform.name):
1✔
216
                    transformed_after_template = self._apply_global_transform(
1✔
217
                        global_transform=after_global_transform,
218
                        parameters=parameters_after,
219
                        template=transformed_after_template,
220
                    )
221
            # Macro transformations won't remove the transform from the template
222
            if "Transform" in transformed_after_template:
1✔
223
                transformed_after_template.pop("Transform")
1✔
224
            self._after_cache[_SCOPE_TRANSFORM_TEMPLATE_OUTCOME] = transformed_after_template
1✔
225

226
        self._save_runtime_cache()
1✔
227

228
        return transformed_before_template, transformed_after_template
1✔
229

230
    def visit_node_global_transform(
1✔
231
        self, node_global_transform: NodeGlobalTransform
232
    ) -> PreprocEntityDelta[GlobalTransform, GlobalTransform]:
233
        change_type = node_global_transform.change_type
1✔
234

235
        name_delta = self.visit(node_global_transform.name)
1✔
236
        parameters_delta = self.visit(node_global_transform.parameters)
1✔
237

238
        before = Nothing
1✔
239
        if change_type != ChangeType.CREATED:
1✔
240
            before = GlobalTransform(name=name_delta.before, parameters=parameters_delta.before)
1✔
241
        after = Nothing
1✔
242
        if change_type != ChangeType.REMOVED:
1✔
243
            after = GlobalTransform(name=name_delta.after, parameters=parameters_delta.after)
1✔
244
        return PreprocEntityDelta(before=before, after=after)
1✔
245

246
    def visit_node_transform(
1✔
247
        self, node_transform: NodeTransform
248
    ) -> PreprocEntityDelta[list[GlobalTransform], list[GlobalTransform]]:
249
        change_type = node_transform.change_type
1✔
250
        before = [] if change_type != ChangeType.CREATED else Nothing
1✔
251
        after = [] if change_type != ChangeType.REMOVED else Nothing
1✔
252
        for change_set_entity in node_transform.global_transforms:
1✔
253
            if not isinstance(change_set_entity.name.value, str):
1✔
254
                raise ValidationError("Key Name of transform definition must be a string.")
1✔
255

256
            delta: PreprocEntityDelta[GlobalTransform, GlobalTransform] = self.visit(
1✔
257
                change_set_entity=change_set_entity
258
            )
259
            delta_before = delta.before
1✔
260
            delta_after = delta.after
1✔
261
            if not is_nothing(before) and not is_nothing(delta_before):
1✔
262
                before.append(delta_before)
1✔
263
            if not is_nothing(after) and not is_nothing(delta_after):
1✔
264
                after.append(delta_after)
1✔
265
        return PreprocEntityDelta(before=before, after=after)
1✔
266

267
    def _compute_fn_transform(
1✔
268
        self, macro_definition: Any, siblings: Any, allow_string: False
269
    ) -> Any:
270
        def _normalize_transform(obj):
1✔
271
            transforms = []
1✔
272

273
            if isinstance(obj, str):
1✔
274
                transforms.append({"Name": obj, "Parameters": {}})
1✔
275

276
            if isinstance(obj, dict):
1✔
277
                transforms.append(obj)
1✔
278

279
            if isinstance(obj, list):
1✔
280
                for v in obj:
1✔
281
                    if isinstance(v, str):
1✔
NEW
282
                        transforms.append({"Name": v, "Parameters": {}})
×
283

284
                    if isinstance(v, dict):
1✔
285
                        if not v.get("Parameters"):
1✔
NEW
286
                            v["Parameters"] = {}
×
287
                        transforms.append(v)
1✔
288

289
            return transforms
1✔
290

291
        normalized_transforms = _normalize_transform(macro_definition)
1✔
292
        transform_output = copy.deepcopy(siblings)
1✔
293
        for transform in normalized_transforms:
1✔
294
            transform_name = transform["Name"]
1✔
295
            if transform_name == INCLUDE_TRANSFORM:
1✔
296
                transform_output = self._compute_include_transform(
1✔
297
                    parameters=transform["Parameters"], fragment=transform_output
298
                )
299
            else:
300
                transform_output: dict | str = self._invoke_macro(
1✔
301
                    fragment=transform_output,
302
                    name=transform["Name"],
303
                    parameters=transform.get("Parameters", {}),
304
                    allow_string=allow_string,
305
                )
306

307
        if isinstance(transform_output, dict) and FnTransform in transform_output:
1✔
308
            transform_output.pop(FnTransform)
1✔
309

310
        return transform_output
1✔
311

312
    def _replace_at_jsonpath(self, template: dict, path: str, result: Any):
1✔
313
        pattern = jsonpath_ng.parse(path)
1✔
314
        result_template = pattern.update(template, result)
1✔
315

316
        return result_template
1✔
317

318
    def visit_node_intrinsic_function_fn_transform(
1✔
319
        self, node_intrinsic_function: NodeIntrinsicFunctionFnTransform
320
    ) -> PreprocEntityDelta:
321
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
322
        parent_json_path = node_intrinsic_function.scope.parent.jsonpath
1✔
323

324
        # Only when a FnTransform is used as Property value the macro function is allowed to return a str
325
        property_value_regex = r"\.(Properties)"
1✔
326
        allow_string = False
1✔
327
        if re.search(property_value_regex, parent_json_path):
1✔
328
            allow_string = True
1✔
329

330
        if not is_nothing(arguments_delta.before):
1✔
NEW
331
            before = self._compute_fn_transform(
×
332
                arguments_delta.before,
333
                node_intrinsic_function.before_siblings,
334
                allow_string=allow_string,
335
            )
NEW
336
            updated_before_template = self._replace_at_jsonpath(
×
337
                self._before_template, parent_json_path, before
338
            )
NEW
339
            self._after_cache[_SCOPE_TRANSFORM_TEMPLATE_OUTCOME] = updated_before_template
×
340
        else:
341
            before = Nothing
1✔
342

343
        if not is_nothing(arguments_delta.after):
1✔
344
            after = self._compute_fn_transform(
1✔
345
                arguments_delta.after,
346
                node_intrinsic_function.after_siblings,
347
                allow_string=allow_string,
348
            )
349
            updated_after_template = self._replace_at_jsonpath(
1✔
350
                self._after_template, parent_json_path, after
351
            )
352
            self._after_cache[_SCOPE_TRANSFORM_TEMPLATE_OUTCOME] = updated_after_template
1✔
353
        else:
NEW
354
            after = Nothing
×
355

356
        self._save_runtime_cache()
1✔
357
        return PreprocEntityDelta(before=before, after=after)
1✔
358

359
    def visit_node_properties(
1✔
360
        self, node_properties: NodeProperties
361
    ) -> PreprocEntityDelta[PreprocProperties, PreprocProperties]:
362
        if not is_nothing(node_properties.fn_transform):
1✔
363
            self.visit_node_intrinsic_function_fn_transform(node_properties.fn_transform)
1✔
364

365
        return super().visit_node_properties(node_properties=node_properties)
1✔
366

367
    def visit_node_resources(self, node_resources: NodeResources) -> PreprocEntityDelta:
1✔
368
        if not is_nothing(node_resources.fn_transform):
1✔
369
            self.visit_node_intrinsic_function_fn_transform(
1✔
370
                node_intrinsic_function=node_resources.fn_transform
371
            )
372

373
        return super().visit_node_resources(node_resources=node_resources)
1✔
374

375
    def _invoke_macro(self, name: str, parameters: dict, fragment: dict, allow_string=False):
1✔
376
        account_id = self._change_set.account_id
1✔
377
        region_name = self._change_set.region_name
1✔
378
        macro_definition = get_cloudformation_store(
1✔
379
            account_id=account_id, region_name=region_name
380
        ).macros.get(name)
381

382
        if not macro_definition:
1✔
NEW
383
            raise FailedTransformationException(name, f"Transformation {name} is not supported.")
×
384

385
        simplified_parameters = {}
1✔
386
        if resolved_parameters := self._change_set.resolved_parameters:
1✔
387
            for key, resolved_parameter in resolved_parameters.items():
1✔
388
                final_value = engine_parameter_value(resolved_parameter)
1✔
389
                simplified_parameters[key] = (
1✔
390
                    final_value.split(",")
391
                    if resolved_parameter["type_"] == "CommaDelimitedList"
392
                    else final_value
393
                )
394

395
        transformation_id = f"{account_id}::{name}"
1✔
396
        event = {
1✔
397
            "region": region_name,
398
            "accountId": account_id,
399
            "fragment": fragment,
400
            "transformId": transformation_id,
401
            "params": parameters,
402
            "requestId": long_uid(),
403
            "templateParameterValues": simplified_parameters,
404
        }
405

406
        client = connect_to(aws_access_key_id=account_id, region_name=region_name).lambda_
1✔
407
        try:
1✔
408
            invocation = client.invoke(
1✔
409
                FunctionName=macro_definition["FunctionName"], Payload=json.dumps(event)
410
            )
NEW
411
        except ClientError:
×
NEW
412
            LOG.error(
×
413
                "client error executing lambda function '%s' with payload '%s'",
414
                macro_definition["FunctionName"],
415
                json.dumps(event),
416
            )
NEW
417
            raise
×
418
        if invocation.get("StatusCode") != 200 or invocation.get("FunctionError") == "Unhandled":
1✔
419
            raise FailedTransformationException(
1✔
420
                transformation=name,
421
                message=f"Received malformed response from transform {transformation_id}. Rollback requested by user.",
422
            )
423
        result = json.loads(invocation["Payload"].read())
1✔
424

425
        if result.get("status") != "success":
1✔
426
            error_message = result.get("errorMessage")
1✔
427
            message = (
1✔
428
                f"Transform {transformation_id} failed with: {error_message}. Rollback requested by user."
429
                if error_message
430
                else f"Transform {transformation_id} failed without an error message.. Rollback requested by user."
431
            )
432
            raise FailedTransformationException(transformation=name, message=message)
1✔
433

434
        if not isinstance(result.get("fragment"), dict) and not allow_string:
1✔
435
            raise FailedTransformationException(
1✔
436
                transformation=name,
437
                message="Template format error: unsupported structure.. Rollback requested by user.",
438
            )
439

440
        return result.get("fragment")
1✔
441

442
    def visit_node_intrinsic_function_fn_get_att(
1✔
443
        self, node_intrinsic_function: NodeIntrinsicFunction
444
    ) -> PreprocEntityDelta:
445
        return self.visit(node_intrinsic_function.arguments)
1✔
446

447
    def visit_node_intrinsic_function_fn_sub(
1✔
448
        self, node_intrinsic_function: NodeIntrinsicFunction
449
    ) -> PreprocEntityDelta:
450
        try:
1✔
451
            # If an argument is a Parameter it should be resolved, any other case, ignore it
452
            return super().visit_node_intrinsic_function_fn_sub(node_intrinsic_function)
1✔
453
        except RuntimeError:
1✔
454
            return self.visit(node_intrinsic_function.arguments)
1✔
455

456
    def visit_node_intrinsic_function_fn_split(
1✔
457
        self, node_intrinsic_function: NodeIntrinsicFunction
458
    ) -> PreprocEntityDelta:
459
        try:
1✔
460
            # If an argument is a Parameter it should be resolved, any other case, ignore it
461
            return super().visit_node_intrinsic_function_fn_split(node_intrinsic_function)
1✔
462
        except RuntimeError:
1✔
463
            return self.visit(node_intrinsic_function.arguments)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc