• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20649646029

29 Dec 2025 01:37PM UTC coverage: 86.925% (+0.01%) from 86.915%
20649646029

push

github

web-flow
IAM: Update `create_user_with_policy` fixture (#13568)

2 of 5 new or added lines in 1 file covered. (40.0%)

25 existing lines in 3 files now uncovered.

70059 of 80597 relevant lines covered (86.93%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.65
/localstack-core/localstack/services/lambda_/provider.py
1
import base64
1✔
2
import dataclasses
1✔
3
import datetime
1✔
4
import itertools
1✔
5
import json
1✔
6
import logging
1✔
7
import re
1✔
8
import threading
1✔
9
import time
1✔
10
from typing import IO, Any
1✔
11

12
from botocore.exceptions import ClientError
1✔
13

14
from localstack import config
1✔
15
from localstack.aws.api import RequestContext, ServiceException, handler
1✔
16
from localstack.aws.api.lambda_ import (
1✔
17
    AccountLimit,
18
    AccountUsage,
19
    AddLayerVersionPermissionResponse,
20
    AddPermissionRequest,
21
    AddPermissionResponse,
22
    Alias,
23
    AliasConfiguration,
24
    AliasRoutingConfiguration,
25
    AllowedPublishers,
26
    Architecture,
27
    Arn,
28
    Blob,
29
    BlobStream,
30
    CapacityProviderConfig,
31
    CodeSigningConfigArn,
32
    CodeSigningConfigNotFoundException,
33
    CodeSigningPolicies,
34
    CompatibleArchitectures,
35
    CompatibleRuntimes,
36
    Concurrency,
37
    Cors,
38
    CreateCodeSigningConfigResponse,
39
    CreateEventSourceMappingRequest,
40
    CreateFunctionRequest,
41
    CreateFunctionUrlConfigResponse,
42
    DeleteCodeSigningConfigResponse,
43
    DeleteFunctionResponse,
44
    Description,
45
    DestinationConfig,
46
    DurableExecutionName,
47
    EventSourceMappingConfiguration,
48
    FunctionCodeLocation,
49
    FunctionConfiguration,
50
    FunctionEventInvokeConfig,
51
    FunctionName,
52
    FunctionUrlAuthType,
53
    FunctionUrlQualifier,
54
    FunctionVersionLatestPublished,
55
    GetAccountSettingsResponse,
56
    GetCodeSigningConfigResponse,
57
    GetFunctionCodeSigningConfigResponse,
58
    GetFunctionConcurrencyResponse,
59
    GetFunctionRecursionConfigResponse,
60
    GetFunctionResponse,
61
    GetFunctionUrlConfigResponse,
62
    GetLayerVersionPolicyResponse,
63
    GetLayerVersionResponse,
64
    GetPolicyResponse,
65
    GetProvisionedConcurrencyConfigResponse,
66
    InvalidParameterValueException,
67
    InvocationResponse,
68
    InvocationType,
69
    InvokeAsyncResponse,
70
    InvokeMode,
71
    LambdaApi,
72
    LambdaManagedInstancesCapacityProviderConfig,
73
    LastUpdateStatus,
74
    LayerName,
75
    LayerPermissionAllowedAction,
76
    LayerPermissionAllowedPrincipal,
77
    LayersListItem,
78
    LayerVersionArn,
79
    LayerVersionContentInput,
80
    LayerVersionNumber,
81
    LicenseInfo,
82
    ListAliasesResponse,
83
    ListCodeSigningConfigsResponse,
84
    ListEventSourceMappingsResponse,
85
    ListFunctionEventInvokeConfigsResponse,
86
    ListFunctionsByCodeSigningConfigResponse,
87
    ListFunctionsResponse,
88
    ListFunctionUrlConfigsResponse,
89
    ListLayersResponse,
90
    ListLayerVersionsResponse,
91
    ListProvisionedConcurrencyConfigsResponse,
92
    ListTagsResponse,
93
    ListVersionsByFunctionResponse,
94
    LogFormat,
95
    LoggingConfig,
96
    LogType,
97
    MasterRegion,
98
    MaxFunctionEventInvokeConfigListItems,
99
    MaximumEventAgeInSeconds,
100
    MaximumRetryAttempts,
101
    MaxItems,
102
    MaxLayerListItems,
103
    MaxListItems,
104
    MaxProvisionedConcurrencyConfigListItems,
105
    NamespacedFunctionName,
106
    NamespacedStatementId,
107
    NumericLatestPublishedOrAliasQualifier,
108
    OnFailure,
109
    OnSuccess,
110
    OrganizationId,
111
    PackageType,
112
    PositiveInteger,
113
    PreconditionFailedException,
114
    ProvisionedConcurrencyConfigListItem,
115
    ProvisionedConcurrencyConfigNotFoundException,
116
    ProvisionedConcurrencyStatusEnum,
117
    PublishLayerVersionResponse,
118
    PutFunctionCodeSigningConfigResponse,
119
    PutFunctionRecursionConfigResponse,
120
    PutProvisionedConcurrencyConfigResponse,
121
    Qualifier,
122
    RecursiveLoop,
123
    ReservedConcurrentExecutions,
124
    ResourceConflictException,
125
    ResourceNotFoundException,
126
    Runtime,
127
    RuntimeVersionConfig,
128
    SnapStart,
129
    SnapStartApplyOn,
130
    SnapStartOptimizationStatus,
131
    SnapStartResponse,
132
    State,
133
    StatementId,
134
    StateReasonCode,
135
    String,
136
    TaggableResource,
137
    TagKeyList,
138
    Tags,
139
    TenantId,
140
    TracingMode,
141
    UnqualifiedFunctionName,
142
    UpdateCodeSigningConfigResponse,
143
    UpdateEventSourceMappingRequest,
144
    UpdateFunctionCodeRequest,
145
    UpdateFunctionConfigurationRequest,
146
    UpdateFunctionUrlConfigResponse,
147
    VersionWithLatestPublished,
148
)
149
from localstack.aws.api.lambda_ import FunctionVersion as FunctionVersionApi
1✔
150
from localstack.aws.api.lambda_ import ServiceException as LambdaServiceException
1✔
151
from localstack.aws.api.pipes import (
1✔
152
    DynamoDBStreamStartPosition,
153
    KinesisStreamStartPosition,
154
)
155
from localstack.aws.connect import connect_to
1✔
156
from localstack.aws.spec import load_service
1✔
157
from localstack.services.edge import ROUTER
1✔
158
from localstack.services.lambda_ import api_utils
1✔
159
from localstack.services.lambda_ import hooks as lambda_hooks
1✔
160
from localstack.services.lambda_.analytics import (
1✔
161
    FunctionInitializationType,
162
    FunctionOperation,
163
    FunctionStatus,
164
    function_counter,
165
)
166
from localstack.services.lambda_.api_utils import (
1✔
167
    ARCHITECTURES,
168
    STATEMENT_ID_REGEX,
169
    SUBNET_ID_REGEX,
170
    function_locators_from_arn,
171
)
172
from localstack.services.lambda_.event_source_mapping.esm_config_factory import (
1✔
173
    EsmConfigFactory,
174
)
175
from localstack.services.lambda_.event_source_mapping.esm_worker import (
1✔
176
    EsmState,
177
    EsmWorker,
178
)
179
from localstack.services.lambda_.event_source_mapping.esm_worker_factory import (
1✔
180
    EsmWorkerFactory,
181
)
182
from localstack.services.lambda_.event_source_mapping.pipe_utils import get_internal_client
1✔
183
from localstack.services.lambda_.invocation import AccessDeniedException
1✔
184
from localstack.services.lambda_.invocation.execution_environment import (
1✔
185
    EnvironmentStartupTimeoutException,
186
)
187
from localstack.services.lambda_.invocation.lambda_models import (
1✔
188
    AliasRoutingConfig,
189
    CodeSigningConfig,
190
    EventInvokeConfig,
191
    Function,
192
    FunctionResourcePolicy,
193
    FunctionUrlConfig,
194
    FunctionVersion,
195
    ImageConfig,
196
    LambdaEphemeralStorage,
197
    Layer,
198
    LayerPolicy,
199
    LayerPolicyStatement,
200
    LayerVersion,
201
    ProvisionedConcurrencyConfiguration,
202
    RequestEntityTooLargeException,
203
    ResourcePolicy,
204
    UpdateStatus,
205
    ValidationException,
206
    VersionAlias,
207
    VersionFunctionConfiguration,
208
    VersionIdentifier,
209
    VersionState,
210
    VpcConfig,
211
)
212
from localstack.services.lambda_.invocation.lambda_service import (
1✔
213
    LambdaService,
214
    create_image_code,
215
    destroy_code_if_not_used,
216
    lambda_stores,
217
    store_lambda_archive,
218
    store_s3_bucket_archive,
219
)
220
from localstack.services.lambda_.invocation.models import CapacityProvider as CapacityProviderModel
1✔
221
from localstack.services.lambda_.invocation.models import LambdaStore
1✔
222
from localstack.services.lambda_.invocation.runtime_executor import get_runtime_executor
1✔
223
from localstack.services.lambda_.lambda_utils import HINT_LOG
1✔
224
from localstack.services.lambda_.layerfetcher.layer_fetcher import LayerFetcher
1✔
225
from localstack.services.lambda_.provider_utils import (
1✔
226
    LambdaLayerVersionIdentifier,
227
    get_function_version,
228
    get_function_version_from_arn,
229
)
230
from localstack.services.lambda_.runtimes import (
1✔
231
    ALL_RUNTIMES,
232
    DEPRECATED_RUNTIMES,
233
    DEPRECATED_RUNTIMES_UPGRADES,
234
    RUNTIMES_AGGREGATED,
235
    SNAP_START_SUPPORTED_RUNTIMES,
236
    VALID_RUNTIMES,
237
)
238
from localstack.services.lambda_.urlrouter import FunctionUrlRouter
1✔
239
from localstack.services.plugins import ServiceLifecycleHook
1✔
240
from localstack.state import StateVisitor
1✔
241
from localstack.utils.aws.arns import (
1✔
242
    ArnData,
243
    capacity_provider_arn,
244
    extract_resource_from_arn,
245
    extract_service_from_arn,
246
    get_partition,
247
    lambda_event_source_mapping_arn,
248
    parse_arn,
249
)
250
from localstack.utils.aws.client_types import ServicePrincipal
1✔
251
from localstack.utils.bootstrap import is_api_enabled
1✔
252
from localstack.utils.collections import PaginatedList, merge_recursive
1✔
253
from localstack.utils.event_matcher import validate_event_pattern
1✔
254
from localstack.utils.strings import get_random_hex, short_uid, to_bytes, to_str
1✔
255
from localstack.utils.sync import poll_condition
1✔
256
from localstack.utils.urls import localstack_host
1✔
257

258
LOG = logging.getLogger(__name__)
1✔
259

260
CAPACITY_PROVIDER_ARN_NAME = "arn:aws[a-zA-Z-]*:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:capacity-provider:[a-zA-Z0-9-_]+"
1✔
261
LAMBDA_DEFAULT_TIMEOUT = 3
1✔
262
LAMBDA_DEFAULT_MEMORY_SIZE = 128
1✔
263

264
LAMBDA_TAG_LIMIT_PER_RESOURCE = 50
1✔
265
LAMBDA_LAYERS_LIMIT_PER_FUNCTION = 5
1✔
266

267
TAG_KEY_CUSTOM_URL = "_custom_id_"
1✔
268
# Requirements (from RFC3986 & co): not longer than 63, first char must be
269
# alpha, then alphanumeric or hyphen, except cannot start or end with hyphen
270
TAG_KEY_CUSTOM_URL_VALIDATOR = re.compile(r"^[A-Za-z]([A-Za-z0-9\-]{0,61}[A-Za-z0-9])?$")
1✔
271

272

273
class LambdaProvider(LambdaApi, ServiceLifecycleHook):
1✔
274
    lambda_service: LambdaService
1✔
275
    create_fn_lock: threading.RLock
1✔
276
    create_layer_lock: threading.RLock
1✔
277
    router: FunctionUrlRouter
1✔
278
    esm_workers: dict[str, EsmWorker]
1✔
279
    layer_fetcher: LayerFetcher | None
1✔
280

281
    def __init__(self) -> None:
1✔
282
        self.lambda_service = LambdaService()
1✔
283
        self.create_fn_lock = threading.RLock()
1✔
284
        self.create_layer_lock = threading.RLock()
1✔
285
        self.router = FunctionUrlRouter(ROUTER, self.lambda_service)
1✔
286
        self.esm_workers = {}
1✔
287
        self.layer_fetcher = None
1✔
288
        lambda_hooks.inject_layer_fetcher.run(self)
1✔
289

290
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
291
        visitor.visit(lambda_stores)
×
292

293
    def on_before_state_reset(self):
1✔
294
        self.lambda_service.stop()
×
295

296
    def on_after_state_reset(self):
1✔
297
        self.router.lambda_service = self.lambda_service = LambdaService()
×
298

299
    def on_before_state_load(self):
1✔
300
        self.lambda_service.stop()
×
301

302
    def on_after_state_load(self):
1✔
303
        self.lambda_service = LambdaService()
×
304
        self.router.lambda_service = self.lambda_service
×
305

306
        for account_id, account_bundle in lambda_stores.items():
×
307
            for region_name, state in account_bundle.items():
×
308
                for fn in state.functions.values():
×
309
                    for fn_version in fn.versions.values():
×
310
                        try:
×
311
                            # $LATEST is not invokable for Lambda functions with a capacity provider
312
                            # and has a different State (i.e., ActiveNonInvokable)
313
                            is_capacity_provider_latest = (
×
314
                                fn_version.config.CapacityProviderConfig
315
                                and fn_version.id.qualifier == "$LATEST"
316
                            )
317
                            if not is_capacity_provider_latest:
×
318
                                # Restore the "Pending" state for the function version and start it
319
                                new_state = VersionState(
×
320
                                    state=State.Pending,
321
                                    code=StateReasonCode.Creating,
322
                                    reason="The function is being created.",
323
                                )
324
                                new_config = dataclasses.replace(fn_version.config, state=new_state)
×
325
                                new_version = dataclasses.replace(fn_version, config=new_config)
×
326
                                fn.versions[fn_version.id.qualifier] = new_version
×
327
                                self.lambda_service.create_function_version(fn_version).result(
×
328
                                    timeout=5
329
                                )
330
                        except Exception:
×
331
                            LOG.warning(
×
332
                                "Failed to restore function version %s",
333
                                fn_version.id.qualified_arn(),
334
                                exc_info=LOG.isEnabledFor(logging.DEBUG),
335
                            )
336
                    # restore provisioned concurrency per function considering both versions and aliases
337
                    for (
×
338
                        provisioned_qualifier,
339
                        provisioned_config,
340
                    ) in fn.provisioned_concurrency_configs.items():
341
                        fn_arn = None
×
342
                        try:
×
343
                            if api_utils.qualifier_is_alias(provisioned_qualifier):
×
344
                                alias = fn.aliases.get(provisioned_qualifier)
×
345
                                resolved_version = fn.versions.get(alias.function_version)
×
346
                                fn_arn = resolved_version.id.qualified_arn()
×
347
                            elif api_utils.qualifier_is_version(provisioned_qualifier):
×
348
                                fn_version = fn.versions.get(provisioned_qualifier)
×
349
                                fn_arn = fn_version.id.qualified_arn()
×
350
                            else:
351
                                raise InvalidParameterValueException(
×
352
                                    "Invalid qualifier type:"
353
                                    " Qualifier can only be an alias or a version for provisioned concurrency."
354
                                )
355

356
                            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
357
                            manager.update_provisioned_concurrency_config(
×
358
                                provisioned_config.provisioned_concurrent_executions
359
                            )
360
                        except Exception:
×
361
                            LOG.warning(
×
362
                                "Failed to restore provisioned concurrency %s for function %s",
363
                                provisioned_config,
364
                                fn_arn,
365
                                exc_info=LOG.isEnabledFor(logging.DEBUG),
366
                            )
367

368
                for esm in state.event_source_mappings.values():
×
369
                    # Restores event source workers
370
                    function_arn = esm.get("FunctionArn")
×
371

372
                    # TODO: How do we know the event source is up?
373
                    # A basic poll to see if the mapped Lambda function is active/failed
374
                    if not poll_condition(
×
375
                        lambda: get_function_version_from_arn(function_arn).config.state.state
376
                        in [State.Active, State.Failed],
377
                        timeout=10,
378
                    ):
379
                        LOG.warning(
×
380
                            "Creating ESM for Lambda that is not in running state: %s",
381
                            function_arn,
382
                        )
383

384
                    function_version = get_function_version_from_arn(function_arn)
×
385
                    function_role = function_version.config.role
×
386

387
                    is_esm_enabled = esm.get("State", EsmState.DISABLED) not in (
×
388
                        EsmState.DISABLED,
389
                        EsmState.DISABLING,
390
                    )
391
                    esm_worker = EsmWorkerFactory(
×
392
                        esm, function_role, is_esm_enabled
393
                    ).get_esm_worker()
394

395
                    # Note: a worker is created in the DISABLED state if not enabled
396
                    esm_worker.create()
×
397
                    # TODO: assigning the esm_worker to the dict only works after .create(). Could it cause a race
398
                    #  condition if we get a shutdown here and have a worker thread spawned but not accounted for?
399
                    self.esm_workers[esm_worker.uuid] = esm_worker
×
400

401
    def on_after_init(self):
1✔
402
        self.router.register_routes()
1✔
403
        get_runtime_executor().validate_environment()
1✔
404

405
    def on_before_stop(self) -> None:
1✔
406
        for esm_worker in self.esm_workers.values():
1✔
407
            esm_worker.stop_for_shutdown()
1✔
408

409
        # TODO: should probably unregister routes?
410
        self.lambda_service.stop()
1✔
411

412
    @staticmethod
1✔
413
    def _get_function(function_name: str, account_id: str, region: str) -> Function:
1✔
414
        state = lambda_stores[account_id][region]
1✔
415
        function = state.functions.get(function_name)
1✔
416
        if not function:
1✔
417
            arn = api_utils.unqualified_lambda_arn(
1✔
418
                function_name=function_name,
419
                account=account_id,
420
                region=region,
421
            )
422
            raise ResourceNotFoundException(
1✔
423
                f"Function not found: {arn}",
424
                Type="User",
425
            )
426
        return function
1✔
427

428
    @staticmethod
1✔
429
    def _get_esm(uuid: str, account_id: str, region: str) -> EventSourceMappingConfiguration:
1✔
430
        state = lambda_stores[account_id][region]
1✔
431
        esm = state.event_source_mappings.get(uuid)
1✔
432
        if not esm:
1✔
433
            arn = lambda_event_source_mapping_arn(uuid, account_id, region)
1✔
434
            raise ResourceNotFoundException(
1✔
435
                f"Event source mapping not found: {arn}",
436
                Type="User",
437
            )
438
        return esm
1✔
439

440
    @staticmethod
1✔
441
    def _get_capacity_provider(
1✔
442
        capacity_provider_name: str,
443
        account_id: str,
444
        region: str,
445
        error_msg_template: str = "Capacity provider not found: {}",
446
    ) -> CapacityProviderModel:
447
        state = lambda_stores[account_id][region]
1✔
448
        cp = state.capacity_providers.get(capacity_provider_name)
1✔
449
        if not cp:
1✔
450
            arn = capacity_provider_arn(capacity_provider_name, account_id, region)
1✔
451
            raise ResourceNotFoundException(
1✔
452
                error_msg_template.format(arn),
453
                Type="User",
454
            )
455
        return cp
×
456

457
    @staticmethod
1✔
458
    def _validate_qualifier_expression(qualifier: str) -> None:
1✔
459
        if error_messages := api_utils.validate_qualifier(qualifier):
1✔
460
            raise ValidationException(
×
461
                message=api_utils.construct_validation_exception_message(error_messages)
462
            )
463

464
    @staticmethod
1✔
465
    def _validate_publish_to(publish_to: str):
1✔
466
        if publish_to != FunctionVersionLatestPublished.LATEST_PUBLISHED:
×
467
            raise ValidationException(
×
468
                message=f"1 validation error detected: Value '{publish_to}' at 'publishTo' failed to satisfy constraint: Member must satisfy enum value set: [LATEST_PUBLISHED]"
469
            )
470

471
    @staticmethod
1✔
472
    def _resolve_fn_qualifier(resolved_fn: Function, qualifier: str | None) -> tuple[str, str]:
1✔
473
        """Attempts to resolve a given qualifier and returns a qualifier that exists or
474
        raises an appropriate ResourceNotFoundException.
475

476
        :param resolved_fn: The resolved lambda function
477
        :param qualifier: The qualifier to be resolved or None
478
        :return: Tuple of (resolved qualifier, function arn either qualified or unqualified)"""
479
        function_name = resolved_fn.function_name
1✔
480
        # assuming function versions need to live in the same account and region
481
        account_id = resolved_fn.latest().id.account
1✔
482
        region = resolved_fn.latest().id.region
1✔
483
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
484
        if qualifier is not None:
1✔
485
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
486
            if api_utils.qualifier_is_alias(qualifier):
1✔
487
                if qualifier not in resolved_fn.aliases:
1✔
488
                    raise ResourceNotFoundException(f"Cannot find alias arn: {fn_arn}", Type="User")
1✔
489
            elif api_utils.qualifier_is_version(qualifier) or qualifier == "$LATEST":
1✔
490
                if qualifier not in resolved_fn.versions:
1✔
491
                    raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
492
            else:
493
                # matches qualifier pattern but invalid alias or version
494
                raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
495
        resolved_qualifier = qualifier or "$LATEST"
1✔
496
        return resolved_qualifier, fn_arn
1✔
497

498
    @staticmethod
1✔
499
    def _function_revision_id(resolved_fn: Function, resolved_qualifier: str) -> str:
1✔
500
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
501
            return resolved_fn.aliases[resolved_qualifier].revision_id
1✔
502
        # Assumes that a non-alias is a version
503
        else:
504
            return resolved_fn.versions[resolved_qualifier].config.revision_id
1✔
505

506
    def _resolve_vpc_id(self, account_id: str, region_name: str, subnet_id: str) -> str:
1✔
507
        ec2_client = connect_to(aws_access_key_id=account_id, region_name=region_name).ec2
1✔
508
        try:
1✔
509
            return ec2_client.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["VpcId"]
1✔
510
        except ec2_client.exceptions.ClientError as e:
1✔
511
            code = e.response["Error"]["Code"]
1✔
512
            message = e.response["Error"]["Message"]
1✔
513
            raise InvalidParameterValueException(
1✔
514
                f"Error occurred while DescribeSubnets. EC2 Error Code: {code}. EC2 Error Message: {message}",
515
                Type="User",
516
            )
517

518
    def _build_vpc_config(
1✔
519
        self,
520
        account_id: str,
521
        region_name: str,
522
        vpc_config: dict | None = None,
523
    ) -> VpcConfig | None:
524
        if not vpc_config or not is_api_enabled("ec2"):
1✔
525
            return None
1✔
526

527
        subnet_ids = vpc_config.get("SubnetIds", [])
1✔
528
        if subnet_ids is not None and len(subnet_ids) == 0:
1✔
529
            return VpcConfig(vpc_id="", security_group_ids=[], subnet_ids=[])
1✔
530

531
        subnet_id = subnet_ids[0]
1✔
532
        if not bool(SUBNET_ID_REGEX.match(subnet_id)):
1✔
533
            raise ValidationException(
1✔
534
                f"1 validation error detected: Value '[{subnet_id}]' at 'vpcConfig.subnetIds' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 0, Member must satisfy regular expression pattern: subnet-[0-9a-z]*]"
535
            )
536

537
        return VpcConfig(
1✔
538
            vpc_id=self._resolve_vpc_id(account_id, region_name, subnet_id),
539
            security_group_ids=vpc_config.get("SecurityGroupIds", []),
540
            subnet_ids=subnet_ids,
541
        )
542

543
    def _create_version_model(
1✔
544
        self,
545
        function_name: str,
546
        region: str,
547
        account_id: str,
548
        description: str | None = None,
549
        revision_id: str | None = None,
550
        code_sha256: str | None = None,
551
        publish_to: FunctionVersionLatestPublished | None = None,
552
        is_active: bool = False,
553
    ) -> tuple[FunctionVersion, bool]:
554
        """
555
        Release a new version to the model if all restrictions are met.
556
        Restrictions:
557
          - CodeSha256, if provided, must equal the current latest version code hash
558
          - RevisionId, if provided, must equal the current latest version revision id
559
          - Some changes have been done to the latest version since last publish
560
        Will return a tuple of the version, and whether the version was published (True) or the latest available version was taken (False).
561
        This can happen if the latest version has not been changed since the last version publish, in this case the last version will be returned.
562

563
        :param function_name: Function name to be published
564
        :param region: Region of the function
565
        :param account_id: Account of the function
566
        :param description: new description of the version (will be the description of the function if missing)
567
        :param revision_id: Revision id, function will raise error if it does not match latest revision id
568
        :param code_sha256: Code sha256, function will raise error if it does not match latest code hash
569
        :return: Tuple of (published version, whether version was released or last released version returned, since nothing changed)
570
        """
571
        current_latest_version = get_function_version(
1✔
572
            function_name=function_name, qualifier="$LATEST", account_id=account_id, region=region
573
        )
574
        if revision_id and current_latest_version.config.revision_id != revision_id:
1✔
575
            raise PreconditionFailedException(
1✔
576
                "The Revision Id provided does not match the latest Revision Id. Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
577
                Type="User",
578
            )
579

580
        # check if code hashes match if they are specified
581
        current_hash = (
1✔
582
            current_latest_version.config.code.code_sha256
583
            if current_latest_version.config.package_type == PackageType.Zip
584
            else current_latest_version.config.image.code_sha256
585
        )
586
        # if the code is a zip package and hot reloaded (hot reloading is currently only supported for zip packagetypes)
587
        # we cannot enforce the codesha256 check
588
        is_hot_reloaded_zip_package = (
1✔
589
            current_latest_version.config.package_type == PackageType.Zip
590
            and current_latest_version.config.code.is_hot_reloading()
591
        )
592
        if code_sha256 and current_hash != code_sha256 and not is_hot_reloaded_zip_package:
1✔
593
            raise InvalidParameterValueException(
1✔
594
                f"CodeSHA256 ({code_sha256}) is different from current CodeSHA256 in $LATEST ({current_hash}). Please try again with the CodeSHA256 in $LATEST.",
595
                Type="User",
596
            )
597

598
        state = lambda_stores[account_id][region]
1✔
599
        function = state.functions.get(function_name)
1✔
600
        changes = {}
1✔
601
        if description is not None:
1✔
602
            changes["description"] = description
1✔
603
        # TODO copy environment instead of restarting one, get rid of all the "Pending"s
604

605
        with function.lock:
1✔
606
            if function.next_version > 1 and (
1✔
607
                prev_version := function.versions.get(str(function.next_version - 1))
608
            ):
609
                if (
1✔
610
                    prev_version.config.internal_revision
611
                    == current_latest_version.config.internal_revision
612
                ):
613
                    return prev_version, False
1✔
614
            # TODO check if there was a change since last version
615
            if publish_to == FunctionVersionLatestPublished.LATEST_PUBLISHED:
1✔
616
                qualifier = "$LATEST.PUBLISHED"
×
617
            else:
618
                qualifier = str(function.next_version)
1✔
619
                function.next_version += 1
1✔
620
            new_id = VersionIdentifier(
1✔
621
                function_name=function_name,
622
                qualifier=qualifier,
623
                region=region,
624
                account=account_id,
625
            )
626

627
            if current_latest_version.config.CapacityProviderConfig:
1✔
628
                # for lambda managed functions, snap start is not supported
629
                snap_start = None
×
630
            else:
631
                apply_on = current_latest_version.config.snap_start["ApplyOn"]
1✔
632
                optimization_status = SnapStartOptimizationStatus.Off
1✔
633
                if apply_on == SnapStartApplyOn.PublishedVersions:
1✔
634
                    optimization_status = SnapStartOptimizationStatus.On
×
635
                snap_start = SnapStartResponse(
1✔
636
                    ApplyOn=apply_on,
637
                    OptimizationStatus=optimization_status,
638
                )
639

640
            last_update = None
1✔
641
            new_state = VersionState(
1✔
642
                state=State.Pending,
643
                code=StateReasonCode.Creating,
644
                reason="The function is being created.",
645
            )
646
            if publish_to == FunctionVersionLatestPublished.LATEST_PUBLISHED:
1✔
647
                last_update = UpdateStatus(
×
648
                    status=LastUpdateStatus.InProgress,
649
                    code="Updating",
650
                    reason="The function is being updated.",
651
                )
652
                if is_active:
×
653
                    new_state = VersionState(state=State.Active)
×
654
            new_version = dataclasses.replace(
1✔
655
                current_latest_version,
656
                config=dataclasses.replace(
657
                    current_latest_version.config,
658
                    last_update=last_update,
659
                    state=new_state,
660
                    snap_start=snap_start,
661
                    **changes,
662
                ),
663
                id=new_id,
664
            )
665
            function.versions[qualifier] = new_version
1✔
666
        return new_version, True
1✔
667

668
    def _publish_version_from_existing_version(
1✔
669
        self,
670
        function_name: str,
671
        region: str,
672
        account_id: str,
673
        description: str | None = None,
674
        revision_id: str | None = None,
675
        code_sha256: str | None = None,
676
        publish_to: FunctionVersionLatestPublished | None = None,
677
    ) -> FunctionVersion:
678
        """
679
        Publish version from an existing, already initialized LATEST
680

681
        :param function_name: Function name
682
        :param region: region
683
        :param account_id: account id
684
        :param description: description
685
        :param revision_id: revision id (check if current version matches)
686
        :param code_sha256: code sha (check if current code matches)
687
        :return: new version
688
        """
689
        is_active = True if publish_to == FunctionVersionLatestPublished.LATEST_PUBLISHED else False
1✔
690
        new_version, changed = self._create_version_model(
1✔
691
            function_name=function_name,
692
            region=region,
693
            account_id=account_id,
694
            description=description,
695
            revision_id=revision_id,
696
            code_sha256=code_sha256,
697
            publish_to=publish_to,
698
            is_active=is_active,
699
        )
700
        if not changed:
1✔
701
            return new_version
1✔
702

703
        if new_version.config.CapacityProviderConfig:
1✔
704
            self.lambda_service.publish_version_async(new_version)
×
705
        else:
706
            self.lambda_service.publish_version(new_version)
1✔
707
        state = lambda_stores[account_id][region]
1✔
708
        function = state.functions.get(function_name)
1✔
709

710
        # Update revision id for $LATEST version
711
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
712
        latest_version = function.versions["$LATEST"]
1✔
713
        function.versions["$LATEST"] = dataclasses.replace(
1✔
714
            latest_version, config=dataclasses.replace(latest_version.config)
715
        )
716
        if new_version.config.CapacityProviderConfig:
1✔
717
            # publish_version happens async for functions with a capacity provider.
718
            # Therefore, we return the new_version with State=Pending or LastUpdateStatus=InProgress ($LATEST.PUBLISHED)
719
            return new_version
×
720
        else:
721
            # Regular functions yield an Active state modified during `publish_version` (sync).
722
            # Therefore, we need to get the updated version from the store.
723
            updated_version = function.versions.get(new_version.id.qualifier)
1✔
724
            return updated_version
1✔
725

726
    def _publish_version_with_changes(
1✔
727
        self,
728
        function_name: str,
729
        region: str,
730
        account_id: str,
731
        description: str | None = None,
732
        revision_id: str | None = None,
733
        code_sha256: str | None = None,
734
        publish_to: FunctionVersionLatestPublished | None = None,
735
        is_active: bool = False,
736
    ) -> FunctionVersion:
737
        """
738
        Publish version together with a new latest version (publish on create / update)
739

740
        :param function_name: Function name
741
        :param region: region
742
        :param account_id: account id
743
        :param description: description
744
        :param revision_id: revision id (check if current version matches)
745
        :param code_sha256: code sha (check if current code matches)
746
        :return: new version
747
        """
748
        new_version, changed = self._create_version_model(
1✔
749
            function_name=function_name,
750
            region=region,
751
            account_id=account_id,
752
            description=description,
753
            revision_id=revision_id,
754
            code_sha256=code_sha256,
755
            publish_to=publish_to,
756
            is_active=is_active,
757
        )
758
        if not changed:
1✔
759
            return new_version
×
760
        self.lambda_service.create_function_version(new_version)
1✔
761
        return new_version
1✔
762

763
    @staticmethod
1✔
764
    def _verify_env_variables(env_vars: dict[str, str]):
1✔
765
        dumped_env_vars = json.dumps(env_vars, separators=(",", ":"))
1✔
766
        if (
1✔
767
            len(dumped_env_vars.encode("utf-8"))
768
            > config.LAMBDA_LIMITS_MAX_FUNCTION_ENVVAR_SIZE_BYTES
769
        ):
770
            raise InvalidParameterValueException(
1✔
771
                f"Lambda was unable to configure your environment variables because the environment variables you have provided exceeded the 4KB limit. String measured: {dumped_env_vars}",
772
                Type="User",
773
            )
774

775
    @staticmethod
1✔
776
    def _validate_snapstart(snap_start: SnapStart, runtime: Runtime):
1✔
777
        apply_on = snap_start.get("ApplyOn")
1✔
778
        if apply_on not in [
1✔
779
            SnapStartApplyOn.PublishedVersions,
780
            SnapStartApplyOn.None_,
781
        ]:
782
            raise ValidationException(
1✔
783
                f"1 validation error detected: Value '{apply_on}' at 'snapStart.applyOn' failed to satisfy constraint: Member must satisfy enum value set: [PublishedVersions, None]"
784
            )
785

786
        if runtime not in SNAP_START_SUPPORTED_RUNTIMES:
1✔
787
            raise InvalidParameterValueException(
×
788
                f"{runtime} is not supported for SnapStart enabled functions.", Type="User"
789
            )
790

791
    def _validate_layers(self, new_layers: list[str], region: str, account_id: str):
1✔
792
        if len(new_layers) > LAMBDA_LAYERS_LIMIT_PER_FUNCTION:
1✔
793
            raise InvalidParameterValueException(
1✔
794
                "Cannot reference more than 5 layers.", Type="User"
795
            )
796

797
        visited_layers = {}
1✔
798
        for layer_version_arn in new_layers:
1✔
799
            (
1✔
800
                layer_region,
801
                layer_account_id,
802
                layer_name,
803
                layer_version_str,
804
            ) = api_utils.parse_layer_arn(layer_version_arn)
805
            if layer_version_str is None:
1✔
806
                raise ValidationException(
1✔
807
                    f"1 validation error detected: Value '[{layer_version_arn}]'"
808
                    + " at 'layers' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 2048, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: "
809
                    + "(arn:(aws[a-zA-Z-]*)?:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+), Member must not be null]",
810
                )
811

812
            state = lambda_stores[layer_account_id][layer_region]
1✔
813
            layer = state.layers.get(layer_name)
1✔
814
            layer_version = None
1✔
815
            if layer is not None:
1✔
816
                layer_version = layer.layer_versions.get(layer_version_str)
1✔
817
            if layer_account_id == account_id:
1✔
818
                if region and layer_region != region:
1✔
819
                    raise InvalidParameterValueException(
1✔
820
                        f"Layers are not in the same region as the function. "
821
                        f"Layers are expected to be in region {region}.",
822
                        Type="User",
823
                    )
824
                if layer is None or layer.layer_versions.get(layer_version_str) is None:
1✔
825
                    raise InvalidParameterValueException(
1✔
826
                        f"Layer version {layer_version_arn} does not exist.", Type="User"
827
                    )
828
            else:  # External layer from other account
829
                # TODO: validate IAM layer policy here, allowing access by default for now and only checking region
830
                if region and layer_region != region:
×
831
                    # TODO: detect user or role from context when IAM users are implemented
832
                    user = "user/localstack-testing"
×
833
                    raise AccessDeniedException(
×
834
                        f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
835
                    )
836
                if layer is None or layer_version is None:
×
837
                    # Limitation: cannot fetch external layers when using the same account id as the target layer
838
                    # because we do not want to trigger the layer fetcher for every non-existing layer.
839
                    if self.layer_fetcher is None:
×
840
                        raise NotImplementedError(
841
                            "Fetching shared layers from AWS is a pro feature."
842
                        )
843

844
                    layer = self.layer_fetcher.fetch_layer(layer_version_arn)
×
845
                    if layer is None:
×
846
                        # TODO: detect user or role from context when IAM users are implemented
847
                        user = "user/localstack-testing"
×
848
                        raise AccessDeniedException(
×
849
                            f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
850
                        )
851

852
                    # Distinguish between new layer and new layer version
853
                    if layer_version is None:
×
854
                        # Create whole layer from scratch
855
                        state.layers[layer_name] = layer
×
856
                    else:
857
                        # Create layer version if another version of the same layer already exists
858
                        state.layers[layer_name].layer_versions[layer_version_str] = (
×
859
                            layer.layer_versions.get(layer_version_str)
860
                        )
861

862
            # only the first two matches in the array are considered for the error message
863
            layer_arn = ":".join(layer_version_arn.split(":")[:-1])
1✔
864
            if layer_arn in visited_layers:
1✔
865
                conflict_layer_version_arn = visited_layers[layer_arn]
1✔
866
                raise InvalidParameterValueException(
1✔
867
                    f"Two different versions of the same layer are not allowed to be referenced in the same function. {conflict_layer_version_arn} and {layer_version_arn} are versions of the same layer.",
868
                    Type="User",
869
                )
870
            visited_layers[layer_arn] = layer_version_arn
1✔
871

872
    def _validate_capacity_provider_config(
1✔
873
        self, capacity_provider_config: CapacityProviderConfig, context: RequestContext
874
    ):
875
        if not capacity_provider_config.get("LambdaManagedInstancesCapacityProviderConfig"):
×
876
            raise ValidationException(
×
877
                "1 validation error detected: Value null at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig' failed to satisfy constraint: Member must not be null"
878
            )
879

880
        capacity_provider_arn = capacity_provider_config.get(
×
881
            "LambdaManagedInstancesCapacityProviderConfig", {}
882
        ).get("CapacityProviderArn")
883
        if not capacity_provider_arn:
×
884
            raise ValidationException(
×
885
                "1 validation error detected: Value null at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig.capacityProviderArn' failed to satisfy constraint: Member must not be null"
886
            )
887

888
        if not re.match(CAPACITY_PROVIDER_ARN_NAME, capacity_provider_arn):
×
889
            raise ValidationException(
×
890
                f"1 validation error detected: Value '{capacity_provider_arn}' at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig.capacityProviderArn' failed to satisfy constraint: Member must satisfy regular expression pattern: {CAPACITY_PROVIDER_ARN_NAME}"
891
            )
892

893
        capacity_provider_name = capacity_provider_arn.split(":")[-1]
×
894
        self.get_capacity_provider(context, capacity_provider_name)
×
895

896
    @staticmethod
1✔
897
    def map_layers(new_layers: list[str]) -> list[LayerVersion]:
1✔
898
        layers = []
1✔
899
        for layer_version_arn in new_layers:
1✔
900
            region_name, account_id, layer_name, layer_version = api_utils.parse_layer_arn(
1✔
901
                layer_version_arn
902
            )
903
            layer = lambda_stores[account_id][region_name].layers.get(layer_name)
1✔
904
            layer_version = layer.layer_versions.get(layer_version)
1✔
905
            layers.append(layer_version)
1✔
906
        return layers
1✔
907

908
    def get_function_recursion_config(
1✔
909
        self,
910
        context: RequestContext,
911
        function_name: UnqualifiedFunctionName,
912
        **kwargs,
913
    ) -> GetFunctionRecursionConfigResponse:
914
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
915
        function_name = api_utils.get_function_name(function_name, context)
1✔
916
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
917
        return GetFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
918

919
    def put_function_recursion_config(
1✔
920
        self,
921
        context: RequestContext,
922
        function_name: UnqualifiedFunctionName,
923
        recursive_loop: RecursiveLoop,
924
        **kwargs,
925
    ) -> PutFunctionRecursionConfigResponse:
926
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
927
        function_name = api_utils.get_function_name(function_name, context)
1✔
928

929
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
930

931
        allowed_values = list(RecursiveLoop.__members__.values())
1✔
932
        if recursive_loop not in allowed_values:
1✔
933
            raise ValidationException(
1✔
934
                f"1 validation error detected: Value '{recursive_loop}' at 'recursiveLoop' failed to satisfy constraint: "
935
                f"Member must satisfy enum value set: [Terminate, Allow]"
936
            )
937

938
        fn.recursive_loop = recursive_loop
1✔
939
        return PutFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
940

941
    @handler(operation="CreateFunction", expand=False)
1✔
942
    def create_function(
1✔
943
        self,
944
        context: RequestContext,
945
        request: CreateFunctionRequest,
946
    ) -> FunctionConfiguration:
947
        context_region = context.region
1✔
948
        context_account_id = context.account_id
1✔
949

950
        zip_file = request.get("Code", {}).get("ZipFile")
1✔
951
        if zip_file and len(zip_file) > config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED:
1✔
952
            raise RequestEntityTooLargeException(
1✔
953
                f"Zipped size must be smaller than {config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED} bytes"
954
            )
955

956
        if context.request.content_length > config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE:
1✔
957
            raise RequestEntityTooLargeException(
1✔
958
                f"Request must be smaller than {config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE} bytes for the CreateFunction operation"
959
            )
960

961
        if architectures := request.get("Architectures"):
1✔
962
            if len(architectures) != 1:
1✔
963
                raise ValidationException(
1✔
964
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
965
                    f"satisfy constraint: Member must have length less than or equal to 1",
966
                )
967
            if architectures[0] not in ARCHITECTURES:
1✔
968
                raise ValidationException(
1✔
969
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
970
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
971
                    f"[x86_64, arm64], Member must not be null]",
972
                )
973

974
        if env_vars := request.get("Environment", {}).get("Variables"):
1✔
975
            self._verify_env_variables(env_vars)
1✔
976

977
        if layers := request.get("Layers", []):
1✔
978
            self._validate_layers(layers, region=context_region, account_id=context_account_id)
1✔
979

980
        if not api_utils.is_role_arn(request.get("Role")):
1✔
981
            raise ValidationException(
1✔
982
                f"1 validation error detected: Value '{request.get('Role')}'"
983
                + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
984
            )
985
        if not self.lambda_service.can_assume_role(request.get("Role"), context.region):
1✔
986
            raise InvalidParameterValueException(
×
987
                "The role defined for the function cannot be assumed by Lambda.", Type="User"
988
            )
989
        package_type = request.get("PackageType", PackageType.Zip)
1✔
990
        runtime = request.get("Runtime")
1✔
991
        self._validate_runtime(package_type, runtime)
1✔
992

993
        request_function_name = request.get("FunctionName")
1✔
994

995
        function_name, *_ = api_utils.get_name_and_qualifier(
1✔
996
            function_arn_or_name=request_function_name,
997
            qualifier=None,
998
            context=context,
999
        )
1000

1001
        if runtime in DEPRECATED_RUNTIMES:
1✔
1002
            LOG.warning(
1✔
1003
                "The Lambda runtime %s} is deprecated. "
1004
                "Please upgrade the runtime for the function %s: "
1005
                "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
1006
                runtime,
1007
                function_name,
1008
            )
1009
        if snap_start := request.get("SnapStart"):
1✔
1010
            self._validate_snapstart(snap_start, runtime)
1✔
1011
        if publish_to := request.get("PublishTo"):
1✔
1012
            self._validate_publish_to(publish_to)
×
1013
        state = lambda_stores[context_account_id][context_region]
1✔
1014

1015
        with self.create_fn_lock:
1✔
1016
            if function_name in state.functions:
1✔
1017
                raise ResourceConflictException(f"Function already exist: {function_name}")
×
1018
            fn = Function(function_name=function_name)
1✔
1019
            arn = VersionIdentifier(
1✔
1020
                function_name=function_name,
1021
                qualifier="$LATEST",
1022
                region=context_region,
1023
                account=context_account_id,
1024
            )
1025
            # save function code to s3
1026
            code = None
1✔
1027
            image = None
1✔
1028
            image_config = None
1✔
1029
            runtime_version_config = RuntimeVersionConfig(
1✔
1030
                # Limitation: the runtime id (presumably sha256 of image) is currently hardcoded
1031
                # Potential implementation: provide (cached) sha256 hash of used Docker image
1032
                RuntimeVersionArn=f"arn:{context.partition}:lambda:{context_region}::runtime:8eeff65f6809a3ce81507fe733fe09b835899b99481ba22fd75b5a7338290ec1"
1033
            )
1034
            request_code = request.get("Code")
1✔
1035
            if package_type == PackageType.Zip:
1✔
1036
                # TODO verify if correct combination of code is set
1037
                if zip_file := request_code.get("ZipFile"):
1✔
1038
                    code = store_lambda_archive(
1✔
1039
                        archive_file=zip_file,
1040
                        function_name=function_name,
1041
                        region_name=context_region,
1042
                        account_id=context_account_id,
1043
                    )
1044
                elif s3_bucket := request_code.get("S3Bucket"):
1✔
1045
                    s3_key = request_code["S3Key"]
1✔
1046
                    s3_object_version = request_code.get("S3ObjectVersion")
1✔
1047
                    code = store_s3_bucket_archive(
1✔
1048
                        archive_bucket=s3_bucket,
1049
                        archive_key=s3_key,
1050
                        archive_version=s3_object_version,
1051
                        function_name=function_name,
1052
                        region_name=context_region,
1053
                        account_id=context_account_id,
1054
                    )
1055
                else:
1056
                    raise LambdaServiceException("A ZIP file or S3 bucket is required")
×
1057
            elif package_type == PackageType.Image:
1✔
1058
                image = request_code.get("ImageUri")
1✔
1059
                if not image:
1✔
1060
                    raise LambdaServiceException(
×
1061
                        "An image is required when the package type is set to 'image'"
1062
                    )
1063
                image = create_image_code(image_uri=image)
1✔
1064

1065
                image_config_req = request.get("ImageConfig", {})
1✔
1066
                image_config = ImageConfig(
1✔
1067
                    command=image_config_req.get("Command"),
1068
                    entrypoint=image_config_req.get("EntryPoint"),
1069
                    working_directory=image_config_req.get("WorkingDirectory"),
1070
                )
1071
                # Runtime management controls are not available when providing a custom image
1072
                runtime_version_config = None
1✔
1073

1074
            capacity_provider_config = None
1✔
1075
            memory_size = request.get("MemorySize", LAMBDA_DEFAULT_MEMORY_SIZE)
1✔
1076
            if "CapacityProviderConfig" in request:
1✔
1077
                capacity_provider_config = request["CapacityProviderConfig"]
×
1078
                self._validate_capacity_provider_config(capacity_provider_config, context)
×
1079

1080
                default_config = CapacityProviderConfig(
×
1081
                    LambdaManagedInstancesCapacityProviderConfig=LambdaManagedInstancesCapacityProviderConfig(
1082
                        ExecutionEnvironmentMemoryGiBPerVCpu=2.0,
1083
                        PerExecutionEnvironmentMaxConcurrency=16,
1084
                    )
1085
                )
1086
                capacity_provider_config = merge_recursive(default_config, capacity_provider_config)
×
1087
                memory_size = 2048
×
1088
                if request.get("LoggingConfig", {}).get("LogFormat") == LogFormat.Text:
×
1089
                    raise InvalidParameterValueException(
×
1090
                        'LogLevel is not supported when LogFormat is set to "Text". Remove LogLevel from your request or change the LogFormat to "JSON" and try again.',
1091
                        Type="User",
1092
                    )
1093
            if "LoggingConfig" in request:
1✔
1094
                logging_config = request["LoggingConfig"]
1✔
1095
                LOG.warning(
1✔
1096
                    "Advanced Lambda Logging Configuration is currently mocked "
1097
                    "and will not impact the logging behavior. "
1098
                    "Please create a feature request if needed."
1099
                )
1100

1101
                # when switching to JSON, app and system level log is auto set to INFO
1102
                if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
1103
                    logging_config = {
1✔
1104
                        "ApplicationLogLevel": "INFO",
1105
                        "SystemLogLevel": "INFO",
1106
                        "LogGroup": f"/aws/lambda/{function_name}",
1107
                    } | logging_config
1108
                else:
1109
                    logging_config = (
×
1110
                        LoggingConfig(
1111
                            LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
1112
                        )
1113
                        | logging_config
1114
                    )
1115

1116
            elif capacity_provider_config:
1✔
1117
                logging_config = LoggingConfig(
×
1118
                    LogFormat=LogFormat.JSON,
1119
                    LogGroup=f"/aws/lambda/{function_name}",
1120
                    ApplicationLogLevel="INFO",
1121
                    SystemLogLevel="INFO",
1122
                )
1123
            else:
1124
                logging_config = LoggingConfig(
1✔
1125
                    LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
1126
                )
1127
            snap_start = (
1✔
1128
                None
1129
                if capacity_provider_config
1130
                else SnapStartResponse(
1131
                    ApplyOn=request.get("SnapStart", {}).get("ApplyOn", SnapStartApplyOn.None_),
1132
                    OptimizationStatus=SnapStartOptimizationStatus.Off,
1133
                )
1134
            )
1135
            version = FunctionVersion(
1✔
1136
                id=arn,
1137
                config=VersionFunctionConfiguration(
1138
                    last_modified=api_utils.format_lambda_date(datetime.datetime.now()),
1139
                    description=request.get("Description", ""),
1140
                    role=request["Role"],
1141
                    timeout=request.get("Timeout", LAMBDA_DEFAULT_TIMEOUT),
1142
                    runtime=request.get("Runtime"),
1143
                    memory_size=memory_size,
1144
                    handler=request.get("Handler"),
1145
                    package_type=package_type,
1146
                    environment=env_vars,
1147
                    architectures=request.get("Architectures") or [Architecture.x86_64],
1148
                    tracing_config_mode=request.get("TracingConfig", {}).get(
1149
                        "Mode", TracingMode.PassThrough
1150
                    ),
1151
                    image=image,
1152
                    image_config=image_config,
1153
                    code=code,
1154
                    layers=self.map_layers(layers),
1155
                    internal_revision=short_uid(),
1156
                    ephemeral_storage=LambdaEphemeralStorage(
1157
                        size=request.get("EphemeralStorage", {}).get("Size", 512)
1158
                    ),
1159
                    snap_start=snap_start,
1160
                    runtime_version_config=runtime_version_config,
1161
                    dead_letter_arn=request.get("DeadLetterConfig", {}).get("TargetArn"),
1162
                    vpc_config=self._build_vpc_config(
1163
                        context_account_id, context_region, request.get("VpcConfig")
1164
                    ),
1165
                    state=VersionState(
1166
                        state=State.Pending,
1167
                        code=StateReasonCode.Creating,
1168
                        reason="The function is being created.",
1169
                    ),
1170
                    logging_config=logging_config,
1171
                    # TODO: might need something like **optional_kwargs if None
1172
                    #   -> Test with regular GetFunction (i.e., without a capacity provider)
1173
                    CapacityProviderConfig=capacity_provider_config,
1174
                ),
1175
            )
1176
            version_post_response = None
1✔
1177
            if capacity_provider_config:
1✔
1178
                version_post_response = dataclasses.replace(
×
1179
                    version,
1180
                    config=dataclasses.replace(
1181
                        version.config,
1182
                        last_update=UpdateStatus(status=LastUpdateStatus.Successful),
1183
                        state=VersionState(state=State.ActiveNonInvocable),
1184
                    ),
1185
                )
1186
            fn.versions["$LATEST"] = version_post_response or version
1✔
1187
            state.functions[function_name] = fn
1✔
1188
        initialization_type = (
1✔
1189
            FunctionInitializationType.lambda_managed_instances
1190
            if capacity_provider_config
1191
            else FunctionInitializationType.on_demand
1192
        )
1193
        function_counter.labels(
1✔
1194
            operation=FunctionOperation.create,
1195
            runtime=runtime or "n/a",
1196
            status=FunctionStatus.success,
1197
            invocation_type="n/a",
1198
            package_type=package_type,
1199
            initialization_type=initialization_type,
1200
        )
1201
        # TODO: consider potential other side effects of not having a function version for $LATEST
1202
        # Provisioning happens upon publishing for functions using a capacity provider
1203
        if not capacity_provider_config:
1✔
1204
            self.lambda_service.create_function_version(version)
1✔
1205

1206
        if tags := request.get("Tags"):
1✔
1207
            # This will check whether the function exists.
1208
            self._store_tags(arn.unqualified_arn(), tags)
1✔
1209

1210
        if request.get("Publish"):
1✔
1211
            version = self._publish_version_with_changes(
1✔
1212
                function_name=function_name,
1213
                region=context_region,
1214
                account_id=context_account_id,
1215
                publish_to=request.get("PublishTo"),
1216
            )
1217

1218
        if config.LAMBDA_SYNCHRONOUS_CREATE:
1✔
1219
            # block via retrying until "terminal" condition reached before returning
1220
            if not poll_condition(
×
1221
                lambda: get_function_version(
1222
                    function_name, version.id.qualifier, version.id.account, version.id.region
1223
                ).config.state.state
1224
                in [State.Active, State.ActiveNonInvocable, State.Failed],
1225
                timeout=10,
1226
            ):
1227
                LOG.warning(
×
1228
                    "LAMBDA_SYNCHRONOUS_CREATE is active, but waiting for %s reached timeout.",
1229
                    function_name,
1230
                )
1231

1232
        return api_utils.map_config_out(
1✔
1233
            version, return_qualified_arn=False, return_update_status=False
1234
        )
1235

1236
    def _validate_runtime(self, package_type, runtime):
1✔
1237
        runtimes = ALL_RUNTIMES
1✔
1238
        if config.LAMBDA_RUNTIME_VALIDATION:
1✔
1239
            runtimes = list(itertools.chain(RUNTIMES_AGGREGATED.values()))
1✔
1240

1241
        if package_type == PackageType.Zip and runtime not in runtimes:
1✔
1242
            # deprecated runtimes have different error
1243
            if runtime in DEPRECATED_RUNTIMES:
1✔
1244
                HINT_LOG.info(
1✔
1245
                    "Set env variable LAMBDA_RUNTIME_VALIDATION to 0"
1246
                    " in order to allow usage of deprecated runtimes"
1247
                )
1248
                self._check_for_recomended_migration_target(runtime)
1✔
1249

1250
            raise InvalidParameterValueException(
1✔
1251
                f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1252
                Type="User",
1253
            )
1254

1255
    def _check_for_recomended_migration_target(self, deprecated_runtime):
1✔
1256
        # AWS offers recommended runtime for migration for "newly" deprecated runtimes
1257
        # in order to preserve parity with error messages we need the code bellow
1258
        latest_runtime = DEPRECATED_RUNTIMES_UPGRADES.get(deprecated_runtime)
1✔
1259

1260
        if latest_runtime is not None:
1✔
1261
            LOG.debug(
1✔
1262
                "The Lambda runtime %s is deprecated. Please upgrade to a supported Lambda runtime such as %s.",
1263
                deprecated_runtime,
1264
                latest_runtime,
1265
            )
1266
            raise InvalidParameterValueException(
1✔
1267
                f"The runtime parameter of {deprecated_runtime} is no longer supported for creating or updating AWS Lambda functions. We recommend you use a supported runtime while creating or updating functions.",
1268
                Type="User",
1269
            )
1270

1271
    @handler(operation="UpdateFunctionConfiguration", expand=False)
1✔
1272
    def update_function_configuration(
1✔
1273
        self, context: RequestContext, request: UpdateFunctionConfigurationRequest
1274
    ) -> FunctionConfiguration:
1275
        """updates the $LATEST version of the function"""
1276
        function_name = request.get("FunctionName")
1✔
1277

1278
        # in case we got ARN or partial ARN
1279
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1280
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1281
        state = lambda_stores[account_id][region]
1✔
1282

1283
        if function_name not in state.functions:
1✔
1284
            raise ResourceNotFoundException(
×
1285
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1286
                Type="User",
1287
            )
1288
        function = state.functions[function_name]
1✔
1289

1290
        # TODO: lock modification of latest version
1291
        # TODO: notify service for changes relevant to re-provisioning of $LATEST
1292
        latest_version = function.latest()
1✔
1293
        latest_version_config = latest_version.config
1✔
1294

1295
        revision_id = request.get("RevisionId")
1✔
1296
        if revision_id and revision_id != latest_version.config.revision_id:
1✔
1297
            raise PreconditionFailedException(
1✔
1298
                "The Revision Id provided does not match the latest Revision Id. "
1299
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1300
                Type="User",
1301
            )
1302

1303
        replace_kwargs = {}
1✔
1304
        if "EphemeralStorage" in request:
1✔
1305
            replace_kwargs["ephemeral_storage"] = LambdaEphemeralStorage(
×
1306
                request.get("EphemeralStorage", {}).get("Size", 512)
1307
            )  # TODO: do defaults here apply as well?
1308

1309
        if "Role" in request:
1✔
1310
            if not api_utils.is_role_arn(request["Role"]):
1✔
1311
                raise ValidationException(
1✔
1312
                    f"1 validation error detected: Value '{request.get('Role')}'"
1313
                    + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
1314
                )
1315
            replace_kwargs["role"] = request["Role"]
1✔
1316

1317
        if "Description" in request:
1✔
1318
            replace_kwargs["description"] = request["Description"]
1✔
1319

1320
        if "Timeout" in request:
1✔
1321
            replace_kwargs["timeout"] = request["Timeout"]
1✔
1322

1323
        if "MemorySize" in request:
1✔
1324
            replace_kwargs["memory_size"] = request["MemorySize"]
1✔
1325

1326
        if "DeadLetterConfig" in request:
1✔
1327
            replace_kwargs["dead_letter_arn"] = request.get("DeadLetterConfig", {}).get("TargetArn")
1✔
1328

1329
        if vpc_config := request.get("VpcConfig"):
1✔
1330
            replace_kwargs["vpc_config"] = self._build_vpc_config(account_id, region, vpc_config)
1✔
1331

1332
        if "Handler" in request:
1✔
1333
            replace_kwargs["handler"] = request["Handler"]
1✔
1334

1335
        if "Runtime" in request:
1✔
1336
            runtime = request["Runtime"]
1✔
1337

1338
            if runtime not in ALL_RUNTIMES:
1✔
1339
                raise InvalidParameterValueException(
1✔
1340
                    f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1341
                    Type="User",
1342
                )
1343
            if runtime in DEPRECATED_RUNTIMES:
1✔
1344
                LOG.warning(
×
1345
                    "The Lambda runtime %s is deprecated. "
1346
                    "Please upgrade the runtime for the function %s: "
1347
                    "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
1348
                    runtime,
1349
                    function_name,
1350
                )
1351
            replace_kwargs["runtime"] = request["Runtime"]
1✔
1352

1353
        if snap_start := request.get("SnapStart"):
1✔
1354
            runtime = replace_kwargs.get("runtime") or latest_version_config.runtime
1✔
1355
            self._validate_snapstart(snap_start, runtime)
1✔
1356
            replace_kwargs["snap_start"] = SnapStartResponse(
1✔
1357
                ApplyOn=snap_start.get("ApplyOn", SnapStartApplyOn.None_),
1358
                OptimizationStatus=SnapStartOptimizationStatus.Off,
1359
            )
1360

1361
        if "Environment" in request:
1✔
1362
            if env_vars := request.get("Environment", {}).get("Variables", {}):
1✔
1363
                self._verify_env_variables(env_vars)
1✔
1364
            replace_kwargs["environment"] = env_vars
1✔
1365

1366
        if "Layers" in request:
1✔
1367
            new_layers = request["Layers"]
1✔
1368
            if new_layers:
1✔
1369
                self._validate_layers(new_layers, region=region, account_id=account_id)
1✔
1370
            replace_kwargs["layers"] = self.map_layers(new_layers)
1✔
1371

1372
        if "ImageConfig" in request:
1✔
1373
            new_image_config = request["ImageConfig"]
1✔
1374
            replace_kwargs["image_config"] = ImageConfig(
1✔
1375
                command=new_image_config.get("Command"),
1376
                entrypoint=new_image_config.get("EntryPoint"),
1377
                working_directory=new_image_config.get("WorkingDirectory"),
1378
            )
1379

1380
        if "LoggingConfig" in request:
1✔
1381
            logging_config = request["LoggingConfig"]
1✔
1382
            LOG.warning(
1✔
1383
                "Advanced Lambda Logging Configuration is currently mocked "
1384
                "and will not impact the logging behavior. "
1385
                "Please create a feature request if needed."
1386
            )
1387

1388
            # when switching to JSON, app and system level log is auto set to INFO
1389
            if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
1390
                logging_config = {
1✔
1391
                    "ApplicationLogLevel": "INFO",
1392
                    "SystemLogLevel": "INFO",
1393
                } | logging_config
1394

1395
            last_config = latest_version_config.logging_config
1✔
1396

1397
            # add partial update
1398
            new_logging_config = last_config | logging_config
1✔
1399

1400
            # in case we switched from JSON to Text we need to remove LogLevel keys
1401
            if (
1✔
1402
                new_logging_config.get("LogFormat") == LogFormat.Text
1403
                and last_config.get("LogFormat") == LogFormat.JSON
1404
            ):
1405
                new_logging_config.pop("ApplicationLogLevel", None)
1✔
1406
                new_logging_config.pop("SystemLogLevel", None)
1✔
1407

1408
            replace_kwargs["logging_config"] = new_logging_config
1✔
1409

1410
        if "TracingConfig" in request:
1✔
1411
            new_mode = request.get("TracingConfig", {}).get("Mode")
×
1412
            if new_mode:
×
1413
                replace_kwargs["tracing_config_mode"] = new_mode
×
1414

1415
        if "CapacityProviderConfig" in request:
1✔
1416
            capacity_provider_config = request["CapacityProviderConfig"]
×
1417
            self._validate_capacity_provider_config(capacity_provider_config, context)
×
1418

1419
            if latest_version.config.CapacityProviderConfig and not request[
×
1420
                "CapacityProviderConfig"
1421
            ].get("LambdaManagedInstancesCapacityProviderConfig"):
1422
                raise ValidationException(
×
1423
                    "1 validation error detected: Value null at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig' failed to satisfy constraint: Member must not be null"
1424
                )
1425
            if not latest_version.config.CapacityProviderConfig:
×
1426
                raise InvalidParameterValueException(
×
1427
                    "CapacityProviderConfig isn't supported for Lambda Default functions.",
1428
                    Type="User",
1429
                )
1430

1431
        new_latest_version = dataclasses.replace(
1✔
1432
            latest_version,
1433
            config=dataclasses.replace(
1434
                latest_version_config,
1435
                last_modified=api_utils.generate_lambda_date(),
1436
                internal_revision=short_uid(),
1437
                last_update=UpdateStatus(
1438
                    status=LastUpdateStatus.InProgress,
1439
                    code="Creating",
1440
                    reason="The function is being created.",
1441
                ),
1442
                **replace_kwargs,
1443
            ),
1444
        )
1445
        function.versions["$LATEST"] = new_latest_version  # TODO: notify
1✔
1446
        self.lambda_service.update_version(new_version=new_latest_version)
1✔
1447

1448
        return api_utils.map_config_out(new_latest_version)
1✔
1449

1450
    @handler(operation="UpdateFunctionCode", expand=False)
1✔
1451
    def update_function_code(
1✔
1452
        self, context: RequestContext, request: UpdateFunctionCodeRequest
1453
    ) -> FunctionConfiguration:
1454
        """updates the $LATEST version of the function"""
1455
        # only supports normal zip packaging atm
1456
        # if request.get("Publish"):
1457
        #     self.lambda_service.create_function_version()
1458

1459
        function_name = request.get("FunctionName")
1✔
1460
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1461
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1462

1463
        store = lambda_stores[account_id][region]
1✔
1464
        if function_name not in store.functions:
1✔
1465
            raise ResourceNotFoundException(
×
1466
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1467
                Type="User",
1468
            )
1469
        function = store.functions[function_name]
1✔
1470

1471
        revision_id = request.get("RevisionId")
1✔
1472
        if revision_id and revision_id != function.latest().config.revision_id:
1✔
1473
            raise PreconditionFailedException(
1✔
1474
                "The Revision Id provided does not match the latest Revision Id. "
1475
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1476
                Type="User",
1477
            )
1478

1479
        # TODO verify if correct combination of code is set
1480
        image = None
1✔
1481
        if (
1✔
1482
            request.get("ZipFile") or request.get("S3Bucket")
1483
        ) and function.latest().config.package_type == PackageType.Image:
1484
            raise InvalidParameterValueException(
1✔
1485
                "Please provide ImageUri when updating a function with packageType Image.",
1486
                Type="User",
1487
            )
1488
        elif request.get("ImageUri") and function.latest().config.package_type == PackageType.Zip:
1✔
1489
            raise InvalidParameterValueException(
1✔
1490
                "Please don't provide ImageUri when updating a function with packageType Zip.",
1491
                Type="User",
1492
            )
1493

1494
        if publish_to := request.get("PublishTo"):
1✔
1495
            self._validate_publish_to(publish_to)
×
1496

1497
        if zip_file := request.get("ZipFile"):
1✔
1498
            code = store_lambda_archive(
1✔
1499
                archive_file=zip_file,
1500
                function_name=function_name,
1501
                region_name=region,
1502
                account_id=account_id,
1503
            )
1504
        elif s3_bucket := request.get("S3Bucket"):
1✔
1505
            s3_key = request["S3Key"]
1✔
1506
            s3_object_version = request.get("S3ObjectVersion")
1✔
1507
            code = store_s3_bucket_archive(
1✔
1508
                archive_bucket=s3_bucket,
1509
                archive_key=s3_key,
1510
                archive_version=s3_object_version,
1511
                function_name=function_name,
1512
                region_name=region,
1513
                account_id=account_id,
1514
            )
1515
        elif image := request.get("ImageUri"):
1✔
1516
            code = None
1✔
1517
            image = create_image_code(image_uri=image)
1✔
1518
        else:
1519
            raise LambdaServiceException("A ZIP file, S3 bucket, or image is required")
×
1520

1521
        old_function_version = function.versions.get("$LATEST")
1✔
1522
        replace_kwargs = {"code": code} if code else {"image": image}
1✔
1523

1524
        if architectures := request.get("Architectures"):
1✔
1525
            if len(architectures) != 1:
×
1526
                raise ValidationException(
×
1527
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1528
                    f"satisfy constraint: Member must have length less than or equal to 1",
1529
                )
1530
            # An empty list of architectures is also forbidden. Further exceptions are tested here for create_function:
1531
            # tests.aws.services.lambda_.test_lambda_api.TestLambdaFunction.test_create_lambda_exceptions
1532
            if architectures[0] not in ARCHITECTURES:
×
1533
                raise ValidationException(
×
1534
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1535
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
1536
                    f"[x86_64, arm64], Member must not be null]",
1537
                )
1538
            replace_kwargs["architectures"] = architectures
×
1539

1540
        config = dataclasses.replace(
1✔
1541
            old_function_version.config,
1542
            internal_revision=short_uid(),
1543
            last_modified=api_utils.generate_lambda_date(),
1544
            last_update=UpdateStatus(
1545
                status=LastUpdateStatus.InProgress,
1546
                code="Creating",
1547
                reason="The function is being created.",
1548
            ),
1549
            **replace_kwargs,
1550
        )
1551
        function_version = dataclasses.replace(old_function_version, config=config)
1✔
1552
        function.versions["$LATEST"] = function_version
1✔
1553

1554
        self.lambda_service.update_version(new_version=function_version)
1✔
1555
        if request.get("Publish"):
1✔
1556
            function_version = self._publish_version_with_changes(
1✔
1557
                function_name=function_name,
1558
                region=region,
1559
                account_id=account_id,
1560
                publish_to=publish_to,
1561
                is_active=True,
1562
            )
1563
        return api_utils.map_config_out(
1✔
1564
            function_version, return_qualified_arn=bool(request.get("Publish"))
1565
        )
1566

1567
    # TODO: does deleting the latest published version affect the next versions number?
1568
    # TODO: what happens when we call this with a qualifier and a fully qualified ARN? (+ conflicts?)
1569
    # TODO: test different ARN patterns (shorthand ARN?)
1570
    # TODO: test deleting across regions?
1571
    # TODO: test mismatch between context region and region in ARN
1572
    # TODO: test qualifier $LATEST, alias-name and version
1573
    def delete_function(
1✔
1574
        self,
1575
        context: RequestContext,
1576
        function_name: NamespacedFunctionName,
1577
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1578
        **kwargs,
1579
    ) -> DeleteFunctionResponse:
1580
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1581
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1582
            function_name, qualifier, context
1583
        )
1584

1585
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1586
            raise InvalidParameterValueException(
×
1587
                "Deletion of aliases is not currently supported.",
1588
                Type="User",
1589
            )
1590

1591
        store = lambda_stores[account_id][region]
1✔
1592
        if qualifier == "$LATEST":
1✔
1593
            raise InvalidParameterValueException(
1✔
1594
                "$LATEST version cannot be deleted without deleting the function.", Type="User"
1595
            )
1596

1597
        if function_name not in store.functions:
1✔
1598
            e = ResourceNotFoundException(
1✔
1599
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1600
                Type="User",
1601
            )
1602
            raise e
1✔
1603
        function = store.functions.get(function_name)
1✔
1604

1605
        function_has_capacity_provider = False
1✔
1606
        if qualifier:
1✔
1607
            # delete a version of the function
1608
            version = function.versions.pop(qualifier, None)
1✔
1609
            if version:
1✔
1610
                if version.config.CapacityProviderConfig:
1✔
1611
                    function_has_capacity_provider = True
×
1612
                self.lambda_service.stop_version(version.id.qualified_arn())
1✔
1613
                destroy_code_if_not_used(code=version.config.code, function=function)
1✔
1614
        else:
1615
            # delete the whole function
1616
            # TODO: introduce locking for safe deletion: We could create a new version at the API layer before
1617
            #  the old version gets cleaned up in the internal lambda service.
1618
            function = store.functions.pop(function_name)
1✔
1619
            for version in function.versions.values():
1✔
1620
                # Functions with a capacity provider do NOT have a version manager for $LATEST because only
1621
                # published versions are invokable.
1622
                if version.config.CapacityProviderConfig:
1✔
1623
                    function_has_capacity_provider = True
×
1624
                    if version.id.qualifier == "$LATEST":
×
1625
                        pass
×
1626
                else:
1627
                    self.lambda_service.stop_version(qualified_arn=version.id.qualified_arn())
1✔
1628
                # we can safely destroy the code here
1629
                if version.config.code:
1✔
1630
                    version.config.code.destroy()
1✔
1631

1632
        return DeleteFunctionResponse(StatusCode=202 if function_has_capacity_provider else 204)
1✔
1633

1634
    def list_functions(
1✔
1635
        self,
1636
        context: RequestContext,
1637
        master_region: MasterRegion = None,  # (only relevant for lambda@edge)
1638
        function_version: FunctionVersionApi = None,
1639
        marker: String = None,
1640
        max_items: MaxListItems = None,
1641
        **kwargs,
1642
    ) -> ListFunctionsResponse:
1643
        state = lambda_stores[context.account_id][context.region]
1✔
1644

1645
        if function_version and function_version != FunctionVersionApi.ALL:
1✔
1646
            raise ValidationException(
1✔
1647
                f"1 validation error detected: Value '{function_version}'"
1648
                + " at 'functionVersion' failed to satisfy constraint: Member must satisfy enum value set: [ALL]"
1649
            )
1650

1651
        if function_version == FunctionVersionApi.ALL:
1✔
1652
            # include all versions for all function
1653
            versions = [v for f in state.functions.values() for v in f.versions.values()]
1✔
1654
            return_qualified_arn = True
1✔
1655
        else:
1656
            versions = [f.latest() for f in state.functions.values()]
1✔
1657
            return_qualified_arn = False
1✔
1658

1659
        versions = [
1✔
1660
            api_utils.map_to_list_response(
1661
                api_utils.map_config_out(fc, return_qualified_arn=return_qualified_arn)
1662
            )
1663
            for fc in versions
1664
        ]
1665
        versions = PaginatedList(versions)
1✔
1666
        page, token = versions.get_page(
1✔
1667
            lambda version: version["FunctionArn"],
1668
            marker,
1669
            max_items,
1670
        )
1671
        return ListFunctionsResponse(Functions=page, NextMarker=token)
1✔
1672

1673
    def get_function(
1✔
1674
        self,
1675
        context: RequestContext,
1676
        function_name: NamespacedFunctionName,
1677
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1678
        **kwargs,
1679
    ) -> GetFunctionResponse:
1680
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1681
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1682
            function_name, qualifier, context
1683
        )
1684

1685
        fn = lambda_stores[account_id][region].functions.get(function_name)
1✔
1686
        if fn is None:
1✔
1687
            if qualifier is None:
1✔
1688
                raise ResourceNotFoundException(
1✔
1689
                    f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
1690
                    Type="User",
1691
                )
1692
            else:
1693
                raise ResourceNotFoundException(
1✔
1694
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
1695
                    Type="User",
1696
                )
1697
        alias_name = None
1✔
1698
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1699
            if qualifier not in fn.aliases:
1✔
1700
                alias_arn = api_utils.qualified_lambda_arn(
1✔
1701
                    function_name, qualifier, account_id, region
1702
                )
1703
                raise ResourceNotFoundException(f"Function not found: {alias_arn}", Type="User")
1✔
1704
            alias_name = qualifier
1✔
1705
            qualifier = fn.aliases[alias_name].function_version
1✔
1706

1707
        version = get_function_version(
1✔
1708
            function_name=function_name,
1709
            qualifier=qualifier,
1710
            account_id=account_id,
1711
            region=region,
1712
        )
1713
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
1714
        additional_fields = {}
1✔
1715
        if tags:
1✔
1716
            additional_fields["Tags"] = tags
1✔
1717
        code_location = None
1✔
1718
        if code := version.config.code:
1✔
1719
            code_location = FunctionCodeLocation(
1✔
1720
                Location=code.generate_presigned_url(endpoint_url=config.external_service_url()),
1721
                RepositoryType="S3",
1722
            )
1723
        elif image := version.config.image:
1✔
1724
            code_location = FunctionCodeLocation(
1✔
1725
                ImageUri=image.image_uri,
1726
                RepositoryType=image.repository_type,
1727
                ResolvedImageUri=image.resolved_image_uri,
1728
            )
1729
        concurrency = None
1✔
1730
        if fn.reserved_concurrent_executions:
1✔
1731
            concurrency = Concurrency(
1✔
1732
                ReservedConcurrentExecutions=fn.reserved_concurrent_executions
1733
            )
1734

1735
        return GetFunctionResponse(
1✔
1736
            Configuration=api_utils.map_config_out(
1737
                version, return_qualified_arn=bool(qualifier), alias_name=alias_name
1738
            ),
1739
            Code=code_location,  # TODO
1740
            Concurrency=concurrency,
1741
            **additional_fields,
1742
        )
1743

1744
    def get_function_configuration(
1✔
1745
        self,
1746
        context: RequestContext,
1747
        function_name: NamespacedFunctionName,
1748
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1749
        **kwargs,
1750
    ) -> FunctionConfiguration:
1751
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1752
        # CAVE: THIS RETURN VALUE IS *NOT* THE SAME AS IN get_function (!) but seems to be only configuration part?
1753
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1754
            function_name, qualifier, context
1755
        )
1756
        version = get_function_version(
1✔
1757
            function_name=function_name,
1758
            qualifier=qualifier,
1759
            account_id=account_id,
1760
            region=region,
1761
        )
1762
        return api_utils.map_config_out(version, return_qualified_arn=bool(qualifier))
1✔
1763

1764
    def invoke(
1✔
1765
        self,
1766
        context: RequestContext,
1767
        function_name: NamespacedFunctionName,
1768
        invocation_type: InvocationType | None = None,
1769
        log_type: LogType | None = None,
1770
        client_context: String | None = None,
1771
        durable_execution_name: DurableExecutionName | None = None,
1772
        payload: IO[Blob] | None = None,
1773
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1774
        tenant_id: TenantId | None = None,
1775
        **kwargs,
1776
    ) -> InvocationResponse:
1777
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1778
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1779
            function_name, qualifier, context
1780
        )
1781

1782
        user_agent = context.request.user_agent.string
1✔
1783

1784
        time_before = time.perf_counter()
1✔
1785
        try:
1✔
1786
            invocation_result = self.lambda_service.invoke(
1✔
1787
                function_name=function_name,
1788
                qualifier=qualifier,
1789
                region=region,
1790
                account_id=account_id,
1791
                invocation_type=invocation_type,
1792
                client_context=client_context,
1793
                request_id=context.request_id,
1794
                trace_context=context.trace_context,
1795
                payload=payload.read() if payload else None,
1796
                user_agent=user_agent,
1797
            )
1798
        except ServiceException:
1✔
1799
            raise
1✔
1800
        except EnvironmentStartupTimeoutException as e:
1✔
1801
            raise LambdaServiceException(
1✔
1802
                f"[{context.request_id}] Timeout while starting up lambda environment for function {function_name}:{qualifier}"
1803
            ) from e
1804
        except Exception as e:
1✔
1805
            LOG.error(
1✔
1806
                "[%s] Error while invoking lambda %s",
1807
                context.request_id,
1808
                function_name,
1809
                exc_info=LOG.isEnabledFor(logging.DEBUG),
1810
            )
1811
            raise LambdaServiceException(
1✔
1812
                f"[{context.request_id}] Internal error while executing lambda {function_name}:{qualifier}. Caused by {type(e).__name__}: {e}"
1813
            ) from e
1814

1815
        if invocation_type == InvocationType.Event:
1✔
1816
            # This happens when invocation type is event
1817
            return InvocationResponse(StatusCode=202)
1✔
1818
        if invocation_type == InvocationType.DryRun:
1✔
1819
            # This happens when invocation type is dryrun
1820
            return InvocationResponse(StatusCode=204)
1✔
1821
        LOG.debug("Lambda invocation duration: %0.2fms", (time.perf_counter() - time_before) * 1000)
1✔
1822

1823
        response = InvocationResponse(
1✔
1824
            StatusCode=200,
1825
            Payload=invocation_result.payload,
1826
            ExecutedVersion=invocation_result.executed_version,
1827
        )
1828

1829
        if invocation_result.is_error:
1✔
1830
            response["FunctionError"] = "Unhandled"
1✔
1831

1832
        if log_type == LogType.Tail:
1✔
1833
            response["LogResult"] = to_str(
1✔
1834
                base64.b64encode(to_bytes(invocation_result.logs)[-4096:])
1835
            )
1836

1837
        return response
1✔
1838

1839
    # Version operations
1840
    def publish_version(
1✔
1841
        self,
1842
        context: RequestContext,
1843
        function_name: FunctionName,
1844
        code_sha256: String | None = None,
1845
        description: Description | None = None,
1846
        revision_id: String | None = None,
1847
        publish_to: FunctionVersionLatestPublished | None = None,
1848
        **kwargs,
1849
    ) -> FunctionConfiguration:
1850
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1851
        function_name = api_utils.get_function_name(function_name, context)
1✔
1852
        if publish_to:
1✔
1853
            self._validate_publish_to(publish_to)
×
1854
        new_version = self._publish_version_from_existing_version(
1✔
1855
            function_name=function_name,
1856
            description=description,
1857
            account_id=account_id,
1858
            region=region,
1859
            revision_id=revision_id,
1860
            code_sha256=code_sha256,
1861
            publish_to=publish_to,
1862
        )
1863
        return api_utils.map_config_out(new_version, return_qualified_arn=True)
1✔
1864

1865
    def list_versions_by_function(
1✔
1866
        self,
1867
        context: RequestContext,
1868
        function_name: NamespacedFunctionName,
1869
        marker: String = None,
1870
        max_items: MaxListItems = None,
1871
        **kwargs,
1872
    ) -> ListVersionsByFunctionResponse:
1873
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1874
        function_name = api_utils.get_function_name(function_name, context)
1✔
1875
        function = self._get_function(
1✔
1876
            function_name=function_name, region=region, account_id=account_id
1877
        )
1878
        versions = [
1✔
1879
            api_utils.map_to_list_response(
1880
                api_utils.map_config_out(version=version, return_qualified_arn=True)
1881
            )
1882
            for version in function.versions.values()
1883
        ]
1884
        items = PaginatedList(versions)
1✔
1885
        page, token = items.get_page(
1✔
1886
            lambda item: item,
1887
            marker,
1888
            max_items,
1889
        )
1890
        return ListVersionsByFunctionResponse(Versions=page, NextMarker=token)
1✔
1891

1892
    # Alias
1893

1894
    def _create_routing_config_model(
1✔
1895
        self, routing_config_dict: dict[str, float], function_version: FunctionVersion
1896
    ):
1897
        if len(routing_config_dict) > 1:
1✔
1898
            raise InvalidParameterValueException(
1✔
1899
                "Number of items in AdditionalVersionWeights cannot be greater than 1",
1900
                Type="User",
1901
            )
1902
        # should be exactly one item here, still iterating, might be supported in the future
1903
        for key, value in routing_config_dict.items():
1✔
1904
            if value < 0.0 or value >= 1.0:
1✔
1905
                raise ValidationException(
1✔
1906
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map value must satisfy constraint: [Member must have value less than or equal to 1.0, Member must have value greater than or equal to 0.0, Member must not be null]"
1907
                )
1908
            if key == function_version.id.qualifier:
1✔
1909
                raise InvalidParameterValueException(
1✔
1910
                    f"Invalid function version {function_version.id.qualifier}. Function version {function_version.id.qualifier} is already included in routing configuration.",
1911
                    Type="User",
1912
                )
1913
            # check if version target is latest, then no routing config is allowed
1914
            if function_version.id.qualifier == "$LATEST":
1✔
1915
                raise InvalidParameterValueException(
1✔
1916
                    "$LATEST is not supported for an alias pointing to more than 1 version"
1917
                )
1918
            if not api_utils.qualifier_is_version(key):
1✔
1919
                raise ValidationException(
1✔
1920
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map keys must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: [0-9]+]"
1921
                )
1922

1923
            # checking if the version in the config exists
1924
            get_function_version(
1✔
1925
                function_name=function_version.id.function_name,
1926
                qualifier=key,
1927
                region=function_version.id.region,
1928
                account_id=function_version.id.account,
1929
            )
1930
        return AliasRoutingConfig(version_weights=routing_config_dict)
1✔
1931

1932
    def create_alias(
1✔
1933
        self,
1934
        context: RequestContext,
1935
        function_name: FunctionName,
1936
        name: Alias,
1937
        function_version: VersionWithLatestPublished,
1938
        description: Description = None,
1939
        routing_config: AliasRoutingConfiguration = None,
1940
        **kwargs,
1941
    ) -> AliasConfiguration:
1942
        if not api_utils.qualifier_is_alias(name):
1✔
1943
            raise ValidationException(
1✔
1944
                f"1 validation error detected: Value '{name}' at 'name' failed to satisfy constraint: Member must satisfy regular expression pattern: (?!^[0-9]+$)([a-zA-Z0-9-_]+)"
1945
            )
1946

1947
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1948
        function_name = api_utils.get_function_name(function_name, context)
1✔
1949
        target_version = get_function_version(
1✔
1950
            function_name=function_name,
1951
            qualifier=function_version,
1952
            region=region,
1953
            account_id=account_id,
1954
        )
1955
        function = self._get_function(
1✔
1956
            function_name=function_name, region=region, account_id=account_id
1957
        )
1958
        # description is always present, if not specified it's an empty string
1959
        description = description or ""
1✔
1960
        with function.lock:
1✔
1961
            if existing_alias := function.aliases.get(name):
1✔
1962
                raise ResourceConflictException(
1✔
1963
                    f"Alias already exists: {api_utils.map_alias_out(alias=existing_alias, function=function)['AliasArn']}",
1964
                    Type="User",
1965
                )
1966
            # checking if the version exists
1967
            routing_configuration = None
1✔
1968
            if routing_config and (
1✔
1969
                routing_config_dict := routing_config.get("AdditionalVersionWeights")
1970
            ):
1971
                routing_configuration = self._create_routing_config_model(
1✔
1972
                    routing_config_dict, target_version
1973
                )
1974

1975
            alias = VersionAlias(
1✔
1976
                name=name,
1977
                function_version=function_version,
1978
                description=description,
1979
                routing_configuration=routing_configuration,
1980
            )
1981
            function.aliases[name] = alias
1✔
1982
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1983

1984
    def list_aliases(
1✔
1985
        self,
1986
        context: RequestContext,
1987
        function_name: FunctionName,
1988
        function_version: VersionWithLatestPublished = None,
1989
        marker: String = None,
1990
        max_items: MaxListItems = None,
1991
        **kwargs,
1992
    ) -> ListAliasesResponse:
1993
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1994
        function_name = api_utils.get_function_name(function_name, context)
1✔
1995
        function = self._get_function(
1✔
1996
            function_name=function_name, region=region, account_id=account_id
1997
        )
1998
        aliases = [
1✔
1999
            api_utils.map_alias_out(alias, function)
2000
            for alias in function.aliases.values()
2001
            if function_version is None or alias.function_version == function_version
2002
        ]
2003

2004
        aliases = PaginatedList(aliases)
1✔
2005
        page, token = aliases.get_page(
1✔
2006
            lambda alias: alias["AliasArn"],
2007
            marker,
2008
            max_items,
2009
        )
2010

2011
        return ListAliasesResponse(Aliases=page, NextMarker=token)
1✔
2012

2013
    def delete_alias(
1✔
2014
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
2015
    ) -> None:
2016
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2017
        function_name = api_utils.get_function_name(function_name, context)
1✔
2018
        function = self._get_function(
1✔
2019
            function_name=function_name, region=region, account_id=account_id
2020
        )
2021
        version_alias = function.aliases.pop(name, None)
1✔
2022

2023
        # cleanup related resources
2024
        if name in function.provisioned_concurrency_configs:
1✔
2025
            function.provisioned_concurrency_configs.pop(name)
1✔
2026

2027
        # TODO: Allow for deactivating/unregistering specific Lambda URLs
2028
        if version_alias and name in function.function_url_configs:
1✔
2029
            url_config = function.function_url_configs.pop(name)
1✔
2030
            LOG.debug(
1✔
2031
                "Stopping aliased Lambda Function URL %s for %s",
2032
                url_config.url,
2033
                url_config.function_name,
2034
            )
2035

2036
    def get_alias(
1✔
2037
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
2038
    ) -> AliasConfiguration:
2039
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2040
        function_name = api_utils.get_function_name(function_name, context)
1✔
2041
        function = self._get_function(
1✔
2042
            function_name=function_name, region=region, account_id=account_id
2043
        )
2044
        if not (alias := function.aliases.get(name)):
1✔
2045
            raise ResourceNotFoundException(
1✔
2046
                f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name=function_name, qualifier=name, region=region, account=account_id)}",
2047
                Type="User",
2048
            )
2049
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
2050

2051
    def update_alias(
1✔
2052
        self,
2053
        context: RequestContext,
2054
        function_name: FunctionName,
2055
        name: Alias,
2056
        function_version: VersionWithLatestPublished = None,
2057
        description: Description = None,
2058
        routing_config: AliasRoutingConfiguration = None,
2059
        revision_id: String = None,
2060
        **kwargs,
2061
    ) -> AliasConfiguration:
2062
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2063
        function_name = api_utils.get_function_name(function_name, context)
1✔
2064
        function = self._get_function(
1✔
2065
            function_name=function_name, region=region, account_id=account_id
2066
        )
2067
        if not (alias := function.aliases.get(name)):
1✔
2068
            fn_arn = api_utils.qualified_lambda_arn(function_name, name, account_id, region)
1✔
2069
            raise ResourceNotFoundException(
1✔
2070
                f"Alias not found: {fn_arn}",
2071
                Type="User",
2072
            )
2073
        if revision_id and alias.revision_id != revision_id:
1✔
2074
            raise PreconditionFailedException(
1✔
2075
                "The Revision Id provided does not match the latest Revision Id. "
2076
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2077
                Type="User",
2078
            )
2079
        changes = {}
1✔
2080
        if function_version is not None:
1✔
2081
            changes |= {"function_version": function_version}
1✔
2082
        if description is not None:
1✔
2083
            changes |= {"description": description}
1✔
2084
        if routing_config is not None:
1✔
2085
            # if it is an empty dict or AdditionalVersionWeights is empty, set routing config to None
2086
            new_routing_config = None
1✔
2087
            if routing_config_dict := routing_config.get("AdditionalVersionWeights"):
1✔
2088
                new_routing_config = self._create_routing_config_model(routing_config_dict)
×
2089
            changes |= {"routing_configuration": new_routing_config}
1✔
2090
        # even if no changes are done, we have to update revision id for some reason
2091
        old_alias = alias
1✔
2092
        alias = dataclasses.replace(alias, **changes)
1✔
2093
        function.aliases[name] = alias
1✔
2094

2095
        # TODO: signal lambda service that pointer potentially changed
2096
        self.lambda_service.update_alias(old_alias=old_alias, new_alias=alias, function=function)
1✔
2097

2098
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
2099

2100
    # =======================================
2101
    # ======= EVENT SOURCE MAPPINGS =========
2102
    # =======================================
2103
    def check_service_resource_exists(
1✔
2104
        self, service: str, resource_arn: str, function_arn: str, function_role_arn: str
2105
    ):
2106
        """
2107
        Check if the service resource exists and if the function has access to it.
2108

2109
        Raises:
2110
            InvalidParameterValueException: If the service resource does not exist or the function does not have access to it.
2111
        """
2112
        arn = parse_arn(resource_arn)
1✔
2113
        source_client = get_internal_client(
1✔
2114
            arn=resource_arn,
2115
            role_arn=function_role_arn,
2116
            service_principal=ServicePrincipal.lambda_,
2117
            source_arn=function_arn,
2118
        )
2119
        if service in ["sqs", "sqs-fifo"]:
1✔
2120
            try:
1✔
2121
                # AWS uses `GetQueueAttributes` internally to verify the queue existence, but we need the `QueueUrl`
2122
                # which is not given directly. We build out a dummy `QueueUrl` which can be parsed by SQS to return
2123
                # the right value
2124
                queue_name = arn["resource"].split("/")[-1]
1✔
2125
                queue_url = f"http://sqs.{arn['region']}.domain/{arn['account']}/{queue_name}"
1✔
2126
                source_client.get_queue_attributes(QueueUrl=queue_url)
1✔
2127
            except ClientError as e:
1✔
2128
                error_code = e.response["Error"]["Code"]
1✔
2129
                if error_code == "AWS.SimpleQueueService.NonExistentQueue":
1✔
2130
                    raise InvalidParameterValueException(
1✔
2131
                        f"Error occurred while ReceiveMessage. SQS Error Code: {error_code}. SQS Error Message: {e.response['Error']['Message']}",
2132
                        Type="User",
2133
                    )
2134
                raise e
×
2135
        elif service in ["kinesis"]:
1✔
2136
            try:
1✔
2137
                source_client.describe_stream(StreamARN=resource_arn)
1✔
2138
            except ClientError as e:
1✔
2139
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
2140
                    raise InvalidParameterValueException(
1✔
2141
                        f"Stream not found: {resource_arn}",
2142
                        Type="User",
2143
                    )
2144
                raise e
×
2145
        elif service in ["dynamodb"]:
1✔
2146
            try:
1✔
2147
                source_client.describe_stream(StreamArn=resource_arn)
1✔
2148
            except ClientError as e:
1✔
2149
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
2150
                    raise InvalidParameterValueException(
1✔
2151
                        f"Stream not found: {resource_arn}",
2152
                        Type="User",
2153
                    )
2154
                raise e
×
2155

2156
    @handler("CreateEventSourceMapping", expand=False)
1✔
2157
    def create_event_source_mapping(
1✔
2158
        self,
2159
        context: RequestContext,
2160
        request: CreateEventSourceMappingRequest,
2161
    ) -> EventSourceMappingConfiguration:
2162
        return self.create_event_source_mapping_v2(context, request)
1✔
2163

2164
    def create_event_source_mapping_v2(
1✔
2165
        self,
2166
        context: RequestContext,
2167
        request: CreateEventSourceMappingRequest,
2168
    ) -> EventSourceMappingConfiguration:
2169
        # Validations
2170
        function_arn, function_name, state, function_version, function_role = (
1✔
2171
            self.validate_event_source_mapping(context, request)
2172
        )
2173

2174
        esm_config = EsmConfigFactory(request, context, function_arn).get_esm_config()
1✔
2175

2176
        # Copy esm_config to avoid a race condition with potential async update in the store
2177
        state.event_source_mappings[esm_config["UUID"]] = esm_config.copy()
1✔
2178
        enabled = request.get("Enabled", True)
1✔
2179
        # TODO: check for potential async race condition update -> think about locking
2180
        esm_worker = EsmWorkerFactory(esm_config, function_role, enabled).get_esm_worker()
1✔
2181
        self.esm_workers[esm_worker.uuid] = esm_worker
1✔
2182
        # TODO: check StateTransitionReason, LastModified, LastProcessingResult (concurrent updates requires locking!)
2183
        if tags := request.get("Tags"):
1✔
2184
            self._store_tags(esm_config.get("EventSourceMappingArn"), tags)
1✔
2185
        esm_worker.create()
1✔
2186
        return esm_config
1✔
2187

2188
    def validate_event_source_mapping(self, context, request):
1✔
2189
        # TODO: test whether stream ARNs are valid sources for Pipes or ESM or whether only DynamoDB table ARNs work
2190
        # TODO: Validate MaxRecordAgeInSeconds (i.e cannot subceed 60s but can be -1) and MaxRetryAttempts parameters.
2191
        # See https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumrecordageinseconds
2192
        is_create_esm_request = context.operation.name == self.create_event_source_mapping.operation
1✔
2193

2194
        if destination_config := request.get("DestinationConfig"):
1✔
2195
            if "OnSuccess" in destination_config:
1✔
2196
                raise InvalidParameterValueException(
1✔
2197
                    "Unsupported DestinationConfig parameter for given event source mapping type.",
2198
                    Type="User",
2199
                )
2200

2201
        service = None
1✔
2202
        if "SelfManagedEventSource" in request:
1✔
2203
            service = "kafka"
×
2204
            if "SourceAccessConfigurations" not in request:
×
2205
                raise InvalidParameterValueException(
×
2206
                    "Required 'sourceAccessConfigurations' parameter is missing.", Type="User"
2207
                )
2208
        if service is None and "EventSourceArn" not in request:
1✔
2209
            raise InvalidParameterValueException("Unrecognized event source.", Type="User")
1✔
2210
        if service is None:
1✔
2211
            service = extract_service_from_arn(request["EventSourceArn"])
1✔
2212

2213
        batch_size = api_utils.validate_and_set_batch_size(service, request.get("BatchSize"))
1✔
2214
        if service in ["dynamodb", "kinesis"]:
1✔
2215
            starting_position = request.get("StartingPosition")
1✔
2216
            if not starting_position:
1✔
2217
                raise InvalidParameterValueException(
1✔
2218
                    "1 validation error detected: Value null at 'startingPosition' failed to satisfy constraint: Member must not be null.",
2219
                    Type="User",
2220
                )
2221

2222
            if starting_position not in KinesisStreamStartPosition.__members__:
1✔
2223
                raise ValidationException(
1✔
2224
                    f"1 validation error detected: Value '{starting_position}' at 'startingPosition' failed to satisfy constraint: Member must satisfy enum value set: [LATEST, AT_TIMESTAMP, TRIM_HORIZON]"
2225
                )
2226
            # AT_TIMESTAMP is not allowed for DynamoDB Streams
2227
            elif (
1✔
2228
                service == "dynamodb"
2229
                and starting_position not in DynamoDBStreamStartPosition.__members__
2230
            ):
2231
                raise InvalidParameterValueException(
1✔
2232
                    f"Unsupported starting position for arn type: {request['EventSourceArn']}",
2233
                    Type="User",
2234
                )
2235

2236
        if service in ["sqs", "sqs-fifo"]:
1✔
2237
            if batch_size > 10 and request.get("MaximumBatchingWindowInSeconds", 0) == 0:
1✔
2238
                raise InvalidParameterValueException(
1✔
2239
                    "Maximum batch window in seconds must be greater than 0 if maximum batch size is greater than 10",
2240
                    Type="User",
2241
                )
2242

2243
        if (filter_criteria := request.get("FilterCriteria")) is not None:
1✔
2244
            for filter_ in filter_criteria.get("Filters", []):
1✔
2245
                pattern_str = filter_.get("Pattern")
1✔
2246
                if not pattern_str or not isinstance(pattern_str, str):
1✔
2247
                    raise InvalidParameterValueException(
×
2248
                        "Invalid filter pattern definition.", Type="User"
2249
                    )
2250

2251
                if not validate_event_pattern(pattern_str):
1✔
2252
                    raise InvalidParameterValueException(
1✔
2253
                        "Invalid filter pattern definition.", Type="User"
2254
                    )
2255

2256
        # Can either have a FunctionName (i.e CreateEventSourceMapping request) or
2257
        # an internal EventSourceMappingConfiguration representation
2258
        request_function_name = request.get("FunctionName") or request.get("FunctionArn")
1✔
2259
        # can be either a partial arn or a full arn for the version/alias
2260
        function_name, qualifier, account, region = function_locators_from_arn(
1✔
2261
            request_function_name
2262
        )
2263
        # TODO: validate `context.region` vs. `region(request["FunctionName"])` vs. `region(request["EventSourceArn"])`
2264
        account = account or context.account_id
1✔
2265
        region = region or context.region
1✔
2266
        state = lambda_stores[account][region]
1✔
2267
        fn = state.functions.get(function_name)
1✔
2268
        if not fn:
1✔
2269
            raise InvalidParameterValueException("Function does not exist", Type="User")
1✔
2270

2271
        if qualifier:
1✔
2272
            # make sure the function version/alias exists
2273
            if api_utils.qualifier_is_alias(qualifier):
1✔
2274
                fn_alias = fn.aliases.get(qualifier)
1✔
2275
                if not fn_alias:
1✔
2276
                    raise Exception("unknown alias")  # TODO: cover via test
×
2277
            elif api_utils.qualifier_is_version(qualifier):
1✔
2278
                fn_version = fn.versions.get(qualifier)
1✔
2279
                if not fn_version:
1✔
2280
                    raise Exception("unknown version")  # TODO: cover via test
×
2281
            elif qualifier == "$LATEST":
1✔
2282
                pass
1✔
2283
            elif qualifier == "$LATEST.PUBLISHED":
×
2284
                if fn.versions.get(qualifier):
×
2285
                    pass
×
2286
            else:
2287
                raise Exception("invalid functionname")  # TODO: cover via test
×
2288
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account, region)
1✔
2289

2290
        else:
2291
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account, region)
1✔
2292

2293
        function_version = get_function_version_from_arn(fn_arn)
1✔
2294
        function_role = function_version.config.role
1✔
2295

2296
        if source_arn := request.get("EventSourceArn"):
1✔
2297
            self.check_service_resource_exists(service, source_arn, fn_arn, function_role)
1✔
2298
        # Check we are validating a CreateEventSourceMapping request
2299
        if is_create_esm_request:
1✔
2300

2301
            def _get_mapping_sources(mapping: dict[str, Any]) -> list[str]:
1✔
2302
                if event_source_arn := mapping.get("EventSourceArn"):
1✔
2303
                    return [event_source_arn]
1✔
2304
                return (
×
2305
                    mapping.get("SelfManagedEventSource", {})
2306
                    .get("Endpoints", {})
2307
                    .get("KAFKA_BOOTSTRAP_SERVERS", [])
2308
                )
2309

2310
            # check for event source duplicates
2311
            # TODO: currently validated for sqs, kinesis, and dynamodb
2312
            service_id = load_service(service).service_id
1✔
2313
            for uuid, mapping in state.event_source_mappings.items():
1✔
2314
                mapping_sources = _get_mapping_sources(mapping)
1✔
2315
                request_sources = _get_mapping_sources(request)
1✔
2316
                if mapping["FunctionArn"] == fn_arn and (
1✔
2317
                    set(mapping_sources).intersection(request_sources)
2318
                ):
2319
                    if service == "sqs":
1✔
2320
                        # *shakes fist at SQS*
2321
                        raise ResourceConflictException(
1✔
2322
                            f'An event source mapping with {service_id} arn (" {mapping["EventSourceArn"]} ") '
2323
                            f'and function (" {function_name} ") already exists. Please update or delete the '
2324
                            f"existing mapping with UUID {uuid}",
2325
                            Type="User",
2326
                        )
2327
                    elif service == "kafka":
1✔
2328
                        if set(mapping["Topics"]).intersection(request["Topics"]):
×
2329
                            raise ResourceConflictException(
×
2330
                                f'An event source mapping with event source ("{",".join(request_sources)}"), '
2331
                                f'function ("{fn_arn}"), '
2332
                                f'topics ("{",".join(request["Topics"])}") already exists. Please update or delete the '
2333
                                f"existing mapping with UUID {uuid}",
2334
                                Type="User",
2335
                            )
2336
                    else:
2337
                        raise ResourceConflictException(
1✔
2338
                            f'The event source arn (" {mapping["EventSourceArn"]} ") and function '
2339
                            f'(" {function_name} ") provided mapping already exists. Please update or delete the '
2340
                            f"existing mapping with UUID {uuid}",
2341
                            Type="User",
2342
                        )
2343
        return fn_arn, function_name, state, function_version, function_role
1✔
2344

2345
    @handler("UpdateEventSourceMapping", expand=False)
1✔
2346
    def update_event_source_mapping(
1✔
2347
        self,
2348
        context: RequestContext,
2349
        request: UpdateEventSourceMappingRequest,
2350
    ) -> EventSourceMappingConfiguration:
2351
        return self.update_event_source_mapping_v2(context, request)
1✔
2352

2353
    def update_event_source_mapping_v2(
1✔
2354
        self,
2355
        context: RequestContext,
2356
        request: UpdateEventSourceMappingRequest,
2357
    ) -> EventSourceMappingConfiguration:
2358
        # TODO: test and implement this properly (quite complex with many validations and limitations!)
2359
        LOG.warning(
1✔
2360
            "Updating Lambda Event Source Mapping is in experimental state and not yet fully tested."
2361
        )
2362
        state = lambda_stores[context.account_id][context.region]
1✔
2363
        request_data = {**request}
1✔
2364
        uuid = request_data.pop("UUID", None)
1✔
2365
        if not uuid:
1✔
2366
            raise ResourceNotFoundException(
×
2367
                "The resource you requested does not exist.", Type="User"
2368
            )
2369
        old_event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2370
        esm_worker = self.esm_workers.get(uuid)
1✔
2371
        if old_event_source_mapping is None or esm_worker is None:
1✔
2372
            raise ResourceNotFoundException(
1✔
2373
                "The resource you requested does not exist.", Type="User"
2374
            )  # TODO: test?
2375

2376
        # normalize values to overwrite
2377
        event_source_mapping = old_event_source_mapping | request_data
1✔
2378

2379
        temp_params = {}  # values only set for the returned response, not saved internally (e.g. transient state)
1✔
2380

2381
        # Validate the newly updated ESM object. We ignore the output here since we only care whether an Exception is raised.
2382
        function_arn, _, _, function_version, function_role = self.validate_event_source_mapping(
1✔
2383
            context, event_source_mapping
2384
        )
2385

2386
        # remove the FunctionName field
2387
        event_source_mapping.pop("FunctionName", None)
1✔
2388

2389
        if function_arn:
1✔
2390
            event_source_mapping["FunctionArn"] = function_arn
1✔
2391

2392
        # Only apply update if the desired state differs
2393
        enabled = request.get("Enabled")
1✔
2394
        if enabled is not None:
1✔
2395
            if enabled and old_event_source_mapping["State"] != EsmState.ENABLED:
1✔
2396
                event_source_mapping["State"] = EsmState.ENABLING
1✔
2397
            # TODO: What happens when trying to update during an update or failed state?!
2398
            elif not enabled and old_event_source_mapping["State"] == EsmState.ENABLED:
1✔
2399
                event_source_mapping["State"] = EsmState.DISABLING
1✔
2400
        else:
2401
            event_source_mapping["State"] = EsmState.UPDATING
1✔
2402

2403
        # To ensure parity, certain responses need to be immediately returned
2404
        temp_params["State"] = event_source_mapping["State"]
1✔
2405

2406
        state.event_source_mappings[uuid] = event_source_mapping
1✔
2407

2408
        # TODO: Currently, we re-create the entire ESM worker. Look into approach with better performance.
2409
        worker_factory = EsmWorkerFactory(
1✔
2410
            event_source_mapping, function_role, request.get("Enabled", esm_worker.enabled)
2411
        )
2412

2413
        # Get a new ESM worker object but do not active it, since the factory holds all logic for creating new worker from configuration.
2414
        updated_esm_worker = worker_factory.get_esm_worker()
1✔
2415
        self.esm_workers[uuid] = updated_esm_worker
1✔
2416

2417
        # We should stop() the worker since the delete() will remove the ESM from the state mapping.
2418
        esm_worker.stop()
1✔
2419
        # This will either create an EsmWorker in the CREATING state if enabled. Otherwise, the DISABLING state is set.
2420
        updated_esm_worker.create()
1✔
2421

2422
        return {**event_source_mapping, **temp_params}
1✔
2423

2424
    def delete_event_source_mapping(
1✔
2425
        self, context: RequestContext, uuid: String, **kwargs
2426
    ) -> EventSourceMappingConfiguration:
2427
        state = lambda_stores[context.account_id][context.region]
1✔
2428
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2429
        if not event_source_mapping:
1✔
2430
            raise ResourceNotFoundException(
1✔
2431
                "The resource you requested does not exist.", Type="User"
2432
            )
2433
        esm = state.event_source_mappings[uuid]
1✔
2434
        # TODO: add proper locking
2435
        esm_worker = self.esm_workers.pop(uuid, None)
1✔
2436
        # Asynchronous delete in v2
2437
        if not esm_worker:
1✔
2438
            raise ResourceNotFoundException(
×
2439
                "The resource you requested does not exist.", Type="User"
2440
            )
2441
        esm_worker.delete()
1✔
2442
        return {**esm, "State": EsmState.DELETING}
1✔
2443

2444
    def get_event_source_mapping(
1✔
2445
        self, context: RequestContext, uuid: String, **kwargs
2446
    ) -> EventSourceMappingConfiguration:
2447
        state = lambda_stores[context.account_id][context.region]
1✔
2448
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2449
        if not event_source_mapping:
1✔
2450
            raise ResourceNotFoundException(
1✔
2451
                "The resource you requested does not exist.", Type="User"
2452
            )
2453
        esm_worker = self.esm_workers.get(uuid)
1✔
2454
        if not esm_worker:
1✔
UNCOV
2455
            raise ResourceNotFoundException(
×
2456
                "The resource you requested does not exist.", Type="User"
2457
            )
2458
        event_source_mapping["State"] = esm_worker.current_state
1✔
2459
        event_source_mapping["StateTransitionReason"] = esm_worker.state_transition_reason
1✔
2460
        return event_source_mapping
1✔
2461

2462
    def list_event_source_mappings(
1✔
2463
        self,
2464
        context: RequestContext,
2465
        event_source_arn: Arn = None,
2466
        function_name: FunctionName = None,
2467
        marker: String = None,
2468
        max_items: MaxListItems = None,
2469
        **kwargs,
2470
    ) -> ListEventSourceMappingsResponse:
2471
        state = lambda_stores[context.account_id][context.region]
1✔
2472

2473
        esms = state.event_source_mappings.values()
1✔
2474
        # TODO: update and test State and StateTransitionReason for ESM v2
2475

2476
        if event_source_arn:  # TODO: validate pattern
1✔
2477
            esms = [e for e in esms if e.get("EventSourceArn") == event_source_arn]
1✔
2478

2479
        if function_name:
1✔
2480
            esms = [e for e in esms if function_name in e["FunctionArn"]]
1✔
2481

2482
        esms = PaginatedList(esms)
1✔
2483
        page, token = esms.get_page(
1✔
2484
            lambda x: x["UUID"],
2485
            marker,
2486
            max_items,
2487
        )
2488
        return ListEventSourceMappingsResponse(EventSourceMappings=page, NextMarker=token)
1✔
2489

2490
    def get_source_type_from_request(self, request: dict[str, Any]) -> str:
1✔
2491
        if event_source_arn := request.get("EventSourceArn", ""):
×
2492
            service = extract_service_from_arn(event_source_arn)
×
2493
            if service == "sqs" and "fifo" in event_source_arn:
×
2494
                service = "sqs-fifo"
×
2495
            return service
×
2496
        elif request.get("SelfManagedEventSource"):
×
2497
            return "kafka"
×
2498

2499
    # =======================================
2500
    # ============ FUNCTION URLS ============
2501
    # =======================================
2502

2503
    @staticmethod
1✔
2504
    def _validate_qualifier(qualifier: str) -> None:
1✔
2505
        if qualifier in ["$LATEST", "$LATEST.PUBLISHED"] or (
1✔
2506
            qualifier and api_utils.qualifier_is_version(qualifier)
2507
        ):
2508
            raise ValidationException(
1✔
2509
                f"1 validation error detected: Value '{qualifier}' at 'qualifier' failed to satisfy constraint: Member must satisfy regular expression pattern: ((?!^\\d+$)^[0-9a-zA-Z-_]+$)"
2510
            )
2511

2512
    @staticmethod
1✔
2513
    def _validate_invoke_mode(invoke_mode: str) -> None:
1✔
2514
        if invoke_mode and invoke_mode not in [InvokeMode.BUFFERED, InvokeMode.RESPONSE_STREAM]:
1✔
2515
            raise ValidationException(
1✔
2516
                f"1 validation error detected: Value '{invoke_mode}' at 'invokeMode' failed to satisfy constraint: Member must satisfy enum value set: [RESPONSE_STREAM, BUFFERED]"
2517
            )
2518
        if invoke_mode == InvokeMode.RESPONSE_STREAM:
1✔
2519
            # TODO should we actually fail for setting RESPONSE_STREAM?
2520
            #  It should trigger InvokeWithResponseStream which is not implemented
2521
            LOG.warning(
1✔
2522
                "The invokeMode 'RESPONSE_STREAM' is not yet supported on LocalStack. The property is only mocked, the execution will still be 'BUFFERED'"
2523
            )
2524

2525
    # TODO: what happens if function state is not active?
2526
    def create_function_url_config(
1✔
2527
        self,
2528
        context: RequestContext,
2529
        function_name: FunctionName,
2530
        auth_type: FunctionUrlAuthType,
2531
        qualifier: FunctionUrlQualifier = None,
2532
        cors: Cors = None,
2533
        invoke_mode: InvokeMode = None,
2534
        **kwargs,
2535
    ) -> CreateFunctionUrlConfigResponse:
2536
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2537
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2538
            function_name, qualifier, context
2539
        )
2540
        state = lambda_stores[account_id][region]
1✔
2541
        self._validate_qualifier(qualifier)
1✔
2542
        self._validate_invoke_mode(invoke_mode)
1✔
2543

2544
        fn = state.functions.get(function_name)
1✔
2545
        if fn is None:
1✔
2546
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2547

2548
        url_config = fn.function_url_configs.get(qualifier or "$LATEST")
1✔
2549
        if url_config:
1✔
2550
            raise ResourceConflictException(
1✔
2551
                f"Failed to create function url config for [functionArn = {url_config.function_arn}]. Error message:  FunctionUrlConfig exists for this Lambda function",
2552
                Type="User",
2553
            )
2554

2555
        if qualifier and qualifier != "$LATEST" and qualifier not in fn.aliases:
1✔
2556
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2557

2558
        normalized_qualifier = qualifier or "$LATEST"
1✔
2559

2560
        function_arn = (
1✔
2561
            api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
2562
            if qualifier
2563
            else api_utils.unqualified_lambda_arn(function_name, account_id, region)
2564
        )
2565

2566
        custom_id: str | None = None
1✔
2567

2568
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
2569
        if TAG_KEY_CUSTOM_URL in tags:
1✔
2570
            # Note: I really wanted to add verification here that the
2571
            # url_id is unique, so we could surface that to the user ASAP.
2572
            # However, it seems like that information isn't available yet,
2573
            # since (as far as I can tell) we call
2574
            # self.router.register_routes() once, in a single shot, for all
2575
            # of the routes -- and we need to verify that it's unique not
2576
            # just for this particular lambda function, but for the entire
2577
            # lambda provider. Therefore... that idea proved non-trivial!
2578
            custom_id_tag_value = (
1✔
2579
                f"{tags[TAG_KEY_CUSTOM_URL]}-{qualifier}" if qualifier else tags[TAG_KEY_CUSTOM_URL]
2580
            )
2581
            if TAG_KEY_CUSTOM_URL_VALIDATOR.match(custom_id_tag_value):
1✔
2582
                custom_id = custom_id_tag_value
1✔
2583

2584
            else:
2585
                # Note: we're logging here instead of raising to prioritize
2586
                # strict parity with AWS over the localstack-only custom_id
2587
                LOG.warning(
1✔
2588
                    "Invalid custom ID tag value for lambda URL (%s=%s). "
2589
                    "Replaced with default (random id)",
2590
                    TAG_KEY_CUSTOM_URL,
2591
                    custom_id_tag_value,
2592
                )
2593

2594
        # The url_id is the subdomain used for the URL we're creating. This
2595
        # is either created randomly (as in AWS), or can be passed as a tag
2596
        # to the lambda itself (localstack-only).
2597
        url_id: str
2598
        if custom_id is None:
1✔
2599
            url_id = api_utils.generate_random_url_id()
1✔
2600
        else:
2601
            url_id = custom_id
1✔
2602

2603
        host_definition = localstack_host(custom_port=config.GATEWAY_LISTEN[0].port)
1✔
2604
        fn.function_url_configs[normalized_qualifier] = FunctionUrlConfig(
1✔
2605
            function_arn=function_arn,
2606
            function_name=function_name,
2607
            cors=cors,
2608
            url_id=url_id,
2609
            url=f"http://{url_id}.lambda-url.{context.region}.{host_definition.host_and_port()}/",  # TODO: https support
2610
            auth_type=auth_type,
2611
            creation_time=api_utils.generate_lambda_date(),
2612
            last_modified_time=api_utils.generate_lambda_date(),
2613
            invoke_mode=invoke_mode,
2614
        )
2615

2616
        # persist and start URL
2617
        # TODO: implement URL invoke
2618
        api_url_config = api_utils.map_function_url_config(
1✔
2619
            fn.function_url_configs[normalized_qualifier]
2620
        )
2621

2622
        return CreateFunctionUrlConfigResponse(
1✔
2623
            FunctionUrl=api_url_config["FunctionUrl"],
2624
            FunctionArn=api_url_config["FunctionArn"],
2625
            AuthType=api_url_config["AuthType"],
2626
            Cors=api_url_config["Cors"],
2627
            CreationTime=api_url_config["CreationTime"],
2628
            InvokeMode=api_url_config["InvokeMode"],
2629
        )
2630

2631
    def get_function_url_config(
1✔
2632
        self,
2633
        context: RequestContext,
2634
        function_name: FunctionName,
2635
        qualifier: FunctionUrlQualifier = None,
2636
        **kwargs,
2637
    ) -> GetFunctionUrlConfigResponse:
2638
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2639
        state = lambda_stores[account_id][region]
1✔
2640

2641
        fn_name, qualifier = api_utils.get_name_and_qualifier(function_name, qualifier, context)
1✔
2642

2643
        self._validate_qualifier(qualifier)
1✔
2644

2645
        resolved_fn = state.functions.get(fn_name)
1✔
2646
        if not resolved_fn:
1✔
2647
            raise ResourceNotFoundException(
1✔
2648
                "The resource you requested does not exist.", Type="User"
2649
            )
2650

2651
        qualifier = qualifier or "$LATEST"
1✔
2652
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2653
        if not url_config:
1✔
2654
            raise ResourceNotFoundException(
1✔
2655
                "The resource you requested does not exist.", Type="User"
2656
            )
2657

2658
        return api_utils.map_function_url_config(url_config)
1✔
2659

2660
    def update_function_url_config(
1✔
2661
        self,
2662
        context: RequestContext,
2663
        function_name: FunctionName,
2664
        qualifier: FunctionUrlQualifier = None,
2665
        auth_type: FunctionUrlAuthType = None,
2666
        cors: Cors = None,
2667
        invoke_mode: InvokeMode = None,
2668
        **kwargs,
2669
    ) -> UpdateFunctionUrlConfigResponse:
2670
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2671
        state = lambda_stores[account_id][region]
1✔
2672

2673
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2674
            function_name, qualifier, context
2675
        )
2676
        self._validate_qualifier(qualifier)
1✔
2677
        self._validate_invoke_mode(invoke_mode)
1✔
2678

2679
        fn = state.functions.get(function_name)
1✔
2680
        if not fn:
1✔
2681
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2682

2683
        normalized_qualifier = qualifier or "$LATEST"
1✔
2684

2685
        if (
1✔
2686
            api_utils.qualifier_is_alias(normalized_qualifier)
2687
            and normalized_qualifier not in fn.aliases
2688
        ):
2689
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2690

2691
        url_config = fn.function_url_configs.get(normalized_qualifier)
1✔
2692
        if not url_config:
1✔
2693
            raise ResourceNotFoundException(
1✔
2694
                "The resource you requested does not exist.", Type="User"
2695
            )
2696

2697
        changes = {
1✔
2698
            "last_modified_time": api_utils.generate_lambda_date(),
2699
            **({"cors": cors} if cors is not None else {}),
2700
            **({"auth_type": auth_type} if auth_type is not None else {}),
2701
        }
2702

2703
        if invoke_mode:
1✔
2704
            changes["invoke_mode"] = invoke_mode
1✔
2705

2706
        new_url_config = dataclasses.replace(url_config, **changes)
1✔
2707
        fn.function_url_configs[normalized_qualifier] = new_url_config
1✔
2708

2709
        return UpdateFunctionUrlConfigResponse(
1✔
2710
            FunctionUrl=new_url_config.url,
2711
            FunctionArn=new_url_config.function_arn,
2712
            AuthType=new_url_config.auth_type,
2713
            Cors=new_url_config.cors,
2714
            CreationTime=new_url_config.creation_time,
2715
            LastModifiedTime=new_url_config.last_modified_time,
2716
            InvokeMode=new_url_config.invoke_mode,
2717
        )
2718

2719
    def delete_function_url_config(
1✔
2720
        self,
2721
        context: RequestContext,
2722
        function_name: FunctionName,
2723
        qualifier: FunctionUrlQualifier = None,
2724
        **kwargs,
2725
    ) -> None:
2726
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2727
        state = lambda_stores[account_id][region]
1✔
2728

2729
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2730
            function_name, qualifier, context
2731
        )
2732
        self._validate_qualifier(qualifier)
1✔
2733

2734
        resolved_fn = state.functions.get(function_name)
1✔
2735
        if not resolved_fn:
1✔
2736
            raise ResourceNotFoundException(
1✔
2737
                "The resource you requested does not exist.", Type="User"
2738
            )
2739

2740
        qualifier = qualifier or "$LATEST"
1✔
2741
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2742
        if not url_config:
1✔
2743
            raise ResourceNotFoundException(
1✔
2744
                "The resource you requested does not exist.", Type="User"
2745
            )
2746

2747
        del resolved_fn.function_url_configs[qualifier]
1✔
2748

2749
    def list_function_url_configs(
1✔
2750
        self,
2751
        context: RequestContext,
2752
        function_name: FunctionName,
2753
        marker: String = None,
2754
        max_items: MaxItems = None,
2755
        **kwargs,
2756
    ) -> ListFunctionUrlConfigsResponse:
2757
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2758
        state = lambda_stores[account_id][region]
1✔
2759

2760
        fn_name = api_utils.get_function_name(function_name, context)
1✔
2761
        resolved_fn = state.functions.get(fn_name)
1✔
2762
        if not resolved_fn:
1✔
2763
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2764

2765
        url_configs = [
1✔
2766
            api_utils.map_function_url_config(fn_conf)
2767
            for fn_conf in resolved_fn.function_url_configs.values()
2768
        ]
2769
        url_configs = PaginatedList(url_configs)
1✔
2770
        page, token = url_configs.get_page(
1✔
2771
            lambda url_config: url_config["FunctionArn"],
2772
            marker,
2773
            max_items,
2774
        )
2775
        url_configs = page
1✔
2776
        return ListFunctionUrlConfigsResponse(FunctionUrlConfigs=url_configs, NextMarker=token)
1✔
2777

2778
    # =======================================
2779
    # ============  Permissions  ============
2780
    # =======================================
2781

2782
    @handler("AddPermission", expand=False)
1✔
2783
    def add_permission(
1✔
2784
        self,
2785
        context: RequestContext,
2786
        request: AddPermissionRequest,
2787
    ) -> AddPermissionResponse:
2788
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2789
            request.get("FunctionName"), request.get("Qualifier"), context
2790
        )
2791

2792
        # validate qualifier
2793
        if qualifier is not None:
1✔
2794
            self._validate_qualifier_expression(qualifier)
1✔
2795
            if qualifier == "$LATEST":
1✔
2796
                raise InvalidParameterValueException(
1✔
2797
                    "We currently do not support adding policies for $LATEST.", Type="User"
2798
                )
2799
        account_id, region = api_utils.get_account_and_region(request.get("FunctionName"), context)
1✔
2800

2801
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2802
        resolved_qualifier, fn_arn = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2803

2804
        revision_id = request.get("RevisionId")
1✔
2805
        if revision_id:
1✔
2806
            fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2807
            if revision_id != fn_revision_id:
1✔
2808
                raise PreconditionFailedException(
1✔
2809
                    "The Revision Id provided does not match the latest Revision Id. "
2810
                    "Call the GetPolicy API to retrieve the latest Revision Id",
2811
                    Type="User",
2812
                )
2813

2814
        request_sid = request["StatementId"]
1✔
2815
        if not bool(STATEMENT_ID_REGEX.match(request_sid)):
1✔
2816
            raise ValidationException(
1✔
2817
                f"1 validation error detected: Value '{request_sid}' at 'statementId' failed to satisfy constraint: Member must satisfy regular expression pattern: ([a-zA-Z0-9-_]+)"
2818
            )
2819
        # check for an already existing policy and any conflicts in existing statements
2820
        existing_policy = resolved_fn.permissions.get(resolved_qualifier)
1✔
2821
        if existing_policy:
1✔
2822
            if request_sid in [s["Sid"] for s in existing_policy.policy.Statement]:
1✔
2823
                # uniqueness scope: statement id needs to be unique per qualified function ($LATEST, version, or alias)
2824
                # Counterexample: the same sid can exist within $LATEST, version, and alias
2825
                raise ResourceConflictException(
1✔
2826
                    f"The statement id ({request_sid}) provided already exists. Please provide a new statement id, or remove the existing statement.",
2827
                    Type="User",
2828
                )
2829

2830
        permission_statement = api_utils.build_statement(
1✔
2831
            partition=context.partition,
2832
            resource_arn=fn_arn,
2833
            statement_id=request["StatementId"],
2834
            action=request["Action"],
2835
            principal=request["Principal"],
2836
            source_arn=request.get("SourceArn"),
2837
            source_account=request.get("SourceAccount"),
2838
            principal_org_id=request.get("PrincipalOrgID"),
2839
            event_source_token=request.get("EventSourceToken"),
2840
            auth_type=request.get("FunctionUrlAuthType"),
2841
        )
2842
        new_policy = existing_policy
1✔
2843
        if not existing_policy:
1✔
2844
            new_policy = FunctionResourcePolicy(
1✔
2845
                policy=ResourcePolicy(Version="2012-10-17", Id="default", Statement=[])
2846
            )
2847
        new_policy.policy.Statement.append(permission_statement)
1✔
2848
        if not existing_policy:
1✔
2849
            resolved_fn.permissions[resolved_qualifier] = new_policy
1✔
2850

2851
        # Update revision id of alias or version
2852
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2853
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2854
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2855
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2856
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
1✔
2857
        # Assumes that a non-alias is a version
2858
        else:
2859
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2860
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2861
                resolved_version, config=dataclasses.replace(resolved_version.config)
2862
            )
2863
        return AddPermissionResponse(Statement=json.dumps(permission_statement))
1✔
2864

2865
    def remove_permission(
1✔
2866
        self,
2867
        context: RequestContext,
2868
        function_name: NamespacedFunctionName,
2869
        statement_id: NamespacedStatementId,
2870
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
2871
        revision_id: String | None = None,
2872
        **kwargs,
2873
    ) -> None:
2874
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2875
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2876
            function_name, qualifier, context
2877
        )
2878
        if qualifier is not None:
1✔
2879
            self._validate_qualifier_expression(qualifier)
1✔
2880

2881
        state = lambda_stores[account_id][region]
1✔
2882
        resolved_fn = state.functions.get(function_name)
1✔
2883
        if resolved_fn is None:
1✔
2884
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2885
            raise ResourceNotFoundException(f"No policy found for: {fn_arn}", Type="User")
1✔
2886

2887
        resolved_qualifier, _ = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2888
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2889
        if not function_permission:
1✔
2890
            raise ResourceNotFoundException(
1✔
2891
                "No policy is associated with the given resource.", Type="User"
2892
            )
2893

2894
        # try to find statement in policy and delete it
2895
        statement = None
1✔
2896
        for s in function_permission.policy.Statement:
1✔
2897
            if s["Sid"] == statement_id:
1✔
2898
                statement = s
1✔
2899
                break
1✔
2900

2901
        if not statement:
1✔
2902
            raise ResourceNotFoundException(
1✔
2903
                f"Statement {statement_id} is not found in resource policy.", Type="User"
2904
            )
2905
        fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2906
        if revision_id and revision_id != fn_revision_id:
1✔
2907
            raise PreconditionFailedException(
×
2908
                "The Revision Id provided does not match the latest Revision Id. "
2909
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2910
                Type="User",
2911
            )
2912
        function_permission.policy.Statement.remove(statement)
1✔
2913

2914
        # Update revision id for alias or version
2915
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2916
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2917
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2918
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
×
2919
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
×
2920
        # Assumes that a non-alias is a version
2921
        else:
2922
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2923
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2924
                resolved_version, config=dataclasses.replace(resolved_version.config)
2925
            )
2926

2927
        # remove the policy as a whole when there's no statement left in it
2928
        if len(function_permission.policy.Statement) == 0:
1✔
2929
            del resolved_fn.permissions[resolved_qualifier]
1✔
2930

2931
    def get_policy(
1✔
2932
        self,
2933
        context: RequestContext,
2934
        function_name: NamespacedFunctionName,
2935
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
2936
        **kwargs,
2937
    ) -> GetPolicyResponse:
2938
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2939
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2940
            function_name, qualifier, context
2941
        )
2942

2943
        if qualifier is not None:
1✔
2944
            self._validate_qualifier_expression(qualifier)
1✔
2945

2946
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2947

2948
        resolved_qualifier = qualifier or "$LATEST"
1✔
2949
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2950
        if not function_permission:
1✔
2951
            raise ResourceNotFoundException(
1✔
2952
                "The resource you requested does not exist.", Type="User"
2953
            )
2954

2955
        fn_revision_id = None
1✔
2956
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2957
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2958
            fn_revision_id = resolved_alias.revision_id
1✔
2959
        # Assumes that a non-alias is a version
2960
        else:
2961
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2962
            fn_revision_id = resolved_version.config.revision_id
1✔
2963

2964
        return GetPolicyResponse(
1✔
2965
            Policy=json.dumps(dataclasses.asdict(function_permission.policy)),
2966
            RevisionId=fn_revision_id,
2967
        )
2968

2969
    # =======================================
2970
    # ========  Code signing config  ========
2971
    # =======================================
2972

2973
    def create_code_signing_config(
1✔
2974
        self,
2975
        context: RequestContext,
2976
        allowed_publishers: AllowedPublishers,
2977
        description: Description | None = None,
2978
        code_signing_policies: CodeSigningPolicies | None = None,
2979
        tags: Tags | None = None,
2980
        **kwargs,
2981
    ) -> CreateCodeSigningConfigResponse:
2982
        account = context.account_id
1✔
2983
        region = context.region
1✔
2984

2985
        state = lambda_stores[account][region]
1✔
2986
        # TODO: can there be duplicates?
2987
        csc_id = f"csc-{get_random_hex(17)}"  # e.g. 'csc-077c33b4c19e26036'
1✔
2988
        csc_arn = f"arn:{context.partition}:lambda:{region}:{account}:code-signing-config:{csc_id}"
1✔
2989
        csc = CodeSigningConfig(
1✔
2990
            csc_id=csc_id,
2991
            arn=csc_arn,
2992
            allowed_publishers=allowed_publishers,
2993
            policies=code_signing_policies,
2994
            last_modified=api_utils.generate_lambda_date(),
2995
            description=description,
2996
        )
2997
        state.code_signing_configs[csc_arn] = csc
1✔
2998
        return CreateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
2999

3000
    def put_function_code_signing_config(
1✔
3001
        self,
3002
        context: RequestContext,
3003
        code_signing_config_arn: CodeSigningConfigArn,
3004
        function_name: NamespacedFunctionName,
3005
        **kwargs,
3006
    ) -> PutFunctionCodeSigningConfigResponse:
3007
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3008
        state = lambda_stores[account_id][region]
1✔
3009
        function_name = api_utils.get_function_name(function_name, context)
1✔
3010

3011
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3012
        if not csc:
1✔
3013
            raise CodeSigningConfigNotFoundException(
1✔
3014
                f"The code signing configuration cannot be found. Check that the provided configuration is not deleted: {code_signing_config_arn}.",
3015
                Type="User",
3016
            )
3017

3018
        fn = state.functions.get(function_name)
1✔
3019
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
3020
        if not fn:
1✔
3021
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
3022

3023
        fn.code_signing_config_arn = code_signing_config_arn
1✔
3024
        return PutFunctionCodeSigningConfigResponse(
1✔
3025
            CodeSigningConfigArn=code_signing_config_arn, FunctionName=function_name
3026
        )
3027

3028
    def update_code_signing_config(
1✔
3029
        self,
3030
        context: RequestContext,
3031
        code_signing_config_arn: CodeSigningConfigArn,
3032
        description: Description = None,
3033
        allowed_publishers: AllowedPublishers = None,
3034
        code_signing_policies: CodeSigningPolicies = None,
3035
        **kwargs,
3036
    ) -> UpdateCodeSigningConfigResponse:
3037
        state = lambda_stores[context.account_id][context.region]
1✔
3038
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3039
        if not csc:
1✔
3040
            raise ResourceNotFoundException(
1✔
3041
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3042
            )
3043

3044
        changes = {
1✔
3045
            **(
3046
                {"allowed_publishers": allowed_publishers} if allowed_publishers is not None else {}
3047
            ),
3048
            **({"policies": code_signing_policies} if code_signing_policies is not None else {}),
3049
            **({"description": description} if description is not None else {}),
3050
        }
3051
        new_csc = dataclasses.replace(
1✔
3052
            csc, last_modified=api_utils.generate_lambda_date(), **changes
3053
        )
3054
        state.code_signing_configs[code_signing_config_arn] = new_csc
1✔
3055

3056
        return UpdateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(new_csc))
1✔
3057

3058
    def get_code_signing_config(
1✔
3059
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
3060
    ) -> GetCodeSigningConfigResponse:
3061
        state = lambda_stores[context.account_id][context.region]
1✔
3062
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3063
        if not csc:
1✔
3064
            raise ResourceNotFoundException(
1✔
3065
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3066
            )
3067

3068
        return GetCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
3069

3070
    def get_function_code_signing_config(
1✔
3071
        self, context: RequestContext, function_name: NamespacedFunctionName, **kwargs
3072
    ) -> GetFunctionCodeSigningConfigResponse:
3073
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3074
        state = lambda_stores[account_id][region]
1✔
3075
        function_name = api_utils.get_function_name(function_name, context)
1✔
3076
        fn = state.functions.get(function_name)
1✔
3077
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
3078
        if not fn:
1✔
3079
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
3080

3081
        if fn.code_signing_config_arn:
1✔
3082
            return GetFunctionCodeSigningConfigResponse(
1✔
3083
                CodeSigningConfigArn=fn.code_signing_config_arn, FunctionName=function_name
3084
            )
3085

3086
        return GetFunctionCodeSigningConfigResponse()
1✔
3087

3088
    def delete_function_code_signing_config(
1✔
3089
        self, context: RequestContext, function_name: NamespacedFunctionName, **kwargs
3090
    ) -> None:
3091
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3092
        state = lambda_stores[account_id][region]
1✔
3093
        function_name = api_utils.get_function_name(function_name, context)
1✔
3094
        fn = state.functions.get(function_name)
1✔
3095
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
3096
        if not fn:
1✔
3097
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
3098

3099
        fn.code_signing_config_arn = None
1✔
3100

3101
    def delete_code_signing_config(
1✔
3102
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
3103
    ) -> DeleteCodeSigningConfigResponse:
3104
        state = lambda_stores[context.account_id][context.region]
1✔
3105

3106
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3107
        if not csc:
1✔
3108
            raise ResourceNotFoundException(
1✔
3109
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3110
            )
3111

3112
        del state.code_signing_configs[code_signing_config_arn]
1✔
3113

3114
        return DeleteCodeSigningConfigResponse()
1✔
3115

3116
    def list_code_signing_configs(
1✔
3117
        self,
3118
        context: RequestContext,
3119
        marker: String = None,
3120
        max_items: MaxListItems = None,
3121
        **kwargs,
3122
    ) -> ListCodeSigningConfigsResponse:
3123
        state = lambda_stores[context.account_id][context.region]
1✔
3124

3125
        cscs = [api_utils.map_csc(csc) for csc in state.code_signing_configs.values()]
1✔
3126
        cscs = PaginatedList(cscs)
1✔
3127
        page, token = cscs.get_page(
1✔
3128
            lambda csc: csc["CodeSigningConfigId"],
3129
            marker,
3130
            max_items,
3131
        )
3132
        return ListCodeSigningConfigsResponse(CodeSigningConfigs=page, NextMarker=token)
1✔
3133

3134
    def list_functions_by_code_signing_config(
1✔
3135
        self,
3136
        context: RequestContext,
3137
        code_signing_config_arn: CodeSigningConfigArn,
3138
        marker: String = None,
3139
        max_items: MaxListItems = None,
3140
        **kwargs,
3141
    ) -> ListFunctionsByCodeSigningConfigResponse:
3142
        account = context.account_id
1✔
3143
        region = context.region
1✔
3144

3145
        state = lambda_stores[account][region]
1✔
3146

3147
        if code_signing_config_arn not in state.code_signing_configs:
1✔
3148
            raise ResourceNotFoundException(
1✔
3149
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3150
            )
3151

3152
        fn_arns = [
1✔
3153
            api_utils.unqualified_lambda_arn(fn.function_name, account, region)
3154
            for fn in state.functions.values()
3155
            if fn.code_signing_config_arn == code_signing_config_arn
3156
        ]
3157

3158
        cscs = PaginatedList(fn_arns)
1✔
3159
        page, token = cscs.get_page(
1✔
3160
            lambda x: x,
3161
            marker,
3162
            max_items,
3163
        )
3164
        return ListFunctionsByCodeSigningConfigResponse(FunctionArns=page, NextMarker=token)
1✔
3165

3166
    # =======================================
3167
    # =========  Account Settings   =========
3168
    # =======================================
3169

3170
    # CAVE: these settings & usages are *per* region!
3171
    # Lambda quotas: https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
3172
    def get_account_settings(self, context: RequestContext, **kwargs) -> GetAccountSettingsResponse:
1✔
3173
        state = lambda_stores[context.account_id][context.region]
1✔
3174

3175
        fn_count = 0
1✔
3176
        code_size_sum = 0
1✔
3177
        reserved_concurrency_sum = 0
1✔
3178
        for fn in state.functions.values():
1✔
3179
            fn_count += 1
1✔
3180
            for fn_version in fn.versions.values():
1✔
3181
                # Image-based Lambdas do not have a code attribute and count against the ECR quotas instead
3182
                if fn_version.config.package_type == PackageType.Zip:
1✔
3183
                    code_size_sum += fn_version.config.code.code_size
1✔
3184
            if fn.reserved_concurrent_executions is not None:
1✔
3185
                reserved_concurrency_sum += fn.reserved_concurrent_executions
1✔
3186
            for c in fn.provisioned_concurrency_configs.values():
1✔
3187
                reserved_concurrency_sum += c.provisioned_concurrent_executions
1✔
3188
        for layer in state.layers.values():
1✔
3189
            for layer_version in layer.layer_versions.values():
1✔
3190
                code_size_sum += layer_version.code.code_size
1✔
3191
        return GetAccountSettingsResponse(
1✔
3192
            AccountLimit=AccountLimit(
3193
                TotalCodeSize=config.LAMBDA_LIMITS_TOTAL_CODE_SIZE,
3194
                CodeSizeZipped=config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED,
3195
                CodeSizeUnzipped=config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED,
3196
                ConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS,
3197
                UnreservedConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS
3198
                - reserved_concurrency_sum,
3199
            ),
3200
            AccountUsage=AccountUsage(
3201
                TotalCodeSize=code_size_sum,
3202
                FunctionCount=fn_count,
3203
            ),
3204
        )
3205

3206
    # =======================================
3207
    # ==  Provisioned Concurrency Config   ==
3208
    # =======================================
3209

3210
    def _get_provisioned_config(
1✔
3211
        self, context: RequestContext, function_name: str, qualifier: str
3212
    ) -> ProvisionedConcurrencyConfiguration | None:
3213
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3214
        state = lambda_stores[account_id][region]
1✔
3215
        function_name = api_utils.get_function_name(function_name, context)
1✔
3216
        fn = state.functions.get(function_name)
1✔
3217
        if api_utils.qualifier_is_alias(qualifier):
1✔
3218
            fn_alias = None
1✔
3219
            if fn:
1✔
3220
                fn_alias = fn.aliases.get(qualifier)
1✔
3221
            if fn_alias is None:
1✔
3222
                raise ResourceNotFoundException(
1✔
3223
                    f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3224
                    Type="User",
3225
                )
3226
        elif api_utils.qualifier_is_version(qualifier):
1✔
3227
            fn_version = None
1✔
3228
            if fn:
1✔
3229
                fn_version = fn.versions.get(qualifier)
1✔
3230
            if fn_version is None:
1✔
3231
                raise ResourceNotFoundException(
1✔
3232
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3233
                    Type="User",
3234
                )
3235

3236
        return fn.provisioned_concurrency_configs.get(qualifier)
1✔
3237

3238
    def put_provisioned_concurrency_config(
1✔
3239
        self,
3240
        context: RequestContext,
3241
        function_name: FunctionName,
3242
        qualifier: Qualifier,
3243
        provisioned_concurrent_executions: PositiveInteger,
3244
        **kwargs,
3245
    ) -> PutProvisionedConcurrencyConfigResponse:
3246
        if provisioned_concurrent_executions <= 0:
1✔
3247
            raise ValidationException(
1✔
3248
                f"1 validation error detected: Value '{provisioned_concurrent_executions}' at 'provisionedConcurrentExecutions' failed to satisfy constraint: Member must have value greater than or equal to 1"
3249
            )
3250

3251
        if qualifier == "$LATEST":
1✔
3252
            raise InvalidParameterValueException(
1✔
3253
                "Provisioned Concurrency Configs cannot be applied to unpublished function versions.",
3254
                Type="User",
3255
            )
3256
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3257
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3258
            function_name, qualifier, context
3259
        )
3260
        state = lambda_stores[account_id][region]
1✔
3261
        fn = state.functions.get(function_name)
1✔
3262

3263
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3264

3265
        if provisioned_config:  # TODO: merge?
1✔
3266
            # TODO: add a test for partial updates (if possible)
3267
            LOG.warning(
1✔
3268
                "Partial update of provisioned concurrency config is currently not supported."
3269
            )
3270

3271
        other_provisioned_sum = sum(
1✔
3272
            [
3273
                provisioned_configs.provisioned_concurrent_executions
3274
                for provisioned_qualifier, provisioned_configs in fn.provisioned_concurrency_configs.items()
3275
                if provisioned_qualifier != qualifier
3276
            ]
3277
        )
3278

3279
        if (
1✔
3280
            fn.reserved_concurrent_executions is not None
3281
            and fn.reserved_concurrent_executions
3282
            < other_provisioned_sum + provisioned_concurrent_executions
3283
        ):
3284
            raise InvalidParameterValueException(
1✔
3285
                "Requested Provisioned Concurrency should not be greater than the reservedConcurrentExecution for function",
3286
                Type="User",
3287
            )
3288

3289
        if provisioned_concurrent_executions > config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS:
1✔
3290
            raise InvalidParameterValueException(
1✔
3291
                f"Specified ConcurrentExecutions for function is greater than account's unreserved concurrency"
3292
                f" [{config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS}]."
3293
            )
3294

3295
        settings = self.get_account_settings(context)
1✔
3296
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
3297
            "UnreservedConcurrentExecutions"
3298
        ]
3299
        if (
1✔
3300
            unreserved_concurrent_executions - provisioned_concurrent_executions
3301
            < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY
3302
        ):
3303
            raise InvalidParameterValueException(
1✔
3304
                f"Specified ConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below"
3305
                f" its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
3306
            )
3307

3308
        provisioned_config = ProvisionedConcurrencyConfiguration(
1✔
3309
            provisioned_concurrent_executions, api_utils.generate_lambda_date()
3310
        )
3311
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3312

3313
        if api_utils.qualifier_is_alias(qualifier):
1✔
3314
            alias = fn.aliases.get(qualifier)
1✔
3315
            resolved_version = fn.versions.get(alias.function_version)
1✔
3316

3317
            if (
1✔
3318
                resolved_version
3319
                and fn.provisioned_concurrency_configs.get(alias.function_version) is not None
3320
            ):
3321
                raise ResourceConflictException(
1✔
3322
                    "Alias can't be used for Provisioned Concurrency configuration on an already Provisioned version",
3323
                    Type="User",
3324
                )
3325
            fn_arn = resolved_version.id.qualified_arn()
1✔
3326
        elif api_utils.qualifier_is_version(qualifier):
1✔
3327
            fn_version = fn.versions.get(qualifier)
1✔
3328

3329
            # TODO: might be useful other places, utilize
3330
            pointing_aliases = []
1✔
3331
            for alias in fn.aliases.values():
1✔
3332
                if (
1✔
3333
                    alias.function_version == qualifier
3334
                    and fn.provisioned_concurrency_configs.get(alias.name) is not None
3335
                ):
3336
                    pointing_aliases.append(alias.name)
1✔
3337
            if pointing_aliases:
1✔
3338
                raise ResourceConflictException(
1✔
3339
                    "Version is pointed by a Provisioned Concurrency alias", Type="User"
3340
                )
3341

3342
            fn_arn = fn_version.id.qualified_arn()
1✔
3343

3344
        manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3345

3346
        fn.provisioned_concurrency_configs[qualifier] = provisioned_config
1✔
3347

3348
        manager.update_provisioned_concurrency_config(
1✔
3349
            provisioned_config.provisioned_concurrent_executions
3350
        )
3351

3352
        return PutProvisionedConcurrencyConfigResponse(
1✔
3353
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3354
            AvailableProvisionedConcurrentExecutions=0,
3355
            AllocatedProvisionedConcurrentExecutions=0,
3356
            Status=ProvisionedConcurrencyStatusEnum.IN_PROGRESS,
3357
            # StatusReason=manager.provisioned_state.status_reason,
3358
            LastModified=provisioned_config.last_modified,  # TODO: does change with configuration or also with state changes?
3359
        )
3360

3361
    def get_provisioned_concurrency_config(
1✔
3362
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3363
    ) -> GetProvisionedConcurrencyConfigResponse:
3364
        if qualifier == "$LATEST":
1✔
3365
            raise InvalidParameterValueException(
1✔
3366
                "The function resource provided must be an alias or a published version.",
3367
                Type="User",
3368
            )
3369
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3370
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3371
            function_name, qualifier, context
3372
        )
3373

3374
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3375
        if not provisioned_config:
1✔
3376
            raise ProvisionedConcurrencyConfigNotFoundException(
1✔
3377
                "No Provisioned Concurrency Config found for this function", Type="User"
3378
            )
3379

3380
        # TODO: make this compatible with alias pointer migration on update
3381
        if api_utils.qualifier_is_alias(qualifier):
1✔
3382
            state = lambda_stores[account_id][region]
1✔
3383
            fn = state.functions.get(function_name)
1✔
3384
            alias = fn.aliases.get(qualifier)
1✔
3385
            fn_arn = api_utils.qualified_lambda_arn(
1✔
3386
                function_name, alias.function_version, account_id, region
3387
            )
3388
        else:
3389
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3390

3391
        ver_manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3392

3393
        return GetProvisionedConcurrencyConfigResponse(
1✔
3394
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3395
            LastModified=provisioned_config.last_modified,
3396
            AvailableProvisionedConcurrentExecutions=ver_manager.provisioned_state.available,
3397
            AllocatedProvisionedConcurrentExecutions=ver_manager.provisioned_state.allocated,
3398
            Status=ver_manager.provisioned_state.status,
3399
            StatusReason=ver_manager.provisioned_state.status_reason,
3400
        )
3401

3402
    def list_provisioned_concurrency_configs(
1✔
3403
        self,
3404
        context: RequestContext,
3405
        function_name: FunctionName,
3406
        marker: String = None,
3407
        max_items: MaxProvisionedConcurrencyConfigListItems = None,
3408
        **kwargs,
3409
    ) -> ListProvisionedConcurrencyConfigsResponse:
3410
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3411
        state = lambda_stores[account_id][region]
1✔
3412

3413
        function_name = api_utils.get_function_name(function_name, context)
1✔
3414
        fn = state.functions.get(function_name)
1✔
3415
        if fn is None:
1✔
3416
            raise ResourceNotFoundException(
1✔
3417
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
3418
                Type="User",
3419
            )
3420

3421
        configs = []
1✔
3422
        for qualifier, pc_config in fn.provisioned_concurrency_configs.items():
1✔
3423
            if api_utils.qualifier_is_alias(qualifier):
×
3424
                alias = fn.aliases.get(qualifier)
×
3425
                fn_arn = api_utils.qualified_lambda_arn(
×
3426
                    function_name, alias.function_version, account_id, region
3427
                )
3428
            else:
3429
                fn_arn = api_utils.qualified_lambda_arn(
×
3430
                    function_name, qualifier, account_id, region
3431
                )
3432

3433
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
3434

3435
            configs.append(
×
3436
                ProvisionedConcurrencyConfigListItem(
3437
                    FunctionArn=api_utils.qualified_lambda_arn(
3438
                        function_name, qualifier, account_id, region
3439
                    ),
3440
                    RequestedProvisionedConcurrentExecutions=pc_config.provisioned_concurrent_executions,
3441
                    AvailableProvisionedConcurrentExecutions=manager.provisioned_state.available,
3442
                    AllocatedProvisionedConcurrentExecutions=manager.provisioned_state.allocated,
3443
                    Status=manager.provisioned_state.status,
3444
                    StatusReason=manager.provisioned_state.status_reason,
3445
                    LastModified=pc_config.last_modified,
3446
                )
3447
            )
3448

3449
        provisioned_concurrency_configs = configs
1✔
3450
        provisioned_concurrency_configs = PaginatedList(provisioned_concurrency_configs)
1✔
3451
        page, token = provisioned_concurrency_configs.get_page(
1✔
3452
            lambda x: x,
3453
            marker,
3454
            max_items,
3455
        )
3456
        return ListProvisionedConcurrencyConfigsResponse(
1✔
3457
            ProvisionedConcurrencyConfigs=page, NextMarker=token
3458
        )
3459

3460
    def delete_provisioned_concurrency_config(
1✔
3461
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3462
    ) -> None:
3463
        if qualifier == "$LATEST":
1✔
3464
            raise InvalidParameterValueException(
1✔
3465
                "The function resource provided must be an alias or a published version.",
3466
                Type="User",
3467
            )
3468
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3469
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3470
            function_name, qualifier, context
3471
        )
3472
        state = lambda_stores[account_id][region]
1✔
3473
        fn = state.functions.get(function_name)
1✔
3474

3475
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3476
        # delete is idempotent and doesn't actually care about the provisioned concurrency config not existing
3477
        if provisioned_config:
1✔
3478
            fn.provisioned_concurrency_configs.pop(qualifier)
1✔
3479
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3480
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3481
            manager.update_provisioned_concurrency_config(0)
1✔
3482

3483
    # =======================================
3484
    # =======  Event Invoke Config   ========
3485
    # =======================================
3486

3487
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)"
3488
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]2((-gov)|(-iso(b?)))?-[a-z]+-\\d1)?:(\\d12)?:(.*)" ... (expected → actual)
3489

3490
    def _validate_destination_config(
1✔
3491
        self, store: LambdaStore, function_name: str, destination_config: DestinationConfig
3492
    ):
3493
        def _validate_destination_arn(destination_arn) -> bool:
1✔
3494
            if not api_utils.DESTINATION_ARN_PATTERN.match(destination_arn):
1✔
3495
                # technically we shouldn't handle this in the provider
3496
                raise ValidationException(
1✔
3497
                    "1 validation error detected: Value '"
3498
                    + destination_arn
3499
                    + "' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: "
3500
                    + "$|kafka://([^.]([a-zA-Z0-9\\-_.]{0,248}))|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:((eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)"
3501
                )
3502

3503
            match destination_arn.split(":")[2]:
1✔
3504
                case "lambda":
1✔
3505
                    fn_parts = api_utils.FULL_FN_ARN_PATTERN.search(destination_arn).groupdict()
1✔
3506
                    if fn_parts:
1✔
3507
                        # check if it exists
3508
                        fn = store.functions.get(fn_parts["function_name"])
1✔
3509
                        if not fn:
1✔
3510
                            raise InvalidParameterValueException(
1✔
3511
                                f"The destination ARN {destination_arn} is invalid.", Type="User"
3512
                            )
3513
                        if fn_parts["function_name"] == function_name:
1✔
3514
                            raise InvalidParameterValueException(
1✔
3515
                                "You can't specify the function as a destination for itself.",
3516
                                Type="User",
3517
                            )
3518
                case "sns" | "sqs" | "events":
1✔
3519
                    pass
1✔
3520
                case _:
1✔
3521
                    return False
1✔
3522
            return True
1✔
3523

3524
        validation_err = False
1✔
3525

3526
        failure_destination = destination_config.get("OnFailure", {}).get("Destination")
1✔
3527
        if failure_destination:
1✔
3528
            validation_err = validation_err or not _validate_destination_arn(failure_destination)
1✔
3529

3530
        success_destination = destination_config.get("OnSuccess", {}).get("Destination")
1✔
3531
        if success_destination:
1✔
3532
            validation_err = validation_err or not _validate_destination_arn(success_destination)
1✔
3533

3534
        if validation_err:
1✔
3535
            on_success_part = (
1✔
3536
                f"OnSuccess(destination={success_destination})" if success_destination else "null"
3537
            )
3538
            on_failure_part = (
1✔
3539
                f"OnFailure(destination={failure_destination})" if failure_destination else "null"
3540
            )
3541
            raise InvalidParameterValueException(
1✔
3542
                f"The provided destination config DestinationConfig(onSuccess={on_success_part}, onFailure={on_failure_part}) is invalid.",
3543
                Type="User",
3544
            )
3545

3546
    def put_function_event_invoke_config(
1✔
3547
        self,
3548
        context: RequestContext,
3549
        function_name: FunctionName,
3550
        qualifier: NumericLatestPublishedOrAliasQualifier = None,
3551
        maximum_retry_attempts: MaximumRetryAttempts = None,
3552
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3553
        destination_config: DestinationConfig = None,
3554
        **kwargs,
3555
    ) -> FunctionEventInvokeConfig:
3556
        """
3557
        Destination ARNs can be:
3558
        * SQS arn
3559
        * SNS arn
3560
        * Lambda arn
3561
        * EventBridge arn
3562

3563
        Differences between put_ and update_:
3564
            * put overwrites any existing config
3565
            * update allows changes only single values while keeping the rest of existing ones
3566
            * update fails on non-existing configs
3567

3568
        Differences between destination and DLQ
3569
            * "However, a dead-letter queue is part of a function's version-specific configuration, so it is locked in when you publish a version."
3570
            *  "On-failure destinations also support additional targets and include details about the function's response in the invocation record."
3571

3572
        """
3573
        if (
1✔
3574
            maximum_event_age_in_seconds is None
3575
            and maximum_retry_attempts is None
3576
            and destination_config is None
3577
        ):
3578
            raise InvalidParameterValueException(
1✔
3579
                "You must specify at least one of error handling or destination setting.",
3580
                Type="User",
3581
            )
3582
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3583
        state = lambda_stores[account_id][region]
1✔
3584
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3585
            function_name, qualifier, context
3586
        )
3587
        fn = state.functions.get(function_name)
1✔
3588
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3589
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3590

3591
        qualifier = qualifier or "$LATEST"
1✔
3592

3593
        # validate and normalize destination config
3594
        if destination_config:
1✔
3595
            self._validate_destination_config(state, function_name, destination_config)
1✔
3596

3597
        destination_config = DestinationConfig(
1✔
3598
            OnSuccess=OnSuccess(
3599
                Destination=(destination_config or {}).get("OnSuccess", {}).get("Destination")
3600
            ),
3601
            OnFailure=OnFailure(
3602
                Destination=(destination_config or {}).get("OnFailure", {}).get("Destination")
3603
            ),
3604
        )
3605

3606
        config = EventInvokeConfig(
1✔
3607
            function_name=function_name,
3608
            qualifier=qualifier,
3609
            maximum_event_age_in_seconds=maximum_event_age_in_seconds,
3610
            maximum_retry_attempts=maximum_retry_attempts,
3611
            last_modified=api_utils.generate_lambda_date(),
3612
            destination_config=destination_config,
3613
        )
3614
        fn.event_invoke_configs[qualifier] = config
1✔
3615

3616
        return FunctionEventInvokeConfig(
1✔
3617
            LastModified=datetime.datetime.strptime(
3618
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3619
            ),
3620
            FunctionArn=api_utils.qualified_lambda_arn(
3621
                function_name, qualifier or "$LATEST", account_id, region
3622
            ),
3623
            DestinationConfig=destination_config,
3624
            MaximumEventAgeInSeconds=maximum_event_age_in_seconds,
3625
            MaximumRetryAttempts=maximum_retry_attempts,
3626
        )
3627

3628
    def get_function_event_invoke_config(
1✔
3629
        self,
3630
        context: RequestContext,
3631
        function_name: FunctionName,
3632
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
3633
        **kwargs,
3634
    ) -> FunctionEventInvokeConfig:
3635
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3636
        state = lambda_stores[account_id][region]
1✔
3637
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3638
            function_name, qualifier, context
3639
        )
3640

3641
        qualifier = qualifier or "$LATEST"
1✔
3642
        fn = state.functions.get(function_name)
1✔
3643
        if not fn:
1✔
3644
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3645
            raise ResourceNotFoundException(
1✔
3646
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3647
            )
3648

3649
        config = fn.event_invoke_configs.get(qualifier)
1✔
3650
        if not config:
1✔
3651
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3652
            raise ResourceNotFoundException(
1✔
3653
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3654
            )
3655

3656
        return FunctionEventInvokeConfig(
1✔
3657
            LastModified=datetime.datetime.strptime(
3658
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3659
            ),
3660
            FunctionArn=api_utils.qualified_lambda_arn(
3661
                function_name, qualifier, account_id, region
3662
            ),
3663
            DestinationConfig=config.destination_config,
3664
            MaximumEventAgeInSeconds=config.maximum_event_age_in_seconds,
3665
            MaximumRetryAttempts=config.maximum_retry_attempts,
3666
        )
3667

3668
    def list_function_event_invoke_configs(
1✔
3669
        self,
3670
        context: RequestContext,
3671
        function_name: FunctionName,
3672
        marker: String = None,
3673
        max_items: MaxFunctionEventInvokeConfigListItems = None,
3674
        **kwargs,
3675
    ) -> ListFunctionEventInvokeConfigsResponse:
3676
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3677
        state = lambda_stores[account_id][region]
1✔
3678
        fn = state.functions.get(function_name)
1✔
3679
        if not fn:
1✔
3680
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3681

3682
        event_invoke_configs = [
1✔
3683
            FunctionEventInvokeConfig(
3684
                LastModified=c.last_modified,
3685
                FunctionArn=api_utils.qualified_lambda_arn(
3686
                    function_name, c.qualifier, account_id, region
3687
                ),
3688
                MaximumEventAgeInSeconds=c.maximum_event_age_in_seconds,
3689
                MaximumRetryAttempts=c.maximum_retry_attempts,
3690
                DestinationConfig=c.destination_config,
3691
            )
3692
            for c in fn.event_invoke_configs.values()
3693
        ]
3694

3695
        event_invoke_configs = PaginatedList(event_invoke_configs)
1✔
3696
        page, token = event_invoke_configs.get_page(
1✔
3697
            lambda x: x["FunctionArn"],
3698
            marker,
3699
            max_items,
3700
        )
3701
        return ListFunctionEventInvokeConfigsResponse(
1✔
3702
            FunctionEventInvokeConfigs=page, NextMarker=token
3703
        )
3704

3705
    def delete_function_event_invoke_config(
1✔
3706
        self,
3707
        context: RequestContext,
3708
        function_name: FunctionName,
3709
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
3710
        **kwargs,
3711
    ) -> None:
3712
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3713
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3714
            function_name, qualifier, context
3715
        )
3716
        state = lambda_stores[account_id][region]
1✔
3717
        fn = state.functions.get(function_name)
1✔
3718
        resolved_qualifier = qualifier or "$LATEST"
1✔
3719
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3720
        if not fn:
1✔
3721
            raise ResourceNotFoundException(
1✔
3722
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3723
            )
3724

3725
        config = fn.event_invoke_configs.get(resolved_qualifier)
1✔
3726
        if not config:
1✔
3727
            raise ResourceNotFoundException(
1✔
3728
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3729
            )
3730

3731
        del fn.event_invoke_configs[resolved_qualifier]
1✔
3732

3733
    def update_function_event_invoke_config(
1✔
3734
        self,
3735
        context: RequestContext,
3736
        function_name: FunctionName,
3737
        qualifier: NumericLatestPublishedOrAliasQualifier = None,
3738
        maximum_retry_attempts: MaximumRetryAttempts = None,
3739
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3740
        destination_config: DestinationConfig = None,
3741
        **kwargs,
3742
    ) -> FunctionEventInvokeConfig:
3743
        # like put but only update single fields via replace
3744
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3745
        state = lambda_stores[account_id][region]
1✔
3746
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3747
            function_name, qualifier, context
3748
        )
3749

3750
        if (
1✔
3751
            maximum_event_age_in_seconds is None
3752
            and maximum_retry_attempts is None
3753
            and destination_config is None
3754
        ):
3755
            raise InvalidParameterValueException(
×
3756
                "You must specify at least one of error handling or destination setting.",
3757
                Type="User",
3758
            )
3759

3760
        fn = state.functions.get(function_name)
1✔
3761
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3762
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3763

3764
        qualifier = qualifier or "$LATEST"
1✔
3765

3766
        config = fn.event_invoke_configs.get(qualifier)
1✔
3767
        if not config:
1✔
3768
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3769
            raise ResourceNotFoundException(
1✔
3770
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3771
            )
3772

3773
        if destination_config:
1✔
3774
            self._validate_destination_config(state, function_name, destination_config)
×
3775

3776
        optional_kwargs = {
1✔
3777
            k: v
3778
            for k, v in {
3779
                "destination_config": destination_config,
3780
                "maximum_retry_attempts": maximum_retry_attempts,
3781
                "maximum_event_age_in_seconds": maximum_event_age_in_seconds,
3782
            }.items()
3783
            if v is not None
3784
        }
3785

3786
        new_config = dataclasses.replace(
1✔
3787
            config, last_modified=api_utils.generate_lambda_date(), **optional_kwargs
3788
        )
3789
        fn.event_invoke_configs[qualifier] = new_config
1✔
3790

3791
        return FunctionEventInvokeConfig(
1✔
3792
            LastModified=datetime.datetime.strptime(
3793
                new_config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3794
            ),
3795
            FunctionArn=api_utils.qualified_lambda_arn(
3796
                function_name, qualifier or "$LATEST", account_id, region
3797
            ),
3798
            DestinationConfig=new_config.destination_config,
3799
            MaximumEventAgeInSeconds=new_config.maximum_event_age_in_seconds,
3800
            MaximumRetryAttempts=new_config.maximum_retry_attempts,
3801
        )
3802

3803
    # =======================================
3804
    # ======  Layer & Layer Versions  =======
3805
    # =======================================
3806

3807
    @staticmethod
1✔
3808
    def _resolve_layer(
1✔
3809
        layer_name_or_arn: str, context: RequestContext
3810
    ) -> tuple[str, str, str, str | None]:
3811
        """
3812
        Return locator attributes for a given Lambda layer.
3813

3814
        :param layer_name_or_arn: Layer name or ARN
3815
        :param context: Request context
3816
        :return: Tuple of region, account ID, layer name, layer version
3817
        """
3818
        if api_utils.is_layer_arn(layer_name_or_arn):
1✔
3819
            return api_utils.parse_layer_arn(layer_name_or_arn)
1✔
3820

3821
        return context.region, context.account_id, layer_name_or_arn, None
1✔
3822

3823
    def publish_layer_version(
1✔
3824
        self,
3825
        context: RequestContext,
3826
        layer_name: LayerName,
3827
        content: LayerVersionContentInput,
3828
        description: Description | None = None,
3829
        compatible_runtimes: CompatibleRuntimes | None = None,
3830
        license_info: LicenseInfo | None = None,
3831
        compatible_architectures: CompatibleArchitectures | None = None,
3832
        **kwargs,
3833
    ) -> PublishLayerVersionResponse:
3834
        """
3835
        On first use of a LayerName a new layer is created and for each subsequent call with the same LayerName a new version is created.
3836
        Note that there are no $LATEST versions with layers!
3837

3838
        """
3839
        account = context.account_id
1✔
3840
        region = context.region
1✔
3841

3842
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
3843
            compatible_runtimes, compatible_architectures
3844
        )
3845
        if validation_errors:
1✔
3846
            raise ValidationException(
1✔
3847
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {'; '.join(validation_errors)}"
3848
            )
3849

3850
        state = lambda_stores[account][region]
1✔
3851
        with self.create_layer_lock:
1✔
3852
            if layer_name not in state.layers:
1✔
3853
                # we don't have a version so create new layer object
3854
                # lock is required to avoid creating two v1 objects for the same name
3855
                layer = Layer(
1✔
3856
                    arn=api_utils.layer_arn(layer_name=layer_name, account=account, region=region)
3857
                )
3858
                state.layers[layer_name] = layer
1✔
3859

3860
        layer = state.layers[layer_name]
1✔
3861
        with layer.next_version_lock:
1✔
3862
            next_version = LambdaLayerVersionIdentifier(
1✔
3863
                account_id=account, region=region, layer_name=layer_name
3864
            ).generate(next_version=layer.next_version)
3865
            # When creating a layer with user defined layer version, it is possible that we
3866
            # create layer versions out of order.
3867
            # ie. a user could replicate layer v2 then layer v1. It is important to always keep the maximum possible
3868
            # value for next layer to avoid overwriting existing versions
3869
            if layer.next_version <= next_version:
1✔
3870
                # We don't need to update layer.next_version if the created version is lower than the "next in line"
3871
                layer.next_version = max(next_version, layer.next_version) + 1
1✔
3872

3873
        # creating a new layer
3874
        if content.get("ZipFile"):
1✔
3875
            code = store_lambda_archive(
1✔
3876
                archive_file=content["ZipFile"],
3877
                function_name=layer_name,
3878
                region_name=region,
3879
                account_id=account,
3880
            )
3881
        else:
3882
            code = store_s3_bucket_archive(
1✔
3883
                archive_bucket=content["S3Bucket"],
3884
                archive_key=content["S3Key"],
3885
                archive_version=content.get("S3ObjectVersion"),
3886
                function_name=layer_name,
3887
                region_name=region,
3888
                account_id=account,
3889
            )
3890

3891
        new_layer_version = LayerVersion(
1✔
3892
            layer_version_arn=api_utils.layer_version_arn(
3893
                layer_name=layer_name,
3894
                account=account,
3895
                region=region,
3896
                version=str(next_version),
3897
            ),
3898
            layer_arn=layer.arn,
3899
            version=next_version,
3900
            description=description or "",
3901
            license_info=license_info,
3902
            compatible_runtimes=compatible_runtimes,
3903
            compatible_architectures=compatible_architectures,
3904
            created=api_utils.generate_lambda_date(),
3905
            code=code,
3906
        )
3907

3908
        layer.layer_versions[str(next_version)] = new_layer_version
1✔
3909

3910
        return api_utils.map_layer_out(new_layer_version)
1✔
3911

3912
    def get_layer_version(
1✔
3913
        self,
3914
        context: RequestContext,
3915
        layer_name: LayerName,
3916
        version_number: LayerVersionNumber,
3917
        **kwargs,
3918
    ) -> GetLayerVersionResponse:
3919
        # TODO: handle layer_name as an ARN
3920

3921
        region_name, account_id, layer_name, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
3922
        state = lambda_stores[account_id][region_name]
1✔
3923

3924
        layer = state.layers.get(layer_name)
1✔
3925
        if version_number < 1:
1✔
3926
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
3927
        if layer is None:
1✔
3928
            raise ResourceNotFoundException(
1✔
3929
                "The resource you requested does not exist.", Type="User"
3930
            )
3931
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3932
        if layer_version is None:
1✔
3933
            raise ResourceNotFoundException(
1✔
3934
                "The resource you requested does not exist.", Type="User"
3935
            )
3936
        return api_utils.map_layer_out(layer_version)
1✔
3937

3938
    def get_layer_version_by_arn(
1✔
3939
        self, context: RequestContext, arn: LayerVersionArn, **kwargs
3940
    ) -> GetLayerVersionResponse:
3941
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3942
            arn, context
3943
        )
3944

3945
        if not layer_version:
1✔
3946
            raise ValidationException(
1✔
3947
                f"1 validation error detected: Value '{arn}' at 'arn' failed to satisfy constraint: Member must satisfy regular expression pattern: "
3948
                + "(arn:(aws[a-zA-Z-]*)?:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+)"
3949
            )
3950

3951
        store = lambda_stores[account_id][region_name]
1✔
3952
        if not (layers := store.layers.get(layer_name)):
1✔
3953
            raise ResourceNotFoundException(
×
3954
                "The resource you requested does not exist.", Type="User"
3955
            )
3956

3957
        layer_version = layers.layer_versions.get(layer_version)
1✔
3958

3959
        if not layer_version:
1✔
3960
            raise ResourceNotFoundException(
1✔
3961
                "The resource you requested does not exist.", Type="User"
3962
            )
3963

3964
        return api_utils.map_layer_out(layer_version)
1✔
3965

3966
    def list_layers(
1✔
3967
        self,
3968
        context: RequestContext,
3969
        compatible_runtime: Runtime | None = None,
3970
        marker: String | None = None,
3971
        max_items: MaxLayerListItems | None = None,
3972
        compatible_architecture: Architecture | None = None,
3973
        **kwargs,
3974
    ) -> ListLayersResponse:
3975
        validation_errors = []
1✔
3976

3977
        validation_error_arch = api_utils.validate_layer_architecture(compatible_architecture)
1✔
3978
        if validation_error_arch:
1✔
3979
            validation_errors.append(validation_error_arch)
1✔
3980

3981
        validation_error_runtime = api_utils.validate_layer_runtime(compatible_runtime)
1✔
3982
        if validation_error_runtime:
1✔
3983
            validation_errors.append(validation_error_runtime)
1✔
3984

3985
        if validation_errors:
1✔
3986
            raise ValidationException(
1✔
3987
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
3988
            )
3989
        # TODO: handle filter: compatible_runtime
3990
        # TODO: handle filter: compatible_architecture
3991

3992
        state = lambda_stores[context.account_id][context.region]
×
3993
        layers = state.layers
×
3994

3995
        # TODO: test how filters behave together with only returning layers here? Does it return the latest "matching" layer, i.e. does it ignore later layer versions that don't match?
3996

3997
        responses: list[LayersListItem] = []
×
3998
        for layer_name, layer in layers.items():
×
3999
            # fetch latest version
4000
            layer_versions = list(layer.layer_versions.values())
×
4001
            sorted(layer_versions, key=lambda x: x.version)
×
4002
            latest_layer_version = layer_versions[-1]
×
4003
            responses.append(
×
4004
                LayersListItem(
4005
                    LayerName=layer_name,
4006
                    LayerArn=layer.arn,
4007
                    LatestMatchingVersion=api_utils.map_layer_out(latest_layer_version),
4008
                )
4009
            )
4010

4011
        responses = PaginatedList(responses)
×
4012
        page, token = responses.get_page(
×
4013
            lambda version: version,
4014
            marker,
4015
            max_items,
4016
        )
4017

4018
        return ListLayersResponse(NextMarker=token, Layers=page)
×
4019

4020
    def list_layer_versions(
1✔
4021
        self,
4022
        context: RequestContext,
4023
        layer_name: LayerName,
4024
        compatible_runtime: Runtime | None = None,
4025
        marker: String | None = None,
4026
        max_items: MaxLayerListItems | None = None,
4027
        compatible_architecture: Architecture | None = None,
4028
        **kwargs,
4029
    ) -> ListLayerVersionsResponse:
4030
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
4031
            [compatible_runtime] if compatible_runtime else [],
4032
            [compatible_architecture] if compatible_architecture else [],
4033
        )
4034
        if validation_errors:
1✔
4035
            raise ValidationException(
×
4036
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
4037
            )
4038

4039
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
4040
            layer_name, context
4041
        )
4042
        state = lambda_stores[account_id][region_name]
1✔
4043

4044
        # TODO: Test & handle filter: compatible_runtime
4045
        # TODO: Test & handle filter: compatible_architecture
4046
        all_layer_versions = []
1✔
4047
        layer = state.layers.get(layer_name)
1✔
4048
        if layer is not None:
1✔
4049
            for layer_version in layer.layer_versions.values():
1✔
4050
                all_layer_versions.append(api_utils.map_layer_out(layer_version))
1✔
4051

4052
        all_layer_versions.sort(key=lambda x: x["Version"], reverse=True)
1✔
4053
        all_layer_versions = PaginatedList(all_layer_versions)
1✔
4054
        page, token = all_layer_versions.get_page(
1✔
4055
            lambda version: version["LayerVersionArn"],
4056
            marker,
4057
            max_items,
4058
        )
4059
        return ListLayerVersionsResponse(NextMarker=token, LayerVersions=page)
1✔
4060

4061
    def delete_layer_version(
1✔
4062
        self,
4063
        context: RequestContext,
4064
        layer_name: LayerName,
4065
        version_number: LayerVersionNumber,
4066
        **kwargs,
4067
    ) -> None:
4068
        if version_number < 1:
1✔
4069
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
4070

4071
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
4072
            layer_name, context
4073
        )
4074

4075
        store = lambda_stores[account_id][region_name]
1✔
4076
        layer = store.layers.get(layer_name, {})
1✔
4077
        if layer:
1✔
4078
            layer.layer_versions.pop(str(version_number), None)
1✔
4079

4080
    # =======================================
4081
    # =====  Layer Version Permissions  =====
4082
    # =======================================
4083
    # TODO: lock updates that change revision IDs
4084

4085
    def add_layer_version_permission(
1✔
4086
        self,
4087
        context: RequestContext,
4088
        layer_name: LayerName,
4089
        version_number: LayerVersionNumber,
4090
        statement_id: StatementId,
4091
        action: LayerPermissionAllowedAction,
4092
        principal: LayerPermissionAllowedPrincipal,
4093
        organization_id: OrganizationId = None,
4094
        revision_id: String = None,
4095
        **kwargs,
4096
    ) -> AddLayerVersionPermissionResponse:
4097
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4098
        # `layer_n` contains the layer name.
4099
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
4100

4101
        if action != "lambda:GetLayerVersion":
1✔
4102
            raise ValidationException(
1✔
4103
                f"1 validation error detected: Value '{action}' at 'action' failed to satisfy constraint: Member must satisfy regular expression pattern: lambda:GetLayerVersion"
4104
            )
4105

4106
        store = lambda_stores[account_id][region_name]
1✔
4107
        layer = store.layers.get(layer_n)
1✔
4108

4109
        layer_version_arn = api_utils.layer_version_arn(
1✔
4110
            layer_name, account_id, region_name, str(version_number)
4111
        )
4112

4113
        if layer is None:
1✔
4114
            raise ResourceNotFoundException(
1✔
4115
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4116
            )
4117
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4118
        if layer_version is None:
1✔
4119
            raise ResourceNotFoundException(
1✔
4120
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4121
            )
4122
        # do we have a policy? if not set one
4123
        if layer_version.policy is None:
1✔
4124
            layer_version.policy = LayerPolicy()
1✔
4125

4126
        if statement_id in layer_version.policy.statements:
1✔
4127
            raise ResourceConflictException(
1✔
4128
                f"The statement id ({statement_id}) provided already exists. Please provide a new statement id, or remove the existing statement.",
4129
                Type="User",
4130
            )
4131

4132
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
4133
            raise PreconditionFailedException(
1✔
4134
                "The Revision Id provided does not match the latest Revision Id. "
4135
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
4136
                Type="User",
4137
            )
4138

4139
        statement = LayerPolicyStatement(
1✔
4140
            sid=statement_id, action=action, principal=principal, organization_id=organization_id
4141
        )
4142

4143
        old_statements = layer_version.policy.statements
1✔
4144
        layer_version.policy = dataclasses.replace(
1✔
4145
            layer_version.policy, statements={**old_statements, statement_id: statement}
4146
        )
4147

4148
        return AddLayerVersionPermissionResponse(
1✔
4149
            Statement=json.dumps(
4150
                {
4151
                    "Sid": statement.sid,
4152
                    "Effect": "Allow",
4153
                    "Principal": statement.principal,
4154
                    "Action": statement.action,
4155
                    "Resource": layer_version.layer_version_arn,
4156
                }
4157
            ),
4158
            RevisionId=layer_version.policy.revision_id,
4159
        )
4160

4161
    def remove_layer_version_permission(
1✔
4162
        self,
4163
        context: RequestContext,
4164
        layer_name: LayerName,
4165
        version_number: LayerVersionNumber,
4166
        statement_id: StatementId,
4167
        revision_id: String = None,
4168
        **kwargs,
4169
    ) -> None:
4170
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4171
        # `layer_n` contains the layer name.
4172
        region_name, account_id, layer_n, layer_version = LambdaProvider._resolve_layer(
1✔
4173
            layer_name, context
4174
        )
4175

4176
        layer_version_arn = api_utils.layer_version_arn(
1✔
4177
            layer_name, account_id, region_name, str(version_number)
4178
        )
4179

4180
        state = lambda_stores[account_id][region_name]
1✔
4181
        layer = state.layers.get(layer_n)
1✔
4182
        if layer is None:
1✔
4183
            raise ResourceNotFoundException(
1✔
4184
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4185
            )
4186
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4187
        if layer_version is None:
1✔
4188
            raise ResourceNotFoundException(
1✔
4189
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4190
            )
4191

4192
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
4193
            raise PreconditionFailedException(
1✔
4194
                "The Revision Id provided does not match the latest Revision Id. "
4195
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
4196
                Type="User",
4197
            )
4198

4199
        if statement_id not in layer_version.policy.statements:
1✔
4200
            raise ResourceNotFoundException(
1✔
4201
                f"Statement {statement_id} is not found in resource policy.", Type="User"
4202
            )
4203

4204
        old_statements = layer_version.policy.statements
1✔
4205
        layer_version.policy = dataclasses.replace(
1✔
4206
            layer_version.policy,
4207
            statements={k: v for k, v in old_statements.items() if k != statement_id},
4208
        )
4209

4210
    def get_layer_version_policy(
1✔
4211
        self,
4212
        context: RequestContext,
4213
        layer_name: LayerName,
4214
        version_number: LayerVersionNumber,
4215
        **kwargs,
4216
    ) -> GetLayerVersionPolicyResponse:
4217
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4218
        # `layer_n` contains the layer name.
4219
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
4220

4221
        layer_version_arn = api_utils.layer_version_arn(
1✔
4222
            layer_name, account_id, region_name, str(version_number)
4223
        )
4224

4225
        store = lambda_stores[account_id][region_name]
1✔
4226
        layer = store.layers.get(layer_n)
1✔
4227

4228
        if layer is None:
1✔
4229
            raise ResourceNotFoundException(
1✔
4230
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4231
            )
4232

4233
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4234
        if layer_version is None:
1✔
4235
            raise ResourceNotFoundException(
1✔
4236
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4237
            )
4238

4239
        if layer_version.policy is None:
1✔
4240
            raise ResourceNotFoundException(
1✔
4241
                "No policy is associated with the given resource.", Type="User"
4242
            )
4243

4244
        return GetLayerVersionPolicyResponse(
1✔
4245
            Policy=json.dumps(
4246
                {
4247
                    "Version": layer_version.policy.version,
4248
                    "Id": layer_version.policy.id,
4249
                    "Statement": [
4250
                        {
4251
                            "Sid": ps.sid,
4252
                            "Effect": "Allow",
4253
                            "Principal": ps.principal,
4254
                            "Action": ps.action,
4255
                            "Resource": layer_version.layer_version_arn,
4256
                        }
4257
                        for ps in layer_version.policy.statements.values()
4258
                    ],
4259
                }
4260
            ),
4261
            RevisionId=layer_version.policy.revision_id,
4262
        )
4263

4264
    # =======================================
4265
    # =======  Function Concurrency  ========
4266
    # =======================================
4267
    # (Reserved) function concurrency is scoped to the whole function
4268

4269
    def get_function_concurrency(
1✔
4270
        self, context: RequestContext, function_name: FunctionName, **kwargs
4271
    ) -> GetFunctionConcurrencyResponse:
4272
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4273
        function_name = api_utils.get_function_name(function_name, context)
1✔
4274
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
4275
        return GetFunctionConcurrencyResponse(
1✔
4276
            ReservedConcurrentExecutions=fn.reserved_concurrent_executions
4277
        )
4278

4279
    def put_function_concurrency(
1✔
4280
        self,
4281
        context: RequestContext,
4282
        function_name: FunctionName,
4283
        reserved_concurrent_executions: ReservedConcurrentExecutions,
4284
        **kwargs,
4285
    ) -> Concurrency:
4286
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4287

4288
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4289
        if qualifier:
1✔
4290
            raise InvalidParameterValueException(
1✔
4291
                "This operation is permitted on Lambda functions only. Aliases and versions do not support this operation. Please specify either a function name or an unqualified function ARN.",
4292
                Type="User",
4293
            )
4294

4295
        store = lambda_stores[account_id][region]
1✔
4296
        fn = store.functions.get(function_name)
1✔
4297
        if not fn:
1✔
4298
            fn_arn = api_utils.qualified_lambda_arn(
1✔
4299
                function_name,
4300
                qualifier="$LATEST",
4301
                account=account_id,
4302
                region=region,
4303
            )
4304
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
4305

4306
        settings = self.get_account_settings(context)
1✔
4307
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
4308
            "UnreservedConcurrentExecutions"
4309
        ]
4310

4311
        # The existing reserved concurrent executions for the same function are already deduced in
4312
        # unreserved_concurrent_executions but must not count because the new one will replace the existing one.
4313
        # Joel tested this behavior manually against AWS (2023-11-28).
4314
        existing_reserved_concurrent_executions = (
1✔
4315
            fn.reserved_concurrent_executions if fn.reserved_concurrent_executions else 0
4316
        )
4317
        if (
1✔
4318
            unreserved_concurrent_executions
4319
            - reserved_concurrent_executions
4320
            + existing_reserved_concurrent_executions
4321
        ) < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY:
4322
            raise InvalidParameterValueException(
1✔
4323
                f"Specified ReservedConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
4324
            )
4325

4326
        total_provisioned_concurrency = sum(
1✔
4327
            [
4328
                provisioned_configs.provisioned_concurrent_executions
4329
                for provisioned_configs in fn.provisioned_concurrency_configs.values()
4330
            ]
4331
        )
4332
        if total_provisioned_concurrency > reserved_concurrent_executions:
1✔
4333
            raise InvalidParameterValueException(
1✔
4334
                f" ReservedConcurrentExecutions  {reserved_concurrent_executions} should not be lower than function's total provisioned concurrency [{total_provisioned_concurrency}]."
4335
            )
4336

4337
        fn.reserved_concurrent_executions = reserved_concurrent_executions
1✔
4338

4339
        return Concurrency(ReservedConcurrentExecutions=fn.reserved_concurrent_executions)
1✔
4340

4341
    def delete_function_concurrency(
1✔
4342
        self, context: RequestContext, function_name: FunctionName, **kwargs
4343
    ) -> None:
4344
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4345
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4346
        store = lambda_stores[account_id][region]
1✔
4347
        fn = store.functions.get(function_name)
1✔
4348
        fn.reserved_concurrent_executions = None
1✔
4349

4350
    # =======================================
4351
    # ===============  TAGS   ===============
4352
    # =======================================
4353
    # only Function, Event Source Mapping, and Code Signing Config (not currently supported by LocalStack) ARNs an are available for tagging in AWS
4354

4355
    def _get_tags(self, resource: TaggableResource) -> dict[str, str]:
1✔
4356
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4357
        lambda_adapted_tags = {
1✔
4358
            tag["Key"]: tag["Value"]
4359
            for tag in state.TAGS.list_tags_for_resource(resource).get("Tags")
4360
        }
4361
        return lambda_adapted_tags
1✔
4362

4363
    def _store_tags(self, resource: TaggableResource, tags: dict[str, str]):
1✔
4364
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4365
        if len(state.TAGS.tags.get(resource, {}) | tags) > LAMBDA_TAG_LIMIT_PER_RESOURCE:
1✔
4366
            raise InvalidParameterValueException(
1✔
4367
                "Number of tags exceeds resource tag limit.", Type="User"
4368
            )
4369

4370
        tag_svc_adapted_tags = [{"Key": key, "Value": value} for key, value in tags.items()]
1✔
4371
        state.TAGS.tag_resource(resource, tag_svc_adapted_tags)
1✔
4372

4373
    def fetch_lambda_store_for_tagging(self, resource: TaggableResource) -> LambdaStore:
1✔
4374
        """
4375
        Takes a resource ARN for a TaggableResource (Lambda Function, Event Source Mapping, Code Signing Config, or Capacity Provider) and returns a corresponding
4376
        LambdaStore for its region and account.
4377

4378
        In addition, this function validates that the ARN is a valid TaggableResource type, and that the TaggableResource exists.
4379

4380
        Raises:
4381
            ValidationException: If the resource ARN is not a full ARN for a TaggableResource.
4382
            ResourceNotFoundException: If the specified resource does not exist.
4383
            InvalidParameterValueException: If the resource ARN is a qualified Lambda Function.
4384
        """
4385

4386
        def _raise_validation_exception():
1✔
4387
            raise ValidationException(
1✔
4388
                f"1 validation error detected: Value '{resource}' at 'resource' failed to satisfy constraint: Member must satisfy regular expression pattern: {api_utils.TAGGABLE_RESOURCE_ARN_PATTERN}"
4389
            )
4390

4391
        # Check whether the ARN we have been passed is correctly formatted
4392
        parsed_resource_arn: ArnData = None
1✔
4393
        try:
1✔
4394
            parsed_resource_arn = parse_arn(resource)
1✔
4395
        except Exception:
1✔
4396
            _raise_validation_exception()
1✔
4397

4398
        # TODO: Should we be checking whether this is a full ARN?
4399
        region, account_id, resource_type = map(
1✔
4400
            parsed_resource_arn.get, ("region", "account", "resource")
4401
        )
4402

4403
        if not all((region, account_id, resource_type)):
1✔
4404
            _raise_validation_exception()
×
4405

4406
        if not (parts := resource_type.split(":")):
1✔
4407
            _raise_validation_exception()
×
4408

4409
        resource_type, resource_identifier, *qualifier = parts
1✔
4410

4411
        # Qualifier validation raises before checking for NotFound
4412
        if qualifier:
1✔
4413
            if resource_type == "function":
1✔
4414
                raise InvalidParameterValueException(
1✔
4415
                    "Tags on function aliases and versions are not supported. Please specify a function ARN.",
4416
                    Type="User",
4417
                )
4418
            _raise_validation_exception()
1✔
4419

4420
        if resource_type == "event-source-mapping":
1✔
4421
            self._get_esm(resource_identifier, account_id, region)
1✔
4422
        elif resource_type == "code-signing-config":
1✔
4423
            raise NotImplementedError("Resource tagging on CSC not yet implemented.")
4424
        elif resource_type == "function":
1✔
4425
            self._get_function(
1✔
4426
                function_name=resource_identifier, account_id=account_id, region=region
4427
            )
4428
        elif resource_type == "capacity-provider":
1✔
4429
            self._get_capacity_provider(resource_identifier, account_id, region)
1✔
4430
        else:
4431
            _raise_validation_exception()
1✔
4432

4433
        # If no exceptions are raised, assume ARN and referenced resource is valid for tag operations
4434
        return lambda_stores[account_id][region]
1✔
4435

4436
    def tag_resource(
1✔
4437
        self, context: RequestContext, resource: TaggableResource, tags: Tags, **kwargs
4438
    ) -> None:
4439
        if not tags:
1✔
4440
            raise InvalidParameterValueException(
1✔
4441
                "An error occurred and the request cannot be processed.", Type="User"
4442
            )
4443
        self._store_tags(resource, tags)
1✔
4444

4445
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4446
            "function"
4447
        ):
4448
            name, _, account, region = function_locators_from_arn(resource)
1✔
4449
            function = self._get_function(name, account, region)
1✔
4450
            with function.lock:
1✔
4451
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4452
                latest_version = function.versions["$LATEST"]
1✔
4453
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4454
                    latest_version, config=dataclasses.replace(latest_version.config)
4455
                )
4456

4457
    def list_tags(
1✔
4458
        self, context: RequestContext, resource: TaggableResource, **kwargs
4459
    ) -> ListTagsResponse:
4460
        tags = self._get_tags(resource)
1✔
4461
        return ListTagsResponse(Tags=tags)
1✔
4462

4463
    def untag_resource(
1✔
4464
        self, context: RequestContext, resource: TaggableResource, tag_keys: TagKeyList, **kwargs
4465
    ) -> None:
4466
        if not tag_keys:
1✔
4467
            raise ValidationException(
1✔
4468
                "1 validation error detected: Value null at 'tagKeys' failed to satisfy constraint: Member must not be null"
4469
            )  # should probably be generalized a bit
4470

4471
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4472
        state.TAGS.untag_resource(resource, tag_keys)
1✔
4473

4474
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4475
            "function"
4476
        ):
4477
            name, _, account, region = function_locators_from_arn(resource)
1✔
4478
            function = self._get_function(name, account, region)
1✔
4479
            # TODO: Potential race condition
4480
            with function.lock:
1✔
4481
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4482
                latest_version = function.versions["$LATEST"]
1✔
4483
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4484
                    latest_version, config=dataclasses.replace(latest_version.config)
4485
                )
4486

4487
    # =======================================
4488
    # =======  LEGACY / DEPRECATED   ========
4489
    # =======================================
4490

4491
    def invoke_async(
1✔
4492
        self,
4493
        context: RequestContext,
4494
        function_name: NamespacedFunctionName,
4495
        invoke_args: IO[BlobStream],
4496
        **kwargs,
4497
    ) -> InvokeAsyncResponse:
4498
        """LEGACY API endpoint. Even AWS heavily discourages its usage."""
4499
        raise NotImplementedError
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc