• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20154439467

11 Dec 2025 04:58PM UTC coverage: 86.873% (+0.006%) from 86.867%
20154439467

push

github

web-flow
SQS: Improve update support for CloudFormation handlers. (#13477)

34 of 34 new or added lines in 4 files covered. (100.0%)

15 existing lines in 5 files now uncovered.

69932 of 80499 relevant lines covered (86.87%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.67
/localstack-core/localstack/services/cloudformation/resource_provider.py
1
from __future__ import annotations
1✔
2

3
import copy
1✔
4
import logging
1✔
5
import re
1✔
6
import time
1✔
7
import uuid
1✔
8
from collections.abc import Callable
1✔
9
from dataclasses import dataclass, field
1✔
10
from enum import Enum, auto
1✔
11
from logging import Logger
1✔
12
from math import ceil
1✔
13
from typing import TYPE_CHECKING, Any, Generic, TypedDict, TypeVar
1✔
14

15
import botocore
1✔
16
from botocore.client import BaseClient
1✔
17
from botocore.exceptions import ClientError
1✔
18
from botocore.model import OperationModel
1✔
19
from plux import Plugin, PluginManager
1✔
20

21
from localstack import config
1✔
22
from localstack.aws.connect import InternalClientFactory, ServiceLevelClientFactory
1✔
23
from localstack.services.cloudformation.deployment_utils import (
1✔
24
    check_not_found_exception,
25
    convert_data_types,
26
    fix_account_id_in_arns,
27
    fix_boto_parameters_based_on_report,
28
    remove_none_values,
29
)
30
from localstack.services.cloudformation.engine.quirks import PHYSICAL_RESOURCE_ID_SPECIAL_CASES
1✔
31
from localstack.services.cloudformation.provider_utils import convert_request_kwargs
1✔
32
from localstack.services.cloudformation.service_models import KEY_RESOURCE_STATE
1✔
33

34
PRO_RESOURCE_PROVIDERS = False
1✔
35
try:
1✔
36
    from localstack.pro.core.services.cloudformation.resource_provider import (
1✔
37
        CloudFormationResourceProviderPluginExt,
38
    )
39

40
    PRO_RESOURCE_PROVIDERS = True
×
41
except ImportError:
1✔
42
    pass
1✔
43

44
if TYPE_CHECKING:
1✔
45
    from localstack.services.cloudformation.engine.types import (
×
46
        FuncDetails,
47
        FuncDetailsValue,
48
        ResourceDefinition,
49
    )
50

51
LOG = logging.getLogger(__name__)
1✔
52

53
Properties = TypeVar("Properties")
1✔
54

55
PUBLIC_REGISTRY: dict[str, type[ResourceProvider]] = {}
1✔
56

57
PROVIDER_DEFAULTS = {}  # TODO: remove this after removing patching in -ext
1✔
58

59

60
class OperationStatus(Enum):
1✔
61
    PENDING = auto()
1✔
62
    IN_PROGRESS = auto()
1✔
63
    SUCCESS = auto()
1✔
64
    FAILED = auto()
1✔
65

66

67
@dataclass
1✔
68
class ProgressEvent(Generic[Properties]):
1✔
69
    status: OperationStatus
1✔
70
    resource_model: Properties | None = None
1✔
71
    resource_models: list[Properties] | None = None
1✔
72

73
    message: str = ""
1✔
74
    result: str | None = None
1✔
75
    error_code: str | None = None  # TODO: enum
1✔
76
    custom_context: dict = field(default_factory=dict)
1✔
77

78

79
class Credentials(TypedDict):
1✔
80
    accessKeyId: str
1✔
81
    secretAccessKey: str
1✔
82
    sessionToken: str
1✔
83

84

85
class ResourceProviderPayloadRequestData(TypedDict):
1✔
86
    logicalResourceId: str
1✔
87
    resourceProperties: Properties
1✔
88
    previousResourceProperties: Properties | None
1✔
89
    callerCredentials: Credentials
1✔
90
    providerCredentials: Credentials
1✔
91
    systemTags: dict[str, str]
1✔
92
    previousSystemTags: dict[str, str]
1✔
93
    stackTags: dict[str, str]
1✔
94
    previousStackTags: dict[str, str]
1✔
95

96

97
class ResourceProviderPayload(TypedDict):
1✔
98
    callbackContext: dict
1✔
99
    stackId: str
1✔
100
    requestData: ResourceProviderPayloadRequestData
1✔
101
    resourceType: str
1✔
102
    resourceTypeVersion: str
1✔
103
    awsAccountId: str
1✔
104
    bearerToken: str
1✔
105
    region: str
1✔
106
    action: str
1✔
107

108

109
ResourceProperties = TypeVar("ResourceProperties")
1✔
110

111

112
def _handler_provide_client_params(event_name: str, params: dict, model: OperationModel, **kwargs):
1✔
113
    """
114
    A botocore hook handler that will try to convert the passed parameters according to the given operation model
115
    """
116
    return convert_request_kwargs(params, model.input_shape)
1✔
117

118

119
class ConvertingInternalClientFactory(InternalClientFactory):
1✔
120
    def _get_client_post_hook(self, client: BaseClient) -> BaseClient:
1✔
121
        """
122
        Register handlers that modify the passed properties to make them compatible with the API structure
123
        """
124

125
        client.meta.events.register(
1✔
126
            "provide-client-params.*.*", handler=_handler_provide_client_params
127
        )
128

129
        return super()._get_client_post_hook(client)
1✔
130

131

132
_cfn_resource_client_factory = ConvertingInternalClientFactory(use_ssl=config.DISTRIBUTED_MODE)
1✔
133

134

135
def convert_payload(
1✔
136
    stack_name: str, stack_id: str, payload: ResourceProviderPayload
137
) -> ResourceRequest[Properties]:
138
    client_factory = _cfn_resource_client_factory(
1✔
139
        aws_access_key_id=payload["requestData"]["callerCredentials"]["accessKeyId"],
140
        aws_session_token=payload["requestData"]["callerCredentials"]["sessionToken"],
141
        aws_secret_access_key=payload["requestData"]["callerCredentials"]["secretAccessKey"],
142
        region_name=payload["region"],
143
    )
144
    desired_state = payload["requestData"]["resourceProperties"]
1✔
145
    rr = ResourceRequest(
1✔
146
        _original_payload=desired_state,
147
        aws_client_factory=client_factory,
148
        request_token=str(uuid.uuid4()),  # TODO: not actually a UUID
149
        stack_name=stack_name,
150
        stack_id=stack_id,
151
        account_id=payload["awsAccountId"],
152
        region_name=payload["region"],
153
        desired_state=desired_state,
154
        logical_resource_id=payload["requestData"]["logicalResourceId"],
155
        resource_type=payload["resourceType"],
156
        logger=logging.getLogger("abc"),
157
        custom_context=payload["callbackContext"],
158
        action=payload["action"],
159
    )
160

161
    if previous_properties := payload["requestData"].get("previousResourceProperties"):
1✔
162
        rr.previous_state = previous_properties
1✔
163

164
    return rr
1✔
165

166

167
@dataclass
1✔
168
class ResourceRequest(Generic[Properties]):
1✔
169
    _original_payload: Properties
1✔
170

171
    aws_client_factory: ServiceLevelClientFactory
1✔
172
    request_token: str
1✔
173
    stack_name: str
1✔
174
    stack_id: str
1✔
175
    account_id: str
1✔
176
    region_name: str
1✔
177
    action: str
1✔
178

179
    desired_state: Properties
1✔
180

181
    logical_resource_id: str
1✔
182
    resource_type: str
1✔
183

184
    logger: Logger
1✔
185

186
    custom_context: dict = field(default_factory=dict)
1✔
187

188
    previous_state: Properties | None = None
1✔
189
    previous_tags: dict[str, str] | None = None
1✔
190
    tags: dict[str, str] = field(default_factory=dict)
1✔
191

192

193
class CloudFormationResourceProviderPlugin(Plugin):
1✔
194
    """
195
    Base class for resource provider plugins.
196
    """
197

198
    namespace = "localstack.cloudformation.resource_providers"
1✔
199

200

201
class ResourceProvider(Generic[Properties]):
1✔
202
    """
203
    This provides a base class onto which service-specific resource providers are built.
204
    """
205

206
    SCHEMA: dict
1✔
207

208
    def create(self, request: ResourceRequest[Properties]) -> ProgressEvent[Properties]:
1✔
209
        raise NotImplementedError
210

211
    def update(self, request: ResourceRequest[Properties]) -> ProgressEvent[Properties]:
1✔
212
        raise NotImplementedError
213

214
    def delete(self, request: ResourceRequest[Properties]) -> ProgressEvent[Properties]:
1✔
215
        raise NotImplementedError
216

217
    def read(self, request: ResourceRequest[Properties]) -> ProgressEvent[Properties]:
1✔
218
        raise NotImplementedError
219

220
    def list(self, request: ResourceRequest[Properties]) -> ProgressEvent[Properties]:
1✔
221
        raise NotImplementedError
222

223

224
# legacy helpers
225
def get_resource_type(resource: dict) -> str:
1✔
226
    """this is currently overwritten in PRO to add support for custom resources"""
227
    if isinstance(resource, str):
1✔
228
        raise ValueError(f"Invalid argument: {resource}")
×
229
    try:
1✔
230
        resource_type: str = resource["Type"]
1✔
231

232
        if resource_type.startswith("Custom::"):
1✔
233
            return "AWS::CloudFormation::CustomResource"
×
234
        return resource_type
1✔
235
    except Exception:
×
236
        LOG.warning(
×
237
            "Failed to retrieve resource type %s",
238
            resource.get("Type"),
239
            exc_info=LOG.isEnabledFor(logging.DEBUG) and config.CFN_VERBOSE_ERRORS,
240
        )
241

242

243
def invoke_function(
1✔
244
    account_id: str,
245
    region_name: str,
246
    function: Callable,
247
    params: dict,
248
    resource_type: str,
249
    func_details: FuncDetails,
250
    action_name: str,
251
    resource: Any,
252
) -> Any:
253
    try:
×
254
        LOG.debug(
×
255
            'Request for resource type "%s" in account %s region %s: %s %s',
256
            resource_type,
257
            account_id,
258
            region_name,
259
            func_details["function"],
260
            params,
261
        )
262
        try:
×
263
            result = function(**params)
×
264
        except botocore.exceptions.ParamValidationError as e:
×
265
            # alternatively we could also use the ParamValidator directly
266
            report = e.kwargs.get("report")
×
267
            if not report:
×
268
                raise
×
269

270
            LOG.debug("Converting parameters to allowed types")
×
271
            LOG.debug("Report: %s", report)
×
272
            converted_params = fix_boto_parameters_based_on_report(params, report)
×
273
            LOG.debug("Original parameters:  %s", params)
×
274
            LOG.debug("Converted parameters: %s", converted_params)
×
275

276
            result = function(**converted_params)
×
277
    except Exception as e:
×
278
        if action_name == "Remove" and check_not_found_exception(e, resource_type, resource):
×
279
            return
×
280
        log_method = LOG.warning
×
281
        if config.CFN_VERBOSE_ERRORS:
×
282
            log_method = LOG.exception
×
283
        log_method("Error calling %s with params: %s for resource: %s", function, params, resource)
×
284
        raise e
×
285

286
    return result
×
287

288

289
def get_service_name(resource):
1✔
290
    res_type = resource["Type"]
×
291
    parts = res_type.split("::")
×
292
    if len(parts) == 1:
×
293
        return None
×
294
    if "Cognito::IdentityPool" in res_type:
×
295
        return "cognito-identity"
×
296
    if res_type.endswith("Cognito::UserPool"):
×
297
        return "cognito-idp"
×
298
    if parts[-2] == "Cognito":
×
299
        return "cognito-idp"
×
300
    if parts[-2] == "Elasticsearch":
×
301
        return "es"
×
302
    if parts[-2] == "OpenSearchService":
×
303
        return "opensearch"
×
304
    if parts[-2] == "KinesisFirehose":
×
305
        return "firehose"
×
306
    if parts[-2] == "ResourceGroups":
×
307
        return "resource-groups"
×
308
    if parts[-2] == "CertificateManager":
×
309
        return "acm"
×
310
    if "ElasticLoadBalancing::" in res_type:
×
311
        return "elb"
×
312
    if "ElasticLoadBalancingV2::" in res_type:
×
313
        return "elbv2"
×
314
    if "ApplicationAutoScaling::" in res_type:
×
315
        return "application-autoscaling"
×
316
    if "MSK::" in res_type:
×
317
        return "kafka"
×
318
    if "Timestream::" in res_type:
×
319
        return "timestream-write"
×
320
    return parts[1].lower()
×
321

322

323
def resolve_resource_parameters(
1✔
324
    account_id_: str,
325
    region_name_: str,
326
    stack_name: str,
327
    resource_definition: ResourceDefinition,
328
    resources: dict[str, ResourceDefinition],
329
    resource_id: str,
330
    func_details: FuncDetailsValue,
331
) -> dict | None:
332
    params = func_details.get("parameters") or (
×
333
        lambda account_id, region_name, properties, logical_resource_id, *args, **kwargs: properties
334
    )
335
    resource_props = resource_definition["Properties"] = resource_definition.get("Properties", {})
×
336
    resource_props = dict(resource_props)
×
337
    resource_state = resource_definition.get(KEY_RESOURCE_STATE, {})
×
338
    last_deployed_state = resource_definition.get("_last_deployed_state", {})
×
339

340
    if callable(params):
×
341
        # resolve parameter map via custom function
342
        params = params(
×
343
            account_id_, region_name_, resource_props, resource_id, resource_definition, stack_name
344
        )
345
    else:
346
        # it could be a list like ['param1', 'param2', {'apiCallParamName': 'cfResourcePropName'}]
347
        if isinstance(params, list):
×
348
            _params = {}
×
349
            for param in params:
×
350
                if isinstance(param, dict):
×
351
                    _params.update(param)
×
352
                else:
353
                    _params[param] = param
×
354
            params = _params
×
355

356
        params = dict(params)
×
357
        # TODO(srw): mutably mapping params :(
358
        for param_key, prop_keys in dict(params).items():
×
359
            params.pop(param_key, None)
×
360
            if not isinstance(prop_keys, list):
×
361
                prop_keys = [prop_keys]
×
362
            for prop_key in prop_keys:
×
363
                if callable(prop_key):
×
364
                    prop_value = prop_key(
×
365
                        account_id_,
366
                        region_name_,
367
                        resource_props,
368
                        resource_id,
369
                        resource_definition,
370
                        stack_name,
371
                    )
372
                else:
373
                    prop_value = resource_props.get(
×
374
                        prop_key,
375
                        resource_definition.get(
376
                            prop_key,
377
                            resource_state.get(prop_key, last_deployed_state.get(prop_key)),
378
                        ),
379
                    )
380
                if prop_value is not None:
×
381
                    params[param_key] = prop_value
×
382
                    break
×
383

384
    # this is an indicator that we should skip this resource deployment, and return
385
    if params is None:
×
386
        return
×
387

388
    # FIXME: move this to a single place after template processing is finished
389
    # convert any moto account IDs (123456789012) in ARNs to our format (000000000000)
390
    params = fix_account_id_in_arns(params, account_id_)
×
391
    # convert data types (e.g., boolean strings to bool)
392
    # TODO: this might not be needed anymore
393
    params = convert_data_types(func_details.get("types", {}), params)
×
394
    # remove None values, as they usually raise boto3 errors
395
    params = remove_none_values(params)
×
396

397
    return params
×
398

399

400
class NoResourceProvider(Exception):
1✔
401
    pass
1✔
402

403

404
def resolve_json_pointer(resource_props: Properties, primary_id_path: str) -> str:
1✔
405
    primary_id_path = primary_id_path.replace("/properties", "")
1✔
406
    parts = [p for p in primary_id_path.split("/") if p]
1✔
407

408
    resolved_part = resource_props.copy()
1✔
409
    for i in range(len(parts)):
1✔
410
        part = parts[i]
1✔
411
        resolved_part = resolved_part.get(part)
1✔
412
        if i == len(parts) - 1:
1✔
413
            # last part
414
            return resolved_part
1✔
415

416
    raise Exception(f"Resource properties is missing field: {part}")
×
417

418

419
class ResourceProviderExecutor:
1✔
420
    """
421
    Point of abstraction between our integration with generic base models, and the new providers.
422
    """
423

424
    def __init__(
1✔
425
        self,
426
        *,
427
        stack_name: str,
428
        stack_id: str,
429
    ):
430
        self.stack_name = stack_name
1✔
431
        self.stack_id = stack_id
1✔
432

433
    def deploy_loop(
1✔
434
        self,
435
        resource_provider: ResourceProvider,
436
        resource: dict,
437
        raw_payload: ResourceProviderPayload,
438
        max_timeout: int = config.CFN_PER_RESOURCE_TIMEOUT,
439
        sleep_time: float = 1,
440
    ) -> ProgressEvent[Properties]:
441
        payload = copy.deepcopy(raw_payload)
1✔
442

443
        max_iterations = max(ceil(max_timeout / sleep_time), 10)
1✔
444

445
        for current_iteration in range(max_iterations):
1✔
446
            resource_type = get_resource_type({"Type": raw_payload["resourceType"]})
1✔
447
            resource["SpecifiedProperties"] = raw_payload["requestData"]["resourceProperties"]
1✔
448

449
            try:
1✔
450
                event = self.execute_action(resource_provider, payload)
1✔
451
            except ClientError:
1✔
452
                LOG.error(
1✔
453
                    "client error invoking '%s' handler for resource '%s' (type '%s')",
454
                    raw_payload["action"],
455
                    raw_payload["requestData"]["logicalResourceId"],
456
                    resource_type,
457
                )
458
                raise
1✔
459

460
            match event.status:
1✔
461
                case OperationStatus.FAILED:
1✔
462
                    return event
1✔
463
                case OperationStatus.SUCCESS:
1✔
464
                    if not hasattr(resource_provider, "SCHEMA"):
1✔
465
                        raise Exception(
×
466
                            "A ResourceProvider should always have a SCHEMA property defined."
467
                        )
468
                    resource_type_schema = resource_provider.SCHEMA
1✔
469
                    if raw_payload["action"] != "Remove":
1✔
470
                        physical_resource_id = (
1✔
471
                            self.extract_physical_resource_id_from_model_with_schema(
472
                                event.resource_model,
473
                                raw_payload["resourceType"],
474
                                resource_type_schema,
475
                            )
476
                        )
477

478
                        resource["PhysicalResourceId"] = physical_resource_id
1✔
479
                        resource["Properties"] = event.resource_model
1✔
480
                        resource["_last_deployed_state"] = copy.deepcopy(event.resource_model)
1✔
481
                    return event
1✔
482
                case OperationStatus.IN_PROGRESS:
1✔
483
                    # update the shared state
484
                    context = {**payload["callbackContext"], **event.custom_context}
1✔
485
                    payload["callbackContext"] = context
1✔
486
                    payload["requestData"]["resourceProperties"] = event.resource_model
1✔
487
                    resource["Properties"] = event.resource_model
1✔
488

489
                    if current_iteration < config.CFN_NO_WAIT_ITERATIONS:
1✔
490
                        pass
1✔
491
                    else:
492
                        time.sleep(sleep_time)
1✔
493

494
                case OperationStatus.PENDING:
×
495
                    # come back to this resource in another iteration
496
                    return event
×
497
                case invalid_status:
×
498
                    raise ValueError(
×
499
                        f"Invalid OperationStatus ({invalid_status}) returned for resource {raw_payload['requestData']['logicalResourceId']} (type {raw_payload['resourceType']})"
500
                    )
501

502
        else:
503
            raise TimeoutError(
×
504
                f"Resource deployment for resource {raw_payload['requestData']['logicalResourceId']} (type {raw_payload['resourceType']}) timed out."
505
            )
506

507
    def execute_action(
1✔
508
        self, resource_provider: ResourceProvider, raw_payload: ResourceProviderPayload
509
    ) -> ProgressEvent[Properties]:
510
        change_type = raw_payload["action"]
1✔
511
        request = convert_payload(
1✔
512
            stack_name=self.stack_name, stack_id=self.stack_id, payload=raw_payload
513
        )
514

515
        match change_type:
1✔
516
            case "Add":
1✔
517
                return resource_provider.create(request)
1✔
518
            case "Dynamic" | "Modify":
1✔
519
                try:
1✔
520
                    return resource_provider.update(request)
1✔
521
                except NotImplementedError:
1✔
522
                    feature_request_url = "https://github.com/localstack/localstack/issues/new?template=feature-request.yml"
1✔
523
                    LOG.warning(
1✔
524
                        'Unable to update resource type "%s", id "%s", '
525
                        "the update operation is not implemented for this resource. "
526
                        "Please consider submitting a feature request at this URL: %s",
527
                        request.resource_type,
528
                        request.logical_resource_id,
529
                        feature_request_url,
530
                    )
531
                    if request.previous_state is None:
1✔
532
                        # this is an issue with our update detection. We should never be in this state.
UNCOV
533
                        request.action = "Add"
×
UNCOV
534
                        return resource_provider.create(request)
×
535

536
                    return ProgressEvent(
1✔
537
                        status=OperationStatus.SUCCESS, resource_model=request.previous_state
538
                    )
539
                except Exception as e:
1✔
540
                    # FIXME: this fallback should be removed after fixing updates in general (order/dependenies)
541
                    # catch-all for any exception that looks like a not found exception
542
                    if check_not_found_exception(e, request.resource_type, request.desired_state):
1✔
543
                        return ProgressEvent(
1✔
544
                            status=OperationStatus.SUCCESS, resource_model=request.previous_state
545
                        )
546

547
                    return ProgressEvent(
×
548
                        status=OperationStatus.FAILED,
549
                        resource_model={},
550
                        message=f"Failed to delete resource with id {request.logical_resource_id} of type {request.resource_type}",
551
                    )
552
            case "Remove":
1✔
553
                try:
1✔
554
                    return resource_provider.delete(request)
1✔
555
                except Exception as e:
1✔
556
                    # catch-all for any exception that looks like a not found exception
557
                    if check_not_found_exception(e, request.resource_type, request.desired_state):
1✔
558
                        return ProgressEvent(status=OperationStatus.SUCCESS, resource_model={})
1✔
559

560
                    return ProgressEvent(
1✔
561
                        status=OperationStatus.FAILED,
562
                        resource_model={},
563
                        message=f"Failed to delete resource with id {request.logical_resource_id} of type {request.resource_type}",
564
                    )
565
            case _:
×
566
                raise NotImplementedError(change_type)  # TODO: change error type
567

568
    @staticmethod
1✔
569
    def try_load_resource_provider(resource_type: str) -> ResourceProvider | None:
1✔
570
        # TODO: unify namespace of plugins
571
        if resource_type and resource_type.startswith("Custom"):
1✔
572
            resource_type = "AWS::CloudFormation::CustomResource"
×
573

574
        # 1. try to load pro resource provider
575
        # prioritise pro resource providers
576
        if PRO_RESOURCE_PROVIDERS:
1✔
577
            try:
×
578
                plugin = pro_plugin_manager.load(resource_type)
×
579
                return plugin.factory()
×
580
            except ValueError:
×
581
                # could not find a plugin for that name
582
                pass
×
583
            except Exception:
×
584
                LOG.warning(
×
585
                    "Failed to load PRO resource type %s as a ResourceProvider.",
586
                    resource_type,
587
                    exc_info=LOG.isEnabledFor(logging.DEBUG) and config.CFN_VERBOSE_ERRORS,
588
                )
589

590
        # 2. try to load community resource provider
591
        try:
1✔
592
            plugin = plugin_manager.load(resource_type)
1✔
593
            return plugin.factory()
1✔
594
        except ValueError:
1✔
595
            # could not find a plugin for that name
596
            pass
1✔
597
        except Exception:
1✔
598
            if config.CFN_VERBOSE_ERRORS:
1✔
599
                LOG.warning(
×
600
                    "Failed to load community resource type %s as a ResourceProvider.",
601
                    resource_type,
602
                    exc_info=LOG.isEnabledFor(logging.DEBUG) and config.CFN_VERBOSE_ERRORS,
603
                )
604

605
        # we could not find the resource provider
606
        return None
1✔
607

608
    def extract_physical_resource_id_from_model_with_schema(
1✔
609
        self, resource_model: Properties, resource_type: str, resource_type_schema: dict
610
    ) -> str:
611
        if resource_type in PHYSICAL_RESOURCE_ID_SPECIAL_CASES:
1✔
612
            primary_id_path = PHYSICAL_RESOURCE_ID_SPECIAL_CASES[resource_type]
1✔
613

614
            if "<" in primary_id_path:
1✔
615
                # composite quirk, e.g. something like MyRef|MyName
616
                # try to extract parts
617
                physical_resource_id = primary_id_path
×
618
                find_results = re.findall("<([^>]+)>", primary_id_path)
×
619
                for found_part in find_results:
×
620
                    resolved_part = resolve_json_pointer(resource_model, found_part)
×
621
                    physical_resource_id = physical_resource_id.replace(
×
622
                        f"<{found_part}>", resolved_part
623
                    )
624
            else:
625
                physical_resource_id = resolve_json_pointer(resource_model, primary_id_path)
1✔
626
        else:
627
            primary_id_paths = resource_type_schema["primaryIdentifier"]
1✔
628
            if len(primary_id_paths) > 1:
1✔
629
                # TODO: auto-merge. Verify logic here with AWS
630
                physical_resource_id = "-".join(
1✔
631
                    [resolve_json_pointer(resource_model, pip) for pip in primary_id_paths]
632
                )
633
            else:
634
                physical_resource_id = resolve_json_pointer(resource_model, primary_id_paths[0])
1✔
635

636
        return physical_resource_id
1✔
637

638

639
plugin_manager = PluginManager(CloudFormationResourceProviderPlugin.namespace)
1✔
640
if PRO_RESOURCE_PROVIDERS:
1✔
641
    pro_plugin_manager = PluginManager(CloudFormationResourceProviderPluginExt.namespace)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc