• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 17086927072

19 Aug 2025 10:02PM UTC coverage: 86.889% (+0.01%) from 86.875%
17086927072

push

github

web-flow
APIGW: fix TestInvokeMethod path logic (#13030)

4 of 23 new or added lines in 1 file covered. (17.39%)

264 existing lines in 17 files now uncovered.

67018 of 77131 relevant lines covered (86.89%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.44
/localstack-core/localstack/services/cloudformation/engine/v2/change_set_model_transform.py
1
import copy
1✔
2
import json
1✔
3
import logging
1✔
4
import os
1✔
5
import re
1✔
6
from typing import Any, Final, TypedDict
1✔
7

8
import boto3
1✔
9
import jsonpath_ng
1✔
10
from botocore.exceptions import ClientError
1✔
11
from samtranslator.translator.transform import transform as transform_sam
1✔
12

13
from localstack.aws.connect import connect_to
1✔
14
from localstack.services.cloudformation.engine.policy_loader import create_policy_loader
1✔
15
from localstack.services.cloudformation.engine.template_preparer import parse_template
1✔
16
from localstack.services.cloudformation.engine.transformers import (
1✔
17
    FailedTransformationException,
18
)
19
from localstack.services.cloudformation.engine.v2.change_set_model import (
1✔
20
    ChangeType,
21
    FnTransform,
22
    Maybe,
23
    NodeGlobalTransform,
24
    NodeIntrinsicFunction,
25
    NodeIntrinsicFunctionFnTransform,
26
    NodeProperties,
27
    NodeResource,
28
    NodeResources,
29
    NodeTransform,
30
    Nothing,
31
    Scope,
32
    is_nothing,
33
)
34
from localstack.services.cloudformation.engine.v2.change_set_model_preproc import (
1✔
35
    ChangeSetModelPreproc,
36
    PreprocEntityDelta,
37
    PreprocProperties,
38
)
39
from localstack.services.cloudformation.engine.validations import ValidationError
1✔
40
from localstack.services.cloudformation.stores import get_cloudformation_store
1✔
41
from localstack.services.cloudformation.v2.entities import ChangeSet
1✔
42
from localstack.services.cloudformation.v2.types import engine_parameter_value
1✔
43
from localstack.utils import testutil
1✔
44
from localstack.utils.strings import long_uid
1✔
45

46
LOG = logging.getLogger(__name__)
1✔
47

48
SERVERLESS_TRANSFORM = "AWS::Serverless-2016-10-31"
1✔
49
EXTENSIONS_TRANSFORM = "AWS::LanguageExtensions"
1✔
50
SECRETSMANAGER_TRANSFORM = "AWS::SecretsManager-2020-07-23"
1✔
51
INCLUDE_TRANSFORM = "AWS::Include"
1✔
52

53
_SCOPE_TRANSFORM_TEMPLATE_OUTCOME: Final[Scope] = Scope("TRANSFORM_TEMPLATE_OUTCOME")
1✔
54

55

56
# TODO: evaluate the use of subtypes to represent and validate types of transforms
57
class GlobalTransform:
1✔
58
    name: str
1✔
59
    parameters: Maybe[dict]
1✔
60

61
    def __init__(self, name: str, parameters: Maybe[dict]):
1✔
62
        self.name = name
1✔
63
        self.parameters = parameters
1✔
64

65

66
class TransformPreprocParameter(TypedDict):
1✔
67
    # TODO: expand
68
    ParameterKey: str
1✔
69
    ParameterValue: Any
1✔
70
    ParameterType: str | None
1✔
71

72

73
class ChangeSetModelTransform(ChangeSetModelPreproc):
1✔
74
    _before_parameters: Final[dict]
1✔
75
    _after_parameters: Final[dict]
1✔
76
    _before_template: Final[Maybe[dict]]
1✔
77
    _after_template: Final[Maybe[dict]]
1✔
78

79
    def __init__(
1✔
80
        self,
81
        change_set: ChangeSet,
82
        before_parameters: dict,
83
        after_parameters: dict,
84
        before_template: dict | None,
85
        after_template: dict | None,
86
    ):
87
        super().__init__(change_set=change_set)
1✔
88
        self._before_parameters = before_parameters
1✔
89
        self._after_parameters = after_parameters
1✔
90
        self._before_template = before_template or Nothing
1✔
91
        self._after_template = after_template or Nothing
1✔
92

93
    # Ported from v1:
94
    @staticmethod
1✔
95
    def _apply_global_serverless_transformation(
1✔
96
        region_name: str, template: dict, parameters: dict
97
    ) -> dict:
98
        """only returns string when parsing SAM template, otherwise None"""
99
        # TODO: we might also want to override the access key ID to account ID
100
        region_before = os.environ.get("AWS_DEFAULT_REGION")
1✔
101
        if boto3.session.Session().region_name is None:
1✔
102
            os.environ["AWS_DEFAULT_REGION"] = region_name
1✔
103
        loader = create_policy_loader()
1✔
104
        # The following transformation function can carry out in-place changes ensure this cannot occur.
105
        template = copy.deepcopy(template)
1✔
106
        parameters = copy.deepcopy(parameters)
1✔
107
        try:
1✔
108
            transformed = transform_sam(template, parameters, loader)
1✔
109
            return transformed
1✔
110
        except Exception as e:
×
UNCOV
111
            raise FailedTransformationException(transformation=SERVERLESS_TRANSFORM, message=str(e))
×
112
        finally:
113
            # Note: we need to fix boto3 region, otherwise AWS SAM transformer fails
114
            os.environ.pop("AWS_DEFAULT_REGION", None)
1✔
115
            if region_before is not None:
1✔
UNCOV
116
                os.environ["AWS_DEFAULT_REGION"] = region_before
×
117

118
    def _compute_include_transform(self, parameters: dict, fragment: dict) -> dict:
1✔
119
        location = parameters.get("Location")
1✔
120
        if not location or not location.startswith("s3://"):
1✔
UNCOV
121
            raise FailedTransformationException(
×
122
                transformation=INCLUDE_TRANSFORM,
123
                message="Unexpected Location parameter for AWS::Include transformer: %s" % location,
124
            )
125

126
        s3_client = connect_to(
1✔
127
            aws_access_key_id=self._change_set.account_id, region_name=self._change_set.region_name
128
        ).s3
129
        bucket, _, path = location.removeprefix("s3://").partition("/")
1✔
130
        try:
1✔
131
            content = testutil.download_s3_object(s3_client, bucket, path)
1✔
132
        except ClientError:
×
UNCOV
133
            raise FailedTransformationException(
×
134
                transformation=INCLUDE_TRANSFORM,
135
                message="Error downloading S3 object '%s/%s'" % (bucket, path),
136
            )
137
        try:
1✔
138
            template_to_include = parse_template(content)
1✔
139
        except Exception as e:
×
UNCOV
140
            raise FailedTransformationException(transformation=INCLUDE_TRANSFORM, message=str(e))
×
141

142
        return {**fragment, **template_to_include}
1✔
143

144
    def _apply_global_transform(
1✔
145
        self, global_transform: GlobalTransform, template: dict, parameters: dict
146
    ) -> dict:
147
        transform_name = global_transform.name
1✔
148
        if transform_name == EXTENSIONS_TRANSFORM:
1✔
149
            # Applied lazily in downstream tasks (see ChangeSetModelPreproc).
150
            transformed_template = template
1✔
151
        elif transform_name == SERVERLESS_TRANSFORM:
1✔
152
            transformed_template = self._apply_global_serverless_transformation(
1✔
153
                region_name=self._change_set.region_name,
154
                template=template,
155
                parameters=parameters,
156
            )
157
        elif transform_name == SECRETSMANAGER_TRANSFORM:
1✔
158
            # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/transform-aws-secretsmanager.html
159
            LOG.warning("%s is not yet supported. Ignoring.", SECRETSMANAGER_TRANSFORM)
×
UNCOV
160
            transformed_template = template
×
161
        elif transform_name == INCLUDE_TRANSFORM:
1✔
162
            transformed_template = self._compute_include_transform(
1✔
163
                parameters=global_transform.parameters,
164
                fragment=template,
165
            )
166
        else:
167
            transformed_template = self._invoke_macro(
1✔
168
                name=global_transform.name,
169
                parameters=global_transform.parameters
170
                if not is_nothing(global_transform.parameters)
171
                else {},
172
                fragment=template,
173
                allow_string=False,
174
            )
175
        return transformed_template
1✔
176

177
    def transform(self) -> tuple[dict, dict]:
1✔
178
        self._setup_runtime_cache()
1✔
179

180
        node_template = self._change_set.update_model.node_template
1✔
181

182
        parameters_delta = self.visit_node_parameters(node_template.parameters)
1✔
183
        parameters_before = parameters_delta.before
1✔
184
        parameters_after = parameters_delta.after
1✔
185

186
        self.visit_node_resources(node_template.resources)
1✔
187

188
        transform_delta: PreprocEntityDelta[list[GlobalTransform], list[GlobalTransform]] = (
1✔
189
            self.visit_node_transform(node_template.transform)
190
        )
191
        transform_before: Maybe[list[GlobalTransform]] = transform_delta.before
1✔
192
        transform_after: Maybe[list[GlobalTransform]] = transform_delta.after
1✔
193

194
        transformed_before_template = self._before_template
1✔
195
        if transform_before and not is_nothing(self._before_template):
1✔
196
            if _SCOPE_TRANSFORM_TEMPLATE_OUTCOME in self._before_cache:
1✔
197
                transformed_before_template = self._before_cache[_SCOPE_TRANSFORM_TEMPLATE_OUTCOME]
1✔
198
            else:
199
                for before_global_transform in transform_before:
1✔
200
                    if not is_nothing(before_global_transform.name):
1✔
UNCOV
201
                        transformed_before_template = self._apply_global_transform(
×
202
                            global_transform=before_global_transform,
203
                            parameters=parameters_before,
204
                            template=transformed_before_template,
205
                        )
206

207
                # Macro transformations won't remove the transform from the template
208
                if "Transform" in transformed_before_template:
1✔
UNCOV
209
                    transformed_before_template.pop("Transform")
×
210
                self._before_cache[_SCOPE_TRANSFORM_TEMPLATE_OUTCOME] = transformed_before_template
1✔
211

212
        transformed_after_template = self._after_template
1✔
213
        if transform_after and not is_nothing(self._after_template):
1✔
214
            transformed_after_template = self._after_template
1✔
215
            for after_global_transform in transform_after:
1✔
216
                if not is_nothing(after_global_transform.name):
1✔
217
                    transformed_after_template = self._apply_global_transform(
1✔
218
                        global_transform=after_global_transform,
219
                        parameters=parameters_after,
220
                        template=transformed_after_template,
221
                    )
222
            # Macro transformations won't remove the transform from the template
223
            if "Transform" in transformed_after_template:
1✔
224
                transformed_after_template.pop("Transform")
1✔
225
            self._after_cache[_SCOPE_TRANSFORM_TEMPLATE_OUTCOME] = transformed_after_template
1✔
226

227
        self._save_runtime_cache()
1✔
228

229
        return transformed_before_template, transformed_after_template
1✔
230

231
    def visit_node_global_transform(
1✔
232
        self, node_global_transform: NodeGlobalTransform
233
    ) -> PreprocEntityDelta[GlobalTransform, GlobalTransform]:
234
        change_type = node_global_transform.change_type
1✔
235

236
        name_delta = self.visit(node_global_transform.name)
1✔
237
        parameters_delta = self.visit(node_global_transform.parameters)
1✔
238

239
        before = Nothing
1✔
240
        if change_type != ChangeType.CREATED:
1✔
241
            before = GlobalTransform(name=name_delta.before, parameters=parameters_delta.before)
1✔
242
        after = Nothing
1✔
243
        if change_type != ChangeType.REMOVED:
1✔
244
            after = GlobalTransform(name=name_delta.after, parameters=parameters_delta.after)
1✔
245
        return PreprocEntityDelta(before=before, after=after)
1✔
246

247
    def visit_node_transform(
1✔
248
        self, node_transform: NodeTransform
249
    ) -> PreprocEntityDelta[list[GlobalTransform], list[GlobalTransform]]:
250
        change_type = node_transform.change_type
1✔
251
        before = [] if change_type != ChangeType.CREATED else Nothing
1✔
252
        after = [] if change_type != ChangeType.REMOVED else Nothing
1✔
253
        for change_set_entity in node_transform.global_transforms:
1✔
254
            if not isinstance(change_set_entity.name.value, str):
1✔
255
                raise ValidationError("Key Name of transform definition must be a string.")
1✔
256

257
            delta: PreprocEntityDelta[GlobalTransform, GlobalTransform] = self.visit(
1✔
258
                change_set_entity=change_set_entity
259
            )
260
            delta_before = delta.before
1✔
261
            delta_after = delta.after
1✔
262
            if not is_nothing(before) and not is_nothing(delta_before):
1✔
263
                before.append(delta_before)
1✔
264
            if not is_nothing(after) and not is_nothing(delta_after):
1✔
265
                after.append(delta_after)
1✔
266
        return PreprocEntityDelta(before=before, after=after)
1✔
267

268
    def _compute_fn_transform(
1✔
269
        self, macro_definition: Any, siblings: Any, allow_string: False
270
    ) -> Any:
271
        def _normalize_transform(obj):
1✔
272
            transforms = []
1✔
273

274
            if isinstance(obj, str):
1✔
275
                transforms.append({"Name": obj, "Parameters": {}})
1✔
276

277
            if isinstance(obj, dict):
1✔
278
                transforms.append(obj)
1✔
279

280
            if isinstance(obj, list):
1✔
281
                for v in obj:
1✔
282
                    if isinstance(v, str):
1✔
UNCOV
283
                        transforms.append({"Name": v, "Parameters": {}})
×
284

285
                    if isinstance(v, dict):
1✔
286
                        if not v.get("Parameters"):
1✔
UNCOV
287
                            v["Parameters"] = {}
×
288
                        transforms.append(v)
1✔
289

290
            return transforms
1✔
291

292
        normalized_transforms = _normalize_transform(macro_definition)
1✔
293
        transform_output = copy.deepcopy(siblings)
1✔
294
        for transform in normalized_transforms:
1✔
295
            transform_name = transform["Name"]
1✔
296
            if transform_name == INCLUDE_TRANSFORM:
1✔
297
                transform_output = self._compute_include_transform(
1✔
298
                    parameters=transform["Parameters"], fragment=transform_output
299
                )
300
            else:
301
                transform_output: dict | str = self._invoke_macro(
1✔
302
                    fragment=transform_output,
303
                    name=transform["Name"],
304
                    parameters=transform.get("Parameters", {}),
305
                    allow_string=allow_string,
306
                )
307

308
        if isinstance(transform_output, dict) and FnTransform in transform_output:
1✔
309
            transform_output.pop(FnTransform)
1✔
310

311
        return transform_output
1✔
312

313
    def _replace_at_jsonpath(self, template: dict, path: str, result: Any):
1✔
314
        pattern = jsonpath_ng.parse(path)
1✔
315
        result_template = pattern.update(template, result)
1✔
316

317
        return result_template
1✔
318

319
    def visit_node_intrinsic_function_fn_transform(
1✔
320
        self, node_intrinsic_function: NodeIntrinsicFunctionFnTransform
321
    ) -> PreprocEntityDelta:
322
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
323
        parent_json_path = node_intrinsic_function.scope.parent.jsonpath
1✔
324

325
        # Only when a FnTransform is used as Property value the macro function is allowed to return a str
326
        property_value_regex = r"\.(Properties)"
1✔
327
        allow_string = False
1✔
328
        if re.search(property_value_regex, parent_json_path):
1✔
329
            allow_string = True
1✔
330

331
        if not is_nothing(arguments_delta.before):
1✔
UNCOV
332
            before = self._compute_fn_transform(
×
333
                arguments_delta.before,
334
                node_intrinsic_function.before_siblings,
335
                allow_string=allow_string,
336
            )
UNCOV
337
            updated_before_template = self._replace_at_jsonpath(
×
338
                self._before_template, parent_json_path, before
339
            )
UNCOV
340
            self._after_cache[_SCOPE_TRANSFORM_TEMPLATE_OUTCOME] = updated_before_template
×
341
        else:
342
            before = Nothing
1✔
343

344
        if not is_nothing(arguments_delta.after):
1✔
345
            after = self._compute_fn_transform(
1✔
346
                arguments_delta.after,
347
                node_intrinsic_function.after_siblings,
348
                allow_string=allow_string,
349
            )
350
            updated_after_template = self._replace_at_jsonpath(
1✔
351
                self._after_template, parent_json_path, after
352
            )
353
            self._after_cache[_SCOPE_TRANSFORM_TEMPLATE_OUTCOME] = updated_after_template
1✔
354
        else:
UNCOV
355
            after = Nothing
×
356

357
        self._save_runtime_cache()
1✔
358
        return PreprocEntityDelta(before=before, after=after)
1✔
359

360
    def visit_node_properties(
1✔
361
        self, node_properties: NodeProperties
362
    ) -> PreprocEntityDelta[PreprocProperties, PreprocProperties]:
363
        if not is_nothing(node_properties.fn_transform):
1✔
364
            self.visit_node_intrinsic_function_fn_transform(node_properties.fn_transform)
1✔
365

366
        return super().visit_node_properties(node_properties=node_properties)
1✔
367

368
    def visit_node_resource(self, node_resource: NodeResource) -> PreprocEntityDelta:
1✔
369
        if not is_nothing(node_resource.fn_transform):
1✔
370
            self.visit_node_intrinsic_function_fn_transform(
1✔
371
                node_intrinsic_function=node_resource.fn_transform
372
            )
373

374
        return super().visit_node_resource(node_resource)
1✔
375

376
    def visit_node_resources(self, node_resources: NodeResources) -> PreprocEntityDelta:
1✔
377
        if not is_nothing(node_resources.fn_transform):
1✔
378
            self.visit_node_intrinsic_function_fn_transform(
1✔
379
                node_intrinsic_function=node_resources.fn_transform
380
            )
381

382
        return super().visit_node_resources(node_resources=node_resources)
1✔
383

384
    def _invoke_macro(self, name: str, parameters: dict, fragment: dict, allow_string=False):
1✔
385
        account_id = self._change_set.account_id
1✔
386
        region_name = self._change_set.region_name
1✔
387
        macro_definition = get_cloudformation_store(
1✔
388
            account_id=account_id, region_name=region_name
389
        ).macros.get(name)
390

391
        if not macro_definition:
1✔
UNCOV
392
            raise FailedTransformationException(name, f"Transformation {name} is not supported.")
×
393

394
        simplified_parameters = {}
1✔
395
        if resolved_parameters := self._change_set.resolved_parameters:
1✔
396
            for key, resolved_parameter in resolved_parameters.items():
1✔
397
                final_value = engine_parameter_value(resolved_parameter)
1✔
398
                simplified_parameters[key] = (
1✔
399
                    final_value.split(",")
400
                    if resolved_parameter["type_"] == "CommaDelimitedList"
401
                    else final_value
402
                )
403

404
        transformation_id = f"{account_id}::{name}"
1✔
405
        event = {
1✔
406
            "region": region_name,
407
            "accountId": account_id,
408
            "fragment": fragment,
409
            "transformId": transformation_id,
410
            "params": parameters,
411
            "requestId": long_uid(),
412
            "templateParameterValues": simplified_parameters,
413
        }
414

415
        client = connect_to(aws_access_key_id=account_id, region_name=region_name).lambda_
1✔
416
        try:
1✔
417
            invocation = client.invoke(
1✔
418
                FunctionName=macro_definition["FunctionName"], Payload=json.dumps(event)
419
            )
UNCOV
420
        except ClientError:
×
UNCOV
421
            LOG.error(
×
422
                "client error executing lambda function '%s' with payload '%s'",
423
                macro_definition["FunctionName"],
424
                json.dumps(event),
425
            )
UNCOV
426
            raise
×
427
        if invocation.get("StatusCode") != 200 or invocation.get("FunctionError") == "Unhandled":
1✔
428
            raise FailedTransformationException(
1✔
429
                transformation=name,
430
                message=f"Received malformed response from transform {transformation_id}. Rollback requested by user.",
431
            )
432
        result = json.loads(invocation["Payload"].read())
1✔
433

434
        if result.get("status") != "success":
1✔
435
            error_message = result.get("errorMessage")
1✔
436
            message = (
1✔
437
                f"Transform {transformation_id} failed with: {error_message}. Rollback requested by user."
438
                if error_message
439
                else f"Transform {transformation_id} failed without an error message.. Rollback requested by user."
440
            )
441
            raise FailedTransformationException(transformation=name, message=message)
1✔
442

443
        if not isinstance(result.get("fragment"), dict) and not allow_string:
1✔
444
            raise FailedTransformationException(
1✔
445
                transformation=name,
446
                message="Template format error: unsupported structure.. Rollback requested by user.",
447
            )
448

449
        return result.get("fragment")
1✔
450

451
    def visit_node_intrinsic_function_fn_get_att(
1✔
452
        self, node_intrinsic_function: NodeIntrinsicFunction
453
    ) -> PreprocEntityDelta:
454
        return self.visit(node_intrinsic_function.arguments)
1✔
455

456
    def visit_node_intrinsic_function_fn_sub(
1✔
457
        self, node_intrinsic_function: NodeIntrinsicFunction
458
    ) -> PreprocEntityDelta:
459
        try:
1✔
460
            # If an argument is a Parameter it should be resolved, any other case, ignore it
461
            return super().visit_node_intrinsic_function_fn_sub(node_intrinsic_function)
1✔
462
        except RuntimeError:
1✔
463
            return self.visit(node_intrinsic_function.arguments)
1✔
464

465
    def visit_node_intrinsic_function_fn_split(
1✔
466
        self, node_intrinsic_function: NodeIntrinsicFunction
467
    ) -> PreprocEntityDelta:
468
        try:
1✔
469
            # If an argument is a Parameter it should be resolved, any other case, ignore it
470
            return super().visit_node_intrinsic_function_fn_split(node_intrinsic_function)
1✔
471
        except RuntimeError:
1✔
472
            return self.visit(node_intrinsic_function.arguments)
1✔
473

474
    def visit_node_intrinsic_function_fn_select(
1✔
475
        self, node_intrinsic_function: NodeIntrinsicFunction
476
    ) -> PreprocEntityDelta:
477
        try:
1✔
478
            # If an argument is a Parameter it should be resolved, any other case, ignore it
479
            return super().visit_node_intrinsic_function_fn_select(node_intrinsic_function)
1✔
480
        except RuntimeError:
1✔
481
            return self.visit(node_intrinsic_function.arguments)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc