• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 0e75e353-4494-4b65-a715-a78cf84b0b94

02 Apr 2025 01:40PM UTC coverage: 86.857% (+0.05%) from 86.807%
0e75e353-4494-4b65-a715-a78cf84b0b94

push

circleci

web-flow
CFn: WIP POC v2 executor (#12396)

Co-authored-by: Marco Edoardo Palma <64580864+MEPalma@users.noreply.github.com>

36 of 112 new or added lines in 6 files covered. (32.14%)

144 existing lines in 9 files now uncovered.

63556 of 73173 relevant lines covered (86.86%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.25
/localstack-core/localstack/services/cloudformation/engine/template_deployer.py
1
import base64
1✔
2
import json
1✔
3
import logging
1✔
4
import re
1✔
5
import traceback
1✔
6
import uuid
1✔
7
from typing import Optional
1✔
8

9
from botocore.exceptions import ClientError
1✔
10

11
from localstack import config
1✔
12
from localstack.aws.connect import connect_to
1✔
13
from localstack.constants import INTERNAL_AWS_SECRET_ACCESS_KEY
1✔
14
from localstack.services.cloudformation.deployment_utils import (
1✔
15
    PLACEHOLDER_AWS_NO_VALUE,
16
    get_action_name_for_resource_change,
17
    remove_none_values,
18
)
19
from localstack.services.cloudformation.engine.changes import ChangeConfig, ResourceChange
1✔
20
from localstack.services.cloudformation.engine.entities import Stack, StackChangeSet
1✔
21
from localstack.services.cloudformation.engine.parameters import StackParameter
1✔
22
from localstack.services.cloudformation.engine.quirks import VALID_GETATT_PROPERTIES
1✔
23
from localstack.services.cloudformation.engine.resource_ordering import (
1✔
24
    order_changes,
25
    order_resources,
26
)
27
from localstack.services.cloudformation.engine.template_utils import (
1✔
28
    AWS_URL_SUFFIX,
29
    fn_equals_type_conversion,
30
    get_deps_for_resource,
31
)
32
from localstack.services.cloudformation.resource_provider import (
1✔
33
    Credentials,
34
    OperationStatus,
35
    ProgressEvent,
36
    ResourceProviderExecutor,
37
    ResourceProviderPayload,
38
    get_resource_type,
39
)
40
from localstack.services.cloudformation.service_models import (
1✔
41
    DependencyNotYetSatisfied,
42
)
43
from localstack.services.cloudformation.stores import exports_map, find_stack
1✔
44
from localstack.utils.aws.arns import get_partition
1✔
45
from localstack.utils.functions import prevent_stack_overflow
1✔
46
from localstack.utils.json import clone_safe
1✔
47
from localstack.utils.strings import to_bytes, to_str
1✔
48
from localstack.utils.threads import start_worker_thread
1✔
49

50
from localstack.services.cloudformation.models import *  # noqa: F401, F403, isort:skip
1✔
51
from localstack.utils.urls import localstack_host
1✔
52

53
ACTION_CREATE = "create"
1✔
54
ACTION_DELETE = "delete"
1✔
55

56
REGEX_OUTPUT_APIGATEWAY = re.compile(
1✔
57
    rf"^(https?://.+\.execute-api\.)(?:[^-]+-){{2,3}}\d\.(amazonaws\.com|{AWS_URL_SUFFIX})/?(.*)$"
58
)
59
REGEX_DYNAMIC_REF = re.compile("{{resolve:([^:]+):(.+)}}")
1✔
60

61
LOG = logging.getLogger(__name__)
1✔
62

63
# list of static attribute references to be replaced in {'Fn::Sub': '...'} strings
64
STATIC_REFS = ["AWS::Region", "AWS::Partition", "AWS::StackName", "AWS::AccountId"]
1✔
65

66
# Mock value for unsupported type references
67
MOCK_REFERENCE = "unknown"
1✔
68

69

70
class NoStackUpdates(Exception):
1✔
71
    """Exception indicating that no actions are to be performed in a stack update (which is not allowed)"""
72

73
    pass
1✔
74

75

76
# ---------------------
77
# CF TEMPLATE HANDLING
78
# ---------------------
79

80

81
def get_attr_from_model_instance(
1✔
82
    resource: dict,
83
    attribute_name: str,
84
    resource_type: str,
85
    resource_id: str,
86
    attribute_sub_name: Optional[str] = None,
87
) -> str:
88
    if resource["PhysicalResourceId"] == MOCK_REFERENCE:
1✔
89
        LOG.warning(
1✔
90
            "Attribute '%s' requested from unsupported resource with id %s",
91
            attribute_name,
92
            resource_id,
93
        )
94
        return MOCK_REFERENCE
1✔
95

96
    properties = resource.get("Properties", {})
1✔
97
    # if there's no entry in VALID_GETATT_PROPERTIES for the resource type we still default to "open" and accept anything
98
    valid_atts = VALID_GETATT_PROPERTIES.get(resource_type)
1✔
99
    if valid_atts is not None and attribute_name not in valid_atts:
1✔
100
        LOG.warning(
×
101
            "Invalid attribute in Fn::GetAtt for %s:  | %s.%s",
102
            resource_type,
103
            resource_id,
104
            attribute_name,
105
        )
106
        raise Exception(
×
107
            f"Resource type {resource_type} does not support attribute {{{attribute_name}}}"
108
        )  # TODO: check CFn behavior via snapshot
109

110
    attribute_candidate = properties.get(attribute_name)
1✔
111
    if attribute_sub_name:
1✔
112
        return attribute_candidate.get(attribute_sub_name)
×
113
    if "." in attribute_name:
1✔
114
        # was used for legacy, but keeping it since it might have to work for a custom resource as well
115
        if attribute_candidate:
1✔
116
            return attribute_candidate
×
117

118
        # some resources (e.g. ElastiCache) have their readOnly attributes defined as Aa.Bb but the property is named AaBb
119
        if attribute_candidate := properties.get(attribute_name.replace(".", "")):
1✔
120
            return attribute_candidate
×
121

122
        # accessing nested properties
123
        parts = attribute_name.split(".")
1✔
124
        attribute = properties
1✔
125
        # TODO: the attribute fetching below is a temporary workaround for the dependency resolution.
126
        #  It is caused by trying to access the resource attribute that has not been deployed yet.
127
        #  This should be a hard error.“
128
        for part in parts:
1✔
129
            if attribute is None:
1✔
130
                return None
×
131
            attribute = attribute.get(part)
1✔
132
        return attribute
1✔
133

134
    # If we couldn't find the attribute, this is actually an irrecoverable error.
135
    # After the resource has a state of CREATE_COMPLETE, all attributes should already be set.
136
    # TODO: raise here instead
137
    # if attribute_candidate is None:
138
    # raise Exception(
139
    #     f"Failed to resolve attribute for Fn::GetAtt in {resource_type}: {resource_id}.{attribute_name}"
140
    # )  # TODO: check CFn behavior via snapshot
141
    return attribute_candidate
1✔
142

143

144
def resolve_ref(
1✔
145
    account_id: str,
146
    region_name: str,
147
    stack_name: str,
148
    resources: dict,
149
    parameters: dict[str, StackParameter],
150
    ref: str,
151
):
152
    """
153
    ref always needs to be a static string
154
    ref can be one of these:
155
    1. a pseudo-parameter (e.g. AWS::Region)
156
    2. a parameter
157
    3. the id of a resource (PhysicalResourceId
158
    """
159
    # pseudo parameter
160
    if ref == "AWS::Region":
1✔
161
        return region_name
1✔
162
    if ref == "AWS::Partition":
1✔
163
        return get_partition(region_name)
1✔
164
    if ref == "AWS::StackName":
1✔
165
        return stack_name
1✔
166
    if ref == "AWS::StackId":
1✔
167
        stack = find_stack(account_id, region_name, stack_name)
1✔
168
        if not stack:
1✔
169
            raise ValueError(f"No stack {stack_name} found")
×
170
        return stack.stack_id
1✔
171
    if ref == "AWS::AccountId":
1✔
172
        return account_id
1✔
173
    if ref == "AWS::NoValue":
1✔
174
        return PLACEHOLDER_AWS_NO_VALUE
1✔
175
    if ref == "AWS::NotificationARNs":
1✔
176
        # TODO!
177
        return {}
×
178
    if ref == "AWS::URLSuffix":
1✔
179
        return AWS_URL_SUFFIX
1✔
180

181
    # parameter
182
    if parameter := parameters.get(ref):
1✔
183
        parameter_type: str = parameter["ParameterType"]
1✔
184
        parameter_value = parameter.get("ResolvedValue") or parameter.get("ParameterValue")
1✔
185

186
        if "CommaDelimitedList" in parameter_type or parameter_type.startswith("List<"):
1✔
187
            return [p.strip() for p in parameter_value.split(",")]
1✔
188
        else:
189
            return parameter_value
1✔
190

191
    # resource
192
    resource = resources.get(ref)
1✔
193
    if not resource:
1✔
194
        raise Exception(
×
195
            f"Resource target for `Ref {ref}` could not be found. Is there a resource with name {ref} in your stack?"
196
        )
197

198
    return resources[ref].get("PhysicalResourceId")
1✔
199

200

201
# Using a @prevent_stack_overflow decorator here to avoid infinite recursion
202
# in case we load stack exports that have circular dependencies (see issue 3438)
203
# TODO: Potentially think about a better approach in the future
204
@prevent_stack_overflow(match_parameters=True)
1✔
205
def resolve_refs_recursively(
1✔
206
    account_id: str,
207
    region_name: str,
208
    stack_name: str,
209
    resources: dict,
210
    mappings: dict,
211
    conditions: dict[str, bool],
212
    parameters: dict,
213
    value,
214
):
215
    result = _resolve_refs_recursively(
1✔
216
        account_id, region_name, stack_name, resources, mappings, conditions, parameters, value
217
    )
218

219
    # localstack specific patches
220
    if isinstance(result, str):
1✔
221
        # we're trying to filter constructed API urls here (e.g. via Join in the template)
222
        api_match = REGEX_OUTPUT_APIGATEWAY.match(result)
1✔
223
        if api_match and result in config.CFN_STRING_REPLACEMENT_DENY_LIST:
1✔
224
            return result
1✔
225
        elif api_match:
1✔
226
            prefix = api_match[1]
1✔
227
            host = api_match[2]
1✔
228
            path = api_match[3]
1✔
229
            port = localstack_host().port
1✔
230
            return f"{prefix}{host}:{port}/{path}"
1✔
231

232
        # basic dynamic reference support
233
        # see: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/dynamic-references.html
234
        # technically there are more restrictions for each of these services but checking each of these
235
        # isn't really necessary for the current level of emulation
236
        dynamic_ref_match = REGEX_DYNAMIC_REF.match(result)
1✔
237
        if dynamic_ref_match:
1✔
238
            service_name = dynamic_ref_match[1]
1✔
239
            reference_key = dynamic_ref_match[2]
1✔
240

241
            # only these 3 services are supported for dynamic references right now
242
            if service_name == "ssm":
1✔
243
                ssm_client = connect_to(aws_access_key_id=account_id, region_name=region_name).ssm
1✔
244
                try:
1✔
245
                    return ssm_client.get_parameter(Name=reference_key)["Parameter"]["Value"]
1✔
246
                except ClientError as e:
×
247
                    LOG.error("client error accessing SSM parameter '%s': %s", reference_key, e)
×
248
                    raise
×
249
            elif service_name == "ssm-secure":
1✔
250
                ssm_client = connect_to(aws_access_key_id=account_id, region_name=region_name).ssm
1✔
251
                try:
1✔
252
                    return ssm_client.get_parameter(Name=reference_key, WithDecryption=True)[
1✔
253
                        "Parameter"
254
                    ]["Value"]
255
                except ClientError as e:
×
256
                    LOG.error("client error accessing SSM parameter '%s': %s", reference_key, e)
×
257
                    raise
×
258
            elif service_name == "secretsmanager":
1✔
259
                # reference key needs to be parsed further
260
                # because {{resolve:secretsmanager:secret-id:secret-string:json-key:version-stage:version-id}}
261
                # we match for "secret-id:secret-string:json-key:version-stage:version-id"
262
                # where
263
                #   secret-id can either be the secret name or the full ARN of the secret
264
                #   secret-string *must* be SecretString
265
                #   all other values are optional
266
                secret_id = reference_key
1✔
267
                [json_key, version_stage, version_id] = [None, None, None]
1✔
268
                if "SecretString" in reference_key:
1✔
269
                    parts = reference_key.split(":SecretString:")
1✔
270
                    secret_id = parts[0]
1✔
271
                    # json-key, version-stage and version-id are optional.
272
                    [json_key, version_stage, version_id] = f"{parts[1]}::".split(":")[:3]
1✔
273

274
                kwargs = {}  # optional args for get_secret_value
1✔
275
                if version_id:
1✔
276
                    kwargs["VersionId"] = version_id
×
277
                if version_stage:
1✔
278
                    kwargs["VersionStage"] = version_stage
×
279

280
                secretsmanager_client = connect_to(
1✔
281
                    aws_access_key_id=account_id, region_name=region_name
282
                ).secretsmanager
283
                try:
1✔
284
                    secret_value = secretsmanager_client.get_secret_value(
1✔
285
                        SecretId=secret_id, **kwargs
286
                    )["SecretString"]
287
                except ClientError:
×
288
                    LOG.error("client error while trying to access key '%s': %s", secret_id)
×
289
                    raise
×
290

291
                if json_key:
1✔
292
                    json_secret = json.loads(secret_value)
×
293
                    if json_key not in json_secret:
×
294
                        raise DependencyNotYetSatisfied(
×
295
                            resource_ids=secret_id,
296
                            message=f"Key {json_key} is not yet available in secret {secret_id}.",
297
                        )
298
                    return json_secret[json_key]
×
299
                else:
300
                    return secret_value
1✔
301
            else:
302
                LOG.warning(
×
303
                    "Unsupported service for dynamic parameter: service_name=%s", service_name
304
                )
305

306
    return result
1✔
307

308

309
@prevent_stack_overflow(match_parameters=True)
1✔
310
def _resolve_refs_recursively(
1✔
311
    account_id: str,
312
    region_name: str,
313
    stack_name: str,
314
    resources: dict,
315
    mappings: dict,
316
    conditions: dict,
317
    parameters: dict,
318
    value: dict | list | str | bytes | None,
319
):
320
    if isinstance(value, dict):
1✔
321
        keys_list = list(value.keys())
1✔
322
        stripped_fn_lower = keys_list[0].lower().split("::")[-1] if len(keys_list) == 1 else None
1✔
323

324
        # process special operators
325
        if keys_list == ["Ref"]:
1✔
326
            ref = resolve_ref(
1✔
327
                account_id, region_name, stack_name, resources, parameters, value["Ref"]
328
            )
329
            if ref is None:
1✔
330
                msg = 'Unable to resolve Ref for resource "%s" (yet)' % value["Ref"]
×
331
                LOG.debug("%s - %s", msg, resources.get(value["Ref"]) or set(resources.keys()))
×
332

333
                raise DependencyNotYetSatisfied(resource_ids=value["Ref"], message=msg)
×
334

335
            ref = resolve_refs_recursively(
1✔
336
                account_id,
337
                region_name,
338
                stack_name,
339
                resources,
340
                mappings,
341
                conditions,
342
                parameters,
343
                ref,
344
            )
345
            return ref
1✔
346

347
        if stripped_fn_lower == "getatt":
1✔
348
            attr_ref = value[keys_list[0]]
1✔
349
            attr_ref = attr_ref.split(".") if isinstance(attr_ref, str) else attr_ref
1✔
350
            resource_logical_id = attr_ref[0]
1✔
351
            attribute_name = attr_ref[1]
1✔
352
            attribute_sub_name = attr_ref[2] if len(attr_ref) > 2 else None
1✔
353

354
            # the attribute name can be a Ref
355
            attribute_name = resolve_refs_recursively(
1✔
356
                account_id,
357
                region_name,
358
                stack_name,
359
                resources,
360
                mappings,
361
                conditions,
362
                parameters,
363
                attribute_name,
364
            )
365
            resource = resources.get(resource_logical_id)
1✔
366

367
            resource_type = get_resource_type(resource)
1✔
368
            resolved_getatt = get_attr_from_model_instance(
1✔
369
                resource,
370
                attribute_name,
371
                resource_type,
372
                resource_logical_id,
373
                attribute_sub_name,
374
            )
375

376
            # TODO: we should check the deployment state and not try to GetAtt from a resource that is still IN_PROGRESS or hasn't started yet.
377
            if resolved_getatt is None:
1✔
378
                raise DependencyNotYetSatisfied(
×
379
                    resource_ids=resource_logical_id,
380
                    message=f"Could not resolve attribute '{attribute_name}' on resource '{resource_logical_id}'",
381
                )
382

383
            return resolved_getatt
1✔
384

385
        if stripped_fn_lower == "join":
1✔
386
            join_values = value[keys_list[0]][1]
1✔
387

388
            # this can actually be another ref that produces a list as output
389
            if isinstance(join_values, dict):
1✔
390
                join_values = resolve_refs_recursively(
1✔
391
                    account_id,
392
                    region_name,
393
                    stack_name,
394
                    resources,
395
                    mappings,
396
                    conditions,
397
                    parameters,
398
                    join_values,
399
                )
400

401
            # resolve reference in the items list
402
            assert isinstance(join_values, list)
1✔
403
            join_values = resolve_refs_recursively(
1✔
404
                account_id,
405
                region_name,
406
                stack_name,
407
                resources,
408
                mappings,
409
                conditions,
410
                parameters,
411
                join_values,
412
            )
413

414
            none_values = [v for v in join_values if v is None]
1✔
415
            if none_values:
1✔
416
                LOG.warning(
×
417
                    "Cannot resolve Fn::Join '%s' due to null values: '%s'", value, join_values
418
                )
419
                raise Exception(
×
420
                    f"Cannot resolve CF Fn::Join {value} due to null values: {join_values}"
421
                )
422
            return value[keys_list[0]][0].join([str(v) for v in join_values])
1✔
423

424
        if stripped_fn_lower == "sub":
1✔
425
            item_to_sub = value[keys_list[0]]
1✔
426

427
            attr_refs = {r: {"Ref": r} for r in STATIC_REFS}
1✔
428
            if not isinstance(item_to_sub, list):
1✔
429
                item_to_sub = [item_to_sub, {}]
1✔
430
            result = item_to_sub[0]
1✔
431
            item_to_sub[1].update(attr_refs)
1✔
432

433
            for key, val in item_to_sub[1].items():
1✔
434
                resolved_val = resolve_refs_recursively(
1✔
435
                    account_id,
436
                    region_name,
437
                    stack_name,
438
                    resources,
439
                    mappings,
440
                    conditions,
441
                    parameters,
442
                    val,
443
                )
444

445
                if isinstance(resolved_val, (list, dict, tuple)):
1✔
446
                    # We don't have access to the resource that's a dependency in this case,
447
                    # so do the best we can with the resource ids
448
                    raise DependencyNotYetSatisfied(
×
449
                        resource_ids=key, message=f"Could not resolve {val} to terminal value type"
450
                    )
451
                result = result.replace("${%s}" % key, str(resolved_val))
1✔
452

453
            # resolve placeholders
454
            result = resolve_placeholders_in_string(
1✔
455
                account_id,
456
                region_name,
457
                result,
458
                stack_name,
459
                resources,
460
                mappings,
461
                conditions,
462
                parameters,
463
            )
464
            return result
1✔
465

466
        if stripped_fn_lower == "findinmap":
1✔
467
            # "Fn::FindInMap"
468
            mapping_id = value[keys_list[0]][0]
1✔
469

470
            if isinstance(mapping_id, dict) and "Ref" in mapping_id:
1✔
471
                # TODO: ??
472
                mapping_id = resolve_ref(
1✔
473
                    account_id, region_name, stack_name, resources, parameters, mapping_id["Ref"]
474
                )
475

476
            selected_map = mappings.get(mapping_id)
1✔
477
            if not selected_map:
1✔
478
                raise Exception(
×
479
                    f"Cannot find Mapping with ID {mapping_id} for Fn::FindInMap: {value[keys_list[0]]} {list(resources.keys())}"
480
                    # TODO: verify
481
                )
482

483
            first_level_attribute = value[keys_list[0]][1]
1✔
484
            first_level_attribute = resolve_refs_recursively(
1✔
485
                account_id,
486
                region_name,
487
                stack_name,
488
                resources,
489
                mappings,
490
                conditions,
491
                parameters,
492
                first_level_attribute,
493
            )
494

495
            if first_level_attribute not in selected_map:
1✔
496
                raise Exception(
1✔
497
                    f"Cannot find map key '{first_level_attribute}' in mapping '{mapping_id}'"
498
                )
499
            first_level_mapping = selected_map[first_level_attribute]
1✔
500

501
            second_level_attribute = value[keys_list[0]][2]
1✔
502
            if not isinstance(second_level_attribute, str):
1✔
503
                second_level_attribute = resolve_refs_recursively(
1✔
504
                    account_id,
505
                    region_name,
506
                    stack_name,
507
                    resources,
508
                    mappings,
509
                    conditions,
510
                    parameters,
511
                    second_level_attribute,
512
                )
513
            if second_level_attribute not in first_level_mapping:
1✔
514
                raise Exception(
1✔
515
                    f"Cannot find map key '{second_level_attribute}' in mapping '{mapping_id}' under key '{first_level_attribute}'"
516
                )
517

518
            return first_level_mapping[second_level_attribute]
1✔
519

520
        if stripped_fn_lower == "importvalue":
1✔
521
            import_value_key = resolve_refs_recursively(
1✔
522
                account_id,
523
                region_name,
524
                stack_name,
525
                resources,
526
                mappings,
527
                conditions,
528
                parameters,
529
                value[keys_list[0]],
530
            )
531
            exports = exports_map(account_id, region_name)
1✔
532
            stack_export = exports.get(import_value_key) or {}
1✔
533
            if not stack_export.get("Value"):
1✔
534
                LOG.info(
×
535
                    'Unable to find export "%s" in stack "%s", existing export names: %s',
536
                    import_value_key,
537
                    stack_name,
538
                    list(exports.keys()),
539
                )
540
                return None
×
541
            return stack_export["Value"]
1✔
542

543
        if stripped_fn_lower == "if":
1✔
544
            condition, option1, option2 = value[keys_list[0]]
1✔
545
            condition = conditions.get(condition)
1✔
546
            if condition is None:
1✔
547
                LOG.warning(
×
548
                    "Cannot find condition '%s' in conditions mapping: '%s'",
549
                    condition,
550
                    conditions.keys(),
551
                )
552
                raise KeyError(
×
553
                    f"Cannot find condition '{condition}' in conditions mapping: '{conditions.keys()}'"
554
                )
555

556
            result = resolve_refs_recursively(
1✔
557
                account_id,
558
                region_name,
559
                stack_name,
560
                resources,
561
                mappings,
562
                conditions,
563
                parameters,
564
                option1 if condition else option2,
565
            )
566
            return result
1✔
567

568
        if stripped_fn_lower == "condition":
1✔
569
            # FIXME: this should only allow strings, no evaluation should be performed here
570
            #   see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/intrinsic-function-reference-condition.html
571
            key = value[keys_list[0]]
×
572
            result = conditions.get(key)
×
573
            if result is None:
×
574
                LOG.warning("Cannot find key '%s' in conditions: '%s'", key, conditions.keys())
×
575
                raise KeyError(f"Cannot find key '{key}' in conditions: '{conditions.keys()}'")
×
576
            return result
×
577

578
        if stripped_fn_lower == "not":
1✔
579
            condition = value[keys_list[0]][0]
×
580
            condition = resolve_refs_recursively(
×
581
                account_id,
582
                region_name,
583
                stack_name,
584
                resources,
585
                mappings,
586
                conditions,
587
                parameters,
588
                condition,
589
            )
590
            return not condition
×
591

592
        if stripped_fn_lower in ["and", "or"]:
1✔
593
            conditions = value[keys_list[0]]
×
594
            results = [
×
595
                resolve_refs_recursively(
596
                    account_id,
597
                    region_name,
598
                    stack_name,
599
                    resources,
600
                    mappings,
601
                    conditions,
602
                    parameters,
603
                    cond,
604
                )
605
                for cond in conditions
606
            ]
607
            result = all(results) if stripped_fn_lower == "and" else any(results)
×
608
            return result
×
609

610
        if stripped_fn_lower == "equals":
1✔
611
            operand1, operand2 = value[keys_list[0]]
×
612
            operand1 = resolve_refs_recursively(
×
613
                account_id,
614
                region_name,
615
                stack_name,
616
                resources,
617
                mappings,
618
                conditions,
619
                parameters,
620
                operand1,
621
            )
622
            operand2 = resolve_refs_recursively(
×
623
                account_id,
624
                region_name,
625
                stack_name,
626
                resources,
627
                mappings,
628
                conditions,
629
                parameters,
630
                operand2,
631
            )
632
            # TODO: investigate type coercion here
633
            return fn_equals_type_conversion(operand1) == fn_equals_type_conversion(operand2)
×
634

635
        if stripped_fn_lower == "select":
1✔
636
            index, values = value[keys_list[0]]
1✔
637
            index = resolve_refs_recursively(
1✔
638
                account_id,
639
                region_name,
640
                stack_name,
641
                resources,
642
                mappings,
643
                conditions,
644
                parameters,
645
                index,
646
            )
647
            values = resolve_refs_recursively(
1✔
648
                account_id,
649
                region_name,
650
                stack_name,
651
                resources,
652
                mappings,
653
                conditions,
654
                parameters,
655
                values,
656
            )
657
            try:
1✔
658
                return values[index]
1✔
659
            except TypeError:
1✔
660
                return values[int(index)]
1✔
661

662
        if stripped_fn_lower == "split":
1✔
663
            delimiter, string = value[keys_list[0]]
1✔
664
            delimiter = resolve_refs_recursively(
1✔
665
                account_id,
666
                region_name,
667
                stack_name,
668
                resources,
669
                mappings,
670
                conditions,
671
                parameters,
672
                delimiter,
673
            )
674
            string = resolve_refs_recursively(
1✔
675
                account_id,
676
                region_name,
677
                stack_name,
678
                resources,
679
                mappings,
680
                conditions,
681
                parameters,
682
                string,
683
            )
684
            return string.split(delimiter)
1✔
685

686
        if stripped_fn_lower == "getazs":
1✔
687
            region = (
1✔
688
                resolve_refs_recursively(
689
                    account_id,
690
                    region_name,
691
                    stack_name,
692
                    resources,
693
                    mappings,
694
                    conditions,
695
                    parameters,
696
                    value["Fn::GetAZs"],
697
                )
698
                or region_name
699
            )
700

701
            ec2_client = connect_to(aws_access_key_id=account_id, region_name=region).ec2
1✔
702
            try:
1✔
703
                get_availability_zones = ec2_client.describe_availability_zones()[
1✔
704
                    "AvailabilityZones"
705
                ]
706
            except ClientError:
×
707
                LOG.error("client error describing availability zones")
×
708
                raise
×
709

710
            azs = [az["ZoneName"] for az in get_availability_zones]
1✔
711

712
            return azs
1✔
713

714
        if stripped_fn_lower == "base64":
1✔
715
            value_to_encode = value[keys_list[0]]
1✔
716
            value_to_encode = resolve_refs_recursively(
1✔
717
                account_id,
718
                region_name,
719
                stack_name,
720
                resources,
721
                mappings,
722
                conditions,
723
                parameters,
724
                value_to_encode,
725
            )
726
            return to_str(base64.b64encode(to_bytes(value_to_encode)))
1✔
727

728
        for key, val in dict(value).items():
1✔
729
            value[key] = resolve_refs_recursively(
1✔
730
                account_id,
731
                region_name,
732
                stack_name,
733
                resources,
734
                mappings,
735
                conditions,
736
                parameters,
737
                val,
738
            )
739

740
    if isinstance(value, list):
1✔
741
        # in some cases, intrinsic functions are passed in as, e.g., `[['Fn::Sub', '${MyRef}']]`
742
        if len(value) == 1 and isinstance(value[0], list) and len(value[0]) == 2:
1✔
743
            inner_list = value[0]
×
744
            if str(inner_list[0]).lower().startswith("fn::"):
×
745
                return resolve_refs_recursively(
×
746
                    account_id,
747
                    region_name,
748
                    stack_name,
749
                    resources,
750
                    mappings,
751
                    conditions,
752
                    parameters,
753
                    {inner_list[0]: inner_list[1]},
754
                )
755

756
        # remove _aws_no_value_ from resulting references
757
        clean_list = []
1✔
758
        for item in value:
1✔
759
            temp_value = resolve_refs_recursively(
1✔
760
                account_id,
761
                region_name,
762
                stack_name,
763
                resources,
764
                mappings,
765
                conditions,
766
                parameters,
767
                item,
768
            )
769
            if not (isinstance(temp_value, str) and temp_value == PLACEHOLDER_AWS_NO_VALUE):
1✔
770
                clean_list.append(temp_value)
1✔
771
        value = clean_list
1✔
772

773
    return value
1✔
774

775

776
def resolve_placeholders_in_string(
1✔
777
    account_id: str,
778
    region_name: str,
779
    result,
780
    stack_name: str,
781
    resources: dict,
782
    mappings: dict,
783
    conditions: dict[str, bool],
784
    parameters: dict,
785
):
786
    """
787
    Resolve individual Fn::Sub variable replacements
788

789
    Variables can be template parameter names, resource logical IDs, resource attributes, or a variable in a key-value map
790
    """
791

792
    def _validate_result_type(value: str):
1✔
793
        is_another_account_id = value.isdigit() and len(value) == len(account_id)
1✔
794
        if value == account_id or is_another_account_id:
1✔
795
            return value
×
796

797
        if value.isdigit():
1✔
798
            return int(value)
1✔
799
        else:
800
            try:
1✔
801
                res = float(value)
1✔
802
                return res
1✔
803
            except ValueError:
1✔
804
                return value
1✔
805

806
    def _replace(match):
1✔
807
        ref_expression = match.group(1)
1✔
808
        parts = ref_expression.split(".")
1✔
809
        if len(parts) >= 2:
1✔
810
            # Resource attributes specified => Use GetAtt to resolve
811
            logical_resource_id, _, attr_name = ref_expression.partition(".")
1✔
812
            resolved = get_attr_from_model_instance(
1✔
813
                resources[logical_resource_id],
814
                attr_name,
815
                get_resource_type(resources[logical_resource_id]),
816
                logical_resource_id,
817
            )
818
            if resolved is None:
1✔
819
                raise DependencyNotYetSatisfied(
×
820
                    resource_ids=logical_resource_id,
821
                    message=f"Unable to resolve attribute ref {ref_expression}",
822
                )
823
            if not isinstance(resolved, str):
1✔
824
                resolved = str(resolved)
×
825
            return resolved
1✔
826
        if len(parts) == 1:
1✔
827
            if parts[0] in resources or parts[0].startswith("AWS::"):
1✔
828
                # Logical resource ID or parameter name specified => Use Ref for lookup
829
                result = resolve_ref(
1✔
830
                    account_id, region_name, stack_name, resources, parameters, parts[0]
831
                )
832

833
                if result is None:
1✔
834
                    raise DependencyNotYetSatisfied(
×
835
                        resource_ids=parts[0],
836
                        message=f"Unable to resolve attribute ref {ref_expression}",
837
                    )
838
                # TODO: is this valid?
839
                # make sure we resolve any functions/placeholders in the extracted string
840
                result = resolve_refs_recursively(
1✔
841
                    account_id,
842
                    region_name,
843
                    stack_name,
844
                    resources,
845
                    mappings,
846
                    conditions,
847
                    parameters,
848
                    result,
849
                )
850
                # make sure we convert the result to string
851
                # TODO: do this more systematically
852
                result = "" if result is None else str(result)
1✔
853
                return result
1✔
854
            elif parts[0] in parameters:
1✔
855
                parameter = parameters[parts[0]]
1✔
856
                parameter_type: str = parameter["ParameterType"]
1✔
857
                parameter_value = parameter.get("ResolvedValue") or parameter.get("ParameterValue")
1✔
858

859
                if parameter_type in ["CommaDelimitedList"] or parameter_type.startswith("List<"):
1✔
860
                    return [p.strip() for p in parameter_value.split(",")]
×
861
                elif parameter_type == "Number":
1✔
862
                    return str(parameter_value)
1✔
863
                else:
864
                    return parameter_value
1✔
865
            else:
866
                raise DependencyNotYetSatisfied(
×
867
                    resource_ids=parts[0],
868
                    message=f"Unable to resolve attribute ref {ref_expression}",
869
                )
870
        # TODO raise exception here?
871
        return match.group(0)
×
872

873
    regex = r"\$\{([^\}]+)\}"
1✔
874
    result = re.sub(regex, _replace, result)
1✔
875
    return _validate_result_type(result)
1✔
876

877

878
def evaluate_resource_condition(conditions: dict[str, bool], resource: dict) -> bool:
1✔
879
    if condition := resource.get("Condition"):
1✔
880
        return conditions.get(condition, True)
1✔
881
    return True
1✔
882

883

884
# -----------------------
885
# MAIN TEMPLATE DEPLOYER
886
# -----------------------
887

888

889
class TemplateDeployer:
1✔
890
    def __init__(self, account_id: str, region_name: str, stack):
1✔
891
        self.stack = stack
1✔
892
        self.account_id = account_id
1✔
893
        self.region_name = region_name
1✔
894

895
    @property
1✔
896
    def resources(self):
1✔
897
        return self.stack.resources
1✔
898

899
    @property
1✔
900
    def mappings(self):
1✔
901
        return self.stack.mappings
×
902

903
    @property
1✔
904
    def stack_name(self):
1✔
905
        return self.stack.stack_name
×
906

907
    # ------------------
908
    # MAIN ENTRY POINTS
909
    # ------------------
910

911
    def deploy_stack(self):
1✔
912
        self.stack.set_stack_status("CREATE_IN_PROGRESS")
1✔
913
        try:
1✔
914
            self.apply_changes(
1✔
915
                self.stack,
916
                self.stack,
917
                initialize=True,
918
                action="CREATE",
919
            )
920
        except Exception as e:
×
921
            log_method = LOG.info
×
922
            if config.CFN_VERBOSE_ERRORS:
×
923
                log_method = LOG.exception
×
924
            log_method("Unable to create stack %s: %s", self.stack.stack_name, e)
×
925
            self.stack.set_stack_status("CREATE_FAILED")
×
926
            raise
×
927

928
    def apply_change_set(self, change_set: StackChangeSet):
1✔
929
        action = (
1✔
930
            "UPDATE"
931
            if change_set.stack.status in {"CREATE_COMPLETE", "UPDATE_COMPLETE"}
932
            else "CREATE"
933
        )
934
        change_set.stack.set_stack_status(f"{action}_IN_PROGRESS")
1✔
935
        # update parameters on parent stack
936
        change_set.stack.set_resolved_parameters(change_set.resolved_parameters)
1✔
937
        # update conditions on parent stack
938
        change_set.stack.set_resolved_stack_conditions(change_set.resolved_conditions)
1✔
939

940
        # update attributes that the stack inherits from the changeset
941
        change_set.stack.metadata["Capabilities"] = change_set.metadata.get("Capabilities")
1✔
942

943
        try:
1✔
944
            self.apply_changes(
1✔
945
                change_set.stack,
946
                change_set,
947
                action=action,
948
            )
949
        except Exception as e:
×
950
            LOG.info(
×
951
                "Unable to apply change set %s: %s", change_set.metadata.get("ChangeSetName"), e
952
            )
953
            change_set.metadata["Status"] = f"{action}_FAILED"
×
954
            self.stack.set_stack_status(f"{action}_FAILED")
×
955
            raise
×
956

957
    def update_stack(self, new_stack):
1✔
958
        self.stack.set_stack_status("UPDATE_IN_PROGRESS")
1✔
959
        # apply changes
960
        self.apply_changes(self.stack, new_stack, action="UPDATE")
1✔
961
        self.stack.set_time_attribute("LastUpdatedTime")
1✔
962

963
    # ----------------------------
964
    # DEPENDENCY RESOLUTION UTILS
965
    # ----------------------------
966

967
    def is_deployed(self, resource):
1✔
968
        return self.stack.resource_states.get(resource["LogicalResourceId"], {}).get(
1✔
969
            "ResourceStatus"
970
        ) in [
971
            "CREATE_COMPLETE",
972
            "UPDATE_COMPLETE",
973
        ]
974

975
    def all_resource_dependencies_satisfied(self, resource) -> bool:
1✔
976
        unsatisfied = self.get_unsatisfied_dependencies(resource)
×
977
        return not unsatisfied
×
978

979
    def get_unsatisfied_dependencies(self, resource):
1✔
980
        res_deps = self.get_resource_dependencies(
×
981
            resource
982
        )  # the output here is currently a set of merged IDs from both resources and parameters
983
        parameter_deps = {d for d in res_deps if d in self.stack.resolved_parameters}
×
984
        resource_deps = res_deps.difference(parameter_deps)
×
985
        res_deps_mapped = {v: self.stack.resources.get(v) for v in resource_deps}
×
986
        return self.get_unsatisfied_dependencies_for_resources(res_deps_mapped, resource)
×
987

988
    def get_unsatisfied_dependencies_for_resources(
1✔
989
        self, resources, depending_resource=None, return_first=True
990
    ):
991
        result = {}
×
992
        for resource_id, resource in resources.items():
×
993
            if not resource:
×
994
                raise Exception(
×
995
                    f"Resource '{resource_id}' not found in stack {self.stack.stack_name}"
996
                )
997
            if not self.is_deployed(resource):
×
998
                LOG.debug(
×
999
                    "Dependency for resource %s not yet deployed: %s %s",
1000
                    depending_resource,
1001
                    resource_id,
1002
                    resource,
1003
                )
1004
                result[resource_id] = resource
×
1005
                if return_first:
×
1006
                    break
×
1007
        return result
×
1008

1009
    def get_resource_dependencies(self, resource: dict) -> set[str]:
1✔
1010
        """
1011
        Takes a resource and returns its dependencies on other resources via a str -> str mapping
1012
        """
1013
        # Note: using the original, unmodified template here to preserve Ref's ...
1014
        raw_resources = self.stack.template_original["Resources"]
×
1015
        raw_resource = raw_resources[resource["LogicalResourceId"]]
×
1016
        return get_deps_for_resource(raw_resource, self.stack.resolved_conditions)
×
1017

1018
    # -----------------
1019
    # DEPLOYMENT UTILS
1020
    # -----------------
1021

1022
    def init_resource_status(self, resources=None, stack=None, action="CREATE"):
1✔
1023
        resources = resources or self.resources
1✔
1024
        stack = stack or self.stack
1✔
1025
        for resource_id, resource in resources.items():
1✔
1026
            stack.set_resource_status(resource_id, f"{action}_IN_PROGRESS")
1✔
1027

1028
    def get_change_config(
1✔
1029
        self, action: str, resource: dict, change_set_id: Optional[str] = None
1030
    ) -> ChangeConfig:
1031
        result = ChangeConfig(
1✔
1032
            **{
1033
                "Type": "Resource",
1034
                "ResourceChange": ResourceChange(
1035
                    **{
1036
                        "Action": action,
1037
                        # TODO(srw): how can the resource not contain a logical resource id?
1038
                        "LogicalResourceId": resource.get("LogicalResourceId"),
1039
                        "PhysicalResourceId": resource.get("PhysicalResourceId"),
1040
                        "ResourceType": resource["Type"],
1041
                        # TODO ChangeSetId is only set for *nested* change sets
1042
                        # "ChangeSetId": change_set_id,
1043
                        "Scope": [],  # TODO
1044
                        "Details": [],  # TODO
1045
                    }
1046
                ),
1047
            }
1048
        )
1049
        if action == "Modify":
1✔
1050
            result["ResourceChange"]["Replacement"] = "False"
1✔
1051
        return result
1✔
1052

1053
    def resource_config_differs(self, resource_new):
1✔
1054
        """Return whether the given resource properties differ from the existing config (for stack updates)."""
1055
        # TODO: this is broken for default fields and result_handler property modifications when they're added to the properties in the model
1056
        resource_id = resource_new["LogicalResourceId"]
1✔
1057
        resource_old = self.resources[resource_id]
1✔
1058
        props_old = resource_old.get("SpecifiedProperties", {})
1✔
1059
        props_new = resource_new["Properties"]
1✔
1060
        ignored_keys = ["LogicalResourceId", "PhysicalResourceId"]
1✔
1061
        old_keys = set(props_old.keys()) - set(ignored_keys)
1✔
1062
        new_keys = set(props_new.keys()) - set(ignored_keys)
1✔
1063
        if old_keys != new_keys:
1✔
1064
            return True
1✔
1065
        for key in old_keys:
1✔
1066
            if props_old[key] != props_new[key]:
1✔
1067
                return True
1✔
1068
        old_status = self.stack.resource_states.get(resource_id) or {}
1✔
1069
        previous_state = (
1✔
1070
            old_status.get("PreviousResourceStatus") or old_status.get("ResourceStatus") or ""
1071
        )
1072
        if old_status and "DELETE" in previous_state:
1✔
1073
            return True
×
1074

1075
    # TODO: ?
1076
    def merge_properties(self, resource_id: str, old_stack, new_stack) -> None:
1✔
1077
        old_resources = old_stack.template["Resources"]
1✔
1078
        new_resources = new_stack.template["Resources"]
1✔
1079
        new_resource = new_resources[resource_id]
1✔
1080

1081
        old_resource = old_resources[resource_id] = old_resources.get(resource_id) or {}
1✔
1082
        for key, value in new_resource.items():
1✔
1083
            if key == "Properties":
1✔
1084
                continue
1✔
1085
            old_resource[key] = old_resource.get(key, value)
1✔
1086
        old_res_props = old_resource["Properties"] = old_resource.get("Properties", {})
1✔
1087
        for key, value in new_resource["Properties"].items():
1✔
1088
            old_res_props[key] = value
1✔
1089

1090
        old_res_props = {
1✔
1091
            k: v for k, v in old_res_props.items() if k in new_resource["Properties"].keys()
1092
        }
1093
        old_resource["Properties"] = old_res_props
1✔
1094

1095
        # overwrite original template entirely
1096
        old_stack.template_original["Resources"][resource_id] = new_stack.template_original[
1✔
1097
            "Resources"
1098
        ][resource_id]
1099

1100
    def construct_changes(
1✔
1101
        self,
1102
        existing_stack,
1103
        new_stack,
1104
        # TODO: remove initialize argument from here, and determine action based on resource status
1105
        initialize: Optional[bool] = False,
1106
        change_set_id=None,
1107
        append_to_changeset: Optional[bool] = False,
1108
        filter_unchanged_resources: Optional[bool] = False,
1109
    ) -> list[ChangeConfig]:
1110
        old_resources = existing_stack.template["Resources"]
1✔
1111
        new_resources = new_stack.template["Resources"]
1✔
1112
        deletes = [val for key, val in old_resources.items() if key not in new_resources]
1✔
1113
        adds = [val for key, val in new_resources.items() if initialize or key not in old_resources]
1✔
1114
        modifies = [
1✔
1115
            val for key, val in new_resources.items() if not initialize and key in old_resources
1116
        ]
1117

1118
        changes = []
1✔
1119
        for action, items in (("Remove", deletes), ("Add", adds), ("Modify", modifies)):
1✔
1120
            for item in items:
1✔
1121
                item["Properties"] = item.get("Properties", {})
1✔
1122
                if (
1✔
1123
                    not filter_unchanged_resources  # TODO: find out purpose of this
1124
                    or action != "Modify"
1125
                    or self.resource_config_differs(item)
1126
                ):
1127
                    change = self.get_change_config(action, item, change_set_id=change_set_id)
1✔
1128
                    changes.append(change)
1✔
1129

1130
        # append changes to change set
1131
        if append_to_changeset and isinstance(new_stack, StackChangeSet):
1✔
1132
            new_stack.changes.extend(changes)
1✔
1133

1134
        return changes
1✔
1135

1136
    def apply_changes(
1✔
1137
        self,
1138
        existing_stack: Stack,
1139
        new_stack: StackChangeSet,
1140
        change_set_id: Optional[str] = None,
1141
        initialize: Optional[bool] = False,
1142
        action: Optional[str] = None,
1143
    ):
1144
        old_resources = existing_stack.template["Resources"]
1✔
1145
        new_resources = new_stack.template["Resources"]
1✔
1146
        action = action or "CREATE"
1✔
1147
        # TODO: this seems wrong, not every resource here will be in an UPDATE_IN_PROGRESS state? (only the ones that will actually be updated)
1148
        self.init_resource_status(old_resources, action="UPDATE")
1✔
1149

1150
        # apply parameter changes to existing stack
1151
        # self.apply_parameter_changes(existing_stack, new_stack)
1152

1153
        # construct changes
1154
        changes = self.construct_changes(
1✔
1155
            existing_stack,
1156
            new_stack,
1157
            initialize=initialize,
1158
            change_set_id=change_set_id,
1159
        )
1160

1161
        # check if we have actual changes in the stack, and prepare properties
1162
        contains_changes = False
1✔
1163
        for change in changes:
1✔
1164
            res_action = change["ResourceChange"]["Action"]
1✔
1165
            resource = new_resources.get(change["ResourceChange"]["LogicalResourceId"])
1✔
1166
            #  FIXME: we need to resolve refs before diffing to detect if for example a parameter causes the change or not
1167
            #   unfortunately this would currently cause issues because we might not be able to resolve everything yet
1168
            # resource = resolve_refs_recursively(
1169
            #     self.stack_name,
1170
            #     self.resources,
1171
            #     self.mappings,
1172
            #     self.stack.resolved_conditions,
1173
            #     self.stack.resolved_parameters,
1174
            #     resource,
1175
            # )
1176
            if res_action in ["Add", "Remove"] or self.resource_config_differs(resource):
1✔
1177
                contains_changes = True
1✔
1178
            if res_action in ["Modify", "Add"]:
1✔
1179
                # mutating call that overwrites resource properties with new properties and overwrites the template in old stack with new template
1180
                self.merge_properties(resource["LogicalResourceId"], existing_stack, new_stack)
1✔
1181
        if not contains_changes:
1✔
1182
            raise NoStackUpdates("No updates are to be performed.")
1✔
1183

1184
        # merge stack outputs and conditions
1185
        existing_stack.outputs.update(new_stack.outputs)
1✔
1186
        existing_stack.conditions.update(new_stack.conditions)
1✔
1187

1188
        # TODO: ideally the entire template has to be replaced, but tricky at this point
1189
        existing_stack.template["Metadata"] = new_stack.template.get("Metadata")
1✔
1190
        existing_stack.template_body = new_stack.template_body
1✔
1191

1192
        # start deployment loop
1193
        return self.apply_changes_in_loop(
1✔
1194
            changes, existing_stack, action=action, new_stack=new_stack
1195
        )
1196

1197
    def apply_changes_in_loop(
1✔
1198
        self,
1199
        changes: list[ChangeConfig],
1200
        stack: Stack,
1201
        action: Optional[str] = None,
1202
        new_stack=None,
1203
    ):
1204
        def _run(*args):
1✔
1205
            status_reason = None
1✔
1206
            try:
1✔
1207
                self.do_apply_changes_in_loop(changes, stack)
1✔
1208
                status = f"{action}_COMPLETE"
1✔
1209
            except Exception as e:
1✔
1210
                log_method = LOG.debug
1✔
1211
                if config.CFN_VERBOSE_ERRORS:
1✔
1212
                    log_method = LOG.exception
×
1213
                log_method(
1✔
1214
                    'Error applying changes for CloudFormation stack "%s": %s %s',
1215
                    stack.stack_name,
1216
                    e,
1217
                    traceback.format_exc(),
1218
                )
1219
                status = f"{action}_FAILED"
1✔
1220
                status_reason = str(e)
1✔
1221
            stack.set_stack_status(status, status_reason)
1✔
1222
            if isinstance(new_stack, StackChangeSet):
1✔
1223
                new_stack.metadata["Status"] = status
1✔
1224
                exec_result = "EXECUTE_FAILED" if "FAILED" in status else "EXECUTE_COMPLETE"
1✔
1225
                new_stack.metadata["ExecutionStatus"] = exec_result
1✔
1226
                result = "failed" if "FAILED" in status else "succeeded"
1✔
1227
                new_stack.metadata["StatusReason"] = status_reason or f"Deployment {result}"
1✔
1228

1229
        # run deployment in background loop, to avoid client network timeouts
1230
        return start_worker_thread(_run)
1✔
1231

1232
    def prepare_should_deploy_change(
1✔
1233
        self, resource_id: str, change: ResourceChange, stack, new_resources: dict
1234
    ) -> bool:
1235
        """
1236
        TODO: document
1237
        """
1238
        resource = new_resources[resource_id]
1✔
1239
        res_change = change["ResourceChange"]
1✔
1240
        action = res_change["Action"]
1✔
1241

1242
        # check resource condition, if present
1243
        if not evaluate_resource_condition(stack.resolved_conditions, resource):
1✔
1244
            LOG.debug(
1✔
1245
                'Skipping deployment of "%s", as resource condition evaluates to false', resource_id
1246
            )
1247
            return False
1✔
1248

1249
        # resolve refs in resource details
1250
        resolve_refs_recursively(
1✔
1251
            self.account_id,
1252
            self.region_name,
1253
            stack.stack_name,
1254
            stack.resources,
1255
            stack.mappings,
1256
            stack.resolved_conditions,
1257
            stack.resolved_parameters,
1258
            resource,
1259
        )
1260

1261
        if action in ["Add", "Modify"]:
1✔
1262
            is_deployed = self.is_deployed(resource)
1✔
1263
            # TODO: Attaching the cached _deployed info here, as we should not change the "Add"/"Modify" attribute
1264
            #  here, which is used further down the line to determine the resource action CREATE/UPDATE. This is a
1265
            #  temporary workaround for now - to be refactored once we introduce proper stack resource state models.
1266
            res_change["_deployed"] = is_deployed
1✔
1267
            if not is_deployed:
1✔
1268
                return True
1✔
1269
            if action == "Add":
×
1270
                return False
×
1271
        elif action == "Remove":
1✔
1272
            return True
1✔
1273
        return True
×
1274

1275
    # Stack is needed here
1276
    def apply_change(self, change: ChangeConfig, stack: Stack) -> None:
1✔
1277
        change_details = change["ResourceChange"]
1✔
1278
        action = change_details["Action"]
1✔
1279
        resource_id = change_details["LogicalResourceId"]
1✔
1280
        resources = stack.resources
1✔
1281
        resource = resources[resource_id]
1✔
1282

1283
        # TODO: this should not be needed as resources are filtered out if the
1284
        # condition evaluates to False.
1285
        if not evaluate_resource_condition(stack.resolved_conditions, resource):
1✔
1286
            return
×
1287

1288
        # remove AWS::NoValue entries
1289
        resource_props = resource.get("Properties")
1✔
1290
        if resource_props:
1✔
1291
            resource["Properties"] = remove_none_values(resource_props)
1✔
1292

1293
        executor = self.create_resource_provider_executor()
1✔
1294
        resource_provider_payload = self.create_resource_provider_payload(
1✔
1295
            action, logical_resource_id=resource_id
1296
        )
1297

1298
        resource_provider = executor.try_load_resource_provider(get_resource_type(resource))
1✔
1299
        if resource_provider is not None:
1✔
1300
            # add in-progress event
1301
            resource_status = f"{get_action_name_for_resource_change(action)}_IN_PROGRESS"
1✔
1302
            physical_resource_id = None
1✔
1303
            if action in ("Modify", "Remove"):
1✔
1304
                previous_state = self.resources[resource_id].get("_last_deployed_state")
1✔
1305
                if not previous_state:
1✔
1306
                    # TODO: can this happen?
1307
                    previous_state = self.resources[resource_id]["Properties"]
1✔
1308
                physical_resource_id = executor.extract_physical_resource_id_from_model_with_schema(
1✔
1309
                    resource_model=previous_state,
1310
                    resource_type=resource["Type"],
1311
                    resource_type_schema=resource_provider.SCHEMA,
1312
                )
1313
            stack.add_stack_event(
1✔
1314
                resource_id=resource_id,
1315
                physical_res_id=physical_resource_id,
1316
                status=resource_status,
1317
            )
1318

1319
            # perform the deploy
1320
            progress_event = executor.deploy_loop(
1✔
1321
                resource_provider, resource, resource_provider_payload
1322
            )
1323
        else:
1324
            resource["PhysicalResourceId"] = MOCK_REFERENCE
1✔
1325
            progress_event = ProgressEvent(OperationStatus.SUCCESS, resource_model={})
1✔
1326

1327
        # TODO: clean up the surrounding loop (do_apply_changes_in_loop) so that the responsibilities are clearer
1328
        stack_action = get_action_name_for_resource_change(action)
1✔
1329
        match progress_event.status:
1✔
1330
            case OperationStatus.FAILED:
1✔
1331
                stack.set_resource_status(
1✔
1332
                    resource_id,
1333
                    f"{stack_action}_FAILED",
1334
                    status_reason=progress_event.message or "",
1335
                )
1336
                # TODO: remove exception raising here?
1337
                # TODO: fix request token
1338
                raise Exception(
1✔
1339
                    f'Resource handler returned message: "{progress_event.message}" (RequestToken: 10c10335-276a-33d3-5c07-018b684c3d26, HandlerErrorCode: InvalidRequest){progress_event.error_code}'
1340
                )
1341
            case OperationStatus.SUCCESS:
1✔
1342
                stack.set_resource_status(resource_id, f"{stack_action}_COMPLETE")
1✔
1343
            case OperationStatus.PENDING:
×
1344
                # signal to the main loop that we should come back to this resource in the future
1345
                raise DependencyNotYetSatisfied(
×
1346
                    resource_ids=[], message="Resource dependencies not yet satisfied"
1347
                )
1348
            case OperationStatus.IN_PROGRESS:
×
1349
                raise Exception("Resource deployment loop should not finish in this state")
×
1350
            case unknown_status:
×
1351
                raise Exception(f"Unknown operation status: {unknown_status}")
×
1352

1353
        # TODO: this is probably already done in executor, try removing this
1354
        resource["Properties"] = progress_event.resource_model
1✔
1355

1356
    def create_resource_provider_executor(self) -> ResourceProviderExecutor:
1✔
1357
        return ResourceProviderExecutor(
1✔
1358
            stack_name=self.stack.stack_name,
1359
            stack_id=self.stack.stack_id,
1360
        )
1361

1362
    def create_resource_provider_payload(
1✔
1363
        self, action: str, logical_resource_id: str
1364
    ) -> ResourceProviderPayload:
1365
        # FIXME: use proper credentials
1366
        creds: Credentials = {
1✔
1367
            "accessKeyId": self.account_id,
1368
            "secretAccessKey": INTERNAL_AWS_SECRET_ACCESS_KEY,
1369
            "sessionToken": "",
1370
        }
1371
        resource = self.resources[logical_resource_id]
1✔
1372

1373
        resource_provider_payload: ResourceProviderPayload = {
1✔
1374
            "awsAccountId": self.account_id,
1375
            "callbackContext": {},
1376
            "stackId": self.stack.stack_name,
1377
            "resourceType": resource["Type"],
1378
            "resourceTypeVersion": "000000",
1379
            # TODO: not actually a UUID
1380
            "bearerToken": str(uuid.uuid4()),
1381
            "region": self.region_name,
1382
            "action": action,
1383
            "requestData": {
1384
                "logicalResourceId": logical_resource_id,
1385
                "resourceProperties": resource["Properties"],
1386
                "previousResourceProperties": resource.get("_last_deployed_state"),  # TODO
1387
                "callerCredentials": creds,
1388
                "providerCredentials": creds,
1389
                "systemTags": {},
1390
                "previousSystemTags": {},
1391
                "stackTags": {},
1392
                "previousStackTags": {},
1393
            },
1394
        }
1395
        return resource_provider_payload
1✔
1396

1397
    def delete_stack(self):
1✔
1398
        if not self.stack:
1✔
1399
            return
×
1400
        self.stack.set_stack_status("DELETE_IN_PROGRESS")
1✔
1401
        stack_resources = list(self.stack.resources.values())
1✔
1402
        resources = {r["LogicalResourceId"]: clone_safe(r) for r in stack_resources}
1✔
1403
        original_resources = self.stack.template_original["Resources"]
1✔
1404

1405
        # TODO: what is this doing?
1406
        for key, resource in resources.items():
1✔
1407
            resource["Properties"] = resource.get(
1✔
1408
                "Properties", clone_safe(resource)
1409
            )  # TODO: why is there a fallback?
1410
            resource["ResourceType"] = get_resource_type(resource)
1✔
1411

1412
        ordered_resource_ids = list(
1✔
1413
            order_resources(
1414
                resources=original_resources,
1415
                resolved_conditions=self.stack.resolved_conditions,
1416
                resolved_parameters=self.stack.resolved_parameters,
1417
                reverse=True,
1418
            ).keys()
1419
        )
1420
        for i, resource_id in enumerate(ordered_resource_ids):
1✔
1421
            resource = resources[resource_id]
1✔
1422
            try:
1✔
1423
                # TODO: cache condition value in resource details on deployment and use cached value here
1424
                if not evaluate_resource_condition(
1✔
1425
                    self.stack.resolved_conditions,
1426
                    resource,
1427
                ):
1428
                    continue
1✔
1429

1430
                executor = self.create_resource_provider_executor()
1✔
1431
                resource_provider_payload = self.create_resource_provider_payload(
1✔
1432
                    "Remove", logical_resource_id=resource_id
1433
                )
1434
                LOG.debug(
1✔
1435
                    'Handling "Remove" for resource "%s" (%s/%s) type "%s"',
1436
                    resource_id,
1437
                    i + 1,
1438
                    len(resources),
1439
                    resource["ResourceType"],
1440
                )
1441
                resource_provider = executor.try_load_resource_provider(get_resource_type(resource))
1✔
1442
                if resource_provider is not None:
1✔
1443
                    event = executor.deploy_loop(
1✔
1444
                        resource_provider, resource, resource_provider_payload
1445
                    )
1446
                else:
1447
                    event = ProgressEvent(OperationStatus.SUCCESS, resource_model={})
1✔
1448
                match event.status:
1✔
1449
                    case OperationStatus.SUCCESS:
1✔
1450
                        self.stack.set_resource_status(resource_id, "DELETE_COMPLETE")
1✔
1451
                    case OperationStatus.PENDING:
1✔
1452
                        # the resource is still being deleted, specifically the provider has
1453
                        # signalled that the deployment loop should skip this resource this
1454
                        # time and come back to it later, likely due to unmet child
1455
                        # resources still existing because we don't delete things in the
1456
                        # correct order yet.
UNCOV
1457
                        continue
×
1458
                    case OperationStatus.FAILED:
1✔
1459
                        LOG.exception(
1✔
1460
                            "Failed to delete resource with id %s. Reason: %s",
1461
                            resource_id,
1462
                            event.message or "unknown",
1463
                        )
UNCOV
1464
                    case OperationStatus.IN_PROGRESS:
×
1465
                        # the resource provider executor should not return this state, so
1466
                        # this state is a programming error
UNCOV
1467
                        raise Exception(
×
1468
                            "Programming error: ResourceProviderExecutor cannot return IN_PROGRESS"
1469
                        )
UNCOV
1470
                    case other_status:
×
UNCOV
1471
                        raise Exception(f"Use of unsupported status found: {other_status}")
×
1472

1473
            except Exception as e:
1✔
1474
                LOG.exception(
1✔
1475
                    "Failed to delete resource with id %s. Final exception: %s",
1476
                    resource_id,
1477
                    e,
1478
                )
1479

1480
        # update status
1481
        self.stack.set_stack_status("DELETE_COMPLETE")
1✔
1482
        self.stack.set_time_attribute("DeletionTime")
1✔
1483

1484
    def do_apply_changes_in_loop(self, changes: list[ChangeConfig], stack: Stack) -> list:
1✔
1485
        # apply changes in a retry loop, to resolve resource dependencies and converge to the target state
1486
        changes_done = []
1✔
1487
        new_resources = stack.resources
1✔
1488

1489
        sorted_changes = order_changes(
1✔
1490
            given_changes=changes,
1491
            resources=new_resources,
1492
            resolved_conditions=stack.resolved_conditions,
1493
            resolved_parameters=stack.resolved_parameters,
1494
        )
1495
        for change_idx, change in enumerate(sorted_changes):
1✔
1496
            res_change = change["ResourceChange"]
1✔
1497
            action = res_change["Action"]
1✔
1498
            is_add_or_modify = action in ["Add", "Modify"]
1✔
1499
            resource_id = res_change["LogicalResourceId"]
1✔
1500

1501
            # TODO: do resolve_refs_recursively once here
1502
            try:
1✔
1503
                if is_add_or_modify:
1✔
1504
                    should_deploy = self.prepare_should_deploy_change(
1✔
1505
                        resource_id, change, stack, new_resources
1506
                    )
1507
                    LOG.debug(
1✔
1508
                        'Handling "%s" for resource "%s" (%s/%s) type "%s" (should_deploy=%s)',
1509
                        action,
1510
                        resource_id,
1511
                        change_idx + 1,
1512
                        len(changes),
1513
                        res_change["ResourceType"],
1514
                        should_deploy,
1515
                    )
1516
                    if not should_deploy:
1✔
1517
                        stack_action = get_action_name_for_resource_change(action)
1✔
1518
                        stack.set_resource_status(resource_id, f"{stack_action}_COMPLETE")
1✔
1519
                        continue
1✔
1520
                elif action == "Remove":
1✔
1521
                    should_remove = self.prepare_should_deploy_change(
1✔
1522
                        resource_id, change, stack, new_resources
1523
                    )
1524
                    if not should_remove:
1✔
UNCOV
1525
                        continue
×
1526
                    LOG.debug(
1✔
1527
                        'Handling "%s" for resource "%s" (%s/%s) type "%s"',
1528
                        action,
1529
                        resource_id,
1530
                        change_idx + 1,
1531
                        len(changes),
1532
                        res_change["ResourceType"],
1533
                    )
1534
                self.apply_change(change, stack=stack)
1✔
1535
                changes_done.append(change)
1✔
1536
            except Exception as e:
1✔
1537
                status_action = {
1✔
1538
                    "Add": "CREATE",
1539
                    "Modify": "UPDATE",
1540
                    "Dynamic": "UPDATE",
1541
                    "Remove": "DELETE",
1542
                }[action]
1543
                stack.add_stack_event(
1✔
1544
                    resource_id=resource_id,
1545
                    physical_res_id=new_resources[resource_id].get("PhysicalResourceId"),
1546
                    status=f"{status_action}_FAILED",
1547
                    status_reason=str(e),
1548
                )
1549
                if config.CFN_VERBOSE_ERRORS:
1✔
UNCOV
1550
                    LOG.exception("Failed to deploy resource %s, stack deploy failed", resource_id)
×
1551
                raise
1✔
1552

1553
        # clean up references to deleted resources in stack
1554
        deletes = [c for c in changes_done if c["ResourceChange"]["Action"] == "Remove"]
1✔
1555
        for delete in deletes:
1✔
1556
            stack.template["Resources"].pop(delete["ResourceChange"]["LogicalResourceId"], None)
1✔
1557

1558
        # resolve outputs
1559
        stack.resolved_outputs = resolve_outputs(self.account_id, self.region_name, stack)
1✔
1560

1561
        return changes_done
1✔
1562

1563

1564
# FIXME: resolve_refs_recursively should not be needed, the resources themselves should have those values available already
1565
def resolve_outputs(account_id: str, region_name: str, stack) -> list[dict]:
1✔
1566
    result = []
1✔
1567
    for k, details in stack.outputs.items():
1✔
1568
        if not evaluate_resource_condition(stack.resolved_conditions, details):
1✔
1569
            continue
1✔
1570
        value = None
1✔
1571
        try:
1✔
1572
            resolve_refs_recursively(
1✔
1573
                account_id,
1574
                region_name,
1575
                stack.stack_name,
1576
                stack.resources,
1577
                stack.mappings,
1578
                stack.resolved_conditions,
1579
                stack.resolved_parameters,
1580
                details,
1581
            )
1582
            value = details["Value"]
1✔
UNCOV
1583
        except Exception as e:
×
UNCOV
1584
            log_method = LOG.debug
×
UNCOV
1585
            if config.CFN_VERBOSE_ERRORS:
×
UNCOV
1586
                raise  # unresolvable outputs cause a stack failure
×
1587
                # log_method = getattr(LOG, "exception")
UNCOV
1588
            log_method("Unable to resolve references in stack outputs: %s - %s", details, e)
×
1589
        exports = details.get("Export") or {}
1✔
1590
        export = exports.get("Name")
1✔
1591
        export = resolve_refs_recursively(
1✔
1592
            account_id,
1593
            region_name,
1594
            stack.stack_name,
1595
            stack.resources,
1596
            stack.mappings,
1597
            stack.resolved_conditions,
1598
            stack.resolved_parameters,
1599
            export,
1600
        )
1601
        description = details.get("Description")
1✔
1602
        entry = {
1✔
1603
            "OutputKey": k,
1604
            "OutputValue": value,
1605
            "Description": description,
1606
            "ExportName": export,
1607
        }
1608
        result.append(entry)
1✔
1609
    return result
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc