• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 21891233725

10 Feb 2026 10:59PM UTC coverage: 86.883% (+0.01%) from 86.871%
21891233725

push

github

web-flow
SFN: Fix Local mock iterations for states without retry (#13693)

Co-authored-by: Hernan <hernaner28@gmail.com>

The Step Functions Local mock configuration was using RetryCount to index numbered mock responses. In case of successful invocations or states without retry configuration this caused all invocations to return the same response ("0") instead of iterating through the sequence ("0", "1", "2", etc.).

RetryCount only increments on actual retry attempts (failures), not on successful invocations. This made the mock iteration feature unusable for testing state transition scenarios with multiple invocations of the same state.

- Added next_local_mock_invocation_number dict to execution environment to track mock invocations.
- Modified get_current_local_mocked_response() to use next_local_mock_invocation_number instead of retry_count.
- Shared next_local_mock_invocation_number between parent and child frames to maintain consistent counting across execution context

An existing `aws.services.stepfunctions.v2.local_mocking.test_base_scenarios.TestBaseScenarios.test_map_state_lambda test` was giving a false positive. It used a mock response with only one mocked invocation and should have failed when 2 invocations were done. Effectively, it started to fail after the fix and mocked response has been adjusted.

Also, adds `test_numbered_mock_responses_multiple_success_invocations` that tests multiple success invocations in a regular top-level state, outside of map configuration.

7 of 7 new or added lines in 1 file covered. (100.0%)

255 existing lines in 12 files now uncovered.

69977 of 80542 relevant lines covered (86.88%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.67
/localstack-core/localstack/services/cloudformation/resource_provider.py
1
from __future__ import annotations
1✔
2

3
import copy
1✔
4
import logging
1✔
5
import re
1✔
6
import time
1✔
7
import uuid
1✔
8
from collections.abc import Callable
1✔
9
from dataclasses import dataclass, field
1✔
10
from enum import Enum, auto
1✔
11
from logging import Logger
1✔
12
from math import ceil
1✔
13
from typing import TYPE_CHECKING, Any, TypedDict, TypeVar
1✔
14

15
import botocore
1✔
16
from botocore.client import BaseClient
1✔
17
from botocore.exceptions import ClientError
1✔
18
from botocore.model import OperationModel
1✔
19
from plux import Plugin, PluginManager
1✔
20

21
from localstack import config
1✔
22
from localstack.aws.connect import InternalClientFactory, ServiceLevelClientFactory
1✔
23
from localstack.services.cloudformation.deployment_utils import (
1✔
24
    check_not_found_exception,
25
    convert_data_types,
26
    fix_account_id_in_arns,
27
    fix_boto_parameters_based_on_report,
28
    remove_none_values,
29
)
30
from localstack.services.cloudformation.engine.quirks import PHYSICAL_RESOURCE_ID_SPECIAL_CASES
1✔
31
from localstack.services.cloudformation.provider_utils import convert_request_kwargs
1✔
32
from localstack.services.cloudformation.service_models import KEY_RESOURCE_STATE
1✔
33

34
PRO_RESOURCE_PROVIDERS = False
1✔
35
try:
1✔
36
    from localstack.pro.core.services.cloudformation.resource_provider import (
1✔
37
        CloudFormationResourceProviderPluginExt,
38
    )
39

40
    PRO_RESOURCE_PROVIDERS = True
×
41
except ImportError:
1✔
42
    pass
1✔
43

44
if TYPE_CHECKING:
1✔
45
    from localstack.services.cloudformation.engine.types import (
×
46
        FuncDetails,
47
        FuncDetailsValue,
48
        ResourceDefinition,
49
    )
50

51
LOG = logging.getLogger(__name__)
1✔
52

53
Properties = TypeVar("Properties")
1✔
54

55
PUBLIC_REGISTRY: dict[str, type[ResourceProvider]] = {}
1✔
56

57
PROVIDER_DEFAULTS = {}  # TODO: remove this after removing patching in -ext
1✔
58

59

60
class OperationStatus(Enum):
1✔
61
    PENDING = auto()
1✔
62
    IN_PROGRESS = auto()
1✔
63
    SUCCESS = auto()
1✔
64
    FAILED = auto()
1✔
65

66

67
@dataclass
1✔
68
class ProgressEvent[Properties]:
1✔
69
    status: OperationStatus
1✔
70
    resource_model: Properties | None = None
1✔
71
    resource_models: list[Properties] | None = None
1✔
72

73
    message: str = ""
1✔
74
    result: str | None = None
1✔
75
    error_code: str | None = None  # TODO: enum
1✔
76
    custom_context: dict = field(default_factory=dict)
1✔
77

78

79
class Credentials(TypedDict):
1✔
80
    accessKeyId: str
1✔
81
    secretAccessKey: str
1✔
82
    sessionToken: str
1✔
83

84

85
class ResourceProviderPayloadRequestData(TypedDict):
1✔
86
    logicalResourceId: str
1✔
87
    resourceProperties: Properties
1✔
88
    previousResourceProperties: Properties | None
1✔
89
    callerCredentials: Credentials
1✔
90
    providerCredentials: Credentials
1✔
91
    systemTags: dict[str, str]
1✔
92
    previousSystemTags: dict[str, str]
1✔
93
    stackTags: dict[str, str]
1✔
94
    previousStackTags: dict[str, str]
1✔
95

96

97
class ResourceProviderPayload(TypedDict):
1✔
98
    callbackContext: dict
1✔
99
    stackId: str
1✔
100
    requestData: ResourceProviderPayloadRequestData
1✔
101
    resourceType: str
1✔
102
    resourceTypeVersion: str
1✔
103
    awsAccountId: str
1✔
104
    bearerToken: str
1✔
105
    region: str
1✔
106
    action: str
1✔
107

108

109
ResourceProperties = TypeVar("ResourceProperties")
1✔
110

111

112
def _handler_provide_client_params(event_name: str, params: dict, model: OperationModel, **kwargs):
1✔
113
    """
114
    A botocore hook handler that will try to convert the passed parameters according to the given operation model
115
    """
116
    return convert_request_kwargs(params, model.input_shape)
1✔
117

118

119
class ConvertingInternalClientFactory(InternalClientFactory):
1✔
120
    def _get_client_post_hook(self, client: BaseClient) -> BaseClient:
1✔
121
        """
122
        Register handlers that modify the passed properties to make them compatible with the API structure
123
        """
124

125
        client.meta.events.register(
1✔
126
            "provide-client-params.*.*", handler=_handler_provide_client_params
127
        )
128

129
        return super()._get_client_post_hook(client)
1✔
130

131

132
_cfn_resource_client_factory = ConvertingInternalClientFactory(use_ssl=config.DISTRIBUTED_MODE)
1✔
133

134

135
def convert_payload(
1✔
136
    stack_name: str, stack_id: str, payload: ResourceProviderPayload
137
) -> ResourceRequest[Properties]:
138
    client_factory = _cfn_resource_client_factory(
1✔
139
        aws_access_key_id=payload["requestData"]["callerCredentials"]["accessKeyId"],
140
        aws_session_token=payload["requestData"]["callerCredentials"]["sessionToken"],
141
        aws_secret_access_key=payload["requestData"]["callerCredentials"]["secretAccessKey"],
142
        region_name=payload["region"],
143
    )
144
    desired_state = payload["requestData"]["resourceProperties"]
1✔
145
    rr = ResourceRequest(
1✔
146
        _original_payload=desired_state,
147
        aws_client_factory=client_factory,
148
        request_token=str(uuid.uuid4()),  # TODO: not actually a UUID
149
        stack_name=stack_name,
150
        stack_id=stack_id,
151
        account_id=payload["awsAccountId"],
152
        region_name=payload["region"],
153
        desired_state=desired_state,
154
        logical_resource_id=payload["requestData"]["logicalResourceId"],
155
        resource_type=payload["resourceType"],
156
        logger=logging.getLogger("abc"),
157
        custom_context=payload["callbackContext"],
158
        action=payload["action"],
159
    )
160

161
    if previous_properties := payload["requestData"].get("previousResourceProperties"):
1✔
162
        rr.previous_state = previous_properties
1✔
163

164
    return rr
1✔
165

166

167
@dataclass
1✔
168
class ResourceRequest[Properties]:
1✔
169
    _original_payload: Properties
1✔
170

171
    aws_client_factory: ServiceLevelClientFactory
1✔
172
    request_token: str
1✔
173
    stack_name: str
1✔
174
    stack_id: str
1✔
175
    account_id: str
1✔
176
    region_name: str
1✔
177
    action: str
1✔
178

179
    desired_state: Properties
1✔
180

181
    logical_resource_id: str
1✔
182
    resource_type: str
1✔
183

184
    logger: Logger
1✔
185

186
    custom_context: dict = field(default_factory=dict)
1✔
187

188
    previous_state: Properties | None = None
1✔
189
    previous_tags: dict[str, str] | None = None
1✔
190
    tags: dict[str, str] = field(default_factory=dict)
1✔
191

192

193
class CloudFormationResourceProviderPlugin(Plugin):
1✔
194
    """
195
    Base class for resource provider plugins.
196
    """
197

198
    namespace = "localstack.cloudformation.resource_providers"
1✔
199

200

201
class ResourceProvider[Properties]:
1✔
202
    """
203
    This provides a base class onto which service-specific resource providers are built.
204
    """
205

206
    SCHEMA: dict
1✔
207

208
    def create(self, request: ResourceRequest[Properties]) -> ProgressEvent[Properties]:
1✔
209
        raise NotImplementedError
210

211
    def update(self, request: ResourceRequest[Properties]) -> ProgressEvent[Properties]:
1✔
212
        raise NotImplementedError
213

214
    def delete(self, request: ResourceRequest[Properties]) -> ProgressEvent[Properties]:
1✔
215
        raise NotImplementedError
216

217
    def read(self, request: ResourceRequest[Properties]) -> ProgressEvent[Properties]:
1✔
218
        raise NotImplementedError
219

220
    def list(self, request: ResourceRequest[Properties]) -> ProgressEvent[Properties]:
1✔
221
        raise NotImplementedError
222

223

224
# legacy helpers
225
def get_resource_type(resource: dict) -> str:
1✔
226
    """this is currently overwritten in PRO to add support for custom resources"""
227
    if isinstance(resource, str):
1✔
228
        raise ValueError(f"Invalid argument: {resource}")
×
229
    try:
1✔
230
        resource_type: str = resource["Type"]
1✔
231

232
        if resource_type.startswith("Custom::"):
1✔
233
            return "AWS::CloudFormation::CustomResource"
×
234
        return resource_type
1✔
235
    except Exception:
×
236
        LOG.warning(
×
237
            "Failed to retrieve resource type %s",
238
            resource.get("Type"),
239
            exc_info=LOG.isEnabledFor(logging.DEBUG) and config.CFN_VERBOSE_ERRORS,
240
        )
241

242

243
def invoke_function(
1✔
244
    account_id: str,
245
    region_name: str,
246
    function: Callable,
247
    params: dict,
248
    resource_type: str,
249
    func_details: FuncDetails,
250
    action_name: str,
251
    resource: Any,
252
) -> Any:
253
    try:
×
254
        LOG.debug(
×
255
            'Request for resource type "%s" in account %s region %s: %s %s',
256
            resource_type,
257
            account_id,
258
            region_name,
259
            func_details["function"],
260
            params,
261
        )
262
        try:
×
263
            result = function(**params)
×
264
        except botocore.exceptions.ParamValidationError as e:
×
265
            # alternatively we could also use the ParamValidator directly
266
            report = e.kwargs.get("report")
×
267
            if not report:
×
268
                raise
×
269

270
            LOG.debug("Converting parameters to allowed types")
×
271
            LOG.debug("Report: %s", report)
×
272
            converted_params = fix_boto_parameters_based_on_report(params, report)
×
273
            LOG.debug("Original parameters:  %s", params)
×
274
            LOG.debug("Converted parameters: %s", converted_params)
×
275

276
            result = function(**converted_params)
×
277
    except Exception as e:
×
278
        if action_name == "Remove" and check_not_found_exception(e, resource_type, resource):
×
279
            return
×
280
        log_method = LOG.warning
×
281
        if config.CFN_VERBOSE_ERRORS:
×
282
            log_method = LOG.exception
×
283
        log_method("Error calling %s with params: %s for resource: %s", function, params, resource)
×
284
        raise e
×
285

286
    return result
×
287

288

289
def get_service_name(resource):
1✔
290
    res_type = resource["Type"]
×
291
    parts = res_type.split("::")
×
292
    if len(parts) == 1:
×
293
        return None
×
294
    if "Cognito::IdentityPool" in res_type:
×
295
        return "cognito-identity"
×
296
    if res_type.endswith("Cognito::UserPool"):
×
297
        return "cognito-idp"
×
298
    if parts[-2] == "Cognito":
×
299
        return "cognito-idp"
×
300
    if parts[-2] == "Elasticsearch":
×
301
        return "es"
×
302
    if parts[-2] == "OpenSearchService":
×
303
        return "opensearch"
×
304
    if parts[-2] == "KinesisFirehose":
×
305
        return "firehose"
×
306
    if parts[-2] == "ResourceGroups":
×
307
        return "resource-groups"
×
308
    if parts[-2] == "CertificateManager":
×
309
        return "acm"
×
310
    if "ElasticLoadBalancing::" in res_type:
×
311
        return "elb"
×
312
    if "ElasticLoadBalancingV2::" in res_type:
×
313
        return "elbv2"
×
314
    if "ApplicationAutoScaling::" in res_type:
×
315
        return "application-autoscaling"
×
316
    if "MSK::" in res_type:
×
317
        return "kafka"
×
318
    if "Timestream::" in res_type:
×
319
        return "timestream-write"
×
320
    return parts[1].lower()
×
321

322

323
def resolve_resource_parameters(
1✔
324
    account_id_: str,
325
    region_name_: str,
326
    stack_name: str,
327
    resource_definition: ResourceDefinition,
328
    resources: dict[str, ResourceDefinition],
329
    resource_id: str,
330
    func_details: FuncDetailsValue,
331
) -> dict | None:
332
    params = func_details.get("parameters") or (
×
333
        lambda account_id, region_name, properties, logical_resource_id, *args, **kwargs: properties
334
    )
335
    resource_props = resource_definition["Properties"] = resource_definition.get("Properties", {})
×
336
    resource_props = dict(resource_props)
×
337
    resource_state = resource_definition.get(KEY_RESOURCE_STATE, {})
×
338
    last_deployed_state = resource_definition.get("_last_deployed_state", {})
×
339

340
    if callable(params):
×
341
        # resolve parameter map via custom function
342
        params = params(
×
343
            account_id_, region_name_, resource_props, resource_id, resource_definition, stack_name
344
        )
345
    else:
346
        # it could be a list like ['param1', 'param2', {'apiCallParamName': 'cfResourcePropName'}]
347
        if isinstance(params, list):
×
348
            _params = {}
×
349
            for param in params:
×
350
                if isinstance(param, dict):
×
351
                    _params.update(param)
×
352
                else:
353
                    _params[param] = param
×
354
            params = _params
×
355

356
        params = dict(params)
×
357
        # TODO(srw): mutably mapping params :(
358
        for param_key, prop_keys in dict(params).items():
×
359
            params.pop(param_key, None)
×
360
            if not isinstance(prop_keys, list):
×
361
                prop_keys = [prop_keys]
×
362
            for prop_key in prop_keys:
×
363
                if callable(prop_key):
×
364
                    prop_value = prop_key(
×
365
                        account_id_,
366
                        region_name_,
367
                        resource_props,
368
                        resource_id,
369
                        resource_definition,
370
                        stack_name,
371
                    )
372
                else:
373
                    prop_value = resource_props.get(
×
374
                        prop_key,
375
                        resource_definition.get(
376
                            prop_key,
377
                            resource_state.get(prop_key, last_deployed_state.get(prop_key)),
378
                        ),
379
                    )
380
                if prop_value is not None:
×
381
                    params[param_key] = prop_value
×
382
                    break
×
383

384
    # this is an indicator that we should skip this resource deployment, and return
385
    if params is None:
×
386
        return
×
387

388
    # FIXME: move this to a single place after template processing is finished
389
    # convert any moto account IDs (123456789012) in ARNs to our format (000000000000)
390
    params = fix_account_id_in_arns(params, account_id_)
×
391
    # convert data types (e.g., boolean strings to bool)
392
    # TODO: this might not be needed anymore
393
    params = convert_data_types(func_details.get("types", {}), params)
×
394
    # remove None values, as they usually raise boto3 errors
395
    params = remove_none_values(params)
×
396

397
    return params
×
398

399

400
class NoResourceProvider(Exception):
1✔
401
    pass
1✔
402

403

404
def resolve_json_pointer[Properties](resource_props: Properties, primary_id_path: str) -> str:
1✔
405
    primary_id_path = primary_id_path.replace("/properties", "")
1✔
406
    parts = [p for p in primary_id_path.split("/") if p]
1✔
407

408
    resolved_part = resource_props.copy()
1✔
409
    for i in range(len(parts)):
1✔
410
        part = parts[i]
1✔
411
        resolved_part = resolved_part.get(part)
1✔
412
        if i == len(parts) - 1:
1✔
413
            # last part
414
            return resolved_part
1✔
415

416
    raise Exception(f"Resource properties is missing field: {part}")
×
417

418

419
class ResourceProviderExecutor:
1✔
420
    """
421
    Point of abstraction between our integration with generic base models, and the new providers.
422
    """
423

424
    def __init__(
1✔
425
        self,
426
        *,
427
        stack_name: str,
428
        stack_id: str,
429
    ):
430
        self.stack_name = stack_name
1✔
431
        self.stack_id = stack_id
1✔
432

433
    def deploy_loop(
1✔
434
        self,
435
        resource_provider: ResourceProvider,
436
        resource: dict,
437
        raw_payload: ResourceProviderPayload,
438
        max_timeout: int = config.CFN_PER_RESOURCE_TIMEOUT,
439
        sleep_time: float = 1,
440
    ) -> ProgressEvent[Properties]:
441
        payload = copy.deepcopy(raw_payload)
1✔
442

443
        max_iterations = max(ceil(max_timeout / sleep_time), 10)
1✔
444

445
        for current_iteration in range(max_iterations):
1✔
446
            resource_type = get_resource_type({"Type": raw_payload["resourceType"]})
1✔
447
            resource["SpecifiedProperties"] = raw_payload["requestData"]["resourceProperties"]
1✔
448

449
            try:
1✔
450
                event = self.execute_action(resource_provider, payload)
1✔
451
            except ClientError:
1✔
452
                LOG.error(
1✔
453
                    "client error invoking '%s' handler for resource '%s' (type '%s')",
454
                    raw_payload["action"],
455
                    raw_payload["requestData"]["logicalResourceId"],
456
                    resource_type,
457
                )
458
                raise
1✔
459

460
            match event.status:
1✔
461
                case OperationStatus.FAILED:
1✔
462
                    return event
1✔
463
                case OperationStatus.SUCCESS:
1✔
464
                    if not hasattr(resource_provider, "SCHEMA"):
1✔
465
                        raise Exception(
×
466
                            "A ResourceProvider should always have a SCHEMA property defined."
467
                        )
468
                    resource_type_schema = resource_provider.SCHEMA
1✔
469
                    if raw_payload["action"] != "Remove":
1✔
470
                        physical_resource_id = (
1✔
471
                            self.extract_physical_resource_id_from_model_with_schema(
472
                                event.resource_model,
473
                                raw_payload["resourceType"],
474
                                resource_type_schema,
475
                            )
476
                        )
477

478
                        resource["PhysicalResourceId"] = physical_resource_id
1✔
479
                        resource["Properties"] = event.resource_model
1✔
480
                        resource["_last_deployed_state"] = copy.deepcopy(event.resource_model)
1✔
481
                    return event
1✔
482
                case OperationStatus.IN_PROGRESS:
1✔
483
                    # update the shared state
484
                    context = {**payload["callbackContext"], **event.custom_context}
1✔
485
                    payload["callbackContext"] = context
1✔
486
                    payload["requestData"]["resourceProperties"] = event.resource_model
1✔
487
                    resource["Properties"] = event.resource_model
1✔
488

489
                    if current_iteration < config.CFN_NO_WAIT_ITERATIONS:
1✔
490
                        pass
1✔
491
                    else:
492
                        time.sleep(sleep_time)
1✔
493

494
                case OperationStatus.PENDING:
×
495
                    # come back to this resource in another iteration
496
                    return event
×
497
                case invalid_status:
×
498
                    raise ValueError(
×
499
                        f"Invalid OperationStatus ({invalid_status}) returned for resource {raw_payload['requestData']['logicalResourceId']} (type {raw_payload['resourceType']})"
500
                    )
501

502
        else:
503
            raise TimeoutError(
×
504
                f"Resource deployment for resource {raw_payload['requestData']['logicalResourceId']} (type {raw_payload['resourceType']}) timed out."
505
            )
506

507
    def execute_action(
1✔
508
        self, resource_provider: ResourceProvider, raw_payload: ResourceProviderPayload
509
    ) -> ProgressEvent[Properties]:
510
        change_type = raw_payload["action"]
1✔
511
        request = convert_payload(
1✔
512
            stack_name=self.stack_name, stack_id=self.stack_id, payload=raw_payload
513
        )
514

515
        match change_type:
1✔
516
            case "Add":
1✔
517
                return resource_provider.create(request)
1✔
518
            case "Dynamic" | "Modify":
1✔
519
                try:
1✔
520
                    return resource_provider.update(request)
1✔
521
                except NotImplementedError:
1✔
522
                    feature_request_url = "https://github.com/localstack/localstack/issues/new?template=feature-request.yml"
1✔
523
                    LOG.warning(
1✔
524
                        'Unable to update resource type "%s", id "%s", '
525
                        "the update operation is not implemented for this resource. "
526
                        "Please consider submitting a feature request at this URL: %s",
527
                        request.resource_type,
528
                        request.logical_resource_id,
529
                        feature_request_url,
530
                    )
531
                    if request.previous_state is None:
1✔
532
                        # this is an issue with our update detection. We should never be in this state.
533
                        request.action = "Add"
×
534
                        return resource_provider.create(request)
×
535

536
                    return ProgressEvent(
1✔
537
                        status=OperationStatus.SUCCESS, resource_model=request.previous_state
538
                    )
539
                except Exception as e:
1✔
540
                    # FIXME: this fallback should be removed after fixing updates in general (order/dependenies)
541
                    # catch-all for any exception that looks like a not found exception
542
                    if check_not_found_exception(e, request.resource_type, request.desired_state):
1✔
543
                        return ProgressEvent(
1✔
544
                            status=OperationStatus.SUCCESS, resource_model=request.previous_state
545
                        )
546

547
                    return ProgressEvent(
×
548
                        status=OperationStatus.FAILED,
549
                        resource_model={},
550
                        message=f"Failed to delete resource with id {request.logical_resource_id} of type {request.resource_type}",
551
                        custom_context={"exception": e},
552
                    )
553
            case "Remove":
1✔
554
                try:
1✔
555
                    return resource_provider.delete(request)
1✔
556
                except Exception as e:
1✔
557
                    # catch-all for any exception that looks like a not found exception
558
                    if check_not_found_exception(e, request.resource_type, request.desired_state):
1✔
559
                        return ProgressEvent(status=OperationStatus.SUCCESS, resource_model={})
1✔
560

561
                    return ProgressEvent(
1✔
562
                        status=OperationStatus.FAILED,
563
                        resource_model={},
564
                        message=f"Failed to delete resource with id {request.logical_resource_id} of type {request.resource_type}",
565
                        custom_context={"exception": e},
566
                    )
UNCOV
567
            case _:
×
568
                raise NotImplementedError(change_type)  # TODO: change error type
569

570
    @staticmethod
1✔
571
    def try_load_resource_provider(resource_type: str) -> ResourceProvider | None:
1✔
572
        # TODO: unify namespace of plugins
573
        if resource_type and resource_type.startswith("Custom"):
1✔
UNCOV
574
            resource_type = "AWS::CloudFormation::CustomResource"
×
575

576
        # 1. try to load pro resource provider
577
        # prioritise pro resource providers
578
        if PRO_RESOURCE_PROVIDERS:
1✔
579
            try:
×
580
                plugin = pro_plugin_manager.load(resource_type)
×
UNCOV
581
                return plugin.factory()
×
582
            except ValueError:
×
583
                # could not find a plugin for that name
584
                pass
×
UNCOV
585
            except Exception:
×
UNCOV
586
                LOG.warning(
×
587
                    "Failed to load PRO resource type %s as a ResourceProvider.",
588
                    resource_type,
589
                    exc_info=LOG.isEnabledFor(logging.DEBUG) and config.CFN_VERBOSE_ERRORS,
590
                )
591

592
        # 2. try to load community resource provider
593
        try:
1✔
594
            plugin = plugin_manager.load(resource_type)
1✔
595
            return plugin.factory()
1✔
596
        except ValueError:
1✔
597
            # could not find a plugin for that name
598
            pass
1✔
599
        except Exception:
1✔
600
            if config.CFN_VERBOSE_ERRORS:
1✔
UNCOV
601
                LOG.warning(
×
602
                    "Failed to load community resource type %s as a ResourceProvider.",
603
                    resource_type,
604
                    exc_info=LOG.isEnabledFor(logging.DEBUG) and config.CFN_VERBOSE_ERRORS,
605
                )
606

607
        # we could not find the resource provider
608
        return None
1✔
609

610
    def extract_physical_resource_id_from_model_with_schema(
1✔
611
        self, resource_model: Properties, resource_type: str, resource_type_schema: dict
612
    ) -> str:
613
        if resource_type in PHYSICAL_RESOURCE_ID_SPECIAL_CASES:
1✔
614
            primary_id_path = PHYSICAL_RESOURCE_ID_SPECIAL_CASES[resource_type]
1✔
615

616
            if "<" in primary_id_path:
1✔
617
                # composite quirk, e.g. something like MyRef|MyName
618
                # try to extract parts
619
                physical_resource_id = primary_id_path
×
620
                find_results = re.findall("<([^>]+)>", primary_id_path)
×
621
                for found_part in find_results:
×
UNCOV
622
                    resolved_part = resolve_json_pointer(resource_model, found_part)
×
UNCOV
623
                    physical_resource_id = physical_resource_id.replace(
×
624
                        f"<{found_part}>", resolved_part
625
                    )
626
            else:
627
                physical_resource_id = resolve_json_pointer(resource_model, primary_id_path)
1✔
628
        else:
629
            primary_id_paths = resource_type_schema["primaryIdentifier"]
1✔
630
            if len(primary_id_paths) > 1:
1✔
631
                # TODO: auto-merge. Verify logic here with AWS
632
                physical_resource_id = "-".join(
1✔
633
                    [resolve_json_pointer(resource_model, pip) for pip in primary_id_paths]
634
                )
635
            else:
636
                physical_resource_id = resolve_json_pointer(resource_model, primary_id_paths[0])
1✔
637

638
        return physical_resource_id
1✔
639

640

641
plugin_manager = PluginManager(CloudFormationResourceProviderPlugin.namespace)
1✔
642
if PRO_RESOURCE_PROVIDERS:
1✔
UNCOV
643
    pro_plugin_manager = PluginManager(CloudFormationResourceProviderPluginExt.namespace)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc