• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 17029583506

15 Aug 2025 04:45AM UTC coverage: 86.902% (+0.006%) from 86.896%
17029583506

push

github

web-flow
CFNV2: handle AWS::NoValue (#13000)

3 of 4 new or added lines in 2 files covered. (75.0%)

88 existing lines in 7 files now uncovered.

66940 of 77029 relevant lines covered (86.9%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.17
/localstack-core/localstack/services/cloudformation/engine/transformers.py
1
import copy
1✔
2
import json
1✔
3
import logging
1✔
4
import os
1✔
5
import re
1✔
6
from collections.abc import Callable
1✔
7
from copy import deepcopy
1✔
8
from dataclasses import dataclass
1✔
9
from typing import Any
1✔
10

11
import boto3
1✔
12
from botocore.exceptions import ClientError
1✔
13
from samtranslator.translator.transform import transform as transform_sam
1✔
14

15
from localstack.aws.api import CommonServiceException
1✔
16
from localstack.aws.connect import connect_to
1✔
17
from localstack.services.cloudformation.engine.policy_loader import create_policy_loader
1✔
18
from localstack.services.cloudformation.engine.template_deployer import resolve_refs_recursively
1✔
19
from localstack.services.cloudformation.engine.validations import ValidationError
1✔
20
from localstack.services.cloudformation.stores import get_cloudformation_store
1✔
21
from localstack.utils import testutil
1✔
22
from localstack.utils.objects import recurse_object
1✔
23
from localstack.utils.strings import long_uid
1✔
24

25
LOG = logging.getLogger(__name__)
1✔
26

27
SERVERLESS_TRANSFORM = "AWS::Serverless-2016-10-31"
1✔
28
EXTENSIONS_TRANSFORM = "AWS::LanguageExtensions"
1✔
29
SECRETSMANAGER_TRANSFORM = "AWS::SecretsManager-2020-07-23"
1✔
30

31
TransformResult = dict | str
1✔
32

33

34
@dataclass
1✔
35
class ResolveRefsRecursivelyContext:
1✔
36
    account_id: str
1✔
37
    region_name: str
1✔
38
    stack_name: str
1✔
39
    resources: dict
1✔
40
    mappings: dict
1✔
41
    conditions: dict
1✔
42
    parameters: dict
1✔
43

44
    def resolve(self, value: Any) -> Any:
1✔
45
        return resolve_refs_recursively(
1✔
46
            self.account_id,
47
            self.region_name,
48
            self.stack_name,
49
            self.resources,
50
            self.mappings,
51
            self.conditions,
52
            self.parameters,
53
            value,
54
        )
55

56

57
class Transformer:
1✔
58
    """Abstract class for Fn::Transform intrinsic functions"""
59

60
    def transform(self, account_id: str, region_name: str, parameters: dict) -> TransformResult:
1✔
61
        """Apply the transformer to the given parameters and return the modified construct"""
62

63

64
class AwsIncludeTransformer(Transformer):
1✔
65
    """Implements the 'AWS::Include' transform intrinsic function"""
66

67
    def transform(self, account_id: str, region_name: str, parameters: dict) -> TransformResult:
1✔
68
        from localstack.services.cloudformation.engine.template_preparer import parse_template
1✔
69

70
        location = parameters.get("Location")
1✔
71
        if location and location.startswith("s3://"):
1✔
72
            s3_client = connect_to(aws_access_key_id=account_id, region_name=region_name).s3
1✔
73
            bucket, _, path = location.removeprefix("s3://").partition("/")
1✔
74
            try:
1✔
75
                content = testutil.download_s3_object(s3_client, bucket, path)
1✔
76
            except ClientError:
×
77
                LOG.error("client error downloading S3 object '%s/%s'", bucket, path)
×
78
                raise
×
79
            content = parse_template(content)
1✔
80
            return content
1✔
81
        else:
82
            LOG.warning("Unexpected Location parameter for AWS::Include transformer: %s", location)
×
83
        return parameters
×
84

85

86
# maps transformer names to implementing classes
87
transformers: dict[str, type] = {"AWS::Include": AwsIncludeTransformer}
1✔
88

89

90
def apply_intrinsic_transformations(
1✔
91
    account_id: str,
92
    region_name: str,
93
    template: dict,
94
    stack_name: str,
95
    resources: dict,
96
    mappings: dict,
97
    conditions: dict[str, bool],
98
    stack_parameters: dict,
99
) -> dict:
100
    """Resolve constructs using the 'Fn::Transform' intrinsic function."""
101

102
    def _visit(obj, path, **_):
1✔
103
        if isinstance(obj, dict) and "Fn::Transform" in obj:
1✔
104
            transform = (
1✔
105
                obj["Fn::Transform"]
106
                if isinstance(obj["Fn::Transform"], dict)
107
                else {"Name": obj["Fn::Transform"]}
108
            )
109
            transform_name = transform.get("Name")
1✔
110
            transformer_class = transformers.get(transform_name)
1✔
111
            macro_store = get_cloudformation_store(account_id, region_name).macros
1✔
112
            parameters = transform.get("Parameters") or {}
1✔
113
            parameters = resolve_refs_recursively(
1✔
114
                account_id,
115
                region_name,
116
                stack_name,
117
                resources,
118
                mappings,
119
                conditions,
120
                stack_parameters,
121
                parameters,
122
            )
123
            if transformer_class:
1✔
124
                transformer = transformer_class()
1✔
125
                transformed = transformer.transform(account_id, region_name, parameters)
1✔
126
                obj_copy = deepcopy(obj)
1✔
127
                obj_copy.pop("Fn::Transform")
1✔
128
                obj_copy.update(transformed)
1✔
129
                return obj_copy
1✔
130

131
            elif transform_name in macro_store:
1✔
132
                obj_copy = deepcopy(obj)
1✔
133
                obj_copy.pop("Fn::Transform")
1✔
134
                result = execute_macro(
1✔
135
                    account_id, region_name, obj_copy, transform, stack_parameters, parameters, True
136
                )
137
                return result
1✔
138
            else:
139
                LOG.warning(
×
140
                    "Unsupported transform function '%s' used in %s", transform_name, stack_name
141
                )
142
        return obj
1✔
143

144
    return recurse_object(template, _visit)
1✔
145

146

147
def apply_global_transformations(
1✔
148
    account_id: str,
149
    region_name: str,
150
    template: dict,
151
    stack_name: str,
152
    resources: dict,
153
    mappings: dict,
154
    conditions: dict[str, bool],
155
    stack_parameters: dict,
156
) -> dict:
157
    processed_template = deepcopy(template)
1✔
158
    transformations = format_template_transformations_into_list(
1✔
159
        processed_template.get("Transform", [])
160
    )
161
    for transformation in transformations:
1✔
162
        transformation_parameters = resolve_refs_recursively(
1✔
163
            account_id,
164
            region_name,
165
            stack_name,
166
            resources,
167
            mappings,
168
            conditions,
169
            stack_parameters,
170
            transformation.get("Parameters", {}),
171
        )
172

173
        if not isinstance(transformation["Name"], str):
1✔
174
            # TODO this should be done during template validation
175
            raise CommonServiceException(
1✔
176
                code="ValidationError",
177
                status_code=400,
178
                message="Key Name of transform definition must be a string.",
179
                sender_fault=True,
180
            )
181
        elif transformation["Name"] == SERVERLESS_TRANSFORM:
1✔
182
            processed_template = apply_serverless_transformation(
1✔
183
                account_id, region_name, processed_template, stack_parameters
184
            )
185
        elif transformation["Name"] == EXTENSIONS_TRANSFORM:
1✔
186
            resolve_context = ResolveRefsRecursivelyContext(
1✔
187
                account_id,
188
                region_name,
189
                stack_name,
190
                resources,
191
                mappings,
192
                conditions,
193
                stack_parameters,
194
            )
195

196
            processed_template = apply_language_extensions_transform(
1✔
197
                processed_template,
198
                resolve_context,
199
            )
200
        elif transformation["Name"] == SECRETSMANAGER_TRANSFORM:
1✔
201
            # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/transform-aws-secretsmanager.html
202
            LOG.warning("%s is not yet supported. Ignoring.", SECRETSMANAGER_TRANSFORM)
×
203
        else:
204
            processed_template = execute_macro(
1✔
205
                account_id,
206
                region_name,
207
                parsed_template=template,
208
                macro=transformation,
209
                stack_parameters=stack_parameters,
210
                transformation_parameters=transformation_parameters,
211
            )
212

213
    return processed_template
1✔
214

215

216
def format_template_transformations_into_list(transforms: list | dict | str) -> list[dict]:
1✔
217
    """
218
    The value of the Transform attribute can be:
219
     - a transformation name
220
     - an object like {Name: transformation, Parameters:{}}
221
     - a list a list of names of the transformations to apply
222
     - a list of objects defining a transformation
223
     so the objective of this function is to normalize the list of transformations to apply into a list of transformation objects
224
    """
225
    formatted_transformations = []
1✔
226
    if isinstance(transforms, str):
1✔
227
        formatted_transformations.append({"Name": transforms})
1✔
228

229
    if isinstance(transforms, dict):
1✔
230
        formatted_transformations.append(transforms)
1✔
231

232
    if isinstance(transforms, list):
1✔
233
        for transformation in transforms:
1✔
234
            if isinstance(transformation, str):
1✔
235
                formatted_transformations.append({"Name": transformation})
1✔
236
            if isinstance(transformation, dict):
1✔
237
                formatted_transformations.append(transformation)
1✔
238

239
    return formatted_transformations
1✔
240

241

242
def execute_macro(
1✔
243
    account_id: str,
244
    region_name: str,
245
    parsed_template: dict,
246
    macro: dict,
247
    stack_parameters: dict,
248
    transformation_parameters: dict,
249
    is_intrinsic=False,
250
) -> str:
251
    macro_definition = get_cloudformation_store(account_id, region_name).macros.get(macro["Name"])
1✔
252
    if not macro_definition:
1✔
253
        raise FailedTransformationException(
×
254
            macro["Name"], f"Transformation {macro['Name']} is not supported."
255
        )
256

257
    formatted_stack_parameters = {}
1✔
258
    for key, value in stack_parameters.items():
1✔
259
        # TODO: we want to support other types of parameters
260
        parameter_value = value.get("ParameterValue")
1✔
261
        if value.get("ParameterType") == "CommaDelimitedList" and isinstance(parameter_value, str):
1✔
262
            formatted_stack_parameters[key] = parameter_value.split(",")
1✔
263
        else:
264
            formatted_stack_parameters[key] = parameter_value
1✔
265

266
    transformation_id = f"{account_id}::{macro['Name']}"
1✔
267
    event = {
1✔
268
        "region": region_name,
269
        "accountId": account_id,
270
        "fragment": parsed_template,
271
        "transformId": transformation_id,
272
        "params": transformation_parameters,
273
        "requestId": long_uid(),
274
        "templateParameterValues": formatted_stack_parameters,
275
    }
276

277
    client = connect_to(aws_access_key_id=account_id, region_name=region_name).lambda_
1✔
278
    try:
1✔
279
        invocation = client.invoke(
1✔
280
            FunctionName=macro_definition["FunctionName"], Payload=json.dumps(event)
281
        )
282
    except ClientError:
×
UNCOV
283
        LOG.error(
×
284
            "client error executing lambda function '%s' with payload '%s'",
285
            macro_definition["FunctionName"],
286
            json.dumps(event),
287
        )
UNCOV
288
        raise
×
289
    if invocation.get("StatusCode") != 200 or invocation.get("FunctionError") == "Unhandled":
1✔
290
        raise FailedTransformationException(
1✔
291
            transformation=macro["Name"],
292
            message=f"Received malformed response from transform {transformation_id}. Rollback requested by user.",
293
        )
294
    result = json.loads(invocation["Payload"].read())
1✔
295

296
    if result.get("status") != "success":
1✔
297
        error_message = result.get("errorMessage")
1✔
298
        message = (
1✔
299
            f"Transform {transformation_id} failed with: {error_message}. Rollback requested by user."
300
            if error_message
301
            else f"Transform {transformation_id} failed without an error message.. Rollback requested by user."
302
        )
303
        raise FailedTransformationException(transformation=macro["Name"], message=message)
1✔
304

305
    if not isinstance(result.get("fragment"), dict) and not is_intrinsic:
1✔
306
        raise FailedTransformationException(
1✔
307
            transformation=macro["Name"],
308
            message="Template format error: unsupported structure.. Rollback requested by user.",
309
        )
310

311
    return result.get("fragment")
1✔
312

313

314
def apply_language_extensions_transform(
1✔
315
    template: dict,
316
    resolve_context: ResolveRefsRecursivelyContext,
317
) -> dict:
318
    """
319
    Resolve language extensions constructs
320
    """
321

322
    def _visit(obj, path, **_):
1✔
323
        # Fn::ForEach
324
        # TODO: can this be used in non-resource positions?
325
        if isinstance(obj, dict) and any("Fn::ForEach" in key for key in obj):
1✔
326
            newobj = {}
1✔
327
            for key in obj:
1✔
328
                if "Fn::ForEach" not in key:
1✔
329
                    newobj[key] = obj[key]
1✔
330
                    continue
1✔
331

332
                new_entries = expand_fn_foreach(obj[key], resolve_context)
1✔
333
                newobj.update(**new_entries)
1✔
334
            return newobj
1✔
335
        # Fn::Length
336
        elif isinstance(obj, dict) and "Fn::Length" in obj:
1✔
337
            value = obj["Fn::Length"]
1✔
338
            if isinstance(value, dict):
1✔
339
                value = resolve_context.resolve(value)
1✔
340

341
            if isinstance(value, list):
1✔
342
                # TODO: what if one of the elements was AWS::NoValue?
343
                # no conversion required
344
                return len(value)
1✔
345
            elif isinstance(value, str):
×
346
                length = len(value.split(","))
×
347
                return length
×
UNCOV
348
            return obj
×
349
        elif isinstance(obj, dict) and "Fn::ToJsonString" in obj:
1✔
350
            # TODO: is the default representation ok here?
351
            return json.dumps(obj["Fn::ToJsonString"], default=str, separators=(",", ":"))
1✔
352

353
            # reference
354
        return obj
1✔
355

356
    return recurse_object(template, _visit)
1✔
357

358

359
def expand_fn_foreach(
1✔
360
    foreach_defn: list,
361
    resolve_context: ResolveRefsRecursivelyContext,
362
    extra_replace_mapping: dict | None = None,
363
) -> dict:
364
    if len(foreach_defn) != 3:
1✔
UNCOV
365
        raise ValidationError(
×
366
            f"Fn::ForEach: invalid number of arguments, expected 3 got {len(foreach_defn)}"
367
        )
368
    output = {}
1✔
369
    iteration_name, iteration_value, template = foreach_defn
1✔
370
    if not isinstance(iteration_name, str):
1✔
UNCOV
371
        raise ValidationError(
×
372
            f"Fn::ForEach: incorrect type for iteration name '{iteration_name}', expected str"
373
        )
374
    if isinstance(iteration_value, dict):
1✔
375
        # we have a reference
376
        if "Ref" in iteration_value:
1✔
377
            iteration_value = resolve_context.resolve(iteration_value)
1✔
378
        else:
379
            raise NotImplementedError(
380
                f"Fn::Transform: intrinsic {iteration_value} not supported in this position yet"
381
            )
382
    if not isinstance(iteration_value, list):
1✔
UNCOV
383
        raise ValidationError(
×
384
            f"Fn::ForEach: incorrect type for iteration variables '{iteration_value}', expected list"
385
        )
386

387
    if not isinstance(template, dict):
1✔
UNCOV
388
        raise ValidationError(
×
389
            f"Fn::ForEach: incorrect type for template '{template}', expected dict"
390
        )
391

392
    # TODO: locations other than resources
393
    replace_template_value = "${" + iteration_name + "}"
1✔
394
    for variable in iteration_value:
1✔
395
        # there might be multiple children, which could themselves be a `Fn::ForEach` call
396
        for logical_resource_id_template in template:
1✔
397
            if logical_resource_id_template.startswith("Fn::ForEach"):
1✔
398
                result = expand_fn_foreach(
1✔
399
                    template[logical_resource_id_template],
400
                    resolve_context,
401
                    {iteration_name: variable},
402
                )
403
                output.update(**result)
1✔
404
                continue
1✔
405

406
            if replace_template_value not in logical_resource_id_template:
1✔
UNCOV
407
                raise ValidationError("Fn::ForEach: no placeholder in logical resource id")
×
408

409
            def gen_visit(variable: str) -> Callable:
1✔
410
                def _visit(obj: Any, path: Any):
1✔
411
                    if isinstance(obj, dict) and "Ref" in obj:
1✔
412
                        ref_variable = obj["Ref"]
1✔
413
                        if ref_variable == iteration_name:
1✔
414
                            return variable
1✔
415
                    elif isinstance(obj, dict) and "Fn::Sub" in obj:
1✔
416
                        arguments = recurse_object(obj["Fn::Sub"], _visit)
1✔
417
                        if isinstance(arguments, str):
1✔
418
                            # simple case
419
                            # TODO: can this reference anything outside of the template?
420
                            result = arguments
1✔
421
                            variables_found = re.findall("\\${([^}]+)}", arguments)
1✔
422
                            for var in variables_found:
1✔
423
                                if var == iteration_name:
1✔
424
                                    result = result.replace(f"${{{var}}}", variable)
1✔
425
                            return result
1✔
426
                        else:
427
                            raise NotImplementedError
428
                    elif isinstance(obj, dict) and "Fn::Join" in obj:
1✔
429
                        # first visit arguments
430
                        arguments = recurse_object(
1✔
431
                            obj["Fn::Join"],
432
                            _visit,
433
                        )
434
                        separator, items = arguments
1✔
435
                        return separator.join(items)
1✔
436
                    return obj
1✔
437

438
                return _visit
1✔
439

440
            logical_resource_id = logical_resource_id_template.replace(
1✔
441
                replace_template_value, variable
442
            )
443
            for key, value in (extra_replace_mapping or {}).items():
1✔
444
                logical_resource_id = logical_resource_id.replace("${" + key + "}", value)
1✔
445
            resource_body = copy.deepcopy(template[logical_resource_id_template])
1✔
446
            body = recurse_object(resource_body, gen_visit(variable))
1✔
447
            output[logical_resource_id] = body
1✔
448

449
    return output
1✔
450

451

452
def apply_serverless_transformation(
1✔
453
    account_id: str, region_name: str, parsed_template: dict, template_parameters: dict
454
) -> str | None:
455
    """only returns string when parsing SAM template, otherwise None"""
456
    # TODO: we might also want to override the access key ID to account ID
457
    region_before = os.environ.get("AWS_DEFAULT_REGION")
1✔
458
    if boto3.session.Session().region_name is None:
1✔
459
        os.environ["AWS_DEFAULT_REGION"] = region_name
1✔
460
    loader = create_policy_loader()
1✔
461
    simplified_parameters = {
1✔
462
        k: v.get("ResolvedValue") or v["ParameterValue"] for k, v in template_parameters.items()
463
    }
464

465
    try:
1✔
466
        transformed = transform_sam(parsed_template, simplified_parameters, loader)
1✔
467
        return transformed
1✔
468
    except Exception as e:
×
UNCOV
469
        raise FailedTransformationException(transformation=SERVERLESS_TRANSFORM, message=str(e))
×
470
    finally:
471
        # Note: we need to fix boto3 region, otherwise AWS SAM transformer fails
472
        os.environ.pop("AWS_DEFAULT_REGION", None)
1✔
473
        if region_before is not None:
1✔
UNCOV
474
            os.environ["AWS_DEFAULT_REGION"] = region_before
×
475

476

477
class FailedTransformationException(Exception):
1✔
478
    transformation: str
1✔
479
    msg: str
1✔
480

481
    def __init__(self, transformation: str, message: str = ""):
1✔
482
        self.transformation = transformation
1✔
483
        self.message = message
1✔
484
        super().__init__(self.message)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc