• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / beb06ec3-fc0e-4100-b8f7-fbc8389f86d7

08 May 2025 05:15PM UTC coverage: 86.63% (-0.03%) from 86.66%
beb06ec3-fc0e-4100-b8f7-fbc8389f86d7

push

circleci

web-flow
CFn v2: Skip media type assertion (#12597)

64324 of 74251 relevant lines covered (86.63%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.94
/localstack-core/localstack/services/cloudformation/provider.py
1
import copy
1✔
2
import json
1✔
3
import logging
1✔
4
import re
1✔
5
from collections import defaultdict
1✔
6
from copy import deepcopy
1✔
7

8
from localstack.aws.api import CommonServiceException, RequestContext, handler
1✔
9
from localstack.aws.api.cloudformation import (
1✔
10
    AlreadyExistsException,
11
    CallAs,
12
    ChangeSetNameOrId,
13
    ChangeSetNotFoundException,
14
    ChangeSetType,
15
    ClientRequestToken,
16
    CloudformationApi,
17
    CreateChangeSetInput,
18
    CreateChangeSetOutput,
19
    CreateStackInput,
20
    CreateStackInstancesInput,
21
    CreateStackInstancesOutput,
22
    CreateStackOutput,
23
    CreateStackSetInput,
24
    CreateStackSetOutput,
25
    DeleteChangeSetOutput,
26
    DeleteStackInstancesInput,
27
    DeleteStackInstancesOutput,
28
    DeleteStackSetOutput,
29
    DeletionMode,
30
    DescribeChangeSetOutput,
31
    DescribeStackEventsOutput,
32
    DescribeStackResourceOutput,
33
    DescribeStackResourcesOutput,
34
    DescribeStackSetOperationOutput,
35
    DescribeStackSetOutput,
36
    DescribeStacksOutput,
37
    DisableRollback,
38
    EnableTerminationProtection,
39
    ExecuteChangeSetOutput,
40
    ExecutionStatus,
41
    ExportName,
42
    GetTemplateOutput,
43
    GetTemplateSummaryInput,
44
    GetTemplateSummaryOutput,
45
    IncludePropertyValues,
46
    InsufficientCapabilitiesException,
47
    InvalidChangeSetStatusException,
48
    ListChangeSetsOutput,
49
    ListExportsOutput,
50
    ListImportsOutput,
51
    ListStackInstancesInput,
52
    ListStackInstancesOutput,
53
    ListStackResourcesOutput,
54
    ListStackSetsInput,
55
    ListStackSetsOutput,
56
    ListStacksOutput,
57
    ListTypesInput,
58
    ListTypesOutput,
59
    LogicalResourceId,
60
    NextToken,
61
    Parameter,
62
    PhysicalResourceId,
63
    RegisterTypeInput,
64
    RegisterTypeOutput,
65
    RegistryType,
66
    RetainExceptOnCreate,
67
    RetainResources,
68
    RoleARN,
69
    StackName,
70
    StackNameOrId,
71
    StackSetName,
72
    StackStatus,
73
    StackStatusFilter,
74
    TemplateParameter,
75
    TemplateStage,
76
    TypeSummary,
77
    UpdateStackInput,
78
    UpdateStackOutput,
79
    UpdateStackSetInput,
80
    UpdateStackSetOutput,
81
    UpdateTerminationProtectionOutput,
82
    ValidateTemplateInput,
83
    ValidateTemplateOutput,
84
)
85
from localstack.aws.connect import connect_to
1✔
86
from localstack.services.cloudformation import api_utils
1✔
87
from localstack.services.cloudformation.engine import parameters as param_resolver
1✔
88
from localstack.services.cloudformation.engine import template_deployer, template_preparer
1✔
89
from localstack.services.cloudformation.engine.entities import (
1✔
90
    Stack,
91
    StackChangeSet,
92
    StackInstance,
93
    StackSet,
94
)
95
from localstack.services.cloudformation.engine.parameters import mask_no_echo, strip_parameter_type
1✔
96
from localstack.services.cloudformation.engine.resource_ordering import (
1✔
97
    NoResourceInStack,
98
    order_resources,
99
)
100
from localstack.services.cloudformation.engine.template_deployer import (
1✔
101
    NoStackUpdates,
102
)
103
from localstack.services.cloudformation.engine.template_utils import resolve_stack_conditions
1✔
104
from localstack.services.cloudformation.engine.transformers import (
1✔
105
    FailedTransformationException,
106
)
107
from localstack.services.cloudformation.engine.validations import (
1✔
108
    DEFAULT_TEMPLATE_VALIDATIONS,
109
    ValidationError,
110
)
111
from localstack.services.cloudformation.resource_provider import (
1✔
112
    PRO_RESOURCE_PROVIDERS,
113
    ResourceProvider,
114
)
115
from localstack.services.cloudformation.stores import (
1✔
116
    cloudformation_stores,
117
    find_active_stack_by_name_or_id,
118
    find_change_set,
119
    find_stack,
120
    find_stack_by_id,
121
    get_cloudformation_store,
122
)
123
from localstack.state import StateVisitor
1✔
124
from localstack.utils.collections import (
1✔
125
    remove_attributes,
126
    select_attributes,
127
    select_from_typed_dict,
128
)
129
from localstack.utils.json import clone
1✔
130
from localstack.utils.strings import long_uid, short_uid
1✔
131

132
LOG = logging.getLogger(__name__)
1✔
133

134
ARN_CHANGESET_REGEX = re.compile(
1✔
135
    r"arn:(aws|aws-us-gov|aws-cn):cloudformation:[-a-zA-Z0-9]+:\d{12}:changeSet/[a-zA-Z][-a-zA-Z0-9]*/[-a-zA-Z0-9:/._+]+"
136
)
137
ARN_STACK_REGEX = re.compile(
1✔
138
    r"arn:(aws|aws-us-gov|aws-cn):cloudformation:[-a-zA-Z0-9]+:\d{12}:stack/[a-zA-Z][-a-zA-Z0-9]*/[-a-zA-Z0-9:/._+]+"
139
)
140

141

142
def clone_stack_params(stack_params):
1✔
143
    try:
1✔
144
        return clone(stack_params)
1✔
145
    except Exception as e:
×
146
        LOG.info("Unable to clone stack parameters: %s", e)
×
147
        return stack_params
×
148

149

150
def find_stack_instance(stack_set: StackSet, account: str, region: str):
1✔
151
    for instance in stack_set.stack_instances:
1✔
152
        if instance.metadata["Account"] == account and instance.metadata["Region"] == region:
1✔
153
            return instance
1✔
154
    return None
×
155

156

157
def stack_not_found_error(stack_name: str):
1✔
158
    # FIXME
159
    raise ValidationError("Stack with id %s does not exist" % stack_name)
×
160

161

162
def not_found_error(message: str):
1✔
163
    # FIXME
164
    raise ResourceNotFoundException(message)
×
165

166

167
class ResourceNotFoundException(CommonServiceException):
1✔
168
    def __init__(self, message=None):
1✔
169
        super().__init__("ResourceNotFoundException", status_code=404, message=message)
×
170

171

172
class InternalFailure(CommonServiceException):
1✔
173
    def __init__(self, message=None):
1✔
174
        super().__init__("InternalFailure", status_code=500, message=message, sender_fault=False)
×
175

176

177
class CloudformationProvider(CloudformationApi):
1✔
178
    def _stack_status_is_active(self, stack_status: str) -> bool:
1✔
179
        return stack_status not in [StackStatus.DELETE_COMPLETE]
1✔
180

181
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
182
        visitor.visit(cloudformation_stores)
×
183

184
    @handler("CreateStack", expand=False)
1✔
185
    def create_stack(self, context: RequestContext, request: CreateStackInput) -> CreateStackOutput:
1✔
186
        # TODO: test what happens when both TemplateUrl and Body are specified
187
        state = get_cloudformation_store(context.account_id, context.region)
1✔
188

189
        stack_name = request.get("StackName")
1✔
190

191
        # get stacks by name
192
        active_stack_candidates = [
1✔
193
            s
194
            for s in state.stacks.values()
195
            if s.stack_name == stack_name and self._stack_status_is_active(s.status)
196
        ]
197

198
        # TODO: fix/implement this code path
199
        #   this needs more investigation how Cloudformation handles it (e.g. normal stack create or does it create a separate changeset?)
200
        # REVIEW_IN_PROGRESS is another special status
201
        # in this case existing changesets are set to obsolete and the stack is created
202
        # review_stack_candidates = [s for s in stack_candidates if s.status == StackStatus.REVIEW_IN_PROGRESS]
203
        # if review_stack_candidates:
204
        # set changesets to obsolete
205
        # for cs in review_stack_candidates[0].change_sets:
206
        #     cs.execution_status = ExecutionStatus.OBSOLETE
207

208
        if active_stack_candidates:
1✔
209
            raise AlreadyExistsException(f"Stack [{stack_name}] already exists")
1✔
210

211
        template_body = request.get("TemplateBody") or ""
1✔
212
        if len(template_body) > 51200:
1✔
213
            raise ValidationError(
1✔
214
                f"1 validation error detected: Value '{request['TemplateBody']}' at 'templateBody' "
215
                "failed to satisfy constraint: Member must have length less than or equal to 51200"
216
            )
217
        api_utils.prepare_template_body(request)  # TODO: avoid mutating request directly
1✔
218

219
        template = template_preparer.parse_template(request["TemplateBody"])
1✔
220

221
        stack_name = template["StackName"] = request.get("StackName")
1✔
222
        if api_utils.validate_stack_name(stack_name) is False:
1✔
223
            raise ValidationError(
×
224
                f"1 validation error detected: Value '{stack_name}' at 'stackName' failed to satisfy constraint:\
225
                Member must satisfy regular expression pattern: [a-zA-Z][-a-zA-Z0-9]*|arn:[-a-zA-Z0-9:/._+]*"
226
            )
227

228
        if (
1✔
229
            "CAPABILITY_AUTO_EXPAND" not in request.get("Capabilities", [])
230
            and "Transform" in template.keys()
231
        ):
232
            raise InsufficientCapabilitiesException(
1✔
233
                "Requires capabilities : [CAPABILITY_AUTO_EXPAND]"
234
            )
235

236
        # resolve stack parameters
237
        new_parameters = param_resolver.convert_stack_parameters_to_dict(request.get("Parameters"))
1✔
238
        parameter_declarations = param_resolver.extract_stack_parameter_declarations(template)
1✔
239
        resolved_parameters = param_resolver.resolve_parameters(
1✔
240
            account_id=context.account_id,
241
            region_name=context.region,
242
            parameter_declarations=parameter_declarations,
243
            new_parameters=new_parameters,
244
            old_parameters={},
245
        )
246

247
        # handle conditions
248
        stack = Stack(context.account_id, context.region, request, template)
1✔
249

250
        try:
1✔
251
            template = template_preparer.transform_template(
1✔
252
                context.account_id,
253
                context.region,
254
                template,
255
                stack.stack_name,
256
                stack.resources,
257
                stack.mappings,
258
                {},  # TODO
259
                resolved_parameters,
260
            )
261
        except FailedTransformationException as e:
1✔
262
            stack.add_stack_event(
1✔
263
                stack.stack_name,
264
                stack.stack_id,
265
                status="ROLLBACK_IN_PROGRESS",
266
                status_reason=e.message,
267
            )
268
            stack.set_stack_status("ROLLBACK_COMPLETE")
1✔
269
            state.stacks[stack.stack_id] = stack
1✔
270
            return CreateStackOutput(StackId=stack.stack_id)
1✔
271

272
        # perform basic static analysis on the template
273
        for validation_fn in DEFAULT_TEMPLATE_VALIDATIONS:
1✔
274
            validation_fn(template)
×
275

276
        stack = Stack(context.account_id, context.region, request, template)
1✔
277

278
        # resolve conditions
279
        raw_conditions = template.get("Conditions", {})
1✔
280
        resolved_stack_conditions = resolve_stack_conditions(
1✔
281
            account_id=context.account_id,
282
            region_name=context.region,
283
            conditions=raw_conditions,
284
            parameters=resolved_parameters,
285
            mappings=stack.mappings,
286
            stack_name=stack_name,
287
        )
288
        stack.set_resolved_stack_conditions(resolved_stack_conditions)
1✔
289

290
        stack.set_resolved_parameters(resolved_parameters)
1✔
291
        stack.template_body = template_body
1✔
292
        state.stacks[stack.stack_id] = stack
1✔
293
        LOG.debug(
1✔
294
            'Creating stack "%s" with %s resources ...',
295
            stack.stack_name,
296
            len(stack.template_resources),
297
        )
298
        deployer = template_deployer.TemplateDeployer(context.account_id, context.region, stack)
1✔
299
        try:
1✔
300
            deployer.deploy_stack()
1✔
301
        except Exception as e:
×
302
            stack.set_stack_status("CREATE_FAILED")
×
303
            msg = 'Unable to create stack "%s": %s' % (stack.stack_name, e)
×
304
            LOG.exception("%s")
×
305
            raise ValidationError(msg) from e
×
306

307
        return CreateStackOutput(StackId=stack.stack_id)
1✔
308

309
    @handler("DeleteStack")
1✔
310
    def delete_stack(
1✔
311
        self,
312
        context: RequestContext,
313
        stack_name: StackName,
314
        retain_resources: RetainResources = None,
315
        role_arn: RoleARN = None,
316
        client_request_token: ClientRequestToken = None,
317
        deletion_mode: DeletionMode = None,
318
        **kwargs,
319
    ) -> None:
320
        stack = find_active_stack_by_name_or_id(context.account_id, context.region, stack_name)
1✔
321
        if not stack:
1✔
322
            # aws will silently ignore invalid stack names - we should do the same
323
            return
1✔
324
        deployer = template_deployer.TemplateDeployer(context.account_id, context.region, stack)
1✔
325
        deployer.delete_stack()
1✔
326

327
    @handler("UpdateStack", expand=False)
1✔
328
    def update_stack(
1✔
329
        self,
330
        context: RequestContext,
331
        request: UpdateStackInput,
332
    ) -> UpdateStackOutput:
333
        stack_name = request.get("StackName")
1✔
334
        stack = find_stack(context.account_id, context.region, stack_name)
1✔
335
        if not stack:
1✔
336
            return not_found_error(f'Unable to update non-existing stack "{stack_name}"')
×
337

338
        api_utils.prepare_template_body(request)
1✔
339
        template = template_preparer.parse_template(request["TemplateBody"])
1✔
340

341
        if (
1✔
342
            "CAPABILITY_AUTO_EXPAND" not in request.get("Capabilities", [])
343
            and "Transform" in template.keys()
344
        ):
345
            raise InsufficientCapabilitiesException(
×
346
                "Requires capabilities : [CAPABILITY_AUTO_EXPAND]"
347
            )
348

349
        new_parameters: dict[str, Parameter] = param_resolver.convert_stack_parameters_to_dict(
1✔
350
            request.get("Parameters")
351
        )
352
        parameter_declarations = param_resolver.extract_stack_parameter_declarations(template)
1✔
353
        resolved_parameters = param_resolver.resolve_parameters(
1✔
354
            account_id=context.account_id,
355
            region_name=context.region,
356
            parameter_declarations=parameter_declarations,
357
            new_parameters=new_parameters,
358
            old_parameters=stack.resolved_parameters,
359
        )
360

361
        resolved_stack_conditions = resolve_stack_conditions(
1✔
362
            account_id=context.account_id,
363
            region_name=context.region,
364
            conditions=template.get("Conditions", {}),
365
            parameters=resolved_parameters,
366
            mappings=template.get("Mappings", {}),
367
            stack_name=stack_name,
368
        )
369

370
        raw_new_template = copy.deepcopy(template)
1✔
371
        try:
1✔
372
            template = template_preparer.transform_template(
1✔
373
                context.account_id,
374
                context.region,
375
                template,
376
                stack.stack_name,
377
                stack.resources,
378
                stack.mappings,
379
                resolved_stack_conditions,
380
                resolved_parameters,
381
            )
382
            processed_template = copy.deepcopy(
1✔
383
                template
384
            )  # copying it here since it's being mutated somewhere downstream
385
        except FailedTransformationException as e:
×
386
            stack.add_stack_event(
×
387
                stack.stack_name,
388
                stack.stack_id,
389
                status="ROLLBACK_IN_PROGRESS",
390
                status_reason=e.message,
391
            )
392
            stack.set_stack_status("ROLLBACK_COMPLETE")
×
393
            return CreateStackOutput(StackId=stack.stack_id)
×
394

395
        # perform basic static analysis on the template
396
        for validation_fn in DEFAULT_TEMPLATE_VALIDATIONS:
1✔
397
            validation_fn(template)
×
398

399
        # update the template
400
        stack.template_original = template
1✔
401

402
        deployer = template_deployer.TemplateDeployer(context.account_id, context.region, stack)
1✔
403
        # TODO: there shouldn't be a "new" stack on update
404
        new_stack = Stack(
1✔
405
            context.account_id, context.region, request, template, request["TemplateBody"]
406
        )
407
        new_stack.set_resolved_parameters(resolved_parameters)
1✔
408
        stack.set_resolved_parameters(resolved_parameters)
1✔
409
        stack.set_resolved_stack_conditions(resolved_stack_conditions)
1✔
410
        try:
1✔
411
            deployer.update_stack(new_stack)
1✔
412
        except NoStackUpdates as e:
1✔
413
            stack.set_stack_status("UPDATE_COMPLETE")
1✔
414
            if raw_new_template != processed_template:
1✔
415
                # processed templates seem to never return an exception here
416
                return UpdateStackOutput(StackId=stack.stack_id)
1✔
417
            raise ValidationError(str(e))
1✔
418
        except Exception as e:
×
419
            stack.set_stack_status("UPDATE_FAILED")
×
420
            msg = f'Unable to update stack "{stack_name}": {e}'
×
421
            LOG.exception("%s", msg)
×
422
            raise ValidationError(msg) from e
×
423

424
        return UpdateStackOutput(StackId=stack.stack_id)
1✔
425

426
    @handler("DescribeStacks")
1✔
427
    def describe_stacks(
1✔
428
        self,
429
        context: RequestContext,
430
        stack_name: StackName = None,
431
        next_token: NextToken = None,
432
        **kwargs,
433
    ) -> DescribeStacksOutput:
434
        # TODO: test & implement pagination
435
        state = get_cloudformation_store(context.account_id, context.region)
1✔
436

437
        if stack_name:
1✔
438
            if ARN_STACK_REGEX.match(stack_name):
1✔
439
                # we can get the stack directly since we index the store by ARN/stackID
440
                stack = state.stacks.get(stack_name)
1✔
441
                stacks = [stack.describe_details()] if stack else []
1✔
442
            else:
443
                # otherwise we have to find the active stack with the given name
444
                stack_candidates: list[Stack] = [
1✔
445
                    s for stack_arn, s in state.stacks.items() if s.stack_name == stack_name
446
                ]
447
                active_stack_candidates = [
1✔
448
                    s for s in stack_candidates if self._stack_status_is_active(s.status)
449
                ]
450
                stacks = [s.describe_details() for s in active_stack_candidates]
1✔
451
        else:
452
            # return all active stacks
453
            stack_list = list(state.stacks.values())
1✔
454
            stacks = [
1✔
455
                s.describe_details() for s in stack_list if self._stack_status_is_active(s.status)
456
            ]
457

458
        if stack_name and not stacks:
1✔
459
            raise ValidationError(f"Stack with id {stack_name} does not exist")
1✔
460

461
        return DescribeStacksOutput(Stacks=stacks)
1✔
462

463
    @handler("ListStacks")
1✔
464
    def list_stacks(
1✔
465
        self,
466
        context: RequestContext,
467
        next_token: NextToken = None,
468
        stack_status_filter: StackStatusFilter = None,
469
        **kwargs,
470
    ) -> ListStacksOutput:
471
        state = get_cloudformation_store(context.account_id, context.region)
×
472

473
        stacks = [
×
474
            s.describe_details()
475
            for s in state.stacks.values()
476
            if not stack_status_filter or s.status in stack_status_filter
477
        ]
478

479
        attrs = [
×
480
            "StackId",
481
            "StackName",
482
            "TemplateDescription",
483
            "CreationTime",
484
            "LastUpdatedTime",
485
            "DeletionTime",
486
            "StackStatus",
487
            "StackStatusReason",
488
            "ParentId",
489
            "RootId",
490
            "DriftInformation",
491
        ]
492
        stacks = [select_attributes(stack, attrs) for stack in stacks]
×
493
        return ListStacksOutput(StackSummaries=stacks)
×
494

495
    @handler("GetTemplate")
1✔
496
    def get_template(
1✔
497
        self,
498
        context: RequestContext,
499
        stack_name: StackName = None,
500
        change_set_name: ChangeSetNameOrId = None,
501
        template_stage: TemplateStage = None,
502
        **kwargs,
503
    ) -> GetTemplateOutput:
504
        if change_set_name:
1✔
505
            stack = find_change_set(
×
506
                context.account_id, context.region, stack_name=stack_name, cs_name=change_set_name
507
            )
508
        else:
509
            stack = find_stack(context.account_id, context.region, stack_name)
1✔
510
        if not stack:
1✔
511
            return stack_not_found_error(stack_name)
×
512

513
        if template_stage == TemplateStage.Processed and "Transform" in stack.template_body:
1✔
514
            copy_template = clone(stack.template_original)
1✔
515
            copy_template.pop("ChangeSetName", None)
1✔
516
            copy_template.pop("StackName", None)
1✔
517
            for resource in copy_template.get("Resources", {}).values():
1✔
518
                resource.pop("LogicalResourceId", None)
1✔
519
            template_body = json.dumps(copy_template)
1✔
520
        else:
521
            template_body = stack.template_body
1✔
522

523
        return GetTemplateOutput(
1✔
524
            TemplateBody=template_body,
525
            StagesAvailable=[TemplateStage.Original, TemplateStage.Processed],
526
        )
527

528
    @handler("GetTemplateSummary", expand=False)
1✔
529
    def get_template_summary(
1✔
530
        self,
531
        context: RequestContext,
532
        request: GetTemplateSummaryInput,
533
    ) -> GetTemplateSummaryOutput:
534
        stack_name = request.get("StackName")
1✔
535

536
        if stack_name:
1✔
537
            stack = find_stack(context.account_id, context.region, stack_name)
1✔
538
            if not stack:
1✔
539
                return stack_not_found_error(stack_name)
×
540
            template = stack.template
1✔
541
        else:
542
            api_utils.prepare_template_body(request)
1✔
543
            template = template_preparer.parse_template(request["TemplateBody"])
1✔
544
            request["StackName"] = "tmp-stack"
1✔
545
            stack = Stack(context.account_id, context.region, request, template)
1✔
546

547
        result: GetTemplateSummaryOutput = stack.describe_details()
1✔
548

549
        # build parameter declarations
550
        result["Parameters"] = list(
1✔
551
            param_resolver.extract_stack_parameter_declarations(template).values()
552
        )
553

554
        id_summaries = defaultdict(list)
1✔
555
        for resource_id, resource in stack.template_resources.items():
1✔
556
            res_type = resource["Type"]
1✔
557
            id_summaries[res_type].append(resource_id)
1✔
558

559
        result["ResourceTypes"] = list(id_summaries.keys())
1✔
560
        result["ResourceIdentifierSummaries"] = [
1✔
561
            {"ResourceType": key, "LogicalResourceIds": values}
562
            for key, values in id_summaries.items()
563
        ]
564
        result["Metadata"] = stack.template.get("Metadata")
1✔
565
        result["Version"] = stack.template.get("AWSTemplateFormatVersion", "2010-09-09")
1✔
566
        # these do not appear in the output
567
        result.pop("Capabilities", None)
1✔
568

569
        return select_from_typed_dict(GetTemplateSummaryOutput, result)
1✔
570

571
    def update_termination_protection(
1✔
572
        self,
573
        context: RequestContext,
574
        enable_termination_protection: EnableTerminationProtection,
575
        stack_name: StackNameOrId,
576
        **kwargs,
577
    ) -> UpdateTerminationProtectionOutput:
578
        stack = find_stack(context.account_id, context.region, stack_name)
1✔
579
        if not stack:
1✔
580
            raise ValidationError(f"Stack '{stack_name}' does not exist.")
×
581
        stack.metadata["EnableTerminationProtection"] = enable_termination_protection
1✔
582
        return UpdateTerminationProtectionOutput(StackId=stack.stack_id)
1✔
583

584
    @handler("CreateChangeSet", expand=False)
1✔
585
    def create_change_set(
1✔
586
        self, context: RequestContext, request: CreateChangeSetInput
587
    ) -> CreateChangeSetOutput:
588
        state = get_cloudformation_store(context.account_id, context.region)
1✔
589

590
        req_params = request
1✔
591
        change_set_type = req_params.get("ChangeSetType", "UPDATE")
1✔
592
        stack_name = req_params.get("StackName")
1✔
593
        change_set_name = req_params.get("ChangeSetName")
1✔
594
        template_body = req_params.get("TemplateBody")
1✔
595
        # s3 or secretsmanager url
596
        template_url = req_params.get("TemplateURL")
1✔
597

598
        # validate and resolve template
599
        if template_body and template_url:
1✔
600
            raise ValidationError(
×
601
                "Specify exactly one of 'TemplateBody' or 'TemplateUrl'"
602
            )  # TODO: check proper message
603

604
        if not template_body and not template_url:
1✔
605
            raise ValidationError(
×
606
                "Specify exactly one of 'TemplateBody' or 'TemplateUrl'"
607
            )  # TODO: check proper message
608

609
        api_utils.prepare_template_body(
1✔
610
            req_params
611
        )  # TODO: function has too many unclear responsibilities
612
        if not template_body:
1✔
613
            template_body = req_params[
×
614
                "TemplateBody"
615
            ]  # should then have been set by prepare_template_body
616
        template = template_preparer.parse_template(req_params["TemplateBody"])
1✔
617

618
        del req_params["TemplateBody"]  # TODO: stop mutating req_params
1✔
619
        template["StackName"] = stack_name
1✔
620
        # TODO: validate with AWS what this is actually doing?
621
        template["ChangeSetName"] = change_set_name
1✔
622

623
        # this is intentionally not in a util yet. Let's first see how the different operations deal with these before generalizing
624
        # handle ARN stack_name here (not valid for initial CREATE, since stack doesn't exist yet)
625
        if ARN_STACK_REGEX.match(stack_name):
1✔
626
            if not (stack := state.stacks.get(stack_name)):
1✔
627
                raise ValidationError(f"Stack '{stack_name}' does not exist.")
×
628
        else:
629
            # stack name specified, so fetch the stack by name
630
            stack_candidates: list[Stack] = [
1✔
631
                s for stack_arn, s in state.stacks.items() if s.stack_name == stack_name
632
            ]
633
            active_stack_candidates = [
1✔
634
                s for s in stack_candidates if self._stack_status_is_active(s.status)
635
            ]
636

637
            # on a CREATE an empty Stack should be generated if we didn't find an active one
638
            if not active_stack_candidates and change_set_type == ChangeSetType.CREATE:
1✔
639
                empty_stack_template = dict(template)
1✔
640
                empty_stack_template["Resources"] = {}
1✔
641
                req_params_copy = clone_stack_params(req_params)
1✔
642
                stack = Stack(
1✔
643
                    context.account_id,
644
                    context.region,
645
                    req_params_copy,
646
                    empty_stack_template,
647
                    template_body=template_body,
648
                )
649
                state.stacks[stack.stack_id] = stack
1✔
650
                stack.set_stack_status("REVIEW_IN_PROGRESS")
1✔
651
            else:
652
                if not active_stack_candidates:
1✔
653
                    raise ValidationError(f"Stack '{stack_name}' does not exist.")
1✔
654
                stack = active_stack_candidates[0]
1✔
655

656
        # TODO: test if rollback status is allowed as well
657
        if (
1✔
658
            change_set_type == ChangeSetType.CREATE
659
            and stack.status != StackStatus.REVIEW_IN_PROGRESS
660
        ):
661
            raise ValidationError(
1✔
662
                f"Stack [{stack_name}] already exists and cannot be created again with the changeSet [{change_set_name}]."
663
            )
664

665
        old_parameters: dict[str, Parameter] = {}
1✔
666
        match change_set_type:
1✔
667
            case ChangeSetType.UPDATE:
1✔
668
                # add changeset to existing stack
669
                old_parameters = {
1✔
670
                    k: mask_no_echo(strip_parameter_type(v))
671
                    for k, v in stack.resolved_parameters.items()
672
                }
673
            case ChangeSetType.IMPORT:
1✔
674
                raise NotImplementedError()  # TODO: implement importing resources
675
            case ChangeSetType.CREATE:
1✔
676
                pass
1✔
677
            case _:
×
678
                msg = (
×
679
                    f"1 validation error detected: Value '{change_set_type}' at 'changeSetType' failed to satisfy "
680
                    f"constraint: Member must satisfy enum value set: [IMPORT, UPDATE, CREATE] "
681
                )
682
                raise ValidationError(msg)
×
683

684
        # resolve parameters
685
        new_parameters: dict[str, Parameter] = param_resolver.convert_stack_parameters_to_dict(
1✔
686
            request.get("Parameters")
687
        )
688
        parameter_declarations = param_resolver.extract_stack_parameter_declarations(template)
1✔
689
        resolved_parameters = param_resolver.resolve_parameters(
1✔
690
            account_id=context.account_id,
691
            region_name=context.region,
692
            parameter_declarations=parameter_declarations,
693
            new_parameters=new_parameters,
694
            old_parameters=old_parameters,
695
        )
696

697
        # TODO: remove this when fixing Stack.resources and transformation order
698
        #   currently we need to create a stack with existing resources + parameters so that resolve refs recursively in here will work.
699
        #   The correct way to do it would be at a later stage anyway just like a normal intrinsic function
700
        req_params_copy = clone_stack_params(req_params)
1✔
701
        temp_stack = Stack(context.account_id, context.region, req_params_copy, template)
1✔
702
        temp_stack.set_resolved_parameters(resolved_parameters)
1✔
703

704
        # TODO: everything below should be async
705
        # apply template transformations
706
        transformed_template = template_preparer.transform_template(
1✔
707
            context.account_id,
708
            context.region,
709
            template,
710
            stack_name=temp_stack.stack_name,
711
            resources=temp_stack.resources,
712
            mappings=temp_stack.mappings,
713
            conditions={},  # TODO: we don't have any resolved conditions yet at this point but we need the conditions because of the samtranslator...
714
            resolved_parameters=resolved_parameters,
715
        )
716

717
        # perform basic static analysis on the template
718
        for validation_fn in DEFAULT_TEMPLATE_VALIDATIONS:
1✔
719
            validation_fn(template)
×
720

721
        # create change set for the stack and apply changes
722
        change_set = StackChangeSet(
1✔
723
            context.account_id, context.region, stack, req_params, transformed_template
724
        )
725
        # only set parameters for the changeset, then switch to stack on execute_change_set
726
        change_set.set_resolved_parameters(resolved_parameters)
1✔
727
        change_set.template_body = template_body
1✔
728

729
        # TODO: evaluate conditions
730
        raw_conditions = transformed_template.get("Conditions", {})
1✔
731
        resolved_stack_conditions = resolve_stack_conditions(
1✔
732
            account_id=context.account_id,
733
            region_name=context.region,
734
            conditions=raw_conditions,
735
            parameters=resolved_parameters,
736
            mappings=temp_stack.mappings,
737
            stack_name=stack_name,
738
        )
739
        change_set.set_resolved_stack_conditions(resolved_stack_conditions)
1✔
740

741
        # a bit gross but use the template ordering to validate missing resources
742
        try:
1✔
743
            order_resources(
1✔
744
                transformed_template["Resources"],
745
                resolved_parameters=resolved_parameters,
746
                resolved_conditions=resolved_stack_conditions,
747
            )
748
        except NoResourceInStack as e:
1✔
749
            raise ValidationError(str(e)) from e
1✔
750

751
        deployer = template_deployer.TemplateDeployer(
1✔
752
            context.account_id, context.region, change_set
753
        )
754
        changes = deployer.construct_changes(
1✔
755
            stack,
756
            change_set,
757
            change_set_id=change_set.change_set_id,
758
            append_to_changeset=True,
759
            filter_unchanged_resources=True,
760
        )
761
        stack.change_sets.append(change_set)
1✔
762
        if not changes:
1✔
763
            change_set.metadata["Status"] = "FAILED"
1✔
764
            change_set.metadata["ExecutionStatus"] = "UNAVAILABLE"
1✔
765
            change_set.metadata["StatusReason"] = (
1✔
766
                "The submitted information didn't contain changes. Submit different information to create a change set."
767
            )
768
        else:
769
            change_set.metadata["Status"] = (
1✔
770
                "CREATE_COMPLETE"  # technically for some time this should first be CREATE_PENDING
771
            )
772
            change_set.metadata["ExecutionStatus"] = (
1✔
773
                "AVAILABLE"  # technically for some time this should first be UNAVAILABLE
774
            )
775

776
        return CreateChangeSetOutput(StackId=change_set.stack_id, Id=change_set.change_set_id)
1✔
777

778
    @handler("DescribeChangeSet")
1✔
779
    def describe_change_set(
1✔
780
        self,
781
        context: RequestContext,
782
        change_set_name: ChangeSetNameOrId,
783
        stack_name: StackNameOrId = None,
784
        next_token: NextToken = None,
785
        include_property_values: IncludePropertyValues = None,
786
        **kwargs,
787
    ) -> DescribeChangeSetOutput:
788
        # TODO add support for include_property_values
789
        # only relevant if change_set_name isn't an ARN
790
        if not ARN_CHANGESET_REGEX.match(change_set_name):
1✔
791
            if not stack_name:
1✔
792
                raise ValidationError(
×
793
                    "StackName must be specified if ChangeSetName is not specified as an ARN."
794
                )
795

796
            stack = find_stack(context.account_id, context.region, stack_name)
1✔
797
            if not stack:
1✔
798
                raise ValidationError(f"Stack [{stack_name}] does not exist")
1✔
799

800
        change_set = find_change_set(
1✔
801
            context.account_id, context.region, change_set_name, stack_name=stack_name
802
        )
803
        if not change_set:
1✔
804
            raise ChangeSetNotFoundException(f"ChangeSet [{change_set_name}] does not exist")
1✔
805

806
        attrs = [
1✔
807
            "ChangeSetType",
808
            "StackStatus",
809
            "LastUpdatedTime",
810
            "DisableRollback",
811
            "EnableTerminationProtection",
812
            "Transform",
813
        ]
814
        result = remove_attributes(deepcopy(change_set.metadata), attrs)
1✔
815
        # TODO: replace this patch with a better solution
816
        result["Parameters"] = [
1✔
817
            mask_no_echo(strip_parameter_type(p)) for p in result.get("Parameters", [])
818
        ]
819
        return result
1✔
820

821
    @handler("DeleteChangeSet")
1✔
822
    def delete_change_set(
1✔
823
        self,
824
        context: RequestContext,
825
        change_set_name: ChangeSetNameOrId,
826
        stack_name: StackNameOrId = None,
827
        **kwargs,
828
    ) -> DeleteChangeSetOutput:
829
        # only relevant if change_set_name isn't an ARN
830
        if not ARN_CHANGESET_REGEX.match(change_set_name):
1✔
831
            if not stack_name:
1✔
832
                raise ValidationError(
1✔
833
                    "StackName must be specified if ChangeSetName is not specified as an ARN."
834
                )
835

836
            stack = find_stack(context.account_id, context.region, stack_name)
1✔
837
            if not stack:
1✔
838
                raise ValidationError(f"Stack [{stack_name}] does not exist")
1✔
839

840
        change_set = find_change_set(
1✔
841
            context.account_id, context.region, change_set_name, stack_name=stack_name
842
        )
843
        if not change_set:
1✔
844
            raise ChangeSetNotFoundException(f"ChangeSet [{change_set_name}] does not exist")
1✔
845
        change_set.stack.change_sets = [
1✔
846
            cs
847
            for cs in change_set.stack.change_sets
848
            if change_set_name not in (cs.change_set_name, cs.change_set_id)
849
        ]
850
        return DeleteChangeSetOutput()
1✔
851

852
    @handler("ExecuteChangeSet")
1✔
853
    def execute_change_set(
1✔
854
        self,
855
        context: RequestContext,
856
        change_set_name: ChangeSetNameOrId,
857
        stack_name: StackNameOrId = None,
858
        client_request_token: ClientRequestToken = None,
859
        disable_rollback: DisableRollback = None,
860
        retain_except_on_create: RetainExceptOnCreate = None,
861
        **kwargs,
862
    ) -> ExecuteChangeSetOutput:
863
        change_set = find_change_set(
1✔
864
            context.account_id,
865
            context.region,
866
            change_set_name,
867
            stack_name=stack_name,
868
            active_only=True,
869
        )
870
        if not change_set:
1✔
871
            raise ChangeSetNotFoundException(f"ChangeSet [{change_set_name}] does not exist")
×
872
        if change_set.metadata.get("ExecutionStatus") != ExecutionStatus.AVAILABLE:
1✔
873
            LOG.debug("Change set %s not in execution status 'AVAILABLE'", change_set_name)
1✔
874
            raise InvalidChangeSetStatusException(
1✔
875
                f"ChangeSet [{change_set.metadata['ChangeSetId']}] cannot be executed in its current status of [{change_set.metadata.get('Status')}]"
876
            )
877
        stack_name = change_set.stack.stack_name
1✔
878
        LOG.debug(
1✔
879
            'Executing change set "%s" for stack "%s" with %s resources ...',
880
            change_set_name,
881
            stack_name,
882
            len(change_set.template_resources),
883
        )
884
        deployer = template_deployer.TemplateDeployer(
1✔
885
            context.account_id, context.region, change_set.stack
886
        )
887
        try:
1✔
888
            deployer.apply_change_set(change_set)
1✔
889
            change_set.stack.metadata["ChangeSetId"] = change_set.change_set_id
1✔
890
        except NoStackUpdates:
×
891
            # TODO: parity-check if this exception should be re-raised or swallowed
892
            raise ValidationError("No updates to be performed for stack change set")
×
893

894
        return ExecuteChangeSetOutput()
1✔
895

896
    @handler("ListChangeSets")
1✔
897
    def list_change_sets(
1✔
898
        self,
899
        context: RequestContext,
900
        stack_name: StackNameOrId,
901
        next_token: NextToken = None,
902
        **kwargs,
903
    ) -> ListChangeSetsOutput:
904
        stack = find_stack(context.account_id, context.region, stack_name)
×
905
        if not stack:
×
906
            return not_found_error(f'Unable to find stack "{stack_name}"')
×
907
        result = [cs.metadata for cs in stack.change_sets]
×
908
        return ListChangeSetsOutput(Summaries=result)
×
909

910
    @handler("ListExports")
1✔
911
    def list_exports(
1✔
912
        self, context: RequestContext, next_token: NextToken = None, **kwargs
913
    ) -> ListExportsOutput:
914
        state = get_cloudformation_store(context.account_id, context.region)
1✔
915
        return ListExportsOutput(Exports=state.exports)
1✔
916

917
    @handler("ListImports")
1✔
918
    def list_imports(
1✔
919
        self,
920
        context: RequestContext,
921
        export_name: ExportName,
922
        next_token: NextToken = None,
923
        **kwargs,
924
    ) -> ListImportsOutput:
925
        state = get_cloudformation_store(context.account_id, context.region)
×
926

927
        importing_stack_names = []
×
928
        for stack in state.stacks.values():
×
929
            if export_name in stack.imports:
×
930
                importing_stack_names.append(stack.stack_name)
×
931

932
        return ListImportsOutput(Imports=importing_stack_names)
×
933

934
    @handler("DescribeStackEvents")
1✔
935
    def describe_stack_events(
1✔
936
        self,
937
        context: RequestContext,
938
        stack_name: StackName = None,
939
        next_token: NextToken = None,
940
        **kwargs,
941
    ) -> DescribeStackEventsOutput:
942
        if stack_name is None:
1✔
943
            raise ValidationError(
1✔
944
                "1 validation error detected: Value null at 'stackName' failed to satisfy constraint: Member must not be null"
945
            )
946

947
        stack = find_active_stack_by_name_or_id(context.account_id, context.region, stack_name)
1✔
948
        if not stack:
1✔
949
            stack = find_stack_by_id(
1✔
950
                account_id=context.account_id, region_name=context.region, stack_id=stack_name
951
            )
952
        if not stack:
1✔
953
            raise ValidationError(f"Stack [{stack_name}] does not exist")
1✔
954
        return DescribeStackEventsOutput(StackEvents=stack.events)
1✔
955

956
    @handler("DescribeStackResource")
1✔
957
    def describe_stack_resource(
1✔
958
        self,
959
        context: RequestContext,
960
        stack_name: StackName,
961
        logical_resource_id: LogicalResourceId,
962
        **kwargs,
963
    ) -> DescribeStackResourceOutput:
964
        stack = find_stack(context.account_id, context.region, stack_name)
1✔
965

966
        if not stack:
1✔
967
            return stack_not_found_error(stack_name)
×
968

969
        try:
1✔
970
            details = stack.resource_status(logical_resource_id)
1✔
971
        except Exception as e:
1✔
972
            if "Unable to find details" in str(e):
1✔
973
                raise ValidationError(
1✔
974
                    f"Resource {logical_resource_id} does not exist for stack {stack_name}"
975
                )
976
            raise
×
977

978
        return DescribeStackResourceOutput(StackResourceDetail=details)
1✔
979

980
    @handler("DescribeStackResources")
1✔
981
    def describe_stack_resources(
1✔
982
        self,
983
        context: RequestContext,
984
        stack_name: StackName = None,
985
        logical_resource_id: LogicalResourceId = None,
986
        physical_resource_id: PhysicalResourceId = None,
987
        **kwargs,
988
    ) -> DescribeStackResourcesOutput:
989
        if physical_resource_id and stack_name:
1✔
990
            raise ValidationError("Cannot specify both StackName and PhysicalResourceId")
×
991
        # TODO: filter stack by PhysicalResourceId!
992
        stack = find_stack(context.account_id, context.region, stack_name)
1✔
993
        if not stack:
1✔
994
            return stack_not_found_error(stack_name)
×
995
        statuses = [
1✔
996
            res_status
997
            for res_id, res_status in stack.resource_states.items()
998
            if logical_resource_id in [res_id, None]
999
        ]
1000
        for status in statuses:
1✔
1001
            status.setdefault("DriftInformation", {"StackResourceDriftStatus": "NOT_CHECKED"})
1✔
1002
        return DescribeStackResourcesOutput(StackResources=statuses)
1✔
1003

1004
    @handler("ListStackResources")
1✔
1005
    def list_stack_resources(
1✔
1006
        self, context: RequestContext, stack_name: StackName, next_token: NextToken = None, **kwargs
1007
    ) -> ListStackResourcesOutput:
1008
        result = self.describe_stack_resources(context, stack_name)
1✔
1009

1010
        resources = deepcopy(result.get("StackResources", []))
1✔
1011
        for resource in resources:
1✔
1012
            attrs = ["StackName", "StackId", "Timestamp", "PreviousResourceStatus"]
1✔
1013
            remove_attributes(resource, attrs)
1✔
1014

1015
        return ListStackResourcesOutput(StackResourceSummaries=resources)
1✔
1016

1017
    @handler("ValidateTemplate", expand=False)
1✔
1018
    def validate_template(
1✔
1019
        self, context: RequestContext, request: ValidateTemplateInput
1020
    ) -> ValidateTemplateOutput:
1021
        try:
1✔
1022
            # TODO implement actual validation logic
1023
            template_body = api_utils.get_template_body(request)
1✔
1024
            valid_template = json.loads(template_preparer.template_to_json(template_body))
1✔
1025

1026
            parameters = [
1✔
1027
                TemplateParameter(
1028
                    ParameterKey=k,
1029
                    DefaultValue=v.get("Default", ""),
1030
                    NoEcho=v.get("NoEcho", False),
1031
                    Description=v.get("Description", ""),
1032
                )
1033
                for k, v in valid_template.get("Parameters", {}).items()
1034
            ]
1035

1036
            return ValidateTemplateOutput(
1✔
1037
                Description=valid_template.get("Description"), Parameters=parameters
1038
            )
1039
        except Exception as e:
1✔
1040
            LOG.exception("Error validating template")
1✔
1041
            raise ValidationError("Template Validation Error") from e
1✔
1042

1043
    # =======================================
1044
    # =============  Stack Set  =============
1045
    # =======================================
1046

1047
    @handler("CreateStackSet", expand=False)
1✔
1048
    def create_stack_set(
1✔
1049
        self, context: RequestContext, request: CreateStackSetInput
1050
    ) -> CreateStackSetOutput:
1051
        state = get_cloudformation_store(context.account_id, context.region)
1✔
1052
        stack_set = StackSet(request)
1✔
1053
        stack_set_id = f"{stack_set.stack_set_name}:{long_uid()}"
1✔
1054
        stack_set.metadata["StackSetId"] = stack_set_id
1✔
1055
        state.stack_sets[stack_set_id] = stack_set
1✔
1056

1057
        return CreateStackSetOutput(StackSetId=stack_set_id)
1✔
1058

1059
    @handler("DescribeStackSetOperation")
1✔
1060
    def describe_stack_set_operation(
1✔
1061
        self,
1062
        context: RequestContext,
1063
        stack_set_name: StackSetName,
1064
        operation_id: ClientRequestToken,
1065
        call_as: CallAs = None,
1066
        **kwargs,
1067
    ) -> DescribeStackSetOperationOutput:
1068
        state = get_cloudformation_store(context.account_id, context.region)
1✔
1069

1070
        set_name = stack_set_name
1✔
1071

1072
        stack_set = [sset for sset in state.stack_sets.values() if sset.stack_set_name == set_name]
1✔
1073
        if not stack_set:
1✔
1074
            return not_found_error(f'Unable to find stack set "{set_name}"')
×
1075
        stack_set = stack_set[0]
1✔
1076
        result = stack_set.operations.get(operation_id)
1✔
1077
        if not result:
1✔
1078
            LOG.debug(
×
1079
                'Unable to find operation ID "%s" for stack set "%s" in list: %s',
1080
                operation_id,
1081
                set_name,
1082
                list(stack_set.operations.keys()),
1083
            )
1084
            return not_found_error(
×
1085
                f'Unable to find operation ID "{operation_id}" for stack set "{set_name}"'
1086
            )
1087

1088
        return DescribeStackSetOperationOutput(StackSetOperation=result)
1✔
1089

1090
    @handler("DescribeStackSet")
1✔
1091
    def describe_stack_set(
1✔
1092
        self,
1093
        context: RequestContext,
1094
        stack_set_name: StackSetName,
1095
        call_as: CallAs = None,
1096
        **kwargs,
1097
    ) -> DescribeStackSetOutput:
1098
        state = get_cloudformation_store(context.account_id, context.region)
×
1099
        result = [
×
1100
            sset.metadata
1101
            for sset in state.stack_sets.values()
1102
            if sset.stack_set_name == stack_set_name
1103
        ]
1104
        if not result:
×
1105
            return not_found_error(f'Unable to find stack set "{stack_set_name}"')
×
1106

1107
        return DescribeStackSetOutput(StackSet=result[0])
×
1108

1109
    @handler("ListStackSets", expand=False)
1✔
1110
    def list_stack_sets(
1✔
1111
        self, context: RequestContext, request: ListStackSetsInput
1112
    ) -> ListStackSetsOutput:
1113
        state = get_cloudformation_store(context.account_id, context.region)
×
1114
        result = [sset.metadata for sset in state.stack_sets.values()]
×
1115
        return ListStackSetsOutput(Summaries=result)
×
1116

1117
    @handler("UpdateStackSet", expand=False)
1✔
1118
    def update_stack_set(
1✔
1119
        self, context: RequestContext, request: UpdateStackSetInput
1120
    ) -> UpdateStackSetOutput:
1121
        state = get_cloudformation_store(context.account_id, context.region)
×
1122
        set_name = request.get("StackSetName")
×
1123
        stack_set = [sset for sset in state.stack_sets.values() if sset.stack_set_name == set_name]
×
1124
        if not stack_set:
×
1125
            return not_found_error(f'Stack set named "{set_name}" does not exist')
×
1126
        stack_set = stack_set[0]
×
1127
        stack_set.metadata.update(request)
×
1128
        op_id = request.get("OperationId") or short_uid()
×
1129
        operation = {
×
1130
            "OperationId": op_id,
1131
            "StackSetId": stack_set.metadata["StackSetId"],
1132
            "Action": "UPDATE",
1133
            "Status": "SUCCEEDED",
1134
        }
1135
        stack_set.operations[op_id] = operation
×
1136
        return UpdateStackSetOutput(OperationId=op_id)
×
1137

1138
    @handler("DeleteStackSet")
1✔
1139
    def delete_stack_set(
1✔
1140
        self,
1141
        context: RequestContext,
1142
        stack_set_name: StackSetName,
1143
        call_as: CallAs = None,
1144
        **kwargs,
1145
    ) -> DeleteStackSetOutput:
1146
        state = get_cloudformation_store(context.account_id, context.region)
1✔
1147
        stack_set = [
1✔
1148
            sset for sset in state.stack_sets.values() if sset.stack_set_name == stack_set_name
1149
        ]
1150

1151
        if not stack_set:
1✔
1152
            return not_found_error(f'Stack set named "{stack_set_name}" does not exist')
×
1153

1154
        # TODO: add a check for remaining stack instances
1155

1156
        for instance in stack_set[0].stack_instances:
1✔
1157
            deployer = template_deployer.TemplateDeployer(
×
1158
                context.account_id, context.region, instance.stack
1159
            )
1160
            deployer.delete_stack()
×
1161
        return DeleteStackSetOutput()
1✔
1162

1163
    @handler("CreateStackInstances", expand=False)
1✔
1164
    def create_stack_instances(
1✔
1165
        self,
1166
        context: RequestContext,
1167
        request: CreateStackInstancesInput,
1168
    ) -> CreateStackInstancesOutput:
1169
        state = get_cloudformation_store(context.account_id, context.region)
1✔
1170

1171
        set_name = request.get("StackSetName")
1✔
1172
        stack_set = [sset for sset in state.stack_sets.values() if sset.stack_set_name == set_name]
1✔
1173

1174
        if not stack_set:
1✔
1175
            return not_found_error(f'Stack set named "{set_name}" does not exist')
×
1176

1177
        stack_set = stack_set[0]
1✔
1178
        op_id = request.get("OperationId") or short_uid()
1✔
1179
        sset_meta = stack_set.metadata
1✔
1180
        accounts = request["Accounts"]
1✔
1181
        regions = request["Regions"]
1✔
1182

1183
        stacks_to_await = []
1✔
1184
        for account in accounts:
1✔
1185
            for region in regions:
1✔
1186
                # deploy new stack
1187
                LOG.debug(
1✔
1188
                    'Deploying instance for stack set "%s" in account: %s region %s',
1189
                    set_name,
1190
                    account,
1191
                    region,
1192
                )
1193
                cf_client = connect_to(aws_access_key_id=account, region_name=region).cloudformation
1✔
1194
                kwargs = select_attributes(sset_meta, ["TemplateBody"]) or select_attributes(
1✔
1195
                    sset_meta, ["TemplateURL"]
1196
                )
1197
                stack_name = f"sset-{set_name}-{account}"
1✔
1198

1199
                # skip creation of existing stacks
1200
                if find_stack(context.account_id, context.region, stack_name):
1✔
1201
                    continue
1✔
1202

1203
                result = cf_client.create_stack(StackName=stack_name, **kwargs)
1✔
1204
                stacks_to_await.append((stack_name, account, region))
1✔
1205
                # store stack instance
1206
                instance = {
1✔
1207
                    "StackSetId": sset_meta["StackSetId"],
1208
                    "OperationId": op_id,
1209
                    "Account": account,
1210
                    "Region": region,
1211
                    "StackId": result["StackId"],
1212
                    "Status": "CURRENT",
1213
                    "StackInstanceStatus": {"DetailedStatus": "SUCCEEDED"},
1214
                }
1215
                instance = StackInstance(instance)
1✔
1216
                stack_set.stack_instances.append(instance)
1✔
1217

1218
        # wait for completion of stack
1219
        for stack_name, account_id, region_name in stacks_to_await:
1✔
1220
            client = connect_to(
1✔
1221
                aws_access_key_id=account_id, region_name=region_name
1222
            ).cloudformation
1223
            client.get_waiter("stack_create_complete").wait(StackName=stack_name)
1✔
1224

1225
        # record operation
1226
        operation = {
1✔
1227
            "OperationId": op_id,
1228
            "StackSetId": stack_set.metadata["StackSetId"],
1229
            "Action": "CREATE",
1230
            "Status": "SUCCEEDED",
1231
        }
1232
        stack_set.operations[op_id] = operation
1✔
1233

1234
        return CreateStackInstancesOutput(OperationId=op_id)
1✔
1235

1236
    @handler("ListStackInstances", expand=False)
1✔
1237
    def list_stack_instances(
1✔
1238
        self,
1239
        context: RequestContext,
1240
        request: ListStackInstancesInput,
1241
    ) -> ListStackInstancesOutput:
1242
        set_name = request.get("StackSetName")
×
1243
        state = get_cloudformation_store(context.account_id, context.region)
×
1244
        stack_set = [sset for sset in state.stack_sets.values() if sset.stack_set_name == set_name]
×
1245
        if not stack_set:
×
1246
            return not_found_error(f'Stack set named "{set_name}" does not exist')
×
1247

1248
        stack_set = stack_set[0]
×
1249
        result = [inst.metadata for inst in stack_set.stack_instances]
×
1250
        return ListStackInstancesOutput(Summaries=result)
×
1251

1252
    @handler("DeleteStackInstances", expand=False)
1✔
1253
    def delete_stack_instances(
1✔
1254
        self,
1255
        context: RequestContext,
1256
        request: DeleteStackInstancesInput,
1257
    ) -> DeleteStackInstancesOutput:
1258
        op_id = request.get("OperationId") or short_uid()
1✔
1259

1260
        accounts = request["Accounts"]
1✔
1261
        regions = request["Regions"]
1✔
1262

1263
        state = get_cloudformation_store(context.account_id, context.region)
1✔
1264
        stack_sets = state.stack_sets.values()
1✔
1265

1266
        set_name = request.get("StackSetName")
1✔
1267
        stack_set = next((sset for sset in stack_sets if sset.stack_set_name == set_name), None)
1✔
1268

1269
        if not stack_set:
1✔
1270
            return not_found_error(f'Stack set named "{set_name}" does not exist')
×
1271

1272
        for account in accounts:
1✔
1273
            for region in regions:
1✔
1274
                instance = find_stack_instance(stack_set, account, region)
1✔
1275
                if instance:
1✔
1276
                    stack_set.stack_instances.remove(instance)
1✔
1277

1278
        # record operation
1279
        operation = {
1✔
1280
            "OperationId": op_id,
1281
            "StackSetId": stack_set.metadata["StackSetId"],
1282
            "Action": "DELETE",
1283
            "Status": "SUCCEEDED",
1284
        }
1285
        stack_set.operations[op_id] = operation
1✔
1286

1287
        return DeleteStackInstancesOutput(OperationId=op_id)
1✔
1288

1289
    @handler("RegisterType", expand=False)
1✔
1290
    def register_type(
1✔
1291
        self,
1292
        context: RequestContext,
1293
        request: RegisterTypeInput,
1294
    ) -> RegisterTypeOutput:
1295
        return RegisterTypeOutput()
×
1296

1297
    def list_types(
1✔
1298
        self, context: RequestContext, request: ListTypesInput, **kwargs
1299
    ) -> ListTypesOutput:
1300
        def is_list_overridden(child_class, parent_class):
×
1301
            if hasattr(child_class, "list"):
×
1302
                import inspect
×
1303

1304
                child_method = child_class.list
×
1305
                parent_method = parent_class.list
×
1306
                return inspect.unwrap(child_method) is not inspect.unwrap(parent_method)
×
1307
            return False
×
1308

1309
        def get_listable_types_summaries(plugin_manager):
×
1310
            plugins = plugin_manager.list_names()
×
1311
            type_summaries = []
×
1312
            for plugin in plugins:
×
1313
                type_summary = TypeSummary(
×
1314
                    Type=RegistryType.RESOURCE,
1315
                    TypeName=plugin,
1316
                )
1317
                provider = plugin_manager.load(plugin)
×
1318
                if is_list_overridden(provider.factory, ResourceProvider):
×
1319
                    type_summaries.append(type_summary)
×
1320
            return type_summaries
×
1321

1322
        from localstack.services.cloudformation.resource_provider import (
×
1323
            plugin_manager,
1324
        )
1325

1326
        type_summaries = get_listable_types_summaries(plugin_manager)
×
1327
        if PRO_RESOURCE_PROVIDERS:
×
1328
            from localstack.services.cloudformation.resource_provider import (
×
1329
                pro_plugin_manager,
1330
            )
1331

1332
            type_summaries.extend(get_listable_types_summaries(pro_plugin_manager))
×
1333

1334
        return ListTypesOutput(TypeSummaries=type_summaries)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc