• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20101417527

10 Dec 2025 12:31PM UTC coverage: 86.876% (-0.01%) from 86.887%
20101417527

push

github

web-flow
IaC: Enable parity metrics to be captured during release (#13494)

69916 of 80478 relevant lines covered (86.88%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.1
/localstack-core/localstack/services/lambda_/provider.py
1
import base64
1✔
2
import dataclasses
1✔
3
import datetime
1✔
4
import itertools
1✔
5
import json
1✔
6
import logging
1✔
7
import re
1✔
8
import threading
1✔
9
import time
1✔
10
from typing import IO, Any
1✔
11

12
from botocore.exceptions import ClientError
1✔
13

14
from localstack import config
1✔
15
from localstack.aws.api import RequestContext, ServiceException, handler
1✔
16
from localstack.aws.api.lambda_ import (
1✔
17
    AccountLimit,
18
    AccountUsage,
19
    AddLayerVersionPermissionResponse,
20
    AddPermissionRequest,
21
    AddPermissionResponse,
22
    Alias,
23
    AliasConfiguration,
24
    AliasRoutingConfiguration,
25
    AllowedPublishers,
26
    Architecture,
27
    Arn,
28
    Blob,
29
    BlobStream,
30
    CapacityProviderConfig,
31
    CodeSigningConfigArn,
32
    CodeSigningConfigNotFoundException,
33
    CodeSigningPolicies,
34
    CompatibleArchitectures,
35
    CompatibleRuntimes,
36
    Concurrency,
37
    Cors,
38
    CreateCodeSigningConfigResponse,
39
    CreateEventSourceMappingRequest,
40
    CreateFunctionRequest,
41
    CreateFunctionUrlConfigResponse,
42
    DeleteCodeSigningConfigResponse,
43
    DeleteFunctionResponse,
44
    Description,
45
    DestinationConfig,
46
    DurableExecutionName,
47
    EventSourceMappingConfiguration,
48
    FunctionCodeLocation,
49
    FunctionConfiguration,
50
    FunctionEventInvokeConfig,
51
    FunctionName,
52
    FunctionUrlAuthType,
53
    FunctionUrlQualifier,
54
    FunctionVersionLatestPublished,
55
    GetAccountSettingsResponse,
56
    GetCodeSigningConfigResponse,
57
    GetFunctionCodeSigningConfigResponse,
58
    GetFunctionConcurrencyResponse,
59
    GetFunctionRecursionConfigResponse,
60
    GetFunctionResponse,
61
    GetFunctionUrlConfigResponse,
62
    GetLayerVersionPolicyResponse,
63
    GetLayerVersionResponse,
64
    GetPolicyResponse,
65
    GetProvisionedConcurrencyConfigResponse,
66
    InvalidParameterValueException,
67
    InvocationResponse,
68
    InvocationType,
69
    InvokeAsyncResponse,
70
    InvokeMode,
71
    LambdaApi,
72
    LambdaManagedInstancesCapacityProviderConfig,
73
    LastUpdateStatus,
74
    LayerName,
75
    LayerPermissionAllowedAction,
76
    LayerPermissionAllowedPrincipal,
77
    LayersListItem,
78
    LayerVersionArn,
79
    LayerVersionContentInput,
80
    LayerVersionNumber,
81
    LicenseInfo,
82
    ListAliasesResponse,
83
    ListCodeSigningConfigsResponse,
84
    ListEventSourceMappingsResponse,
85
    ListFunctionEventInvokeConfigsResponse,
86
    ListFunctionsByCodeSigningConfigResponse,
87
    ListFunctionsResponse,
88
    ListFunctionUrlConfigsResponse,
89
    ListLayersResponse,
90
    ListLayerVersionsResponse,
91
    ListProvisionedConcurrencyConfigsResponse,
92
    ListTagsResponse,
93
    ListVersionsByFunctionResponse,
94
    LogFormat,
95
    LoggingConfig,
96
    LogType,
97
    MasterRegion,
98
    MaxFunctionEventInvokeConfigListItems,
99
    MaximumEventAgeInSeconds,
100
    MaximumRetryAttempts,
101
    MaxItems,
102
    MaxLayerListItems,
103
    MaxListItems,
104
    MaxProvisionedConcurrencyConfigListItems,
105
    NamespacedFunctionName,
106
    NamespacedStatementId,
107
    NumericLatestPublishedOrAliasQualifier,
108
    OnFailure,
109
    OnSuccess,
110
    OrganizationId,
111
    PackageType,
112
    PositiveInteger,
113
    PreconditionFailedException,
114
    ProvisionedConcurrencyConfigListItem,
115
    ProvisionedConcurrencyConfigNotFoundException,
116
    ProvisionedConcurrencyStatusEnum,
117
    PublishLayerVersionResponse,
118
    PutFunctionCodeSigningConfigResponse,
119
    PutFunctionRecursionConfigResponse,
120
    PutProvisionedConcurrencyConfigResponse,
121
    Qualifier,
122
    RecursiveLoop,
123
    ReservedConcurrentExecutions,
124
    ResourceConflictException,
125
    ResourceNotFoundException,
126
    Runtime,
127
    RuntimeVersionConfig,
128
    SnapStart,
129
    SnapStartApplyOn,
130
    SnapStartOptimizationStatus,
131
    SnapStartResponse,
132
    State,
133
    StatementId,
134
    StateReasonCode,
135
    String,
136
    TaggableResource,
137
    TagKeyList,
138
    Tags,
139
    TenantId,
140
    TracingMode,
141
    UnqualifiedFunctionName,
142
    UpdateCodeSigningConfigResponse,
143
    UpdateEventSourceMappingRequest,
144
    UpdateFunctionCodeRequest,
145
    UpdateFunctionConfigurationRequest,
146
    UpdateFunctionUrlConfigResponse,
147
    VersionWithLatestPublished,
148
)
149
from localstack.aws.api.lambda_ import FunctionVersion as FunctionVersionApi
1✔
150
from localstack.aws.api.lambda_ import ServiceException as LambdaServiceException
1✔
151
from localstack.aws.api.pipes import (
1✔
152
    DynamoDBStreamStartPosition,
153
    KinesisStreamStartPosition,
154
)
155
from localstack.aws.connect import connect_to
1✔
156
from localstack.aws.spec import load_service
1✔
157
from localstack.services.edge import ROUTER
1✔
158
from localstack.services.lambda_ import api_utils
1✔
159
from localstack.services.lambda_ import hooks as lambda_hooks
1✔
160
from localstack.services.lambda_.analytics import (
1✔
161
    FunctionInitializationType,
162
    FunctionOperation,
163
    FunctionStatus,
164
    function_counter,
165
)
166
from localstack.services.lambda_.api_utils import (
1✔
167
    ARCHITECTURES,
168
    STATEMENT_ID_REGEX,
169
    SUBNET_ID_REGEX,
170
    function_locators_from_arn,
171
)
172
from localstack.services.lambda_.event_source_mapping.esm_config_factory import (
1✔
173
    EsmConfigFactory,
174
)
175
from localstack.services.lambda_.event_source_mapping.esm_worker import (
1✔
176
    EsmState,
177
    EsmWorker,
178
)
179
from localstack.services.lambda_.event_source_mapping.esm_worker_factory import (
1✔
180
    EsmWorkerFactory,
181
)
182
from localstack.services.lambda_.event_source_mapping.pipe_utils import get_internal_client
1✔
183
from localstack.services.lambda_.invocation import AccessDeniedException
1✔
184
from localstack.services.lambda_.invocation.execution_environment import (
1✔
185
    EnvironmentStartupTimeoutException,
186
)
187
from localstack.services.lambda_.invocation.lambda_models import (
1✔
188
    AliasRoutingConfig,
189
    CodeSigningConfig,
190
    EventInvokeConfig,
191
    Function,
192
    FunctionResourcePolicy,
193
    FunctionUrlConfig,
194
    FunctionVersion,
195
    ImageConfig,
196
    LambdaEphemeralStorage,
197
    Layer,
198
    LayerPolicy,
199
    LayerPolicyStatement,
200
    LayerVersion,
201
    ProvisionedConcurrencyConfiguration,
202
    RequestEntityTooLargeException,
203
    ResourcePolicy,
204
    UpdateStatus,
205
    ValidationException,
206
    VersionAlias,
207
    VersionFunctionConfiguration,
208
    VersionIdentifier,
209
    VersionState,
210
    VpcConfig,
211
)
212
from localstack.services.lambda_.invocation.lambda_service import (
1✔
213
    LambdaService,
214
    create_image_code,
215
    destroy_code_if_not_used,
216
    lambda_stores,
217
    store_lambda_archive,
218
    store_s3_bucket_archive,
219
)
220
from localstack.services.lambda_.invocation.models import CapacityProvider as CapacityProviderModel
1✔
221
from localstack.services.lambda_.invocation.models import LambdaStore
1✔
222
from localstack.services.lambda_.invocation.runtime_executor import get_runtime_executor
1✔
223
from localstack.services.lambda_.lambda_utils import HINT_LOG
1✔
224
from localstack.services.lambda_.layerfetcher.layer_fetcher import LayerFetcher
1✔
225
from localstack.services.lambda_.provider_utils import (
1✔
226
    LambdaLayerVersionIdentifier,
227
    get_function_version,
228
    get_function_version_from_arn,
229
)
230
from localstack.services.lambda_.runtimes import (
1✔
231
    ALL_RUNTIMES,
232
    DEPRECATED_RUNTIMES,
233
    DEPRECATED_RUNTIMES_UPGRADES,
234
    RUNTIMES_AGGREGATED,
235
    SNAP_START_SUPPORTED_RUNTIMES,
236
    VALID_RUNTIMES,
237
)
238
from localstack.services.lambda_.urlrouter import FunctionUrlRouter
1✔
239
from localstack.services.plugins import ServiceLifecycleHook
1✔
240
from localstack.state import StateVisitor
1✔
241
from localstack.utils.aws.arns import (
1✔
242
    ArnData,
243
    capacity_provider_arn,
244
    extract_resource_from_arn,
245
    extract_service_from_arn,
246
    get_partition,
247
    lambda_event_source_mapping_arn,
248
    parse_arn,
249
)
250
from localstack.utils.aws.client_types import ServicePrincipal
1✔
251
from localstack.utils.bootstrap import is_api_enabled
1✔
252
from localstack.utils.collections import PaginatedList, merge_recursive
1✔
253
from localstack.utils.event_matcher import validate_event_pattern
1✔
254
from localstack.utils.strings import get_random_hex, short_uid, to_bytes, to_str
1✔
255
from localstack.utils.sync import poll_condition
1✔
256
from localstack.utils.urls import localstack_host
1✔
257

258
LOG = logging.getLogger(__name__)
1✔
259

260
CAPACITY_PROVIDER_ARN_NAME = "arn:aws[a-zA-Z-]*:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:capacity-provider:[a-zA-Z0-9-_]+"
1✔
261
LAMBDA_DEFAULT_TIMEOUT = 3
1✔
262
LAMBDA_DEFAULT_MEMORY_SIZE = 128
1✔
263

264
LAMBDA_TAG_LIMIT_PER_RESOURCE = 50
1✔
265
LAMBDA_LAYERS_LIMIT_PER_FUNCTION = 5
1✔
266

267
TAG_KEY_CUSTOM_URL = "_custom_id_"
1✔
268
# Requirements (from RFC3986 & co): not longer than 63, first char must be
269
# alpha, then alphanumeric or hyphen, except cannot start or end with hyphen
270
TAG_KEY_CUSTOM_URL_VALIDATOR = re.compile(r"^[A-Za-z]([A-Za-z0-9\-]{0,61}[A-Za-z0-9])?$")
1✔
271

272

273
class LambdaProvider(LambdaApi, ServiceLifecycleHook):
1✔
274
    lambda_service: LambdaService
1✔
275
    create_fn_lock: threading.RLock
1✔
276
    create_layer_lock: threading.RLock
1✔
277
    router: FunctionUrlRouter
1✔
278
    esm_workers: dict[str, EsmWorker]
1✔
279
    layer_fetcher: LayerFetcher | None
1✔
280

281
    def __init__(self) -> None:
1✔
282
        self.lambda_service = LambdaService()
1✔
283
        self.create_fn_lock = threading.RLock()
1✔
284
        self.create_layer_lock = threading.RLock()
1✔
285
        self.router = FunctionUrlRouter(ROUTER, self.lambda_service)
1✔
286
        self.esm_workers = {}
1✔
287
        self.layer_fetcher = None
1✔
288
        lambda_hooks.inject_layer_fetcher.run(self)
1✔
289

290
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
291
        visitor.visit(lambda_stores)
×
292

293
    def on_before_state_reset(self):
1✔
294
        self.lambda_service.stop()
×
295

296
    def on_after_state_reset(self):
1✔
297
        self.router.lambda_service = self.lambda_service = LambdaService()
×
298

299
    def on_before_state_load(self):
1✔
300
        self.lambda_service.stop()
×
301

302
    def on_after_state_load(self):
1✔
303
        self.lambda_service = LambdaService()
×
304
        self.router.lambda_service = self.lambda_service
×
305

306
        for account_id, account_bundle in lambda_stores.items():
×
307
            for region_name, state in account_bundle.items():
×
308
                for fn in state.functions.values():
×
309
                    for fn_version in fn.versions.values():
×
310
                        # restore the "Pending" state for every function version and start it
311
                        try:
×
312
                            new_state = VersionState(
×
313
                                state=State.Pending,
314
                                code=StateReasonCode.Creating,
315
                                reason="The function is being created.",
316
                            )
317
                            new_config = dataclasses.replace(fn_version.config, state=new_state)
×
318
                            new_version = dataclasses.replace(fn_version, config=new_config)
×
319
                            fn.versions[fn_version.id.qualifier] = new_version
×
320
                            # TODO: consider skipping this for $LATEST versions of functions with a capacity provider
321
                            self.lambda_service.create_function_version(fn_version).result(
×
322
                                timeout=5
323
                            )
324
                        except Exception:
×
325
                            LOG.warning(
×
326
                                "Failed to restore function version %s",
327
                                fn_version.id.qualified_arn(),
328
                                exc_info=LOG.isEnabledFor(logging.DEBUG),
329
                            )
330
                    # restore provisioned concurrency per function considering both versions and aliases
331
                    for (
×
332
                        provisioned_qualifier,
333
                        provisioned_config,
334
                    ) in fn.provisioned_concurrency_configs.items():
335
                        fn_arn = None
×
336
                        try:
×
337
                            if api_utils.qualifier_is_alias(provisioned_qualifier):
×
338
                                alias = fn.aliases.get(provisioned_qualifier)
×
339
                                resolved_version = fn.versions.get(alias.function_version)
×
340
                                fn_arn = resolved_version.id.qualified_arn()
×
341
                            elif api_utils.qualifier_is_version(provisioned_qualifier):
×
342
                                fn_version = fn.versions.get(provisioned_qualifier)
×
343
                                fn_arn = fn_version.id.qualified_arn()
×
344
                            else:
345
                                raise InvalidParameterValueException(
×
346
                                    "Invalid qualifier type:"
347
                                    " Qualifier can only be an alias or a version for provisioned concurrency."
348
                                )
349

350
                            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
351
                            manager.update_provisioned_concurrency_config(
×
352
                                provisioned_config.provisioned_concurrent_executions
353
                            )
354
                        except Exception:
×
355
                            LOG.warning(
×
356
                                "Failed to restore provisioned concurrency %s for function %s",
357
                                provisioned_config,
358
                                fn_arn,
359
                                exc_info=LOG.isEnabledFor(logging.DEBUG),
360
                            )
361

362
                for esm in state.event_source_mappings.values():
×
363
                    # Restores event source workers
364
                    function_arn = esm.get("FunctionArn")
×
365

366
                    # TODO: How do we know the event source is up?
367
                    # A basic poll to see if the mapped Lambda function is active/failed
368
                    if not poll_condition(
×
369
                        lambda: get_function_version_from_arn(function_arn).config.state.state
370
                        in [State.Active, State.Failed],
371
                        timeout=10,
372
                    ):
373
                        LOG.warning(
×
374
                            "Creating ESM for Lambda that is not in running state: %s",
375
                            function_arn,
376
                        )
377

378
                    function_version = get_function_version_from_arn(function_arn)
×
379
                    function_role = function_version.config.role
×
380

381
                    is_esm_enabled = esm.get("State", EsmState.DISABLED) not in (
×
382
                        EsmState.DISABLED,
383
                        EsmState.DISABLING,
384
                    )
385
                    esm_worker = EsmWorkerFactory(
×
386
                        esm, function_role, is_esm_enabled
387
                    ).get_esm_worker()
388

389
                    # Note: a worker is created in the DISABLED state if not enabled
390
                    esm_worker.create()
×
391
                    # TODO: assigning the esm_worker to the dict only works after .create(). Could it cause a race
392
                    #  condition if we get a shutdown here and have a worker thread spawned but not accounted for?
393
                    self.esm_workers[esm_worker.uuid] = esm_worker
×
394

395
    def on_after_init(self):
1✔
396
        self.router.register_routes()
1✔
397
        get_runtime_executor().validate_environment()
1✔
398

399
    def on_before_stop(self) -> None:
1✔
400
        for esm_worker in self.esm_workers.values():
1✔
401
            esm_worker.stop_for_shutdown()
1✔
402

403
        # TODO: should probably unregister routes?
404
        self.lambda_service.stop()
1✔
405

406
    @staticmethod
1✔
407
    def _get_function(function_name: str, account_id: str, region: str) -> Function:
1✔
408
        state = lambda_stores[account_id][region]
1✔
409
        function = state.functions.get(function_name)
1✔
410
        if not function:
1✔
411
            arn = api_utils.unqualified_lambda_arn(
1✔
412
                function_name=function_name,
413
                account=account_id,
414
                region=region,
415
            )
416
            raise ResourceNotFoundException(
1✔
417
                f"Function not found: {arn}",
418
                Type="User",
419
            )
420
        return function
1✔
421

422
    @staticmethod
1✔
423
    def _get_esm(uuid: str, account_id: str, region: str) -> EventSourceMappingConfiguration:
1✔
424
        state = lambda_stores[account_id][region]
1✔
425
        esm = state.event_source_mappings.get(uuid)
1✔
426
        if not esm:
1✔
427
            arn = lambda_event_source_mapping_arn(uuid, account_id, region)
1✔
428
            raise ResourceNotFoundException(
1✔
429
                f"Event source mapping not found: {arn}",
430
                Type="User",
431
            )
432
        return esm
1✔
433

434
    @staticmethod
1✔
435
    def _get_capacity_provider(
1✔
436
        capacity_provider_name: str,
437
        account_id: str,
438
        region: str,
439
        error_msg_template: str = "Capacity provider not found: {}",
440
    ) -> CapacityProviderModel:
441
        state = lambda_stores[account_id][region]
1✔
442
        cp = state.capacity_providers.get(capacity_provider_name)
1✔
443
        if not cp:
1✔
444
            arn = capacity_provider_arn(capacity_provider_name, account_id, region)
1✔
445
            raise ResourceNotFoundException(
1✔
446
                error_msg_template.format(arn),
447
                Type="User",
448
            )
449
        return cp
×
450

451
    @staticmethod
1✔
452
    def _validate_qualifier_expression(qualifier: str) -> None:
1✔
453
        if error_messages := api_utils.validate_qualifier(qualifier):
1✔
454
            raise ValidationException(
×
455
                message=api_utils.construct_validation_exception_message(error_messages)
456
            )
457

458
    @staticmethod
1✔
459
    def _resolve_fn_qualifier(resolved_fn: Function, qualifier: str | None) -> tuple[str, str]:
1✔
460
        """Attempts to resolve a given qualifier and returns a qualifier that exists or
461
        raises an appropriate ResourceNotFoundException.
462

463
        :param resolved_fn: The resolved lambda function
464
        :param qualifier: The qualifier to be resolved or None
465
        :return: Tuple of (resolved qualifier, function arn either qualified or unqualified)"""
466
        function_name = resolved_fn.function_name
1✔
467
        # assuming function versions need to live in the same account and region
468
        account_id = resolved_fn.latest().id.account
1✔
469
        region = resolved_fn.latest().id.region
1✔
470
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
471
        if qualifier is not None:
1✔
472
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
473
            if api_utils.qualifier_is_alias(qualifier):
1✔
474
                if qualifier not in resolved_fn.aliases:
1✔
475
                    raise ResourceNotFoundException(f"Cannot find alias arn: {fn_arn}", Type="User")
1✔
476
            elif api_utils.qualifier_is_version(qualifier) or qualifier == "$LATEST":
1✔
477
                if qualifier not in resolved_fn.versions:
1✔
478
                    raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
479
            else:
480
                # matches qualifier pattern but invalid alias or version
481
                raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
482
        resolved_qualifier = qualifier or "$LATEST"
1✔
483
        return resolved_qualifier, fn_arn
1✔
484

485
    @staticmethod
1✔
486
    def _function_revision_id(resolved_fn: Function, resolved_qualifier: str) -> str:
1✔
487
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
488
            return resolved_fn.aliases[resolved_qualifier].revision_id
1✔
489
        # Assumes that a non-alias is a version
490
        else:
491
            return resolved_fn.versions[resolved_qualifier].config.revision_id
1✔
492

493
    def _resolve_vpc_id(self, account_id: str, region_name: str, subnet_id: str) -> str:
1✔
494
        ec2_client = connect_to(aws_access_key_id=account_id, region_name=region_name).ec2
1✔
495
        try:
1✔
496
            return ec2_client.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["VpcId"]
1✔
497
        except ec2_client.exceptions.ClientError as e:
1✔
498
            code = e.response["Error"]["Code"]
1✔
499
            message = e.response["Error"]["Message"]
1✔
500
            raise InvalidParameterValueException(
1✔
501
                f"Error occurred while DescribeSubnets. EC2 Error Code: {code}. EC2 Error Message: {message}",
502
                Type="User",
503
            )
504

505
    def _build_vpc_config(
1✔
506
        self,
507
        account_id: str,
508
        region_name: str,
509
        vpc_config: dict | None = None,
510
    ) -> VpcConfig | None:
511
        if not vpc_config or not is_api_enabled("ec2"):
1✔
512
            return None
1✔
513

514
        subnet_ids = vpc_config.get("SubnetIds", [])
1✔
515
        if subnet_ids is not None and len(subnet_ids) == 0:
1✔
516
            return VpcConfig(vpc_id="", security_group_ids=[], subnet_ids=[])
1✔
517

518
        subnet_id = subnet_ids[0]
1✔
519
        if not bool(SUBNET_ID_REGEX.match(subnet_id)):
1✔
520
            raise ValidationException(
1✔
521
                f"1 validation error detected: Value '[{subnet_id}]' at 'vpcConfig.subnetIds' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 0, Member must satisfy regular expression pattern: subnet-[0-9a-z]*]"
522
            )
523

524
        return VpcConfig(
1✔
525
            vpc_id=self._resolve_vpc_id(account_id, region_name, subnet_id),
526
            security_group_ids=vpc_config.get("SecurityGroupIds", []),
527
            subnet_ids=subnet_ids,
528
        )
529

530
    def _create_version_model(
1✔
531
        self,
532
        function_name: str,
533
        region: str,
534
        account_id: str,
535
        description: str | None = None,
536
        revision_id: str | None = None,
537
        code_sha256: str | None = None,
538
        publish_to: FunctionVersionLatestPublished | None = None,
539
        is_active: bool = False,
540
    ) -> tuple[FunctionVersion, bool]:
541
        """
542
        Release a new version to the model if all restrictions are met.
543
        Restrictions:
544
          - CodeSha256, if provided, must equal the current latest version code hash
545
          - RevisionId, if provided, must equal the current latest version revision id
546
          - Some changes have been done to the latest version since last publish
547
        Will return a tuple of the version, and whether the version was published (True) or the latest available version was taken (False).
548
        This can happen if the latest version has not been changed since the last version publish, in this case the last version will be returned.
549

550
        :param function_name: Function name to be published
551
        :param region: Region of the function
552
        :param account_id: Account of the function
553
        :param description: new description of the version (will be the description of the function if missing)
554
        :param revision_id: Revision id, function will raise error if it does not match latest revision id
555
        :param code_sha256: Code sha256, function will raise error if it does not match latest code hash
556
        :return: Tuple of (published version, whether version was released or last released version returned, since nothing changed)
557
        """
558
        current_latest_version = get_function_version(
1✔
559
            function_name=function_name, qualifier="$LATEST", account_id=account_id, region=region
560
        )
561
        if revision_id and current_latest_version.config.revision_id != revision_id:
1✔
562
            raise PreconditionFailedException(
1✔
563
                "The Revision Id provided does not match the latest Revision Id. Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
564
                Type="User",
565
            )
566

567
        # check if code hashes match if they are specified
568
        current_hash = (
1✔
569
            current_latest_version.config.code.code_sha256
570
            if current_latest_version.config.package_type == PackageType.Zip
571
            else current_latest_version.config.image.code_sha256
572
        )
573
        # if the code is a zip package and hot reloaded (hot reloading is currently only supported for zip packagetypes)
574
        # we cannot enforce the codesha256 check
575
        is_hot_reloaded_zip_package = (
1✔
576
            current_latest_version.config.package_type == PackageType.Zip
577
            and current_latest_version.config.code.is_hot_reloading()
578
        )
579
        if code_sha256 and current_hash != code_sha256 and not is_hot_reloaded_zip_package:
1✔
580
            raise InvalidParameterValueException(
1✔
581
                f"CodeSHA256 ({code_sha256}) is different from current CodeSHA256 in $LATEST ({current_hash}). Please try again with the CodeSHA256 in $LATEST.",
582
                Type="User",
583
            )
584

585
        state = lambda_stores[account_id][region]
1✔
586
        function = state.functions.get(function_name)
1✔
587
        changes = {}
1✔
588
        if description is not None:
1✔
589
            changes["description"] = description
1✔
590
        # TODO copy environment instead of restarting one, get rid of all the "Pending"s
591

592
        with function.lock:
1✔
593
            if function.next_version > 1 and (
1✔
594
                prev_version := function.versions.get(str(function.next_version - 1))
595
            ):
596
                if (
1✔
597
                    prev_version.config.internal_revision
598
                    == current_latest_version.config.internal_revision
599
                ):
600
                    return prev_version, False
1✔
601
            # TODO check if there was a change since last version
602
            if publish_to == FunctionVersionLatestPublished.LATEST_PUBLISHED:
1✔
603
                qualifier = "$LATEST.PUBLISHED"
×
604
            else:
605
                qualifier = str(function.next_version)
1✔
606
                function.next_version += 1
1✔
607
            new_id = VersionIdentifier(
1✔
608
                function_name=function_name,
609
                qualifier=qualifier,
610
                region=region,
611
                account=account_id,
612
            )
613

614
            if current_latest_version.config.CapacityProviderConfig:
1✔
615
                # for lambda managed functions, snap start is not supported
616
                snap_start = None
×
617
            else:
618
                apply_on = current_latest_version.config.snap_start["ApplyOn"]
1✔
619
                optimization_status = SnapStartOptimizationStatus.Off
1✔
620
                if apply_on == SnapStartApplyOn.PublishedVersions:
1✔
621
                    optimization_status = SnapStartOptimizationStatus.On
×
622
                snap_start = SnapStartResponse(
1✔
623
                    ApplyOn=apply_on,
624
                    OptimizationStatus=optimization_status,
625
                )
626

627
            last_update = None
1✔
628
            new_state = VersionState(
1✔
629
                state=State.Pending,
630
                code=StateReasonCode.Creating,
631
                reason="The function is being created.",
632
            )
633
            if publish_to == FunctionVersionLatestPublished.LATEST_PUBLISHED:
1✔
634
                last_update = UpdateStatus(
×
635
                    status=LastUpdateStatus.InProgress,
636
                    code="Updating",
637
                    reason="The function is being updated.",
638
                )
639
                if is_active:
×
640
                    new_state = VersionState(state=State.Active)
×
641
            new_version = dataclasses.replace(
1✔
642
                current_latest_version,
643
                config=dataclasses.replace(
644
                    current_latest_version.config,
645
                    last_update=last_update,
646
                    state=new_state,
647
                    snap_start=snap_start,
648
                    **changes,
649
                ),
650
                id=new_id,
651
            )
652
            function.versions[qualifier] = new_version
1✔
653
        return new_version, True
1✔
654

655
    def _publish_version_from_existing_version(
1✔
656
        self,
657
        function_name: str,
658
        region: str,
659
        account_id: str,
660
        description: str | None = None,
661
        revision_id: str | None = None,
662
        code_sha256: str | None = None,
663
        publish_to: FunctionVersionLatestPublished | None = None,
664
    ) -> FunctionVersion:
665
        """
666
        Publish version from an existing, already initialized LATEST
667

668
        :param function_name: Function name
669
        :param region: region
670
        :param account_id: account id
671
        :param description: description
672
        :param revision_id: revision id (check if current version matches)
673
        :param code_sha256: code sha (check if current code matches)
674
        :return: new version
675
        """
676
        is_active = True if publish_to == FunctionVersionLatestPublished.LATEST_PUBLISHED else False
1✔
677
        new_version, changed = self._create_version_model(
1✔
678
            function_name=function_name,
679
            region=region,
680
            account_id=account_id,
681
            description=description,
682
            revision_id=revision_id,
683
            code_sha256=code_sha256,
684
            publish_to=publish_to,
685
            is_active=is_active,
686
        )
687
        if not changed:
1✔
688
            return new_version
1✔
689

690
        if new_version.config.CapacityProviderConfig:
1✔
691
            self.lambda_service.publish_version_async(new_version)
×
692
        else:
693
            self.lambda_service.publish_version(new_version)
1✔
694
        state = lambda_stores[account_id][region]
1✔
695
        function = state.functions.get(function_name)
1✔
696

697
        # Update revision id for $LATEST version
698
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
699
        latest_version = function.versions["$LATEST"]
1✔
700
        function.versions["$LATEST"] = dataclasses.replace(
1✔
701
            latest_version, config=dataclasses.replace(latest_version.config)
702
        )
703
        if new_version.config.CapacityProviderConfig:
1✔
704
            # publish_version happens async for functions with a capacity provider.
705
            # Therefore, we return the new_version with State=Pending or LastUpdateStatus=InProgress ($LATEST.PUBLISHED)
706
            return new_version
×
707
        else:
708
            # Regular functions yield an Active state modified during `publish_version` (sync).
709
            # Therefore, we need to get the updated version from the store.
710
            updated_version = function.versions.get(new_version.id.qualifier)
1✔
711
            return updated_version
1✔
712

713
    def _publish_version_with_changes(
1✔
714
        self,
715
        function_name: str,
716
        region: str,
717
        account_id: str,
718
        description: str | None = None,
719
        revision_id: str | None = None,
720
        code_sha256: str | None = None,
721
        publish_to: FunctionVersionLatestPublished | None = None,
722
        is_active: bool = False,
723
    ) -> FunctionVersion:
724
        """
725
        Publish version together with a new latest version (publish on create / update)
726

727
        :param function_name: Function name
728
        :param region: region
729
        :param account_id: account id
730
        :param description: description
731
        :param revision_id: revision id (check if current version matches)
732
        :param code_sha256: code sha (check if current code matches)
733
        :return: new version
734
        """
735
        new_version, changed = self._create_version_model(
1✔
736
            function_name=function_name,
737
            region=region,
738
            account_id=account_id,
739
            description=description,
740
            revision_id=revision_id,
741
            code_sha256=code_sha256,
742
            publish_to=publish_to,
743
            is_active=is_active,
744
        )
745
        if not changed:
1✔
746
            return new_version
×
747
        self.lambda_service.create_function_version(new_version)
1✔
748
        return new_version
1✔
749

750
    @staticmethod
1✔
751
    def _verify_env_variables(env_vars: dict[str, str]):
1✔
752
        dumped_env_vars = json.dumps(env_vars, separators=(",", ":"))
1✔
753
        if (
1✔
754
            len(dumped_env_vars.encode("utf-8"))
755
            > config.LAMBDA_LIMITS_MAX_FUNCTION_ENVVAR_SIZE_BYTES
756
        ):
757
            raise InvalidParameterValueException(
1✔
758
                f"Lambda was unable to configure your environment variables because the environment variables you have provided exceeded the 4KB limit. String measured: {dumped_env_vars}",
759
                Type="User",
760
            )
761

762
    @staticmethod
1✔
763
    def _validate_snapstart(snap_start: SnapStart, runtime: Runtime):
1✔
764
        apply_on = snap_start.get("ApplyOn")
1✔
765
        if apply_on not in [
1✔
766
            SnapStartApplyOn.PublishedVersions,
767
            SnapStartApplyOn.None_,
768
        ]:
769
            raise ValidationException(
1✔
770
                f"1 validation error detected: Value '{apply_on}' at 'snapStart.applyOn' failed to satisfy constraint: Member must satisfy enum value set: [PublishedVersions, None]"
771
            )
772

773
        if runtime not in SNAP_START_SUPPORTED_RUNTIMES:
1✔
774
            raise InvalidParameterValueException(
×
775
                f"{runtime} is not supported for SnapStart enabled functions.", Type="User"
776
            )
777

778
    def _validate_layers(self, new_layers: list[str], region: str, account_id: str):
1✔
779
        if len(new_layers) > LAMBDA_LAYERS_LIMIT_PER_FUNCTION:
1✔
780
            raise InvalidParameterValueException(
1✔
781
                "Cannot reference more than 5 layers.", Type="User"
782
            )
783

784
        visited_layers = {}
1✔
785
        for layer_version_arn in new_layers:
1✔
786
            (
1✔
787
                layer_region,
788
                layer_account_id,
789
                layer_name,
790
                layer_version_str,
791
            ) = api_utils.parse_layer_arn(layer_version_arn)
792
            if layer_version_str is None:
1✔
793
                raise ValidationException(
1✔
794
                    f"1 validation error detected: Value '[{layer_version_arn}]'"
795
                    + " at 'layers' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 2048, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: "
796
                    + "(arn:(aws[a-zA-Z-]*)?:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+), Member must not be null]",
797
                )
798

799
            state = lambda_stores[layer_account_id][layer_region]
1✔
800
            layer = state.layers.get(layer_name)
1✔
801
            layer_version = None
1✔
802
            if layer is not None:
1✔
803
                layer_version = layer.layer_versions.get(layer_version_str)
1✔
804
            if layer_account_id == account_id:
1✔
805
                if region and layer_region != region:
1✔
806
                    raise InvalidParameterValueException(
1✔
807
                        f"Layers are not in the same region as the function. "
808
                        f"Layers are expected to be in region {region}.",
809
                        Type="User",
810
                    )
811
                if layer is None or layer.layer_versions.get(layer_version_str) is None:
1✔
812
                    raise InvalidParameterValueException(
1✔
813
                        f"Layer version {layer_version_arn} does not exist.", Type="User"
814
                    )
815
            else:  # External layer from other account
816
                # TODO: validate IAM layer policy here, allowing access by default for now and only checking region
817
                if region and layer_region != region:
×
818
                    # TODO: detect user or role from context when IAM users are implemented
819
                    user = "user/localstack-testing"
×
820
                    raise AccessDeniedException(
×
821
                        f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
822
                    )
823
                if layer is None or layer_version is None:
×
824
                    # Limitation: cannot fetch external layers when using the same account id as the target layer
825
                    # because we do not want to trigger the layer fetcher for every non-existing layer.
826
                    if self.layer_fetcher is None:
×
827
                        raise NotImplementedError(
828
                            "Fetching shared layers from AWS is a pro feature."
829
                        )
830

831
                    layer = self.layer_fetcher.fetch_layer(layer_version_arn)
×
832
                    if layer is None:
×
833
                        # TODO: detect user or role from context when IAM users are implemented
834
                        user = "user/localstack-testing"
×
835
                        raise AccessDeniedException(
×
836
                            f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
837
                        )
838

839
                    # Distinguish between new layer and new layer version
840
                    if layer_version is None:
×
841
                        # Create whole layer from scratch
842
                        state.layers[layer_name] = layer
×
843
                    else:
844
                        # Create layer version if another version of the same layer already exists
845
                        state.layers[layer_name].layer_versions[layer_version_str] = (
×
846
                            layer.layer_versions.get(layer_version_str)
847
                        )
848

849
            # only the first two matches in the array are considered for the error message
850
            layer_arn = ":".join(layer_version_arn.split(":")[:-1])
1✔
851
            if layer_arn in visited_layers:
1✔
852
                conflict_layer_version_arn = visited_layers[layer_arn]
1✔
853
                raise InvalidParameterValueException(
1✔
854
                    f"Two different versions of the same layer are not allowed to be referenced in the same function. {conflict_layer_version_arn} and {layer_version_arn} are versions of the same layer.",
855
                    Type="User",
856
                )
857
            visited_layers[layer_arn] = layer_version_arn
1✔
858

859
    def _validate_capacity_provider_config(
1✔
860
        self, capacity_provider_config: CapacityProviderConfig, context: RequestContext
861
    ):
862
        if not capacity_provider_config.get("LambdaManagedInstancesCapacityProviderConfig"):
×
863
            raise ValidationException(
×
864
                "1 validation error detected: Value null at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig' failed to satisfy constraint: Member must not be null"
865
            )
866

867
        capacity_provider_arn = capacity_provider_config.get(
×
868
            "LambdaManagedInstancesCapacityProviderConfig", {}
869
        ).get("CapacityProviderArn")
870
        if not capacity_provider_arn:
×
871
            raise ValidationException(
×
872
                "1 validation error detected: Value null at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig.capacityProviderArn' failed to satisfy constraint: Member must not be null"
873
            )
874

875
        if not re.match(CAPACITY_PROVIDER_ARN_NAME, capacity_provider_arn):
×
876
            raise ValidationException(
×
877
                f"1 validation error detected: Value '{capacity_provider_arn}' at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig.capacityProviderArn' failed to satisfy constraint: Member must satisfy regular expression pattern: {CAPACITY_PROVIDER_ARN_NAME}"
878
            )
879

880
        capacity_provider_name = capacity_provider_arn.split(":")[-1]
×
881
        self.get_capacity_provider(context, capacity_provider_name)
×
882

883
    @staticmethod
1✔
884
    def map_layers(new_layers: list[str]) -> list[LayerVersion]:
1✔
885
        layers = []
1✔
886
        for layer_version_arn in new_layers:
1✔
887
            region_name, account_id, layer_name, layer_version = api_utils.parse_layer_arn(
1✔
888
                layer_version_arn
889
            )
890
            layer = lambda_stores[account_id][region_name].layers.get(layer_name)
1✔
891
            layer_version = layer.layer_versions.get(layer_version)
1✔
892
            layers.append(layer_version)
1✔
893
        return layers
1✔
894

895
    def get_function_recursion_config(
1✔
896
        self,
897
        context: RequestContext,
898
        function_name: UnqualifiedFunctionName,
899
        **kwargs,
900
    ) -> GetFunctionRecursionConfigResponse:
901
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
902
        function_name = api_utils.get_function_name(function_name, context)
1✔
903
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
904
        return GetFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
905

906
    def put_function_recursion_config(
1✔
907
        self,
908
        context: RequestContext,
909
        function_name: UnqualifiedFunctionName,
910
        recursive_loop: RecursiveLoop,
911
        **kwargs,
912
    ) -> PutFunctionRecursionConfigResponse:
913
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
914
        function_name = api_utils.get_function_name(function_name, context)
1✔
915

916
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
917

918
        allowed_values = list(RecursiveLoop.__members__.values())
1✔
919
        if recursive_loop not in allowed_values:
1✔
920
            raise ValidationException(
1✔
921
                f"1 validation error detected: Value '{recursive_loop}' at 'recursiveLoop' failed to satisfy constraint: "
922
                f"Member must satisfy enum value set: [Terminate, Allow]"
923
            )
924

925
        fn.recursive_loop = recursive_loop
1✔
926
        return PutFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
927

928
    @handler(operation="CreateFunction", expand=False)
1✔
929
    def create_function(
1✔
930
        self,
931
        context: RequestContext,
932
        request: CreateFunctionRequest,
933
    ) -> FunctionConfiguration:
934
        context_region = context.region
1✔
935
        context_account_id = context.account_id
1✔
936

937
        zip_file = request.get("Code", {}).get("ZipFile")
1✔
938
        if zip_file and len(zip_file) > config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED:
1✔
939
            raise RequestEntityTooLargeException(
1✔
940
                f"Zipped size must be smaller than {config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED} bytes"
941
            )
942

943
        if context.request.content_length > config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE:
1✔
944
            raise RequestEntityTooLargeException(
1✔
945
                f"Request must be smaller than {config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE} bytes for the CreateFunction operation"
946
            )
947

948
        if architectures := request.get("Architectures"):
1✔
949
            if len(architectures) != 1:
1✔
950
                raise ValidationException(
1✔
951
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
952
                    f"satisfy constraint: Member must have length less than or equal to 1",
953
                )
954
            if architectures[0] not in ARCHITECTURES:
1✔
955
                raise ValidationException(
1✔
956
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
957
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
958
                    f"[x86_64, arm64], Member must not be null]",
959
                )
960

961
        if env_vars := request.get("Environment", {}).get("Variables"):
1✔
962
            self._verify_env_variables(env_vars)
1✔
963

964
        if layers := request.get("Layers", []):
1✔
965
            self._validate_layers(layers, region=context_region, account_id=context_account_id)
1✔
966

967
        if not api_utils.is_role_arn(request.get("Role")):
1✔
968
            raise ValidationException(
1✔
969
                f"1 validation error detected: Value '{request.get('Role')}'"
970
                + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
971
            )
972
        if not self.lambda_service.can_assume_role(request.get("Role"), context.region):
1✔
973
            raise InvalidParameterValueException(
×
974
                "The role defined for the function cannot be assumed by Lambda.", Type="User"
975
            )
976
        package_type = request.get("PackageType", PackageType.Zip)
1✔
977
        runtime = request.get("Runtime")
1✔
978
        self._validate_runtime(package_type, runtime)
1✔
979

980
        request_function_name = request.get("FunctionName")
1✔
981

982
        function_name, *_ = api_utils.get_name_and_qualifier(
1✔
983
            function_arn_or_name=request_function_name,
984
            qualifier=None,
985
            context=context,
986
        )
987

988
        if runtime in DEPRECATED_RUNTIMES:
1✔
989
            LOG.warning(
1✔
990
                "The Lambda runtime %s} is deprecated. "
991
                "Please upgrade the runtime for the function %s: "
992
                "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
993
                runtime,
994
                function_name,
995
            )
996
        if snap_start := request.get("SnapStart"):
1✔
997
            self._validate_snapstart(snap_start, runtime)
1✔
998
        state = lambda_stores[context_account_id][context_region]
1✔
999

1000
        with self.create_fn_lock:
1✔
1001
            if function_name in state.functions:
1✔
1002
                raise ResourceConflictException(f"Function already exist: {function_name}")
×
1003
            fn = Function(function_name=function_name)
1✔
1004
            arn = VersionIdentifier(
1✔
1005
                function_name=function_name,
1006
                qualifier="$LATEST",
1007
                region=context_region,
1008
                account=context_account_id,
1009
            )
1010
            # save function code to s3
1011
            code = None
1✔
1012
            image = None
1✔
1013
            image_config = None
1✔
1014
            runtime_version_config = RuntimeVersionConfig(
1✔
1015
                # Limitation: the runtime id (presumably sha256 of image) is currently hardcoded
1016
                # Potential implementation: provide (cached) sha256 hash of used Docker image
1017
                RuntimeVersionArn=f"arn:{context.partition}:lambda:{context_region}::runtime:8eeff65f6809a3ce81507fe733fe09b835899b99481ba22fd75b5a7338290ec1"
1018
            )
1019
            request_code = request.get("Code")
1✔
1020
            if package_type == PackageType.Zip:
1✔
1021
                # TODO verify if correct combination of code is set
1022
                if zip_file := request_code.get("ZipFile"):
1✔
1023
                    code = store_lambda_archive(
1✔
1024
                        archive_file=zip_file,
1025
                        function_name=function_name,
1026
                        region_name=context_region,
1027
                        account_id=context_account_id,
1028
                    )
1029
                elif s3_bucket := request_code.get("S3Bucket"):
1✔
1030
                    s3_key = request_code["S3Key"]
1✔
1031
                    s3_object_version = request_code.get("S3ObjectVersion")
1✔
1032
                    code = store_s3_bucket_archive(
1✔
1033
                        archive_bucket=s3_bucket,
1034
                        archive_key=s3_key,
1035
                        archive_version=s3_object_version,
1036
                        function_name=function_name,
1037
                        region_name=context_region,
1038
                        account_id=context_account_id,
1039
                    )
1040
                else:
1041
                    raise LambdaServiceException("A ZIP file or S3 bucket is required")
×
1042
            elif package_type == PackageType.Image:
1✔
1043
                image = request_code.get("ImageUri")
1✔
1044
                if not image:
1✔
1045
                    raise LambdaServiceException(
×
1046
                        "An image is required when the package type is set to 'image'"
1047
                    )
1048
                image = create_image_code(image_uri=image)
1✔
1049

1050
                image_config_req = request.get("ImageConfig", {})
1✔
1051
                image_config = ImageConfig(
1✔
1052
                    command=image_config_req.get("Command"),
1053
                    entrypoint=image_config_req.get("EntryPoint"),
1054
                    working_directory=image_config_req.get("WorkingDirectory"),
1055
                )
1056
                # Runtime management controls are not available when providing a custom image
1057
                runtime_version_config = None
1✔
1058

1059
            capacity_provider_config = None
1✔
1060
            memory_size = request.get("MemorySize", LAMBDA_DEFAULT_MEMORY_SIZE)
1✔
1061
            if "CapacityProviderConfig" in request:
1✔
1062
                capacity_provider_config = request["CapacityProviderConfig"]
×
1063
                self._validate_capacity_provider_config(capacity_provider_config, context)
×
1064

1065
                default_config = CapacityProviderConfig(
×
1066
                    LambdaManagedInstancesCapacityProviderConfig=LambdaManagedInstancesCapacityProviderConfig(
1067
                        ExecutionEnvironmentMemoryGiBPerVCpu=2.0,
1068
                        PerExecutionEnvironmentMaxConcurrency=16,
1069
                    )
1070
                )
1071
                capacity_provider_config = merge_recursive(default_config, capacity_provider_config)
×
1072
                memory_size = 2048
×
1073
                if request.get("LoggingConfig", {}).get("LogFormat") == LogFormat.Text:
×
1074
                    raise InvalidParameterValueException(
×
1075
                        'LogLevel is not supported when LogFormat is set to "Text". Remove LogLevel from your request or change the LogFormat to "JSON" and try again.',
1076
                        Type="User",
1077
                    )
1078
            if "LoggingConfig" in request:
1✔
1079
                logging_config = request["LoggingConfig"]
1✔
1080
                LOG.warning(
1✔
1081
                    "Advanced Lambda Logging Configuration is currently mocked "
1082
                    "and will not impact the logging behavior. "
1083
                    "Please create a feature request if needed."
1084
                )
1085

1086
                # when switching to JSON, app and system level log is auto set to INFO
1087
                if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
1088
                    logging_config = {
1✔
1089
                        "ApplicationLogLevel": "INFO",
1090
                        "SystemLogLevel": "INFO",
1091
                        "LogGroup": f"/aws/lambda/{function_name}",
1092
                    } | logging_config
1093
                else:
1094
                    logging_config = (
×
1095
                        LoggingConfig(
1096
                            LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
1097
                        )
1098
                        | logging_config
1099
                    )
1100

1101
            elif capacity_provider_config:
1✔
1102
                logging_config = LoggingConfig(
×
1103
                    LogFormat=LogFormat.JSON,
1104
                    LogGroup=f"/aws/lambda/{function_name}",
1105
                    ApplicationLogLevel="INFO",
1106
                    SystemLogLevel="INFO",
1107
                )
1108
            else:
1109
                logging_config = LoggingConfig(
1✔
1110
                    LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
1111
                )
1112
            snap_start = (
1✔
1113
                None
1114
                if capacity_provider_config
1115
                else SnapStartResponse(
1116
                    ApplyOn=request.get("SnapStart", {}).get("ApplyOn", SnapStartApplyOn.None_),
1117
                    OptimizationStatus=SnapStartOptimizationStatus.Off,
1118
                )
1119
            )
1120
            version = FunctionVersion(
1✔
1121
                id=arn,
1122
                config=VersionFunctionConfiguration(
1123
                    last_modified=api_utils.format_lambda_date(datetime.datetime.now()),
1124
                    description=request.get("Description", ""),
1125
                    role=request["Role"],
1126
                    timeout=request.get("Timeout", LAMBDA_DEFAULT_TIMEOUT),
1127
                    runtime=request.get("Runtime"),
1128
                    memory_size=memory_size,
1129
                    handler=request.get("Handler"),
1130
                    package_type=package_type,
1131
                    environment=env_vars,
1132
                    architectures=request.get("Architectures") or [Architecture.x86_64],
1133
                    tracing_config_mode=request.get("TracingConfig", {}).get(
1134
                        "Mode", TracingMode.PassThrough
1135
                    ),
1136
                    image=image,
1137
                    image_config=image_config,
1138
                    code=code,
1139
                    layers=self.map_layers(layers),
1140
                    internal_revision=short_uid(),
1141
                    ephemeral_storage=LambdaEphemeralStorage(
1142
                        size=request.get("EphemeralStorage", {}).get("Size", 512)
1143
                    ),
1144
                    snap_start=snap_start,
1145
                    runtime_version_config=runtime_version_config,
1146
                    dead_letter_arn=request.get("DeadLetterConfig", {}).get("TargetArn"),
1147
                    vpc_config=self._build_vpc_config(
1148
                        context_account_id, context_region, request.get("VpcConfig")
1149
                    ),
1150
                    state=VersionState(
1151
                        state=State.Pending,
1152
                        code=StateReasonCode.Creating,
1153
                        reason="The function is being created.",
1154
                    ),
1155
                    logging_config=logging_config,
1156
                    # TODO: might need something like **optional_kwargs if None
1157
                    #   -> Test with regular GetFunction (i.e., without a capacity provider)
1158
                    CapacityProviderConfig=capacity_provider_config,
1159
                ),
1160
            )
1161
            version_post_response = None
1✔
1162
            if capacity_provider_config:
1✔
1163
                version_post_response = dataclasses.replace(
×
1164
                    version,
1165
                    config=dataclasses.replace(
1166
                        version.config,
1167
                        last_update=UpdateStatus(status=LastUpdateStatus.Successful),
1168
                        state=VersionState(state=State.ActiveNonInvocable),
1169
                    ),
1170
                )
1171
            fn.versions["$LATEST"] = version_post_response or version
1✔
1172
            state.functions[function_name] = fn
1✔
1173
        initialization_type = (
1✔
1174
            FunctionInitializationType.lambda_managed_instances
1175
            if capacity_provider_config
1176
            else FunctionInitializationType.on_demand
1177
        )
1178
        function_counter.labels(
1✔
1179
            operation=FunctionOperation.create,
1180
            runtime=runtime or "n/a",
1181
            status=FunctionStatus.success,
1182
            invocation_type="n/a",
1183
            package_type=package_type,
1184
            initialization_type=initialization_type,
1185
        )
1186
        # TODO: consider potential other side effects of not having a function version for $LATEST
1187
        # Provisioning happens upon publishing for functions using a capacity provider
1188
        if not capacity_provider_config:
1✔
1189
            self.lambda_service.create_function_version(version)
1✔
1190

1191
        if tags := request.get("Tags"):
1✔
1192
            # This will check whether the function exists.
1193
            self._store_tags(arn.unqualified_arn(), tags)
1✔
1194

1195
        if request.get("Publish"):
1✔
1196
            version = self._publish_version_with_changes(
1✔
1197
                function_name=function_name,
1198
                region=context_region,
1199
                account_id=context_account_id,
1200
                publish_to=request.get("PublishTo"),
1201
            )
1202

1203
        if config.LAMBDA_SYNCHRONOUS_CREATE:
1✔
1204
            # block via retrying until "terminal" condition reached before returning
1205
            if not poll_condition(
×
1206
                lambda: get_function_version(
1207
                    function_name, version.id.qualifier, version.id.account, version.id.region
1208
                ).config.state.state
1209
                in [State.Active, State.ActiveNonInvocable, State.Failed],
1210
                timeout=10,
1211
            ):
1212
                LOG.warning(
×
1213
                    "LAMBDA_SYNCHRONOUS_CREATE is active, but waiting for %s reached timeout.",
1214
                    function_name,
1215
                )
1216

1217
        return api_utils.map_config_out(
1✔
1218
            version, return_qualified_arn=False, return_update_status=False
1219
        )
1220

1221
    def _validate_runtime(self, package_type, runtime):
1✔
1222
        runtimes = ALL_RUNTIMES
1✔
1223
        if config.LAMBDA_RUNTIME_VALIDATION:
1✔
1224
            runtimes = list(itertools.chain(RUNTIMES_AGGREGATED.values()))
1✔
1225

1226
        if package_type == PackageType.Zip and runtime not in runtimes:
1✔
1227
            # deprecated runtimes have different error
1228
            if runtime in DEPRECATED_RUNTIMES:
1✔
1229
                HINT_LOG.info(
1✔
1230
                    "Set env variable LAMBDA_RUNTIME_VALIDATION to 0"
1231
                    " in order to allow usage of deprecated runtimes"
1232
                )
1233
                self._check_for_recomended_migration_target(runtime)
1✔
1234

1235
            raise InvalidParameterValueException(
1✔
1236
                f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1237
                Type="User",
1238
            )
1239

1240
    def _check_for_recomended_migration_target(self, deprecated_runtime):
1✔
1241
        # AWS offers recommended runtime for migration for "newly" deprecated runtimes
1242
        # in order to preserve parity with error messages we need the code bellow
1243
        latest_runtime = DEPRECATED_RUNTIMES_UPGRADES.get(deprecated_runtime)
1✔
1244

1245
        if latest_runtime is not None:
1✔
1246
            LOG.debug(
1✔
1247
                "The Lambda runtime %s is deprecated. Please upgrade to a supported Lambda runtime such as %s.",
1248
                deprecated_runtime,
1249
                latest_runtime,
1250
            )
1251
            raise InvalidParameterValueException(
1✔
1252
                f"The runtime parameter of {deprecated_runtime} is no longer supported for creating or updating AWS Lambda functions. We recommend you use a supported runtime while creating or updating functions.",
1253
                Type="User",
1254
            )
1255

1256
    @handler(operation="UpdateFunctionConfiguration", expand=False)
1✔
1257
    def update_function_configuration(
1✔
1258
        self, context: RequestContext, request: UpdateFunctionConfigurationRequest
1259
    ) -> FunctionConfiguration:
1260
        """updates the $LATEST version of the function"""
1261
        function_name = request.get("FunctionName")
1✔
1262

1263
        # in case we got ARN or partial ARN
1264
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1265
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1266
        state = lambda_stores[account_id][region]
1✔
1267

1268
        if function_name not in state.functions:
1✔
1269
            raise ResourceNotFoundException(
×
1270
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1271
                Type="User",
1272
            )
1273
        function = state.functions[function_name]
1✔
1274

1275
        # TODO: lock modification of latest version
1276
        # TODO: notify service for changes relevant to re-provisioning of $LATEST
1277
        latest_version = function.latest()
1✔
1278
        latest_version_config = latest_version.config
1✔
1279

1280
        revision_id = request.get("RevisionId")
1✔
1281
        if revision_id and revision_id != latest_version.config.revision_id:
1✔
1282
            raise PreconditionFailedException(
1✔
1283
                "The Revision Id provided does not match the latest Revision Id. "
1284
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1285
                Type="User",
1286
            )
1287

1288
        replace_kwargs = {}
1✔
1289
        if "EphemeralStorage" in request:
1✔
1290
            replace_kwargs["ephemeral_storage"] = LambdaEphemeralStorage(
×
1291
                request.get("EphemeralStorage", {}).get("Size", 512)
1292
            )  # TODO: do defaults here apply as well?
1293

1294
        if "Role" in request:
1✔
1295
            if not api_utils.is_role_arn(request["Role"]):
1✔
1296
                raise ValidationException(
1✔
1297
                    f"1 validation error detected: Value '{request.get('Role')}'"
1298
                    + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
1299
                )
1300
            replace_kwargs["role"] = request["Role"]
1✔
1301

1302
        if "Description" in request:
1✔
1303
            replace_kwargs["description"] = request["Description"]
1✔
1304

1305
        if "Timeout" in request:
1✔
1306
            replace_kwargs["timeout"] = request["Timeout"]
1✔
1307

1308
        if "MemorySize" in request:
1✔
1309
            replace_kwargs["memory_size"] = request["MemorySize"]
1✔
1310

1311
        if "DeadLetterConfig" in request:
1✔
1312
            replace_kwargs["dead_letter_arn"] = request.get("DeadLetterConfig", {}).get("TargetArn")
1✔
1313

1314
        if vpc_config := request.get("VpcConfig"):
1✔
1315
            replace_kwargs["vpc_config"] = self._build_vpc_config(account_id, region, vpc_config)
1✔
1316

1317
        if "Handler" in request:
1✔
1318
            replace_kwargs["handler"] = request["Handler"]
1✔
1319

1320
        if "Runtime" in request:
1✔
1321
            runtime = request["Runtime"]
1✔
1322

1323
            if runtime not in ALL_RUNTIMES:
1✔
1324
                raise InvalidParameterValueException(
1✔
1325
                    f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1326
                    Type="User",
1327
                )
1328
            if runtime in DEPRECATED_RUNTIMES:
1✔
1329
                LOG.warning(
×
1330
                    "The Lambda runtime %s is deprecated. "
1331
                    "Please upgrade the runtime for the function %s: "
1332
                    "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
1333
                    runtime,
1334
                    function_name,
1335
                )
1336
            replace_kwargs["runtime"] = request["Runtime"]
1✔
1337

1338
        if snap_start := request.get("SnapStart"):
1✔
1339
            runtime = replace_kwargs.get("runtime") or latest_version_config.runtime
1✔
1340
            self._validate_snapstart(snap_start, runtime)
1✔
1341
            replace_kwargs["snap_start"] = SnapStartResponse(
1✔
1342
                ApplyOn=snap_start.get("ApplyOn", SnapStartApplyOn.None_),
1343
                OptimizationStatus=SnapStartOptimizationStatus.Off,
1344
            )
1345

1346
        if "Environment" in request:
1✔
1347
            if env_vars := request.get("Environment", {}).get("Variables", {}):
1✔
1348
                self._verify_env_variables(env_vars)
1✔
1349
            replace_kwargs["environment"] = env_vars
1✔
1350

1351
        if "Layers" in request:
1✔
1352
            new_layers = request["Layers"]
1✔
1353
            if new_layers:
1✔
1354
                self._validate_layers(new_layers, region=region, account_id=account_id)
1✔
1355
            replace_kwargs["layers"] = self.map_layers(new_layers)
1✔
1356

1357
        if "ImageConfig" in request:
1✔
1358
            new_image_config = request["ImageConfig"]
1✔
1359
            replace_kwargs["image_config"] = ImageConfig(
1✔
1360
                command=new_image_config.get("Command"),
1361
                entrypoint=new_image_config.get("EntryPoint"),
1362
                working_directory=new_image_config.get("WorkingDirectory"),
1363
            )
1364

1365
        if "LoggingConfig" in request:
1✔
1366
            logging_config = request["LoggingConfig"]
1✔
1367
            LOG.warning(
1✔
1368
                "Advanced Lambda Logging Configuration is currently mocked "
1369
                "and will not impact the logging behavior. "
1370
                "Please create a feature request if needed."
1371
            )
1372

1373
            # when switching to JSON, app and system level log is auto set to INFO
1374
            if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
1375
                logging_config = {
1✔
1376
                    "ApplicationLogLevel": "INFO",
1377
                    "SystemLogLevel": "INFO",
1378
                } | logging_config
1379

1380
            last_config = latest_version_config.logging_config
1✔
1381

1382
            # add partial update
1383
            new_logging_config = last_config | logging_config
1✔
1384

1385
            # in case we switched from JSON to Text we need to remove LogLevel keys
1386
            if (
1✔
1387
                new_logging_config.get("LogFormat") == LogFormat.Text
1388
                and last_config.get("LogFormat") == LogFormat.JSON
1389
            ):
1390
                new_logging_config.pop("ApplicationLogLevel", None)
1✔
1391
                new_logging_config.pop("SystemLogLevel", None)
1✔
1392

1393
            replace_kwargs["logging_config"] = new_logging_config
1✔
1394

1395
        if "TracingConfig" in request:
1✔
1396
            new_mode = request.get("TracingConfig", {}).get("Mode")
×
1397
            if new_mode:
×
1398
                replace_kwargs["tracing_config_mode"] = new_mode
×
1399

1400
        if "CapacityProviderConfig" in request:
1✔
1401
            if latest_version.config.CapacityProviderConfig and not request[
×
1402
                "CapacityProviderConfig"
1403
            ].get("LambdaManagedInstancesCapacityProviderConfig"):
1404
                raise ValidationException(
×
1405
                    "1 validation error detected: Value null at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig' failed to satisfy constraint: Member must not be null"
1406
                )
1407
            if not latest_version.config.CapacityProviderConfig:
×
1408
                raise InvalidParameterValueException(
×
1409
                    "CapacityProviderConfig isn't supported for Lambda Default functions.",
1410
                    Type="User",
1411
                )
1412

1413
        new_latest_version = dataclasses.replace(
1✔
1414
            latest_version,
1415
            config=dataclasses.replace(
1416
                latest_version_config,
1417
                last_modified=api_utils.generate_lambda_date(),
1418
                internal_revision=short_uid(),
1419
                last_update=UpdateStatus(
1420
                    status=LastUpdateStatus.InProgress,
1421
                    code="Creating",
1422
                    reason="The function is being created.",
1423
                ),
1424
                **replace_kwargs,
1425
            ),
1426
        )
1427
        function.versions["$LATEST"] = new_latest_version  # TODO: notify
1✔
1428
        self.lambda_service.update_version(new_version=new_latest_version)
1✔
1429

1430
        return api_utils.map_config_out(new_latest_version)
1✔
1431

1432
    @handler(operation="UpdateFunctionCode", expand=False)
1✔
1433
    def update_function_code(
1✔
1434
        self, context: RequestContext, request: UpdateFunctionCodeRequest
1435
    ) -> FunctionConfiguration:
1436
        """updates the $LATEST version of the function"""
1437
        # only supports normal zip packaging atm
1438
        # if request.get("Publish"):
1439
        #     self.lambda_service.create_function_version()
1440

1441
        function_name = request.get("FunctionName")
1✔
1442
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1443
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1444

1445
        store = lambda_stores[account_id][region]
1✔
1446
        if function_name not in store.functions:
1✔
1447
            raise ResourceNotFoundException(
×
1448
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1449
                Type="User",
1450
            )
1451
        function = store.functions[function_name]
1✔
1452

1453
        revision_id = request.get("RevisionId")
1✔
1454
        if revision_id and revision_id != function.latest().config.revision_id:
1✔
1455
            raise PreconditionFailedException(
1✔
1456
                "The Revision Id provided does not match the latest Revision Id. "
1457
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1458
                Type="User",
1459
            )
1460

1461
        # TODO verify if correct combination of code is set
1462
        image = None
1✔
1463
        if (
1✔
1464
            request.get("ZipFile") or request.get("S3Bucket")
1465
        ) and function.latest().config.package_type == PackageType.Image:
1466
            raise InvalidParameterValueException(
1✔
1467
                "Please provide ImageUri when updating a function with packageType Image.",
1468
                Type="User",
1469
            )
1470
        elif request.get("ImageUri") and function.latest().config.package_type == PackageType.Zip:
1✔
1471
            raise InvalidParameterValueException(
1✔
1472
                "Please don't provide ImageUri when updating a function with packageType Zip.",
1473
                Type="User",
1474
            )
1475

1476
        if zip_file := request.get("ZipFile"):
1✔
1477
            code = store_lambda_archive(
1✔
1478
                archive_file=zip_file,
1479
                function_name=function_name,
1480
                region_name=region,
1481
                account_id=account_id,
1482
            )
1483
        elif s3_bucket := request.get("S3Bucket"):
1✔
1484
            s3_key = request["S3Key"]
1✔
1485
            s3_object_version = request.get("S3ObjectVersion")
1✔
1486
            code = store_s3_bucket_archive(
1✔
1487
                archive_bucket=s3_bucket,
1488
                archive_key=s3_key,
1489
                archive_version=s3_object_version,
1490
                function_name=function_name,
1491
                region_name=region,
1492
                account_id=account_id,
1493
            )
1494
        elif image := request.get("ImageUri"):
1✔
1495
            code = None
1✔
1496
            image = create_image_code(image_uri=image)
1✔
1497
        else:
1498
            raise LambdaServiceException("A ZIP file, S3 bucket, or image is required")
×
1499

1500
        old_function_version = function.versions.get("$LATEST")
1✔
1501
        replace_kwargs = {"code": code} if code else {"image": image}
1✔
1502

1503
        if architectures := request.get("Architectures"):
1✔
1504
            if len(architectures) != 1:
×
1505
                raise ValidationException(
×
1506
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1507
                    f"satisfy constraint: Member must have length less than or equal to 1",
1508
                )
1509
            # An empty list of architectures is also forbidden. Further exceptions are tested here for create_function:
1510
            # tests.aws.services.lambda_.test_lambda_api.TestLambdaFunction.test_create_lambda_exceptions
1511
            if architectures[0] not in ARCHITECTURES:
×
1512
                raise ValidationException(
×
1513
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1514
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
1515
                    f"[x86_64, arm64], Member must not be null]",
1516
                )
1517
            replace_kwargs["architectures"] = architectures
×
1518

1519
        config = dataclasses.replace(
1✔
1520
            old_function_version.config,
1521
            internal_revision=short_uid(),
1522
            last_modified=api_utils.generate_lambda_date(),
1523
            last_update=UpdateStatus(
1524
                status=LastUpdateStatus.InProgress,
1525
                code="Creating",
1526
                reason="The function is being created.",
1527
            ),
1528
            **replace_kwargs,
1529
        )
1530
        function_version = dataclasses.replace(old_function_version, config=config)
1✔
1531
        function.versions["$LATEST"] = function_version
1✔
1532

1533
        self.lambda_service.update_version(new_version=function_version)
1✔
1534
        if request.get("Publish"):
1✔
1535
            function_version = self._publish_version_with_changes(
1✔
1536
                function_name=function_name,
1537
                region=region,
1538
                account_id=account_id,
1539
                # TODO: validations for PublishTo without Publish=True
1540
                publish_to=request.get("PublishTo"),
1541
                is_active=True,
1542
            )
1543
        return api_utils.map_config_out(
1✔
1544
            function_version, return_qualified_arn=bool(request.get("Publish"))
1545
        )
1546

1547
    # TODO: does deleting the latest published version affect the next versions number?
1548
    # TODO: what happens when we call this with a qualifier and a fully qualified ARN? (+ conflicts?)
1549
    # TODO: test different ARN patterns (shorthand ARN?)
1550
    # TODO: test deleting across regions?
1551
    # TODO: test mismatch between context region and region in ARN
1552
    # TODO: test qualifier $LATEST, alias-name and version
1553
    def delete_function(
1✔
1554
        self,
1555
        context: RequestContext,
1556
        function_name: NamespacedFunctionName,
1557
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1558
        **kwargs,
1559
    ) -> DeleteFunctionResponse:
1560
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1561
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1562
            function_name, qualifier, context
1563
        )
1564

1565
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1566
            raise InvalidParameterValueException(
×
1567
                "Deletion of aliases is not currently supported.",
1568
                Type="User",
1569
            )
1570

1571
        store = lambda_stores[account_id][region]
1✔
1572
        if qualifier == "$LATEST":
1✔
1573
            raise InvalidParameterValueException(
1✔
1574
                "$LATEST version cannot be deleted without deleting the function.", Type="User"
1575
            )
1576

1577
        if function_name not in store.functions:
1✔
1578
            e = ResourceNotFoundException(
1✔
1579
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1580
                Type="User",
1581
            )
1582
            raise e
1✔
1583
        function = store.functions.get(function_name)
1✔
1584

1585
        function_has_capacity_provider = False
1✔
1586
        if qualifier:
1✔
1587
            # delete a version of the function
1588
            version = function.versions.pop(qualifier, None)
1✔
1589
            if version:
1✔
1590
                if version.config.CapacityProviderConfig:
1✔
1591
                    function_has_capacity_provider = True
×
1592
                self.lambda_service.stop_version(version.id.qualified_arn())
1✔
1593
                destroy_code_if_not_used(code=version.config.code, function=function)
1✔
1594
        else:
1595
            # delete the whole function
1596
            # TODO: introduce locking for safe deletion: We could create a new version at the API layer before
1597
            #  the old version gets cleaned up in the internal lambda service.
1598
            function = store.functions.pop(function_name)
1✔
1599
            for version in function.versions.values():
1✔
1600
                # Functions with a capacity provider do NOT have a version manager for $LATEST because only
1601
                # published versions are invokable.
1602
                if version.config.CapacityProviderConfig:
1✔
1603
                    function_has_capacity_provider = True
×
1604
                    if version.id.qualifier == "$LATEST":
×
1605
                        pass
×
1606
                else:
1607
                    self.lambda_service.stop_version(qualified_arn=version.id.qualified_arn())
1✔
1608
                # we can safely destroy the code here
1609
                if version.config.code:
1✔
1610
                    version.config.code.destroy()
1✔
1611

1612
        return DeleteFunctionResponse(StatusCode=202 if function_has_capacity_provider else 204)
1✔
1613

1614
    def list_functions(
1✔
1615
        self,
1616
        context: RequestContext,
1617
        master_region: MasterRegion = None,  # (only relevant for lambda@edge)
1618
        function_version: FunctionVersionApi = None,
1619
        marker: String = None,
1620
        max_items: MaxListItems = None,
1621
        **kwargs,
1622
    ) -> ListFunctionsResponse:
1623
        state = lambda_stores[context.account_id][context.region]
1✔
1624

1625
        if function_version and function_version != FunctionVersionApi.ALL:
1✔
1626
            raise ValidationException(
1✔
1627
                f"1 validation error detected: Value '{function_version}'"
1628
                + " at 'functionVersion' failed to satisfy constraint: Member must satisfy enum value set: [ALL]"
1629
            )
1630

1631
        if function_version == FunctionVersionApi.ALL:
1✔
1632
            # include all versions for all function
1633
            versions = [v for f in state.functions.values() for v in f.versions.values()]
1✔
1634
            return_qualified_arn = True
1✔
1635
        else:
1636
            versions = [f.latest() for f in state.functions.values()]
1✔
1637
            return_qualified_arn = False
1✔
1638

1639
        versions = [
1✔
1640
            api_utils.map_to_list_response(
1641
                api_utils.map_config_out(fc, return_qualified_arn=return_qualified_arn)
1642
            )
1643
            for fc in versions
1644
        ]
1645
        versions = PaginatedList(versions)
1✔
1646
        page, token = versions.get_page(
1✔
1647
            lambda version: version["FunctionArn"],
1648
            marker,
1649
            max_items,
1650
        )
1651
        return ListFunctionsResponse(Functions=page, NextMarker=token)
1✔
1652

1653
    def get_function(
1✔
1654
        self,
1655
        context: RequestContext,
1656
        function_name: NamespacedFunctionName,
1657
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1658
        **kwargs,
1659
    ) -> GetFunctionResponse:
1660
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1661
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1662
            function_name, qualifier, context
1663
        )
1664

1665
        fn = lambda_stores[account_id][region].functions.get(function_name)
1✔
1666
        if fn is None:
1✔
1667
            if qualifier is None:
1✔
1668
                raise ResourceNotFoundException(
1✔
1669
                    f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
1670
                    Type="User",
1671
                )
1672
            else:
1673
                raise ResourceNotFoundException(
1✔
1674
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
1675
                    Type="User",
1676
                )
1677
        alias_name = None
1✔
1678
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1679
            if qualifier not in fn.aliases:
1✔
1680
                alias_arn = api_utils.qualified_lambda_arn(
1✔
1681
                    function_name, qualifier, account_id, region
1682
                )
1683
                raise ResourceNotFoundException(f"Function not found: {alias_arn}", Type="User")
1✔
1684
            alias_name = qualifier
1✔
1685
            qualifier = fn.aliases[alias_name].function_version
1✔
1686

1687
        version = get_function_version(
1✔
1688
            function_name=function_name,
1689
            qualifier=qualifier,
1690
            account_id=account_id,
1691
            region=region,
1692
        )
1693
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
1694
        additional_fields = {}
1✔
1695
        if tags:
1✔
1696
            additional_fields["Tags"] = tags
1✔
1697
        code_location = None
1✔
1698
        if code := version.config.code:
1✔
1699
            code_location = FunctionCodeLocation(
1✔
1700
                Location=code.generate_presigned_url(endpoint_url=config.external_service_url()),
1701
                RepositoryType="S3",
1702
            )
1703
        elif image := version.config.image:
1✔
1704
            code_location = FunctionCodeLocation(
1✔
1705
                ImageUri=image.image_uri,
1706
                RepositoryType=image.repository_type,
1707
                ResolvedImageUri=image.resolved_image_uri,
1708
            )
1709
        concurrency = None
1✔
1710
        if fn.reserved_concurrent_executions:
1✔
1711
            concurrency = Concurrency(
1✔
1712
                ReservedConcurrentExecutions=fn.reserved_concurrent_executions
1713
            )
1714

1715
        return GetFunctionResponse(
1✔
1716
            Configuration=api_utils.map_config_out(
1717
                version, return_qualified_arn=bool(qualifier), alias_name=alias_name
1718
            ),
1719
            Code=code_location,  # TODO
1720
            Concurrency=concurrency,
1721
            **additional_fields,
1722
        )
1723

1724
    def get_function_configuration(
1✔
1725
        self,
1726
        context: RequestContext,
1727
        function_name: NamespacedFunctionName,
1728
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1729
        **kwargs,
1730
    ) -> FunctionConfiguration:
1731
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1732
        # CAVE: THIS RETURN VALUE IS *NOT* THE SAME AS IN get_function (!) but seems to be only configuration part?
1733
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1734
            function_name, qualifier, context
1735
        )
1736
        version = get_function_version(
1✔
1737
            function_name=function_name,
1738
            qualifier=qualifier,
1739
            account_id=account_id,
1740
            region=region,
1741
        )
1742
        return api_utils.map_config_out(version, return_qualified_arn=bool(qualifier))
1✔
1743

1744
    def invoke(
1✔
1745
        self,
1746
        context: RequestContext,
1747
        function_name: NamespacedFunctionName,
1748
        invocation_type: InvocationType | None = None,
1749
        log_type: LogType | None = None,
1750
        client_context: String | None = None,
1751
        durable_execution_name: DurableExecutionName | None = None,
1752
        payload: IO[Blob] | None = None,
1753
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1754
        tenant_id: TenantId | None = None,
1755
        **kwargs,
1756
    ) -> InvocationResponse:
1757
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1758
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1759
            function_name, qualifier, context
1760
        )
1761

1762
        user_agent = context.request.user_agent.string
1✔
1763

1764
        time_before = time.perf_counter()
1✔
1765
        try:
1✔
1766
            invocation_result = self.lambda_service.invoke(
1✔
1767
                function_name=function_name,
1768
                qualifier=qualifier,
1769
                region=region,
1770
                account_id=account_id,
1771
                invocation_type=invocation_type,
1772
                client_context=client_context,
1773
                request_id=context.request_id,
1774
                trace_context=context.trace_context,
1775
                payload=payload.read() if payload else None,
1776
                user_agent=user_agent,
1777
            )
1778
        except ServiceException:
1✔
1779
            raise
1✔
1780
        except EnvironmentStartupTimeoutException as e:
1✔
1781
            raise LambdaServiceException(
1✔
1782
                f"[{context.request_id}] Timeout while starting up lambda environment for function {function_name}:{qualifier}"
1783
            ) from e
1784
        except Exception as e:
1✔
1785
            LOG.error(
1✔
1786
                "[%s] Error while invoking lambda %s",
1787
                context.request_id,
1788
                function_name,
1789
                exc_info=LOG.isEnabledFor(logging.DEBUG),
1790
            )
1791
            raise LambdaServiceException(
1✔
1792
                f"[{context.request_id}] Internal error while executing lambda {function_name}:{qualifier}. Caused by {type(e).__name__}: {e}"
1793
            ) from e
1794

1795
        if invocation_type == InvocationType.Event:
1✔
1796
            # This happens when invocation type is event
1797
            return InvocationResponse(StatusCode=202)
1✔
1798
        if invocation_type == InvocationType.DryRun:
1✔
1799
            # This happens when invocation type is dryrun
1800
            return InvocationResponse(StatusCode=204)
1✔
1801
        LOG.debug("Lambda invocation duration: %0.2fms", (time.perf_counter() - time_before) * 1000)
1✔
1802

1803
        response = InvocationResponse(
1✔
1804
            StatusCode=200,
1805
            Payload=invocation_result.payload,
1806
            ExecutedVersion=invocation_result.executed_version,
1807
        )
1808

1809
        if invocation_result.is_error:
1✔
1810
            response["FunctionError"] = "Unhandled"
1✔
1811

1812
        if log_type == LogType.Tail:
1✔
1813
            response["LogResult"] = to_str(
1✔
1814
                base64.b64encode(to_bytes(invocation_result.logs)[-4096:])
1815
            )
1816

1817
        return response
1✔
1818

1819
    # Version operations
1820
    def publish_version(
1✔
1821
        self,
1822
        context: RequestContext,
1823
        function_name: FunctionName,
1824
        code_sha256: String | None = None,
1825
        description: Description | None = None,
1826
        revision_id: String | None = None,
1827
        publish_to: FunctionVersionLatestPublished | None = None,
1828
        **kwargs,
1829
    ) -> FunctionConfiguration:
1830
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1831
        function_name = api_utils.get_function_name(function_name, context)
1✔
1832
        new_version = self._publish_version_from_existing_version(
1✔
1833
            function_name=function_name,
1834
            description=description,
1835
            account_id=account_id,
1836
            region=region,
1837
            revision_id=revision_id,
1838
            code_sha256=code_sha256,
1839
            publish_to=publish_to,
1840
        )
1841
        return api_utils.map_config_out(new_version, return_qualified_arn=True)
1✔
1842

1843
    def list_versions_by_function(
1✔
1844
        self,
1845
        context: RequestContext,
1846
        function_name: NamespacedFunctionName,
1847
        marker: String = None,
1848
        max_items: MaxListItems = None,
1849
        **kwargs,
1850
    ) -> ListVersionsByFunctionResponse:
1851
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1852
        function_name = api_utils.get_function_name(function_name, context)
1✔
1853
        function = self._get_function(
1✔
1854
            function_name=function_name, region=region, account_id=account_id
1855
        )
1856
        versions = [
1✔
1857
            api_utils.map_to_list_response(
1858
                api_utils.map_config_out(version=version, return_qualified_arn=True)
1859
            )
1860
            for version in function.versions.values()
1861
        ]
1862
        items = PaginatedList(versions)
1✔
1863
        page, token = items.get_page(
1✔
1864
            lambda item: item,
1865
            marker,
1866
            max_items,
1867
        )
1868
        return ListVersionsByFunctionResponse(Versions=page, NextMarker=token)
1✔
1869

1870
    # Alias
1871

1872
    def _create_routing_config_model(
1✔
1873
        self, routing_config_dict: dict[str, float], function_version: FunctionVersion
1874
    ):
1875
        if len(routing_config_dict) > 1:
1✔
1876
            raise InvalidParameterValueException(
1✔
1877
                "Number of items in AdditionalVersionWeights cannot be greater than 1",
1878
                Type="User",
1879
            )
1880
        # should be exactly one item here, still iterating, might be supported in the future
1881
        for key, value in routing_config_dict.items():
1✔
1882
            if value < 0.0 or value >= 1.0:
1✔
1883
                raise ValidationException(
1✔
1884
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map value must satisfy constraint: [Member must have value less than or equal to 1.0, Member must have value greater than or equal to 0.0, Member must not be null]"
1885
                )
1886
            if key == function_version.id.qualifier:
1✔
1887
                raise InvalidParameterValueException(
1✔
1888
                    f"Invalid function version {function_version.id.qualifier}. Function version {function_version.id.qualifier} is already included in routing configuration.",
1889
                    Type="User",
1890
                )
1891
            # check if version target is latest, then no routing config is allowed
1892
            if function_version.id.qualifier == "$LATEST":
1✔
1893
                raise InvalidParameterValueException(
1✔
1894
                    "$LATEST is not supported for an alias pointing to more than 1 version"
1895
                )
1896
            if not api_utils.qualifier_is_version(key):
1✔
1897
                raise ValidationException(
1✔
1898
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map keys must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: [0-9]+]"
1899
                )
1900

1901
            # checking if the version in the config exists
1902
            get_function_version(
1✔
1903
                function_name=function_version.id.function_name,
1904
                qualifier=key,
1905
                region=function_version.id.region,
1906
                account_id=function_version.id.account,
1907
            )
1908
        return AliasRoutingConfig(version_weights=routing_config_dict)
1✔
1909

1910
    def create_alias(
1✔
1911
        self,
1912
        context: RequestContext,
1913
        function_name: FunctionName,
1914
        name: Alias,
1915
        function_version: VersionWithLatestPublished,
1916
        description: Description = None,
1917
        routing_config: AliasRoutingConfiguration = None,
1918
        **kwargs,
1919
    ) -> AliasConfiguration:
1920
        if not api_utils.qualifier_is_alias(name):
1✔
1921
            raise ValidationException(
1✔
1922
                f"1 validation error detected: Value '{name}' at 'name' failed to satisfy constraint: Member must satisfy regular expression pattern: (?!^[0-9]+$)([a-zA-Z0-9-_]+)"
1923
            )
1924

1925
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1926
        function_name = api_utils.get_function_name(function_name, context)
1✔
1927
        target_version = get_function_version(
1✔
1928
            function_name=function_name,
1929
            qualifier=function_version,
1930
            region=region,
1931
            account_id=account_id,
1932
        )
1933
        function = self._get_function(
1✔
1934
            function_name=function_name, region=region, account_id=account_id
1935
        )
1936
        # description is always present, if not specified it's an empty string
1937
        description = description or ""
1✔
1938
        with function.lock:
1✔
1939
            if existing_alias := function.aliases.get(name):
1✔
1940
                raise ResourceConflictException(
1✔
1941
                    f"Alias already exists: {api_utils.map_alias_out(alias=existing_alias, function=function)['AliasArn']}",
1942
                    Type="User",
1943
                )
1944
            # checking if the version exists
1945
            routing_configuration = None
1✔
1946
            if routing_config and (
1✔
1947
                routing_config_dict := routing_config.get("AdditionalVersionWeights")
1948
            ):
1949
                routing_configuration = self._create_routing_config_model(
1✔
1950
                    routing_config_dict, target_version
1951
                )
1952

1953
            alias = VersionAlias(
1✔
1954
                name=name,
1955
                function_version=function_version,
1956
                description=description,
1957
                routing_configuration=routing_configuration,
1958
            )
1959
            function.aliases[name] = alias
1✔
1960
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1961

1962
    def list_aliases(
1✔
1963
        self,
1964
        context: RequestContext,
1965
        function_name: FunctionName,
1966
        function_version: VersionWithLatestPublished = None,
1967
        marker: String = None,
1968
        max_items: MaxListItems = None,
1969
        **kwargs,
1970
    ) -> ListAliasesResponse:
1971
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1972
        function_name = api_utils.get_function_name(function_name, context)
1✔
1973
        function = self._get_function(
1✔
1974
            function_name=function_name, region=region, account_id=account_id
1975
        )
1976
        aliases = [
1✔
1977
            api_utils.map_alias_out(alias, function)
1978
            for alias in function.aliases.values()
1979
            if function_version is None or alias.function_version == function_version
1980
        ]
1981

1982
        aliases = PaginatedList(aliases)
1✔
1983
        page, token = aliases.get_page(
1✔
1984
            lambda alias: alias["AliasArn"],
1985
            marker,
1986
            max_items,
1987
        )
1988

1989
        return ListAliasesResponse(Aliases=page, NextMarker=token)
1✔
1990

1991
    def delete_alias(
1✔
1992
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
1993
    ) -> None:
1994
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1995
        function_name = api_utils.get_function_name(function_name, context)
1✔
1996
        function = self._get_function(
1✔
1997
            function_name=function_name, region=region, account_id=account_id
1998
        )
1999
        version_alias = function.aliases.pop(name, None)
1✔
2000

2001
        # cleanup related resources
2002
        if name in function.provisioned_concurrency_configs:
1✔
2003
            function.provisioned_concurrency_configs.pop(name)
1✔
2004

2005
        # TODO: Allow for deactivating/unregistering specific Lambda URLs
2006
        if version_alias and name in function.function_url_configs:
1✔
2007
            url_config = function.function_url_configs.pop(name)
1✔
2008
            LOG.debug(
1✔
2009
                "Stopping aliased Lambda Function URL %s for %s",
2010
                url_config.url,
2011
                url_config.function_name,
2012
            )
2013

2014
    def get_alias(
1✔
2015
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
2016
    ) -> AliasConfiguration:
2017
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2018
        function_name = api_utils.get_function_name(function_name, context)
1✔
2019
        function = self._get_function(
1✔
2020
            function_name=function_name, region=region, account_id=account_id
2021
        )
2022
        if not (alias := function.aliases.get(name)):
1✔
2023
            raise ResourceNotFoundException(
1✔
2024
                f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name=function_name, qualifier=name, region=region, account=account_id)}",
2025
                Type="User",
2026
            )
2027
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
2028

2029
    def update_alias(
1✔
2030
        self,
2031
        context: RequestContext,
2032
        function_name: FunctionName,
2033
        name: Alias,
2034
        function_version: VersionWithLatestPublished = None,
2035
        description: Description = None,
2036
        routing_config: AliasRoutingConfiguration = None,
2037
        revision_id: String = None,
2038
        **kwargs,
2039
    ) -> AliasConfiguration:
2040
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2041
        function_name = api_utils.get_function_name(function_name, context)
1✔
2042
        function = self._get_function(
1✔
2043
            function_name=function_name, region=region, account_id=account_id
2044
        )
2045
        if not (alias := function.aliases.get(name)):
1✔
2046
            fn_arn = api_utils.qualified_lambda_arn(function_name, name, account_id, region)
1✔
2047
            raise ResourceNotFoundException(
1✔
2048
                f"Alias not found: {fn_arn}",
2049
                Type="User",
2050
            )
2051
        if revision_id and alias.revision_id != revision_id:
1✔
2052
            raise PreconditionFailedException(
1✔
2053
                "The Revision Id provided does not match the latest Revision Id. "
2054
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2055
                Type="User",
2056
            )
2057
        changes = {}
1✔
2058
        if function_version is not None:
1✔
2059
            changes |= {"function_version": function_version}
1✔
2060
        if description is not None:
1✔
2061
            changes |= {"description": description}
1✔
2062
        if routing_config is not None:
1✔
2063
            # if it is an empty dict or AdditionalVersionWeights is empty, set routing config to None
2064
            new_routing_config = None
1✔
2065
            if routing_config_dict := routing_config.get("AdditionalVersionWeights"):
1✔
2066
                new_routing_config = self._create_routing_config_model(routing_config_dict)
×
2067
            changes |= {"routing_configuration": new_routing_config}
1✔
2068
        # even if no changes are done, we have to update revision id for some reason
2069
        old_alias = alias
1✔
2070
        alias = dataclasses.replace(alias, **changes)
1✔
2071
        function.aliases[name] = alias
1✔
2072

2073
        # TODO: signal lambda service that pointer potentially changed
2074
        self.lambda_service.update_alias(old_alias=old_alias, new_alias=alias, function=function)
1✔
2075

2076
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
2077

2078
    # =======================================
2079
    # ======= EVENT SOURCE MAPPINGS =========
2080
    # =======================================
2081
    def check_service_resource_exists(
1✔
2082
        self, service: str, resource_arn: str, function_arn: str, function_role_arn: str
2083
    ):
2084
        """
2085
        Check if the service resource exists and if the function has access to it.
2086

2087
        Raises:
2088
            InvalidParameterValueException: If the service resource does not exist or the function does not have access to it.
2089
        """
2090
        arn = parse_arn(resource_arn)
1✔
2091
        source_client = get_internal_client(
1✔
2092
            arn=resource_arn,
2093
            role_arn=function_role_arn,
2094
            service_principal=ServicePrincipal.lambda_,
2095
            source_arn=function_arn,
2096
        )
2097
        if service in ["sqs", "sqs-fifo"]:
1✔
2098
            try:
1✔
2099
                # AWS uses `GetQueueAttributes` internally to verify the queue existence, but we need the `QueueUrl`
2100
                # which is not given directly. We build out a dummy `QueueUrl` which can be parsed by SQS to return
2101
                # the right value
2102
                queue_name = arn["resource"].split("/")[-1]
1✔
2103
                queue_url = f"http://sqs.{arn['region']}.domain/{arn['account']}/{queue_name}"
1✔
2104
                source_client.get_queue_attributes(QueueUrl=queue_url)
1✔
2105
            except ClientError as e:
1✔
2106
                error_code = e.response["Error"]["Code"]
1✔
2107
                if error_code == "AWS.SimpleQueueService.NonExistentQueue":
1✔
2108
                    raise InvalidParameterValueException(
1✔
2109
                        f"Error occurred while ReceiveMessage. SQS Error Code: {error_code}. SQS Error Message: {e.response['Error']['Message']}",
2110
                        Type="User",
2111
                    )
2112
                raise e
×
2113
        elif service in ["kinesis"]:
1✔
2114
            try:
1✔
2115
                source_client.describe_stream(StreamARN=resource_arn)
1✔
2116
            except ClientError as e:
1✔
2117
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
2118
                    raise InvalidParameterValueException(
1✔
2119
                        f"Stream not found: {resource_arn}",
2120
                        Type="User",
2121
                    )
2122
                raise e
×
2123
        elif service in ["dynamodb"]:
1✔
2124
            try:
1✔
2125
                source_client.describe_stream(StreamArn=resource_arn)
1✔
2126
            except ClientError as e:
1✔
2127
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
2128
                    raise InvalidParameterValueException(
1✔
2129
                        f"Stream not found: {resource_arn}",
2130
                        Type="User",
2131
                    )
2132
                raise e
×
2133

2134
    @handler("CreateEventSourceMapping", expand=False)
1✔
2135
    def create_event_source_mapping(
1✔
2136
        self,
2137
        context: RequestContext,
2138
        request: CreateEventSourceMappingRequest,
2139
    ) -> EventSourceMappingConfiguration:
2140
        return self.create_event_source_mapping_v2(context, request)
1✔
2141

2142
    def create_event_source_mapping_v2(
1✔
2143
        self,
2144
        context: RequestContext,
2145
        request: CreateEventSourceMappingRequest,
2146
    ) -> EventSourceMappingConfiguration:
2147
        # Validations
2148
        function_arn, function_name, state, function_version, function_role = (
1✔
2149
            self.validate_event_source_mapping(context, request)
2150
        )
2151

2152
        esm_config = EsmConfigFactory(request, context, function_arn).get_esm_config()
1✔
2153

2154
        # Copy esm_config to avoid a race condition with potential async update in the store
2155
        state.event_source_mappings[esm_config["UUID"]] = esm_config.copy()
1✔
2156
        enabled = request.get("Enabled", True)
1✔
2157
        # TODO: check for potential async race condition update -> think about locking
2158
        esm_worker = EsmWorkerFactory(esm_config, function_role, enabled).get_esm_worker()
1✔
2159
        self.esm_workers[esm_worker.uuid] = esm_worker
1✔
2160
        # TODO: check StateTransitionReason, LastModified, LastProcessingResult (concurrent updates requires locking!)
2161
        if tags := request.get("Tags"):
1✔
2162
            self._store_tags(esm_config.get("EventSourceMappingArn"), tags)
1✔
2163
        esm_worker.create()
1✔
2164
        return esm_config
1✔
2165

2166
    def validate_event_source_mapping(self, context, request):
1✔
2167
        # TODO: test whether stream ARNs are valid sources for Pipes or ESM or whether only DynamoDB table ARNs work
2168
        # TODO: Validate MaxRecordAgeInSeconds (i.e cannot subceed 60s but can be -1) and MaxRetryAttempts parameters.
2169
        # See https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumrecordageinseconds
2170
        is_create_esm_request = context.operation.name == self.create_event_source_mapping.operation
1✔
2171

2172
        if destination_config := request.get("DestinationConfig"):
1✔
2173
            if "OnSuccess" in destination_config:
1✔
2174
                raise InvalidParameterValueException(
1✔
2175
                    "Unsupported DestinationConfig parameter for given event source mapping type.",
2176
                    Type="User",
2177
                )
2178

2179
        service = None
1✔
2180
        if "SelfManagedEventSource" in request:
1✔
2181
            service = "kafka"
×
2182
            if "SourceAccessConfigurations" not in request:
×
2183
                raise InvalidParameterValueException(
×
2184
                    "Required 'sourceAccessConfigurations' parameter is missing.", Type="User"
2185
                )
2186
        if service is None and "EventSourceArn" not in request:
1✔
2187
            raise InvalidParameterValueException("Unrecognized event source.", Type="User")
1✔
2188
        if service is None:
1✔
2189
            service = extract_service_from_arn(request["EventSourceArn"])
1✔
2190

2191
        batch_size = api_utils.validate_and_set_batch_size(service, request.get("BatchSize"))
1✔
2192
        if service in ["dynamodb", "kinesis"]:
1✔
2193
            starting_position = request.get("StartingPosition")
1✔
2194
            if not starting_position:
1✔
2195
                raise InvalidParameterValueException(
1✔
2196
                    "1 validation error detected: Value null at 'startingPosition' failed to satisfy constraint: Member must not be null.",
2197
                    Type="User",
2198
                )
2199

2200
            if starting_position not in KinesisStreamStartPosition.__members__:
1✔
2201
                raise ValidationException(
1✔
2202
                    f"1 validation error detected: Value '{starting_position}' at 'startingPosition' failed to satisfy constraint: Member must satisfy enum value set: [LATEST, AT_TIMESTAMP, TRIM_HORIZON]"
2203
                )
2204
            # AT_TIMESTAMP is not allowed for DynamoDB Streams
2205
            elif (
1✔
2206
                service == "dynamodb"
2207
                and starting_position not in DynamoDBStreamStartPosition.__members__
2208
            ):
2209
                raise InvalidParameterValueException(
1✔
2210
                    f"Unsupported starting position for arn type: {request['EventSourceArn']}",
2211
                    Type="User",
2212
                )
2213

2214
        if service in ["sqs", "sqs-fifo"]:
1✔
2215
            if batch_size > 10 and request.get("MaximumBatchingWindowInSeconds", 0) == 0:
1✔
2216
                raise InvalidParameterValueException(
1✔
2217
                    "Maximum batch window in seconds must be greater than 0 if maximum batch size is greater than 10",
2218
                    Type="User",
2219
                )
2220

2221
        if (filter_criteria := request.get("FilterCriteria")) is not None:
1✔
2222
            for filter_ in filter_criteria.get("Filters", []):
1✔
2223
                pattern_str = filter_.get("Pattern")
1✔
2224
                if not pattern_str or not isinstance(pattern_str, str):
1✔
2225
                    raise InvalidParameterValueException(
×
2226
                        "Invalid filter pattern definition.", Type="User"
2227
                    )
2228

2229
                if not validate_event_pattern(pattern_str):
1✔
2230
                    raise InvalidParameterValueException(
1✔
2231
                        "Invalid filter pattern definition.", Type="User"
2232
                    )
2233

2234
        # Can either have a FunctionName (i.e CreateEventSourceMapping request) or
2235
        # an internal EventSourceMappingConfiguration representation
2236
        request_function_name = request.get("FunctionName") or request.get("FunctionArn")
1✔
2237
        # can be either a partial arn or a full arn for the version/alias
2238
        function_name, qualifier, account, region = function_locators_from_arn(
1✔
2239
            request_function_name
2240
        )
2241
        # TODO: validate `context.region` vs. `region(request["FunctionName"])` vs. `region(request["EventSourceArn"])`
2242
        account = account or context.account_id
1✔
2243
        region = region or context.region
1✔
2244
        state = lambda_stores[account][region]
1✔
2245
        fn = state.functions.get(function_name)
1✔
2246
        if not fn:
1✔
2247
            raise InvalidParameterValueException("Function does not exist", Type="User")
1✔
2248

2249
        if qualifier:
1✔
2250
            # make sure the function version/alias exists
2251
            if api_utils.qualifier_is_alias(qualifier):
1✔
2252
                fn_alias = fn.aliases.get(qualifier)
1✔
2253
                if not fn_alias:
1✔
2254
                    raise Exception("unknown alias")  # TODO: cover via test
×
2255
            elif api_utils.qualifier_is_version(qualifier):
1✔
2256
                fn_version = fn.versions.get(qualifier)
1✔
2257
                if not fn_version:
1✔
2258
                    raise Exception("unknown version")  # TODO: cover via test
×
2259
            elif qualifier == "$LATEST":
1✔
2260
                pass
1✔
2261
            elif qualifier == "$LATEST.PUBLISHED":
×
2262
                if fn.versions.get(qualifier):
×
2263
                    pass
×
2264
            else:
2265
                raise Exception("invalid functionname")  # TODO: cover via test
×
2266
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account, region)
1✔
2267

2268
        else:
2269
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account, region)
1✔
2270

2271
        function_version = get_function_version_from_arn(fn_arn)
1✔
2272
        function_role = function_version.config.role
1✔
2273

2274
        if source_arn := request.get("EventSourceArn"):
1✔
2275
            self.check_service_resource_exists(service, source_arn, fn_arn, function_role)
1✔
2276
        # Check we are validating a CreateEventSourceMapping request
2277
        if is_create_esm_request:
1✔
2278

2279
            def _get_mapping_sources(mapping: dict[str, Any]) -> list[str]:
1✔
2280
                if event_source_arn := mapping.get("EventSourceArn"):
1✔
2281
                    return [event_source_arn]
1✔
2282
                return (
×
2283
                    mapping.get("SelfManagedEventSource", {})
2284
                    .get("Endpoints", {})
2285
                    .get("KAFKA_BOOTSTRAP_SERVERS", [])
2286
                )
2287

2288
            # check for event source duplicates
2289
            # TODO: currently validated for sqs, kinesis, and dynamodb
2290
            service_id = load_service(service).service_id
1✔
2291
            for uuid, mapping in state.event_source_mappings.items():
1✔
2292
                mapping_sources = _get_mapping_sources(mapping)
1✔
2293
                request_sources = _get_mapping_sources(request)
1✔
2294
                if mapping["FunctionArn"] == fn_arn and (
1✔
2295
                    set(mapping_sources).intersection(request_sources)
2296
                ):
2297
                    if service == "sqs":
1✔
2298
                        # *shakes fist at SQS*
2299
                        raise ResourceConflictException(
1✔
2300
                            f'An event source mapping with {service_id} arn (" {mapping["EventSourceArn"]} ") '
2301
                            f'and function (" {function_name} ") already exists. Please update or delete the '
2302
                            f"existing mapping with UUID {uuid}",
2303
                            Type="User",
2304
                        )
2305
                    elif service == "kafka":
1✔
2306
                        if set(mapping["Topics"]).intersection(request["Topics"]):
×
2307
                            raise ResourceConflictException(
×
2308
                                f'An event source mapping with event source ("{",".join(request_sources)}"), '
2309
                                f'function ("{fn_arn}"), '
2310
                                f'topics ("{",".join(request["Topics"])}") already exists. Please update or delete the '
2311
                                f"existing mapping with UUID {uuid}",
2312
                                Type="User",
2313
                            )
2314
                    else:
2315
                        raise ResourceConflictException(
1✔
2316
                            f'The event source arn (" {mapping["EventSourceArn"]} ") and function '
2317
                            f'(" {function_name} ") provided mapping already exists. Please update or delete the '
2318
                            f"existing mapping with UUID {uuid}",
2319
                            Type="User",
2320
                        )
2321
        return fn_arn, function_name, state, function_version, function_role
1✔
2322

2323
    @handler("UpdateEventSourceMapping", expand=False)
1✔
2324
    def update_event_source_mapping(
1✔
2325
        self,
2326
        context: RequestContext,
2327
        request: UpdateEventSourceMappingRequest,
2328
    ) -> EventSourceMappingConfiguration:
2329
        return self.update_event_source_mapping_v2(context, request)
1✔
2330

2331
    def update_event_source_mapping_v2(
1✔
2332
        self,
2333
        context: RequestContext,
2334
        request: UpdateEventSourceMappingRequest,
2335
    ) -> EventSourceMappingConfiguration:
2336
        # TODO: test and implement this properly (quite complex with many validations and limitations!)
2337
        LOG.warning(
1✔
2338
            "Updating Lambda Event Source Mapping is in experimental state and not yet fully tested."
2339
        )
2340
        state = lambda_stores[context.account_id][context.region]
1✔
2341
        request_data = {**request}
1✔
2342
        uuid = request_data.pop("UUID", None)
1✔
2343
        if not uuid:
1✔
2344
            raise ResourceNotFoundException(
×
2345
                "The resource you requested does not exist.", Type="User"
2346
            )
2347
        old_event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2348
        esm_worker = self.esm_workers.get(uuid)
1✔
2349
        if old_event_source_mapping is None or esm_worker is None:
1✔
2350
            raise ResourceNotFoundException(
1✔
2351
                "The resource you requested does not exist.", Type="User"
2352
            )  # TODO: test?
2353

2354
        # normalize values to overwrite
2355
        event_source_mapping = old_event_source_mapping | request_data
1✔
2356

2357
        temp_params = {}  # values only set for the returned response, not saved internally (e.g. transient state)
1✔
2358

2359
        # Validate the newly updated ESM object. We ignore the output here since we only care whether an Exception is raised.
2360
        function_arn, _, _, function_version, function_role = self.validate_event_source_mapping(
1✔
2361
            context, event_source_mapping
2362
        )
2363

2364
        # remove the FunctionName field
2365
        event_source_mapping.pop("FunctionName", None)
1✔
2366

2367
        if function_arn:
1✔
2368
            event_source_mapping["FunctionArn"] = function_arn
1✔
2369

2370
        # Only apply update if the desired state differs
2371
        enabled = request.get("Enabled")
1✔
2372
        if enabled is not None:
1✔
2373
            if enabled and old_event_source_mapping["State"] != EsmState.ENABLED:
1✔
2374
                event_source_mapping["State"] = EsmState.ENABLING
1✔
2375
            # TODO: What happens when trying to update during an update or failed state?!
2376
            elif not enabled and old_event_source_mapping["State"] == EsmState.ENABLED:
1✔
2377
                event_source_mapping["State"] = EsmState.DISABLING
1✔
2378
        else:
2379
            event_source_mapping["State"] = EsmState.UPDATING
1✔
2380

2381
        # To ensure parity, certain responses need to be immediately returned
2382
        temp_params["State"] = event_source_mapping["State"]
1✔
2383

2384
        state.event_source_mappings[uuid] = event_source_mapping
1✔
2385

2386
        # TODO: Currently, we re-create the entire ESM worker. Look into approach with better performance.
2387
        worker_factory = EsmWorkerFactory(
1✔
2388
            event_source_mapping, function_role, request.get("Enabled", esm_worker.enabled)
2389
        )
2390

2391
        # Get a new ESM worker object but do not active it, since the factory holds all logic for creating new worker from configuration.
2392
        updated_esm_worker = worker_factory.get_esm_worker()
1✔
2393
        self.esm_workers[uuid] = updated_esm_worker
1✔
2394

2395
        # We should stop() the worker since the delete() will remove the ESM from the state mapping.
2396
        esm_worker.stop()
1✔
2397
        # This will either create an EsmWorker in the CREATING state if enabled. Otherwise, the DISABLING state is set.
2398
        updated_esm_worker.create()
1✔
2399

2400
        return {**event_source_mapping, **temp_params}
1✔
2401

2402
    def delete_event_source_mapping(
1✔
2403
        self, context: RequestContext, uuid: String, **kwargs
2404
    ) -> EventSourceMappingConfiguration:
2405
        state = lambda_stores[context.account_id][context.region]
1✔
2406
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2407
        if not event_source_mapping:
1✔
2408
            raise ResourceNotFoundException(
1✔
2409
                "The resource you requested does not exist.", Type="User"
2410
            )
2411
        esm = state.event_source_mappings[uuid]
1✔
2412
        # TODO: add proper locking
2413
        esm_worker = self.esm_workers.pop(uuid, None)
1✔
2414
        # Asynchronous delete in v2
2415
        if not esm_worker:
1✔
2416
            raise ResourceNotFoundException(
×
2417
                "The resource you requested does not exist.", Type="User"
2418
            )
2419
        esm_worker.delete()
1✔
2420
        return {**esm, "State": EsmState.DELETING}
1✔
2421

2422
    def get_event_source_mapping(
1✔
2423
        self, context: RequestContext, uuid: String, **kwargs
2424
    ) -> EventSourceMappingConfiguration:
2425
        state = lambda_stores[context.account_id][context.region]
1✔
2426
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2427
        if not event_source_mapping:
1✔
2428
            raise ResourceNotFoundException(
1✔
2429
                "The resource you requested does not exist.", Type="User"
2430
            )
2431
        esm_worker = self.esm_workers.get(uuid)
1✔
2432
        if not esm_worker:
1✔
2433
            raise ResourceNotFoundException(
×
2434
                "The resource you requested does not exist.", Type="User"
2435
            )
2436
        event_source_mapping["State"] = esm_worker.current_state
1✔
2437
        event_source_mapping["StateTransitionReason"] = esm_worker.state_transition_reason
1✔
2438
        return event_source_mapping
1✔
2439

2440
    def list_event_source_mappings(
1✔
2441
        self,
2442
        context: RequestContext,
2443
        event_source_arn: Arn = None,
2444
        function_name: FunctionName = None,
2445
        marker: String = None,
2446
        max_items: MaxListItems = None,
2447
        **kwargs,
2448
    ) -> ListEventSourceMappingsResponse:
2449
        state = lambda_stores[context.account_id][context.region]
1✔
2450

2451
        esms = state.event_source_mappings.values()
1✔
2452
        # TODO: update and test State and StateTransitionReason for ESM v2
2453

2454
        if event_source_arn:  # TODO: validate pattern
1✔
2455
            esms = [e for e in esms if e.get("EventSourceArn") == event_source_arn]
1✔
2456

2457
        if function_name:
1✔
2458
            esms = [e for e in esms if function_name in e["FunctionArn"]]
1✔
2459

2460
        esms = PaginatedList(esms)
1✔
2461
        page, token = esms.get_page(
1✔
2462
            lambda x: x["UUID"],
2463
            marker,
2464
            max_items,
2465
        )
2466
        return ListEventSourceMappingsResponse(EventSourceMappings=page, NextMarker=token)
1✔
2467

2468
    def get_source_type_from_request(self, request: dict[str, Any]) -> str:
1✔
2469
        if event_source_arn := request.get("EventSourceArn", ""):
×
2470
            service = extract_service_from_arn(event_source_arn)
×
2471
            if service == "sqs" and "fifo" in event_source_arn:
×
2472
                service = "sqs-fifo"
×
2473
            return service
×
2474
        elif request.get("SelfManagedEventSource"):
×
2475
            return "kafka"
×
2476

2477
    # =======================================
2478
    # ============ FUNCTION URLS ============
2479
    # =======================================
2480

2481
    @staticmethod
1✔
2482
    def _validate_qualifier(qualifier: str) -> None:
1✔
2483
        if qualifier == "$LATEST" or (qualifier and api_utils.qualifier_is_version(qualifier)):
1✔
2484
            raise ValidationException(
1✔
2485
                f"1 validation error detected: Value '{qualifier}' at 'qualifier' failed to satisfy constraint: Member must satisfy regular expression pattern: ((?!^\\d+$)^[0-9a-zA-Z-_]+$)"
2486
            )
2487

2488
    @staticmethod
1✔
2489
    def _validate_invoke_mode(invoke_mode: str) -> None:
1✔
2490
        if invoke_mode and invoke_mode not in [InvokeMode.BUFFERED, InvokeMode.RESPONSE_STREAM]:
1✔
2491
            raise ValidationException(
1✔
2492
                f"1 validation error detected: Value '{invoke_mode}' at 'invokeMode' failed to satisfy constraint: Member must satisfy enum value set: [RESPONSE_STREAM, BUFFERED]"
2493
            )
2494
        if invoke_mode == InvokeMode.RESPONSE_STREAM:
1✔
2495
            # TODO should we actually fail for setting RESPONSE_STREAM?
2496
            #  It should trigger InvokeWithResponseStream which is not implemented
2497
            LOG.warning(
1✔
2498
                "The invokeMode 'RESPONSE_STREAM' is not yet supported on LocalStack. The property is only mocked, the execution will still be 'BUFFERED'"
2499
            )
2500

2501
    # TODO: what happens if function state is not active?
2502
    def create_function_url_config(
1✔
2503
        self,
2504
        context: RequestContext,
2505
        function_name: FunctionName,
2506
        auth_type: FunctionUrlAuthType,
2507
        qualifier: FunctionUrlQualifier = None,
2508
        cors: Cors = None,
2509
        invoke_mode: InvokeMode = None,
2510
        **kwargs,
2511
    ) -> CreateFunctionUrlConfigResponse:
2512
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2513
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2514
            function_name, qualifier, context
2515
        )
2516
        state = lambda_stores[account_id][region]
1✔
2517
        self._validate_qualifier(qualifier)
1✔
2518
        self._validate_invoke_mode(invoke_mode)
1✔
2519

2520
        fn = state.functions.get(function_name)
1✔
2521
        if fn is None:
1✔
2522
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2523

2524
        url_config = fn.function_url_configs.get(qualifier or "$LATEST")
1✔
2525
        if url_config:
1✔
2526
            raise ResourceConflictException(
1✔
2527
                f"Failed to create function url config for [functionArn = {url_config.function_arn}]. Error message:  FunctionUrlConfig exists for this Lambda function",
2528
                Type="User",
2529
            )
2530

2531
        if qualifier and qualifier != "$LATEST" and qualifier not in fn.aliases:
1✔
2532
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2533

2534
        normalized_qualifier = qualifier or "$LATEST"
1✔
2535

2536
        function_arn = (
1✔
2537
            api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
2538
            if qualifier
2539
            else api_utils.unqualified_lambda_arn(function_name, account_id, region)
2540
        )
2541

2542
        custom_id: str | None = None
1✔
2543

2544
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
2545
        if TAG_KEY_CUSTOM_URL in tags:
1✔
2546
            # Note: I really wanted to add verification here that the
2547
            # url_id is unique, so we could surface that to the user ASAP.
2548
            # However, it seems like that information isn't available yet,
2549
            # since (as far as I can tell) we call
2550
            # self.router.register_routes() once, in a single shot, for all
2551
            # of the routes -- and we need to verify that it's unique not
2552
            # just for this particular lambda function, but for the entire
2553
            # lambda provider. Therefore... that idea proved non-trivial!
2554
            custom_id_tag_value = (
1✔
2555
                f"{tags[TAG_KEY_CUSTOM_URL]}-{qualifier}" if qualifier else tags[TAG_KEY_CUSTOM_URL]
2556
            )
2557
            if TAG_KEY_CUSTOM_URL_VALIDATOR.match(custom_id_tag_value):
1✔
2558
                custom_id = custom_id_tag_value
1✔
2559

2560
            else:
2561
                # Note: we're logging here instead of raising to prioritize
2562
                # strict parity with AWS over the localstack-only custom_id
2563
                LOG.warning(
1✔
2564
                    "Invalid custom ID tag value for lambda URL (%s=%s). "
2565
                    "Replaced with default (random id)",
2566
                    TAG_KEY_CUSTOM_URL,
2567
                    custom_id_tag_value,
2568
                )
2569

2570
        # The url_id is the subdomain used for the URL we're creating. This
2571
        # is either created randomly (as in AWS), or can be passed as a tag
2572
        # to the lambda itself (localstack-only).
2573
        url_id: str
2574
        if custom_id is None:
1✔
2575
            url_id = api_utils.generate_random_url_id()
1✔
2576
        else:
2577
            url_id = custom_id
1✔
2578

2579
        host_definition = localstack_host(custom_port=config.GATEWAY_LISTEN[0].port)
1✔
2580
        fn.function_url_configs[normalized_qualifier] = FunctionUrlConfig(
1✔
2581
            function_arn=function_arn,
2582
            function_name=function_name,
2583
            cors=cors,
2584
            url_id=url_id,
2585
            url=f"http://{url_id}.lambda-url.{context.region}.{host_definition.host_and_port()}/",  # TODO: https support
2586
            auth_type=auth_type,
2587
            creation_time=api_utils.generate_lambda_date(),
2588
            last_modified_time=api_utils.generate_lambda_date(),
2589
            invoke_mode=invoke_mode,
2590
        )
2591

2592
        # persist and start URL
2593
        # TODO: implement URL invoke
2594
        api_url_config = api_utils.map_function_url_config(
1✔
2595
            fn.function_url_configs[normalized_qualifier]
2596
        )
2597

2598
        return CreateFunctionUrlConfigResponse(
1✔
2599
            FunctionUrl=api_url_config["FunctionUrl"],
2600
            FunctionArn=api_url_config["FunctionArn"],
2601
            AuthType=api_url_config["AuthType"],
2602
            Cors=api_url_config["Cors"],
2603
            CreationTime=api_url_config["CreationTime"],
2604
            InvokeMode=api_url_config["InvokeMode"],
2605
        )
2606

2607
    def get_function_url_config(
1✔
2608
        self,
2609
        context: RequestContext,
2610
        function_name: FunctionName,
2611
        qualifier: FunctionUrlQualifier = None,
2612
        **kwargs,
2613
    ) -> GetFunctionUrlConfigResponse:
2614
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2615
        state = lambda_stores[account_id][region]
1✔
2616

2617
        fn_name, qualifier = api_utils.get_name_and_qualifier(function_name, qualifier, context)
1✔
2618

2619
        self._validate_qualifier(qualifier)
1✔
2620

2621
        resolved_fn = state.functions.get(fn_name)
1✔
2622
        if not resolved_fn:
1✔
2623
            raise ResourceNotFoundException(
1✔
2624
                "The resource you requested does not exist.", Type="User"
2625
            )
2626

2627
        qualifier = qualifier or "$LATEST"
1✔
2628
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2629
        if not url_config:
1✔
2630
            raise ResourceNotFoundException(
1✔
2631
                "The resource you requested does not exist.", Type="User"
2632
            )
2633

2634
        return api_utils.map_function_url_config(url_config)
1✔
2635

2636
    def update_function_url_config(
1✔
2637
        self,
2638
        context: RequestContext,
2639
        function_name: FunctionName,
2640
        qualifier: FunctionUrlQualifier = None,
2641
        auth_type: FunctionUrlAuthType = None,
2642
        cors: Cors = None,
2643
        invoke_mode: InvokeMode = None,
2644
        **kwargs,
2645
    ) -> UpdateFunctionUrlConfigResponse:
2646
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2647
        state = lambda_stores[account_id][region]
1✔
2648

2649
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2650
            function_name, qualifier, context
2651
        )
2652
        self._validate_qualifier(qualifier)
1✔
2653
        self._validate_invoke_mode(invoke_mode)
1✔
2654

2655
        fn = state.functions.get(function_name)
1✔
2656
        if not fn:
1✔
2657
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2658

2659
        normalized_qualifier = qualifier or "$LATEST"
1✔
2660

2661
        if (
1✔
2662
            api_utils.qualifier_is_alias(normalized_qualifier)
2663
            and normalized_qualifier not in fn.aliases
2664
        ):
2665
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2666

2667
        url_config = fn.function_url_configs.get(normalized_qualifier)
1✔
2668
        if not url_config:
1✔
2669
            raise ResourceNotFoundException(
1✔
2670
                "The resource you requested does not exist.", Type="User"
2671
            )
2672

2673
        changes = {
1✔
2674
            "last_modified_time": api_utils.generate_lambda_date(),
2675
            **({"cors": cors} if cors is not None else {}),
2676
            **({"auth_type": auth_type} if auth_type is not None else {}),
2677
        }
2678

2679
        if invoke_mode:
1✔
2680
            changes["invoke_mode"] = invoke_mode
1✔
2681

2682
        new_url_config = dataclasses.replace(url_config, **changes)
1✔
2683
        fn.function_url_configs[normalized_qualifier] = new_url_config
1✔
2684

2685
        return UpdateFunctionUrlConfigResponse(
1✔
2686
            FunctionUrl=new_url_config.url,
2687
            FunctionArn=new_url_config.function_arn,
2688
            AuthType=new_url_config.auth_type,
2689
            Cors=new_url_config.cors,
2690
            CreationTime=new_url_config.creation_time,
2691
            LastModifiedTime=new_url_config.last_modified_time,
2692
            InvokeMode=new_url_config.invoke_mode,
2693
        )
2694

2695
    def delete_function_url_config(
1✔
2696
        self,
2697
        context: RequestContext,
2698
        function_name: FunctionName,
2699
        qualifier: FunctionUrlQualifier = None,
2700
        **kwargs,
2701
    ) -> None:
2702
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2703
        state = lambda_stores[account_id][region]
1✔
2704

2705
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2706
            function_name, qualifier, context
2707
        )
2708
        self._validate_qualifier(qualifier)
1✔
2709

2710
        resolved_fn = state.functions.get(function_name)
1✔
2711
        if not resolved_fn:
1✔
2712
            raise ResourceNotFoundException(
1✔
2713
                "The resource you requested does not exist.", Type="User"
2714
            )
2715

2716
        qualifier = qualifier or "$LATEST"
1✔
2717
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2718
        if not url_config:
1✔
2719
            raise ResourceNotFoundException(
1✔
2720
                "The resource you requested does not exist.", Type="User"
2721
            )
2722

2723
        del resolved_fn.function_url_configs[qualifier]
1✔
2724

2725
    def list_function_url_configs(
1✔
2726
        self,
2727
        context: RequestContext,
2728
        function_name: FunctionName,
2729
        marker: String = None,
2730
        max_items: MaxItems = None,
2731
        **kwargs,
2732
    ) -> ListFunctionUrlConfigsResponse:
2733
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2734
        state = lambda_stores[account_id][region]
1✔
2735

2736
        fn_name = api_utils.get_function_name(function_name, context)
1✔
2737
        resolved_fn = state.functions.get(fn_name)
1✔
2738
        if not resolved_fn:
1✔
2739
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2740

2741
        url_configs = [
1✔
2742
            api_utils.map_function_url_config(fn_conf)
2743
            for fn_conf in resolved_fn.function_url_configs.values()
2744
        ]
2745
        url_configs = PaginatedList(url_configs)
1✔
2746
        page, token = url_configs.get_page(
1✔
2747
            lambda url_config: url_config["FunctionArn"],
2748
            marker,
2749
            max_items,
2750
        )
2751
        url_configs = page
1✔
2752
        return ListFunctionUrlConfigsResponse(FunctionUrlConfigs=url_configs, NextMarker=token)
1✔
2753

2754
    # =======================================
2755
    # ============  Permissions  ============
2756
    # =======================================
2757

2758
    @handler("AddPermission", expand=False)
1✔
2759
    def add_permission(
1✔
2760
        self,
2761
        context: RequestContext,
2762
        request: AddPermissionRequest,
2763
    ) -> AddPermissionResponse:
2764
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2765
            request.get("FunctionName"), request.get("Qualifier"), context
2766
        )
2767

2768
        # validate qualifier
2769
        if qualifier is not None:
1✔
2770
            self._validate_qualifier_expression(qualifier)
1✔
2771
            if qualifier == "$LATEST":
1✔
2772
                raise InvalidParameterValueException(
1✔
2773
                    "We currently do not support adding policies for $LATEST.", Type="User"
2774
                )
2775
        account_id, region = api_utils.get_account_and_region(request.get("FunctionName"), context)
1✔
2776

2777
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2778
        resolved_qualifier, fn_arn = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2779

2780
        revision_id = request.get("RevisionId")
1✔
2781
        if revision_id:
1✔
2782
            fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2783
            if revision_id != fn_revision_id:
1✔
2784
                raise PreconditionFailedException(
1✔
2785
                    "The Revision Id provided does not match the latest Revision Id. "
2786
                    "Call the GetPolicy API to retrieve the latest Revision Id",
2787
                    Type="User",
2788
                )
2789

2790
        request_sid = request["StatementId"]
1✔
2791
        if not bool(STATEMENT_ID_REGEX.match(request_sid)):
1✔
2792
            raise ValidationException(
1✔
2793
                f"1 validation error detected: Value '{request_sid}' at 'statementId' failed to satisfy constraint: Member must satisfy regular expression pattern: ([a-zA-Z0-9-_]+)"
2794
            )
2795
        # check for an already existing policy and any conflicts in existing statements
2796
        existing_policy = resolved_fn.permissions.get(resolved_qualifier)
1✔
2797
        if existing_policy:
1✔
2798
            if request_sid in [s["Sid"] for s in existing_policy.policy.Statement]:
1✔
2799
                # uniqueness scope: statement id needs to be unique per qualified function ($LATEST, version, or alias)
2800
                # Counterexample: the same sid can exist within $LATEST, version, and alias
2801
                raise ResourceConflictException(
1✔
2802
                    f"The statement id ({request_sid}) provided already exists. Please provide a new statement id, or remove the existing statement.",
2803
                    Type="User",
2804
                )
2805

2806
        permission_statement = api_utils.build_statement(
1✔
2807
            partition=context.partition,
2808
            resource_arn=fn_arn,
2809
            statement_id=request["StatementId"],
2810
            action=request["Action"],
2811
            principal=request["Principal"],
2812
            source_arn=request.get("SourceArn"),
2813
            source_account=request.get("SourceAccount"),
2814
            principal_org_id=request.get("PrincipalOrgID"),
2815
            event_source_token=request.get("EventSourceToken"),
2816
            auth_type=request.get("FunctionUrlAuthType"),
2817
        )
2818
        new_policy = existing_policy
1✔
2819
        if not existing_policy:
1✔
2820
            new_policy = FunctionResourcePolicy(
1✔
2821
                policy=ResourcePolicy(Version="2012-10-17", Id="default", Statement=[])
2822
            )
2823
        new_policy.policy.Statement.append(permission_statement)
1✔
2824
        if not existing_policy:
1✔
2825
            resolved_fn.permissions[resolved_qualifier] = new_policy
1✔
2826

2827
        # Update revision id of alias or version
2828
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2829
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2830
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2831
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2832
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
1✔
2833
        # Assumes that a non-alias is a version
2834
        else:
2835
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2836
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2837
                resolved_version, config=dataclasses.replace(resolved_version.config)
2838
            )
2839
        return AddPermissionResponse(Statement=json.dumps(permission_statement))
1✔
2840

2841
    def remove_permission(
1✔
2842
        self,
2843
        context: RequestContext,
2844
        function_name: NamespacedFunctionName,
2845
        statement_id: NamespacedStatementId,
2846
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
2847
        revision_id: String | None = None,
2848
        **kwargs,
2849
    ) -> None:
2850
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2851
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2852
            function_name, qualifier, context
2853
        )
2854
        if qualifier is not None:
1✔
2855
            self._validate_qualifier_expression(qualifier)
1✔
2856

2857
        state = lambda_stores[account_id][region]
1✔
2858
        resolved_fn = state.functions.get(function_name)
1✔
2859
        if resolved_fn is None:
1✔
2860
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2861
            raise ResourceNotFoundException(f"No policy found for: {fn_arn}", Type="User")
1✔
2862

2863
        resolved_qualifier, _ = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2864
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2865
        if not function_permission:
1✔
2866
            raise ResourceNotFoundException(
1✔
2867
                "No policy is associated with the given resource.", Type="User"
2868
            )
2869

2870
        # try to find statement in policy and delete it
2871
        statement = None
1✔
2872
        for s in function_permission.policy.Statement:
1✔
2873
            if s["Sid"] == statement_id:
1✔
2874
                statement = s
1✔
2875
                break
1✔
2876

2877
        if not statement:
1✔
2878
            raise ResourceNotFoundException(
1✔
2879
                f"Statement {statement_id} is not found in resource policy.", Type="User"
2880
            )
2881
        fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2882
        if revision_id and revision_id != fn_revision_id:
1✔
2883
            raise PreconditionFailedException(
×
2884
                "The Revision Id provided does not match the latest Revision Id. "
2885
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2886
                Type="User",
2887
            )
2888
        function_permission.policy.Statement.remove(statement)
1✔
2889

2890
        # Update revision id for alias or version
2891
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2892
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2893
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2894
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
×
2895
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
×
2896
        # Assumes that a non-alias is a version
2897
        else:
2898
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2899
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2900
                resolved_version, config=dataclasses.replace(resolved_version.config)
2901
            )
2902

2903
        # remove the policy as a whole when there's no statement left in it
2904
        if len(function_permission.policy.Statement) == 0:
1✔
2905
            del resolved_fn.permissions[resolved_qualifier]
1✔
2906

2907
    def get_policy(
1✔
2908
        self,
2909
        context: RequestContext,
2910
        function_name: NamespacedFunctionName,
2911
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
2912
        **kwargs,
2913
    ) -> GetPolicyResponse:
2914
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2915
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2916
            function_name, qualifier, context
2917
        )
2918

2919
        if qualifier is not None:
1✔
2920
            self._validate_qualifier_expression(qualifier)
1✔
2921

2922
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2923

2924
        resolved_qualifier = qualifier or "$LATEST"
1✔
2925
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2926
        if not function_permission:
1✔
2927
            raise ResourceNotFoundException(
1✔
2928
                "The resource you requested does not exist.", Type="User"
2929
            )
2930

2931
        fn_revision_id = None
1✔
2932
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2933
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2934
            fn_revision_id = resolved_alias.revision_id
1✔
2935
        # Assumes that a non-alias is a version
2936
        else:
2937
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2938
            fn_revision_id = resolved_version.config.revision_id
1✔
2939

2940
        return GetPolicyResponse(
1✔
2941
            Policy=json.dumps(dataclasses.asdict(function_permission.policy)),
2942
            RevisionId=fn_revision_id,
2943
        )
2944

2945
    # =======================================
2946
    # ========  Code signing config  ========
2947
    # =======================================
2948

2949
    def create_code_signing_config(
1✔
2950
        self,
2951
        context: RequestContext,
2952
        allowed_publishers: AllowedPublishers,
2953
        description: Description | None = None,
2954
        code_signing_policies: CodeSigningPolicies | None = None,
2955
        tags: Tags | None = None,
2956
        **kwargs,
2957
    ) -> CreateCodeSigningConfigResponse:
2958
        account = context.account_id
1✔
2959
        region = context.region
1✔
2960

2961
        state = lambda_stores[account][region]
1✔
2962
        # TODO: can there be duplicates?
2963
        csc_id = f"csc-{get_random_hex(17)}"  # e.g. 'csc-077c33b4c19e26036'
1✔
2964
        csc_arn = f"arn:{context.partition}:lambda:{region}:{account}:code-signing-config:{csc_id}"
1✔
2965
        csc = CodeSigningConfig(
1✔
2966
            csc_id=csc_id,
2967
            arn=csc_arn,
2968
            allowed_publishers=allowed_publishers,
2969
            policies=code_signing_policies,
2970
            last_modified=api_utils.generate_lambda_date(),
2971
            description=description,
2972
        )
2973
        state.code_signing_configs[csc_arn] = csc
1✔
2974
        return CreateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
2975

2976
    def put_function_code_signing_config(
1✔
2977
        self,
2978
        context: RequestContext,
2979
        code_signing_config_arn: CodeSigningConfigArn,
2980
        function_name: NamespacedFunctionName,
2981
        **kwargs,
2982
    ) -> PutFunctionCodeSigningConfigResponse:
2983
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2984
        state = lambda_stores[account_id][region]
1✔
2985
        function_name = api_utils.get_function_name(function_name, context)
1✔
2986

2987
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2988
        if not csc:
1✔
2989
            raise CodeSigningConfigNotFoundException(
1✔
2990
                f"The code signing configuration cannot be found. Check that the provided configuration is not deleted: {code_signing_config_arn}.",
2991
                Type="User",
2992
            )
2993

2994
        fn = state.functions.get(function_name)
1✔
2995
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2996
        if not fn:
1✔
2997
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
2998

2999
        fn.code_signing_config_arn = code_signing_config_arn
1✔
3000
        return PutFunctionCodeSigningConfigResponse(
1✔
3001
            CodeSigningConfigArn=code_signing_config_arn, FunctionName=function_name
3002
        )
3003

3004
    def update_code_signing_config(
1✔
3005
        self,
3006
        context: RequestContext,
3007
        code_signing_config_arn: CodeSigningConfigArn,
3008
        description: Description = None,
3009
        allowed_publishers: AllowedPublishers = None,
3010
        code_signing_policies: CodeSigningPolicies = None,
3011
        **kwargs,
3012
    ) -> UpdateCodeSigningConfigResponse:
3013
        state = lambda_stores[context.account_id][context.region]
1✔
3014
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3015
        if not csc:
1✔
3016
            raise ResourceNotFoundException(
1✔
3017
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3018
            )
3019

3020
        changes = {
1✔
3021
            **(
3022
                {"allowed_publishers": allowed_publishers} if allowed_publishers is not None else {}
3023
            ),
3024
            **({"policies": code_signing_policies} if code_signing_policies is not None else {}),
3025
            **({"description": description} if description is not None else {}),
3026
        }
3027
        new_csc = dataclasses.replace(
1✔
3028
            csc, last_modified=api_utils.generate_lambda_date(), **changes
3029
        )
3030
        state.code_signing_configs[code_signing_config_arn] = new_csc
1✔
3031

3032
        return UpdateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(new_csc))
1✔
3033

3034
    def get_code_signing_config(
1✔
3035
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
3036
    ) -> GetCodeSigningConfigResponse:
3037
        state = lambda_stores[context.account_id][context.region]
1✔
3038
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3039
        if not csc:
1✔
3040
            raise ResourceNotFoundException(
1✔
3041
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3042
            )
3043

3044
        return GetCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
3045

3046
    def get_function_code_signing_config(
1✔
3047
        self, context: RequestContext, function_name: NamespacedFunctionName, **kwargs
3048
    ) -> GetFunctionCodeSigningConfigResponse:
3049
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3050
        state = lambda_stores[account_id][region]
1✔
3051
        function_name = api_utils.get_function_name(function_name, context)
1✔
3052
        fn = state.functions.get(function_name)
1✔
3053
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
3054
        if not fn:
1✔
3055
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
3056

3057
        if fn.code_signing_config_arn:
1✔
3058
            return GetFunctionCodeSigningConfigResponse(
1✔
3059
                CodeSigningConfigArn=fn.code_signing_config_arn, FunctionName=function_name
3060
            )
3061

3062
        return GetFunctionCodeSigningConfigResponse()
1✔
3063

3064
    def delete_function_code_signing_config(
1✔
3065
        self, context: RequestContext, function_name: NamespacedFunctionName, **kwargs
3066
    ) -> None:
3067
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3068
        state = lambda_stores[account_id][region]
1✔
3069
        function_name = api_utils.get_function_name(function_name, context)
1✔
3070
        fn = state.functions.get(function_name)
1✔
3071
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
3072
        if not fn:
1✔
3073
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
3074

3075
        fn.code_signing_config_arn = None
1✔
3076

3077
    def delete_code_signing_config(
1✔
3078
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
3079
    ) -> DeleteCodeSigningConfigResponse:
3080
        state = lambda_stores[context.account_id][context.region]
1✔
3081

3082
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3083
        if not csc:
1✔
3084
            raise ResourceNotFoundException(
1✔
3085
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3086
            )
3087

3088
        del state.code_signing_configs[code_signing_config_arn]
1✔
3089

3090
        return DeleteCodeSigningConfigResponse()
1✔
3091

3092
    def list_code_signing_configs(
1✔
3093
        self,
3094
        context: RequestContext,
3095
        marker: String = None,
3096
        max_items: MaxListItems = None,
3097
        **kwargs,
3098
    ) -> ListCodeSigningConfigsResponse:
3099
        state = lambda_stores[context.account_id][context.region]
1✔
3100

3101
        cscs = [api_utils.map_csc(csc) for csc in state.code_signing_configs.values()]
1✔
3102
        cscs = PaginatedList(cscs)
1✔
3103
        page, token = cscs.get_page(
1✔
3104
            lambda csc: csc["CodeSigningConfigId"],
3105
            marker,
3106
            max_items,
3107
        )
3108
        return ListCodeSigningConfigsResponse(CodeSigningConfigs=page, NextMarker=token)
1✔
3109

3110
    def list_functions_by_code_signing_config(
1✔
3111
        self,
3112
        context: RequestContext,
3113
        code_signing_config_arn: CodeSigningConfigArn,
3114
        marker: String = None,
3115
        max_items: MaxListItems = None,
3116
        **kwargs,
3117
    ) -> ListFunctionsByCodeSigningConfigResponse:
3118
        account = context.account_id
1✔
3119
        region = context.region
1✔
3120

3121
        state = lambda_stores[account][region]
1✔
3122

3123
        if code_signing_config_arn not in state.code_signing_configs:
1✔
3124
            raise ResourceNotFoundException(
1✔
3125
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3126
            )
3127

3128
        fn_arns = [
1✔
3129
            api_utils.unqualified_lambda_arn(fn.function_name, account, region)
3130
            for fn in state.functions.values()
3131
            if fn.code_signing_config_arn == code_signing_config_arn
3132
        ]
3133

3134
        cscs = PaginatedList(fn_arns)
1✔
3135
        page, token = cscs.get_page(
1✔
3136
            lambda x: x,
3137
            marker,
3138
            max_items,
3139
        )
3140
        return ListFunctionsByCodeSigningConfigResponse(FunctionArns=page, NextMarker=token)
1✔
3141

3142
    # =======================================
3143
    # =========  Account Settings   =========
3144
    # =======================================
3145

3146
    # CAVE: these settings & usages are *per* region!
3147
    # Lambda quotas: https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
3148
    def get_account_settings(self, context: RequestContext, **kwargs) -> GetAccountSettingsResponse:
1✔
3149
        state = lambda_stores[context.account_id][context.region]
1✔
3150

3151
        fn_count = 0
1✔
3152
        code_size_sum = 0
1✔
3153
        reserved_concurrency_sum = 0
1✔
3154
        for fn in state.functions.values():
1✔
3155
            fn_count += 1
1✔
3156
            for fn_version in fn.versions.values():
1✔
3157
                # Image-based Lambdas do not have a code attribute and count against the ECR quotas instead
3158
                if fn_version.config.package_type == PackageType.Zip:
1✔
3159
                    code_size_sum += fn_version.config.code.code_size
1✔
3160
            if fn.reserved_concurrent_executions is not None:
1✔
3161
                reserved_concurrency_sum += fn.reserved_concurrent_executions
1✔
3162
            for c in fn.provisioned_concurrency_configs.values():
1✔
3163
                reserved_concurrency_sum += c.provisioned_concurrent_executions
1✔
3164
        for layer in state.layers.values():
1✔
3165
            for layer_version in layer.layer_versions.values():
1✔
3166
                code_size_sum += layer_version.code.code_size
1✔
3167
        return GetAccountSettingsResponse(
1✔
3168
            AccountLimit=AccountLimit(
3169
                TotalCodeSize=config.LAMBDA_LIMITS_TOTAL_CODE_SIZE,
3170
                CodeSizeZipped=config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED,
3171
                CodeSizeUnzipped=config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED,
3172
                ConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS,
3173
                UnreservedConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS
3174
                - reserved_concurrency_sum,
3175
            ),
3176
            AccountUsage=AccountUsage(
3177
                TotalCodeSize=code_size_sum,
3178
                FunctionCount=fn_count,
3179
            ),
3180
        )
3181

3182
    # =======================================
3183
    # ==  Provisioned Concurrency Config   ==
3184
    # =======================================
3185

3186
    def _get_provisioned_config(
1✔
3187
        self, context: RequestContext, function_name: str, qualifier: str
3188
    ) -> ProvisionedConcurrencyConfiguration | None:
3189
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3190
        state = lambda_stores[account_id][region]
1✔
3191
        function_name = api_utils.get_function_name(function_name, context)
1✔
3192
        fn = state.functions.get(function_name)
1✔
3193
        if api_utils.qualifier_is_alias(qualifier):
1✔
3194
            fn_alias = None
1✔
3195
            if fn:
1✔
3196
                fn_alias = fn.aliases.get(qualifier)
1✔
3197
            if fn_alias is None:
1✔
3198
                raise ResourceNotFoundException(
1✔
3199
                    f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3200
                    Type="User",
3201
                )
3202
        elif api_utils.qualifier_is_version(qualifier):
1✔
3203
            fn_version = None
1✔
3204
            if fn:
1✔
3205
                fn_version = fn.versions.get(qualifier)
1✔
3206
            if fn_version is None:
1✔
3207
                raise ResourceNotFoundException(
1✔
3208
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3209
                    Type="User",
3210
                )
3211

3212
        return fn.provisioned_concurrency_configs.get(qualifier)
1✔
3213

3214
    def put_provisioned_concurrency_config(
1✔
3215
        self,
3216
        context: RequestContext,
3217
        function_name: FunctionName,
3218
        qualifier: Qualifier,
3219
        provisioned_concurrent_executions: PositiveInteger,
3220
        **kwargs,
3221
    ) -> PutProvisionedConcurrencyConfigResponse:
3222
        if provisioned_concurrent_executions <= 0:
1✔
3223
            raise ValidationException(
1✔
3224
                f"1 validation error detected: Value '{provisioned_concurrent_executions}' at 'provisionedConcurrentExecutions' failed to satisfy constraint: Member must have value greater than or equal to 1"
3225
            )
3226

3227
        if qualifier == "$LATEST":
1✔
3228
            raise InvalidParameterValueException(
1✔
3229
                "Provisioned Concurrency Configs cannot be applied to unpublished function versions.",
3230
                Type="User",
3231
            )
3232
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3233
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3234
            function_name, qualifier, context
3235
        )
3236
        state = lambda_stores[account_id][region]
1✔
3237
        fn = state.functions.get(function_name)
1✔
3238

3239
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3240

3241
        if provisioned_config:  # TODO: merge?
1✔
3242
            # TODO: add a test for partial updates (if possible)
3243
            LOG.warning(
1✔
3244
                "Partial update of provisioned concurrency config is currently not supported."
3245
            )
3246

3247
        other_provisioned_sum = sum(
1✔
3248
            [
3249
                provisioned_configs.provisioned_concurrent_executions
3250
                for provisioned_qualifier, provisioned_configs in fn.provisioned_concurrency_configs.items()
3251
                if provisioned_qualifier != qualifier
3252
            ]
3253
        )
3254

3255
        if (
1✔
3256
            fn.reserved_concurrent_executions is not None
3257
            and fn.reserved_concurrent_executions
3258
            < other_provisioned_sum + provisioned_concurrent_executions
3259
        ):
3260
            raise InvalidParameterValueException(
1✔
3261
                "Requested Provisioned Concurrency should not be greater than the reservedConcurrentExecution for function",
3262
                Type="User",
3263
            )
3264

3265
        if provisioned_concurrent_executions > config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS:
1✔
3266
            raise InvalidParameterValueException(
1✔
3267
                f"Specified ConcurrentExecutions for function is greater than account's unreserved concurrency"
3268
                f" [{config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS}]."
3269
            )
3270

3271
        settings = self.get_account_settings(context)
1✔
3272
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
3273
            "UnreservedConcurrentExecutions"
3274
        ]
3275
        if (
1✔
3276
            unreserved_concurrent_executions - provisioned_concurrent_executions
3277
            < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY
3278
        ):
3279
            raise InvalidParameterValueException(
1✔
3280
                f"Specified ConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below"
3281
                f" its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
3282
            )
3283

3284
        provisioned_config = ProvisionedConcurrencyConfiguration(
1✔
3285
            provisioned_concurrent_executions, api_utils.generate_lambda_date()
3286
        )
3287
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3288

3289
        if api_utils.qualifier_is_alias(qualifier):
1✔
3290
            alias = fn.aliases.get(qualifier)
1✔
3291
            resolved_version = fn.versions.get(alias.function_version)
1✔
3292

3293
            if (
1✔
3294
                resolved_version
3295
                and fn.provisioned_concurrency_configs.get(alias.function_version) is not None
3296
            ):
3297
                raise ResourceConflictException(
1✔
3298
                    "Alias can't be used for Provisioned Concurrency configuration on an already Provisioned version",
3299
                    Type="User",
3300
                )
3301
            fn_arn = resolved_version.id.qualified_arn()
1✔
3302
        elif api_utils.qualifier_is_version(qualifier):
1✔
3303
            fn_version = fn.versions.get(qualifier)
1✔
3304

3305
            # TODO: might be useful other places, utilize
3306
            pointing_aliases = []
1✔
3307
            for alias in fn.aliases.values():
1✔
3308
                if (
1✔
3309
                    alias.function_version == qualifier
3310
                    and fn.provisioned_concurrency_configs.get(alias.name) is not None
3311
                ):
3312
                    pointing_aliases.append(alias.name)
1✔
3313
            if pointing_aliases:
1✔
3314
                raise ResourceConflictException(
1✔
3315
                    "Version is pointed by a Provisioned Concurrency alias", Type="User"
3316
                )
3317

3318
            fn_arn = fn_version.id.qualified_arn()
1✔
3319

3320
        manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3321

3322
        fn.provisioned_concurrency_configs[qualifier] = provisioned_config
1✔
3323

3324
        manager.update_provisioned_concurrency_config(
1✔
3325
            provisioned_config.provisioned_concurrent_executions
3326
        )
3327

3328
        return PutProvisionedConcurrencyConfigResponse(
1✔
3329
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3330
            AvailableProvisionedConcurrentExecutions=0,
3331
            AllocatedProvisionedConcurrentExecutions=0,
3332
            Status=ProvisionedConcurrencyStatusEnum.IN_PROGRESS,
3333
            # StatusReason=manager.provisioned_state.status_reason,
3334
            LastModified=provisioned_config.last_modified,  # TODO: does change with configuration or also with state changes?
3335
        )
3336

3337
    def get_provisioned_concurrency_config(
1✔
3338
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3339
    ) -> GetProvisionedConcurrencyConfigResponse:
3340
        if qualifier == "$LATEST":
1✔
3341
            raise InvalidParameterValueException(
1✔
3342
                "The function resource provided must be an alias or a published version.",
3343
                Type="User",
3344
            )
3345
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3346
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3347
            function_name, qualifier, context
3348
        )
3349

3350
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3351
        if not provisioned_config:
1✔
3352
            raise ProvisionedConcurrencyConfigNotFoundException(
1✔
3353
                "No Provisioned Concurrency Config found for this function", Type="User"
3354
            )
3355

3356
        # TODO: make this compatible with alias pointer migration on update
3357
        if api_utils.qualifier_is_alias(qualifier):
1✔
3358
            state = lambda_stores[account_id][region]
1✔
3359
            fn = state.functions.get(function_name)
1✔
3360
            alias = fn.aliases.get(qualifier)
1✔
3361
            fn_arn = api_utils.qualified_lambda_arn(
1✔
3362
                function_name, alias.function_version, account_id, region
3363
            )
3364
        else:
3365
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3366

3367
        ver_manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3368

3369
        return GetProvisionedConcurrencyConfigResponse(
1✔
3370
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3371
            LastModified=provisioned_config.last_modified,
3372
            AvailableProvisionedConcurrentExecutions=ver_manager.provisioned_state.available,
3373
            AllocatedProvisionedConcurrentExecutions=ver_manager.provisioned_state.allocated,
3374
            Status=ver_manager.provisioned_state.status,
3375
            StatusReason=ver_manager.provisioned_state.status_reason,
3376
        )
3377

3378
    def list_provisioned_concurrency_configs(
1✔
3379
        self,
3380
        context: RequestContext,
3381
        function_name: FunctionName,
3382
        marker: String = None,
3383
        max_items: MaxProvisionedConcurrencyConfigListItems = None,
3384
        **kwargs,
3385
    ) -> ListProvisionedConcurrencyConfigsResponse:
3386
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3387
        state = lambda_stores[account_id][region]
1✔
3388

3389
        function_name = api_utils.get_function_name(function_name, context)
1✔
3390
        fn = state.functions.get(function_name)
1✔
3391
        if fn is None:
1✔
3392
            raise ResourceNotFoundException(
1✔
3393
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
3394
                Type="User",
3395
            )
3396

3397
        configs = []
1✔
3398
        for qualifier, pc_config in fn.provisioned_concurrency_configs.items():
1✔
3399
            if api_utils.qualifier_is_alias(qualifier):
×
3400
                alias = fn.aliases.get(qualifier)
×
3401
                fn_arn = api_utils.qualified_lambda_arn(
×
3402
                    function_name, alias.function_version, account_id, region
3403
                )
3404
            else:
3405
                fn_arn = api_utils.qualified_lambda_arn(
×
3406
                    function_name, qualifier, account_id, region
3407
                )
3408

3409
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
3410

3411
            configs.append(
×
3412
                ProvisionedConcurrencyConfigListItem(
3413
                    FunctionArn=api_utils.qualified_lambda_arn(
3414
                        function_name, qualifier, account_id, region
3415
                    ),
3416
                    RequestedProvisionedConcurrentExecutions=pc_config.provisioned_concurrent_executions,
3417
                    AvailableProvisionedConcurrentExecutions=manager.provisioned_state.available,
3418
                    AllocatedProvisionedConcurrentExecutions=manager.provisioned_state.allocated,
3419
                    Status=manager.provisioned_state.status,
3420
                    StatusReason=manager.provisioned_state.status_reason,
3421
                    LastModified=pc_config.last_modified,
3422
                )
3423
            )
3424

3425
        provisioned_concurrency_configs = configs
1✔
3426
        provisioned_concurrency_configs = PaginatedList(provisioned_concurrency_configs)
1✔
3427
        page, token = provisioned_concurrency_configs.get_page(
1✔
3428
            lambda x: x,
3429
            marker,
3430
            max_items,
3431
        )
3432
        return ListProvisionedConcurrencyConfigsResponse(
1✔
3433
            ProvisionedConcurrencyConfigs=page, NextMarker=token
3434
        )
3435

3436
    def delete_provisioned_concurrency_config(
1✔
3437
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3438
    ) -> None:
3439
        if qualifier == "$LATEST":
1✔
3440
            raise InvalidParameterValueException(
1✔
3441
                "The function resource provided must be an alias or a published version.",
3442
                Type="User",
3443
            )
3444
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3445
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3446
            function_name, qualifier, context
3447
        )
3448
        state = lambda_stores[account_id][region]
1✔
3449
        fn = state.functions.get(function_name)
1✔
3450

3451
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3452
        # delete is idempotent and doesn't actually care about the provisioned concurrency config not existing
3453
        if provisioned_config:
1✔
3454
            fn.provisioned_concurrency_configs.pop(qualifier)
1✔
3455
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3456
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3457
            manager.update_provisioned_concurrency_config(0)
1✔
3458

3459
    # =======================================
3460
    # =======  Event Invoke Config   ========
3461
    # =======================================
3462

3463
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)"
3464
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]2((-gov)|(-iso(b?)))?-[a-z]+-\\d1)?:(\\d12)?:(.*)" ... (expected → actual)
3465

3466
    def _validate_destination_config(
1✔
3467
        self, store: LambdaStore, function_name: str, destination_config: DestinationConfig
3468
    ):
3469
        def _validate_destination_arn(destination_arn) -> bool:
1✔
3470
            if not api_utils.DESTINATION_ARN_PATTERN.match(destination_arn):
1✔
3471
                # technically we shouldn't handle this in the provider
3472
                raise ValidationException(
1✔
3473
                    "1 validation error detected: Value '"
3474
                    + destination_arn
3475
                    + "' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: "
3476
                    + "$|kafka://([^.]([a-zA-Z0-9\\-_.]{0,248}))|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:((eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)"
3477
                )
3478

3479
            match destination_arn.split(":")[2]:
1✔
3480
                case "lambda":
1✔
3481
                    fn_parts = api_utils.FULL_FN_ARN_PATTERN.search(destination_arn).groupdict()
1✔
3482
                    if fn_parts:
1✔
3483
                        # check if it exists
3484
                        fn = store.functions.get(fn_parts["function_name"])
1✔
3485
                        if not fn:
1✔
3486
                            raise InvalidParameterValueException(
1✔
3487
                                f"The destination ARN {destination_arn} is invalid.", Type="User"
3488
                            )
3489
                        if fn_parts["function_name"] == function_name:
1✔
3490
                            raise InvalidParameterValueException(
1✔
3491
                                "You can't specify the function as a destination for itself.",
3492
                                Type="User",
3493
                            )
3494
                case "sns" | "sqs" | "events":
1✔
3495
                    pass
1✔
3496
                case _:
1✔
3497
                    return False
1✔
3498
            return True
1✔
3499

3500
        validation_err = False
1✔
3501

3502
        failure_destination = destination_config.get("OnFailure", {}).get("Destination")
1✔
3503
        if failure_destination:
1✔
3504
            validation_err = validation_err or not _validate_destination_arn(failure_destination)
1✔
3505

3506
        success_destination = destination_config.get("OnSuccess", {}).get("Destination")
1✔
3507
        if success_destination:
1✔
3508
            validation_err = validation_err or not _validate_destination_arn(success_destination)
1✔
3509

3510
        if validation_err:
1✔
3511
            on_success_part = (
1✔
3512
                f"OnSuccess(destination={success_destination})" if success_destination else "null"
3513
            )
3514
            on_failure_part = (
1✔
3515
                f"OnFailure(destination={failure_destination})" if failure_destination else "null"
3516
            )
3517
            raise InvalidParameterValueException(
1✔
3518
                f"The provided destination config DestinationConfig(onSuccess={on_success_part}, onFailure={on_failure_part}) is invalid.",
3519
                Type="User",
3520
            )
3521

3522
    def put_function_event_invoke_config(
1✔
3523
        self,
3524
        context: RequestContext,
3525
        function_name: FunctionName,
3526
        qualifier: NumericLatestPublishedOrAliasQualifier = None,
3527
        maximum_retry_attempts: MaximumRetryAttempts = None,
3528
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3529
        destination_config: DestinationConfig = None,
3530
        **kwargs,
3531
    ) -> FunctionEventInvokeConfig:
3532
        """
3533
        Destination ARNs can be:
3534
        * SQS arn
3535
        * SNS arn
3536
        * Lambda arn
3537
        * EventBridge arn
3538

3539
        Differences between put_ and update_:
3540
            * put overwrites any existing config
3541
            * update allows changes only single values while keeping the rest of existing ones
3542
            * update fails on non-existing configs
3543

3544
        Differences between destination and DLQ
3545
            * "However, a dead-letter queue is part of a function's version-specific configuration, so it is locked in when you publish a version."
3546
            *  "On-failure destinations also support additional targets and include details about the function's response in the invocation record."
3547

3548
        """
3549
        if (
1✔
3550
            maximum_event_age_in_seconds is None
3551
            and maximum_retry_attempts is None
3552
            and destination_config is None
3553
        ):
3554
            raise InvalidParameterValueException(
1✔
3555
                "You must specify at least one of error handling or destination setting.",
3556
                Type="User",
3557
            )
3558
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3559
        state = lambda_stores[account_id][region]
1✔
3560
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3561
            function_name, qualifier, context
3562
        )
3563
        fn = state.functions.get(function_name)
1✔
3564
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3565
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3566

3567
        qualifier = qualifier or "$LATEST"
1✔
3568

3569
        # validate and normalize destination config
3570
        if destination_config:
1✔
3571
            self._validate_destination_config(state, function_name, destination_config)
1✔
3572

3573
        destination_config = DestinationConfig(
1✔
3574
            OnSuccess=OnSuccess(
3575
                Destination=(destination_config or {}).get("OnSuccess", {}).get("Destination")
3576
            ),
3577
            OnFailure=OnFailure(
3578
                Destination=(destination_config or {}).get("OnFailure", {}).get("Destination")
3579
            ),
3580
        )
3581

3582
        config = EventInvokeConfig(
1✔
3583
            function_name=function_name,
3584
            qualifier=qualifier,
3585
            maximum_event_age_in_seconds=maximum_event_age_in_seconds,
3586
            maximum_retry_attempts=maximum_retry_attempts,
3587
            last_modified=api_utils.generate_lambda_date(),
3588
            destination_config=destination_config,
3589
        )
3590
        fn.event_invoke_configs[qualifier] = config
1✔
3591

3592
        return FunctionEventInvokeConfig(
1✔
3593
            LastModified=datetime.datetime.strptime(
3594
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3595
            ),
3596
            FunctionArn=api_utils.qualified_lambda_arn(
3597
                function_name, qualifier or "$LATEST", account_id, region
3598
            ),
3599
            DestinationConfig=destination_config,
3600
            MaximumEventAgeInSeconds=maximum_event_age_in_seconds,
3601
            MaximumRetryAttempts=maximum_retry_attempts,
3602
        )
3603

3604
    def get_function_event_invoke_config(
1✔
3605
        self,
3606
        context: RequestContext,
3607
        function_name: FunctionName,
3608
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
3609
        **kwargs,
3610
    ) -> FunctionEventInvokeConfig:
3611
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3612
        state = lambda_stores[account_id][region]
1✔
3613
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3614
            function_name, qualifier, context
3615
        )
3616

3617
        qualifier = qualifier or "$LATEST"
1✔
3618
        fn = state.functions.get(function_name)
1✔
3619
        if not fn:
1✔
3620
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3621
            raise ResourceNotFoundException(
1✔
3622
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3623
            )
3624

3625
        config = fn.event_invoke_configs.get(qualifier)
1✔
3626
        if not config:
1✔
3627
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3628
            raise ResourceNotFoundException(
1✔
3629
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3630
            )
3631

3632
        return FunctionEventInvokeConfig(
1✔
3633
            LastModified=datetime.datetime.strptime(
3634
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3635
            ),
3636
            FunctionArn=api_utils.qualified_lambda_arn(
3637
                function_name, qualifier, account_id, region
3638
            ),
3639
            DestinationConfig=config.destination_config,
3640
            MaximumEventAgeInSeconds=config.maximum_event_age_in_seconds,
3641
            MaximumRetryAttempts=config.maximum_retry_attempts,
3642
        )
3643

3644
    def list_function_event_invoke_configs(
1✔
3645
        self,
3646
        context: RequestContext,
3647
        function_name: FunctionName,
3648
        marker: String = None,
3649
        max_items: MaxFunctionEventInvokeConfigListItems = None,
3650
        **kwargs,
3651
    ) -> ListFunctionEventInvokeConfigsResponse:
3652
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3653
        state = lambda_stores[account_id][region]
1✔
3654
        fn = state.functions.get(function_name)
1✔
3655
        if not fn:
1✔
3656
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3657

3658
        event_invoke_configs = [
1✔
3659
            FunctionEventInvokeConfig(
3660
                LastModified=c.last_modified,
3661
                FunctionArn=api_utils.qualified_lambda_arn(
3662
                    function_name, c.qualifier, account_id, region
3663
                ),
3664
                MaximumEventAgeInSeconds=c.maximum_event_age_in_seconds,
3665
                MaximumRetryAttempts=c.maximum_retry_attempts,
3666
                DestinationConfig=c.destination_config,
3667
            )
3668
            for c in fn.event_invoke_configs.values()
3669
        ]
3670

3671
        event_invoke_configs = PaginatedList(event_invoke_configs)
1✔
3672
        page, token = event_invoke_configs.get_page(
1✔
3673
            lambda x: x["FunctionArn"],
3674
            marker,
3675
            max_items,
3676
        )
3677
        return ListFunctionEventInvokeConfigsResponse(
1✔
3678
            FunctionEventInvokeConfigs=page, NextMarker=token
3679
        )
3680

3681
    def delete_function_event_invoke_config(
1✔
3682
        self,
3683
        context: RequestContext,
3684
        function_name: FunctionName,
3685
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
3686
        **kwargs,
3687
    ) -> None:
3688
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3689
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3690
            function_name, qualifier, context
3691
        )
3692
        state = lambda_stores[account_id][region]
1✔
3693
        fn = state.functions.get(function_name)
1✔
3694
        resolved_qualifier = qualifier or "$LATEST"
1✔
3695
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3696
        if not fn:
1✔
3697
            raise ResourceNotFoundException(
1✔
3698
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3699
            )
3700

3701
        config = fn.event_invoke_configs.get(resolved_qualifier)
1✔
3702
        if not config:
1✔
3703
            raise ResourceNotFoundException(
1✔
3704
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3705
            )
3706

3707
        del fn.event_invoke_configs[resolved_qualifier]
1✔
3708

3709
    def update_function_event_invoke_config(
1✔
3710
        self,
3711
        context: RequestContext,
3712
        function_name: FunctionName,
3713
        qualifier: NumericLatestPublishedOrAliasQualifier = None,
3714
        maximum_retry_attempts: MaximumRetryAttempts = None,
3715
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3716
        destination_config: DestinationConfig = None,
3717
        **kwargs,
3718
    ) -> FunctionEventInvokeConfig:
3719
        # like put but only update single fields via replace
3720
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3721
        state = lambda_stores[account_id][region]
1✔
3722
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3723
            function_name, qualifier, context
3724
        )
3725

3726
        if (
1✔
3727
            maximum_event_age_in_seconds is None
3728
            and maximum_retry_attempts is None
3729
            and destination_config is None
3730
        ):
3731
            raise InvalidParameterValueException(
×
3732
                "You must specify at least one of error handling or destination setting.",
3733
                Type="User",
3734
            )
3735

3736
        fn = state.functions.get(function_name)
1✔
3737
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3738
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3739

3740
        qualifier = qualifier or "$LATEST"
1✔
3741

3742
        config = fn.event_invoke_configs.get(qualifier)
1✔
3743
        if not config:
1✔
3744
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3745
            raise ResourceNotFoundException(
1✔
3746
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3747
            )
3748

3749
        if destination_config:
1✔
3750
            self._validate_destination_config(state, function_name, destination_config)
×
3751

3752
        optional_kwargs = {
1✔
3753
            k: v
3754
            for k, v in {
3755
                "destination_config": destination_config,
3756
                "maximum_retry_attempts": maximum_retry_attempts,
3757
                "maximum_event_age_in_seconds": maximum_event_age_in_seconds,
3758
            }.items()
3759
            if v is not None
3760
        }
3761

3762
        new_config = dataclasses.replace(
1✔
3763
            config, last_modified=api_utils.generate_lambda_date(), **optional_kwargs
3764
        )
3765
        fn.event_invoke_configs[qualifier] = new_config
1✔
3766

3767
        return FunctionEventInvokeConfig(
1✔
3768
            LastModified=datetime.datetime.strptime(
3769
                new_config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3770
            ),
3771
            FunctionArn=api_utils.qualified_lambda_arn(
3772
                function_name, qualifier or "$LATEST", account_id, region
3773
            ),
3774
            DestinationConfig=new_config.destination_config,
3775
            MaximumEventAgeInSeconds=new_config.maximum_event_age_in_seconds,
3776
            MaximumRetryAttempts=new_config.maximum_retry_attempts,
3777
        )
3778

3779
    # =======================================
3780
    # ======  Layer & Layer Versions  =======
3781
    # =======================================
3782

3783
    @staticmethod
1✔
3784
    def _resolve_layer(
1✔
3785
        layer_name_or_arn: str, context: RequestContext
3786
    ) -> tuple[str, str, str, str | None]:
3787
        """
3788
        Return locator attributes for a given Lambda layer.
3789

3790
        :param layer_name_or_arn: Layer name or ARN
3791
        :param context: Request context
3792
        :return: Tuple of region, account ID, layer name, layer version
3793
        """
3794
        if api_utils.is_layer_arn(layer_name_or_arn):
1✔
3795
            return api_utils.parse_layer_arn(layer_name_or_arn)
1✔
3796

3797
        return context.region, context.account_id, layer_name_or_arn, None
1✔
3798

3799
    def publish_layer_version(
1✔
3800
        self,
3801
        context: RequestContext,
3802
        layer_name: LayerName,
3803
        content: LayerVersionContentInput,
3804
        description: Description | None = None,
3805
        compatible_runtimes: CompatibleRuntimes | None = None,
3806
        license_info: LicenseInfo | None = None,
3807
        compatible_architectures: CompatibleArchitectures | None = None,
3808
        **kwargs,
3809
    ) -> PublishLayerVersionResponse:
3810
        """
3811
        On first use of a LayerName a new layer is created and for each subsequent call with the same LayerName a new version is created.
3812
        Note that there are no $LATEST versions with layers!
3813

3814
        """
3815
        account = context.account_id
1✔
3816
        region = context.region
1✔
3817

3818
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
3819
            compatible_runtimes, compatible_architectures
3820
        )
3821
        if validation_errors:
1✔
3822
            raise ValidationException(
1✔
3823
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {'; '.join(validation_errors)}"
3824
            )
3825

3826
        state = lambda_stores[account][region]
1✔
3827
        with self.create_layer_lock:
1✔
3828
            if layer_name not in state.layers:
1✔
3829
                # we don't have a version so create new layer object
3830
                # lock is required to avoid creating two v1 objects for the same name
3831
                layer = Layer(
1✔
3832
                    arn=api_utils.layer_arn(layer_name=layer_name, account=account, region=region)
3833
                )
3834
                state.layers[layer_name] = layer
1✔
3835

3836
        layer = state.layers[layer_name]
1✔
3837
        with layer.next_version_lock:
1✔
3838
            next_version = LambdaLayerVersionIdentifier(
1✔
3839
                account_id=account, region=region, layer_name=layer_name
3840
            ).generate(next_version=layer.next_version)
3841
            # When creating a layer with user defined layer version, it is possible that we
3842
            # create layer versions out of order.
3843
            # ie. a user could replicate layer v2 then layer v1. It is important to always keep the maximum possible
3844
            # value for next layer to avoid overwriting existing versions
3845
            if layer.next_version <= next_version:
1✔
3846
                # We don't need to update layer.next_version if the created version is lower than the "next in line"
3847
                layer.next_version = max(next_version, layer.next_version) + 1
1✔
3848

3849
        # creating a new layer
3850
        if content.get("ZipFile"):
1✔
3851
            code = store_lambda_archive(
1✔
3852
                archive_file=content["ZipFile"],
3853
                function_name=layer_name,
3854
                region_name=region,
3855
                account_id=account,
3856
            )
3857
        else:
3858
            code = store_s3_bucket_archive(
1✔
3859
                archive_bucket=content["S3Bucket"],
3860
                archive_key=content["S3Key"],
3861
                archive_version=content.get("S3ObjectVersion"),
3862
                function_name=layer_name,
3863
                region_name=region,
3864
                account_id=account,
3865
            )
3866

3867
        new_layer_version = LayerVersion(
1✔
3868
            layer_version_arn=api_utils.layer_version_arn(
3869
                layer_name=layer_name,
3870
                account=account,
3871
                region=region,
3872
                version=str(next_version),
3873
            ),
3874
            layer_arn=layer.arn,
3875
            version=next_version,
3876
            description=description or "",
3877
            license_info=license_info,
3878
            compatible_runtimes=compatible_runtimes,
3879
            compatible_architectures=compatible_architectures,
3880
            created=api_utils.generate_lambda_date(),
3881
            code=code,
3882
        )
3883

3884
        layer.layer_versions[str(next_version)] = new_layer_version
1✔
3885

3886
        return api_utils.map_layer_out(new_layer_version)
1✔
3887

3888
    def get_layer_version(
1✔
3889
        self,
3890
        context: RequestContext,
3891
        layer_name: LayerName,
3892
        version_number: LayerVersionNumber,
3893
        **kwargs,
3894
    ) -> GetLayerVersionResponse:
3895
        # TODO: handle layer_name as an ARN
3896

3897
        region_name, account_id, layer_name, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
3898
        state = lambda_stores[account_id][region_name]
1✔
3899

3900
        layer = state.layers.get(layer_name)
1✔
3901
        if version_number < 1:
1✔
3902
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
3903
        if layer is None:
1✔
3904
            raise ResourceNotFoundException(
1✔
3905
                "The resource you requested does not exist.", Type="User"
3906
            )
3907
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3908
        if layer_version is None:
1✔
3909
            raise ResourceNotFoundException(
1✔
3910
                "The resource you requested does not exist.", Type="User"
3911
            )
3912
        return api_utils.map_layer_out(layer_version)
1✔
3913

3914
    def get_layer_version_by_arn(
1✔
3915
        self, context: RequestContext, arn: LayerVersionArn, **kwargs
3916
    ) -> GetLayerVersionResponse:
3917
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3918
            arn, context
3919
        )
3920

3921
        if not layer_version:
1✔
3922
            raise ValidationException(
1✔
3923
                f"1 validation error detected: Value '{arn}' at 'arn' failed to satisfy constraint: Member must satisfy regular expression pattern: "
3924
                + "(arn:(aws[a-zA-Z-]*)?:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+)"
3925
            )
3926

3927
        store = lambda_stores[account_id][region_name]
1✔
3928
        if not (layers := store.layers.get(layer_name)):
1✔
3929
            raise ResourceNotFoundException(
×
3930
                "The resource you requested does not exist.", Type="User"
3931
            )
3932

3933
        layer_version = layers.layer_versions.get(layer_version)
1✔
3934

3935
        if not layer_version:
1✔
3936
            raise ResourceNotFoundException(
1✔
3937
                "The resource you requested does not exist.", Type="User"
3938
            )
3939

3940
        return api_utils.map_layer_out(layer_version)
1✔
3941

3942
    def list_layers(
1✔
3943
        self,
3944
        context: RequestContext,
3945
        compatible_runtime: Runtime | None = None,
3946
        marker: String | None = None,
3947
        max_items: MaxLayerListItems | None = None,
3948
        compatible_architecture: Architecture | None = None,
3949
        **kwargs,
3950
    ) -> ListLayersResponse:
3951
        validation_errors = []
1✔
3952

3953
        validation_error_arch = api_utils.validate_layer_architecture(compatible_architecture)
1✔
3954
        if validation_error_arch:
1✔
3955
            validation_errors.append(validation_error_arch)
1✔
3956

3957
        validation_error_runtime = api_utils.validate_layer_runtime(compatible_runtime)
1✔
3958
        if validation_error_runtime:
1✔
3959
            validation_errors.append(validation_error_runtime)
1✔
3960

3961
        if validation_errors:
1✔
3962
            raise ValidationException(
1✔
3963
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
3964
            )
3965
        # TODO: handle filter: compatible_runtime
3966
        # TODO: handle filter: compatible_architecture
3967

3968
        state = lambda_stores[context.account_id][context.region]
×
3969
        layers = state.layers
×
3970

3971
        # TODO: test how filters behave together with only returning layers here? Does it return the latest "matching" layer, i.e. does it ignore later layer versions that don't match?
3972

3973
        responses: list[LayersListItem] = []
×
3974
        for layer_name, layer in layers.items():
×
3975
            # fetch latest version
3976
            layer_versions = list(layer.layer_versions.values())
×
3977
            sorted(layer_versions, key=lambda x: x.version)
×
3978
            latest_layer_version = layer_versions[-1]
×
3979
            responses.append(
×
3980
                LayersListItem(
3981
                    LayerName=layer_name,
3982
                    LayerArn=layer.arn,
3983
                    LatestMatchingVersion=api_utils.map_layer_out(latest_layer_version),
3984
                )
3985
            )
3986

3987
        responses = PaginatedList(responses)
×
3988
        page, token = responses.get_page(
×
3989
            lambda version: version,
3990
            marker,
3991
            max_items,
3992
        )
3993

3994
        return ListLayersResponse(NextMarker=token, Layers=page)
×
3995

3996
    def list_layer_versions(
1✔
3997
        self,
3998
        context: RequestContext,
3999
        layer_name: LayerName,
4000
        compatible_runtime: Runtime | None = None,
4001
        marker: String | None = None,
4002
        max_items: MaxLayerListItems | None = None,
4003
        compatible_architecture: Architecture | None = None,
4004
        **kwargs,
4005
    ) -> ListLayerVersionsResponse:
4006
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
4007
            [compatible_runtime] if compatible_runtime else [],
4008
            [compatible_architecture] if compatible_architecture else [],
4009
        )
4010
        if validation_errors:
1✔
4011
            raise ValidationException(
×
4012
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
4013
            )
4014

4015
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
4016
            layer_name, context
4017
        )
4018
        state = lambda_stores[account_id][region_name]
1✔
4019

4020
        # TODO: Test & handle filter: compatible_runtime
4021
        # TODO: Test & handle filter: compatible_architecture
4022
        all_layer_versions = []
1✔
4023
        layer = state.layers.get(layer_name)
1✔
4024
        if layer is not None:
1✔
4025
            for layer_version in layer.layer_versions.values():
1✔
4026
                all_layer_versions.append(api_utils.map_layer_out(layer_version))
1✔
4027

4028
        all_layer_versions.sort(key=lambda x: x["Version"], reverse=True)
1✔
4029
        all_layer_versions = PaginatedList(all_layer_versions)
1✔
4030
        page, token = all_layer_versions.get_page(
1✔
4031
            lambda version: version["LayerVersionArn"],
4032
            marker,
4033
            max_items,
4034
        )
4035
        return ListLayerVersionsResponse(NextMarker=token, LayerVersions=page)
1✔
4036

4037
    def delete_layer_version(
1✔
4038
        self,
4039
        context: RequestContext,
4040
        layer_name: LayerName,
4041
        version_number: LayerVersionNumber,
4042
        **kwargs,
4043
    ) -> None:
4044
        if version_number < 1:
1✔
4045
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
4046

4047
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
4048
            layer_name, context
4049
        )
4050

4051
        store = lambda_stores[account_id][region_name]
1✔
4052
        layer = store.layers.get(layer_name, {})
1✔
4053
        if layer:
1✔
4054
            layer.layer_versions.pop(str(version_number), None)
1✔
4055

4056
    # =======================================
4057
    # =====  Layer Version Permissions  =====
4058
    # =======================================
4059
    # TODO: lock updates that change revision IDs
4060

4061
    def add_layer_version_permission(
1✔
4062
        self,
4063
        context: RequestContext,
4064
        layer_name: LayerName,
4065
        version_number: LayerVersionNumber,
4066
        statement_id: StatementId,
4067
        action: LayerPermissionAllowedAction,
4068
        principal: LayerPermissionAllowedPrincipal,
4069
        organization_id: OrganizationId = None,
4070
        revision_id: String = None,
4071
        **kwargs,
4072
    ) -> AddLayerVersionPermissionResponse:
4073
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4074
        # `layer_n` contains the layer name.
4075
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
4076

4077
        if action != "lambda:GetLayerVersion":
1✔
4078
            raise ValidationException(
1✔
4079
                f"1 validation error detected: Value '{action}' at 'action' failed to satisfy constraint: Member must satisfy regular expression pattern: lambda:GetLayerVersion"
4080
            )
4081

4082
        store = lambda_stores[account_id][region_name]
1✔
4083
        layer = store.layers.get(layer_n)
1✔
4084

4085
        layer_version_arn = api_utils.layer_version_arn(
1✔
4086
            layer_name, account_id, region_name, str(version_number)
4087
        )
4088

4089
        if layer is None:
1✔
4090
            raise ResourceNotFoundException(
1✔
4091
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4092
            )
4093
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4094
        if layer_version is None:
1✔
4095
            raise ResourceNotFoundException(
1✔
4096
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4097
            )
4098
        # do we have a policy? if not set one
4099
        if layer_version.policy is None:
1✔
4100
            layer_version.policy = LayerPolicy()
1✔
4101

4102
        if statement_id in layer_version.policy.statements:
1✔
4103
            raise ResourceConflictException(
1✔
4104
                f"The statement id ({statement_id}) provided already exists. Please provide a new statement id, or remove the existing statement.",
4105
                Type="User",
4106
            )
4107

4108
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
4109
            raise PreconditionFailedException(
1✔
4110
                "The Revision Id provided does not match the latest Revision Id. "
4111
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
4112
                Type="User",
4113
            )
4114

4115
        statement = LayerPolicyStatement(
1✔
4116
            sid=statement_id, action=action, principal=principal, organization_id=organization_id
4117
        )
4118

4119
        old_statements = layer_version.policy.statements
1✔
4120
        layer_version.policy = dataclasses.replace(
1✔
4121
            layer_version.policy, statements={**old_statements, statement_id: statement}
4122
        )
4123

4124
        return AddLayerVersionPermissionResponse(
1✔
4125
            Statement=json.dumps(
4126
                {
4127
                    "Sid": statement.sid,
4128
                    "Effect": "Allow",
4129
                    "Principal": statement.principal,
4130
                    "Action": statement.action,
4131
                    "Resource": layer_version.layer_version_arn,
4132
                }
4133
            ),
4134
            RevisionId=layer_version.policy.revision_id,
4135
        )
4136

4137
    def remove_layer_version_permission(
1✔
4138
        self,
4139
        context: RequestContext,
4140
        layer_name: LayerName,
4141
        version_number: LayerVersionNumber,
4142
        statement_id: StatementId,
4143
        revision_id: String = None,
4144
        **kwargs,
4145
    ) -> None:
4146
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4147
        # `layer_n` contains the layer name.
4148
        region_name, account_id, layer_n, layer_version = LambdaProvider._resolve_layer(
1✔
4149
            layer_name, context
4150
        )
4151

4152
        layer_version_arn = api_utils.layer_version_arn(
1✔
4153
            layer_name, account_id, region_name, str(version_number)
4154
        )
4155

4156
        state = lambda_stores[account_id][region_name]
1✔
4157
        layer = state.layers.get(layer_n)
1✔
4158
        if layer is None:
1✔
4159
            raise ResourceNotFoundException(
1✔
4160
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4161
            )
4162
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4163
        if layer_version is None:
1✔
4164
            raise ResourceNotFoundException(
1✔
4165
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4166
            )
4167

4168
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
4169
            raise PreconditionFailedException(
1✔
4170
                "The Revision Id provided does not match the latest Revision Id. "
4171
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
4172
                Type="User",
4173
            )
4174

4175
        if statement_id not in layer_version.policy.statements:
1✔
4176
            raise ResourceNotFoundException(
1✔
4177
                f"Statement {statement_id} is not found in resource policy.", Type="User"
4178
            )
4179

4180
        old_statements = layer_version.policy.statements
1✔
4181
        layer_version.policy = dataclasses.replace(
1✔
4182
            layer_version.policy,
4183
            statements={k: v for k, v in old_statements.items() if k != statement_id},
4184
        )
4185

4186
    def get_layer_version_policy(
1✔
4187
        self,
4188
        context: RequestContext,
4189
        layer_name: LayerName,
4190
        version_number: LayerVersionNumber,
4191
        **kwargs,
4192
    ) -> GetLayerVersionPolicyResponse:
4193
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4194
        # `layer_n` contains the layer name.
4195
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
4196

4197
        layer_version_arn = api_utils.layer_version_arn(
1✔
4198
            layer_name, account_id, region_name, str(version_number)
4199
        )
4200

4201
        store = lambda_stores[account_id][region_name]
1✔
4202
        layer = store.layers.get(layer_n)
1✔
4203

4204
        if layer is None:
1✔
4205
            raise ResourceNotFoundException(
1✔
4206
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4207
            )
4208

4209
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4210
        if layer_version is None:
1✔
4211
            raise ResourceNotFoundException(
1✔
4212
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4213
            )
4214

4215
        if layer_version.policy is None:
1✔
4216
            raise ResourceNotFoundException(
1✔
4217
                "No policy is associated with the given resource.", Type="User"
4218
            )
4219

4220
        return GetLayerVersionPolicyResponse(
1✔
4221
            Policy=json.dumps(
4222
                {
4223
                    "Version": layer_version.policy.version,
4224
                    "Id": layer_version.policy.id,
4225
                    "Statement": [
4226
                        {
4227
                            "Sid": ps.sid,
4228
                            "Effect": "Allow",
4229
                            "Principal": ps.principal,
4230
                            "Action": ps.action,
4231
                            "Resource": layer_version.layer_version_arn,
4232
                        }
4233
                        for ps in layer_version.policy.statements.values()
4234
                    ],
4235
                }
4236
            ),
4237
            RevisionId=layer_version.policy.revision_id,
4238
        )
4239

4240
    # =======================================
4241
    # =======  Function Concurrency  ========
4242
    # =======================================
4243
    # (Reserved) function concurrency is scoped to the whole function
4244

4245
    def get_function_concurrency(
1✔
4246
        self, context: RequestContext, function_name: FunctionName, **kwargs
4247
    ) -> GetFunctionConcurrencyResponse:
4248
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4249
        function_name = api_utils.get_function_name(function_name, context)
1✔
4250
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
4251
        return GetFunctionConcurrencyResponse(
1✔
4252
            ReservedConcurrentExecutions=fn.reserved_concurrent_executions
4253
        )
4254

4255
    def put_function_concurrency(
1✔
4256
        self,
4257
        context: RequestContext,
4258
        function_name: FunctionName,
4259
        reserved_concurrent_executions: ReservedConcurrentExecutions,
4260
        **kwargs,
4261
    ) -> Concurrency:
4262
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4263

4264
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4265
        if qualifier:
1✔
4266
            raise InvalidParameterValueException(
1✔
4267
                "This operation is permitted on Lambda functions only. Aliases and versions do not support this operation. Please specify either a function name or an unqualified function ARN.",
4268
                Type="User",
4269
            )
4270

4271
        store = lambda_stores[account_id][region]
1✔
4272
        fn = store.functions.get(function_name)
1✔
4273
        if not fn:
1✔
4274
            fn_arn = api_utils.qualified_lambda_arn(
1✔
4275
                function_name,
4276
                qualifier="$LATEST",
4277
                account=account_id,
4278
                region=region,
4279
            )
4280
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
4281

4282
        settings = self.get_account_settings(context)
1✔
4283
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
4284
            "UnreservedConcurrentExecutions"
4285
        ]
4286

4287
        # The existing reserved concurrent executions for the same function are already deduced in
4288
        # unreserved_concurrent_executions but must not count because the new one will replace the existing one.
4289
        # Joel tested this behavior manually against AWS (2023-11-28).
4290
        existing_reserved_concurrent_executions = (
1✔
4291
            fn.reserved_concurrent_executions if fn.reserved_concurrent_executions else 0
4292
        )
4293
        if (
1✔
4294
            unreserved_concurrent_executions
4295
            - reserved_concurrent_executions
4296
            + existing_reserved_concurrent_executions
4297
        ) < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY:
4298
            raise InvalidParameterValueException(
1✔
4299
                f"Specified ReservedConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
4300
            )
4301

4302
        total_provisioned_concurrency = sum(
1✔
4303
            [
4304
                provisioned_configs.provisioned_concurrent_executions
4305
                for provisioned_configs in fn.provisioned_concurrency_configs.values()
4306
            ]
4307
        )
4308
        if total_provisioned_concurrency > reserved_concurrent_executions:
1✔
4309
            raise InvalidParameterValueException(
1✔
4310
                f" ReservedConcurrentExecutions  {reserved_concurrent_executions} should not be lower than function's total provisioned concurrency [{total_provisioned_concurrency}]."
4311
            )
4312

4313
        fn.reserved_concurrent_executions = reserved_concurrent_executions
1✔
4314

4315
        return Concurrency(ReservedConcurrentExecutions=fn.reserved_concurrent_executions)
1✔
4316

4317
    def delete_function_concurrency(
1✔
4318
        self, context: RequestContext, function_name: FunctionName, **kwargs
4319
    ) -> None:
4320
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4321
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4322
        store = lambda_stores[account_id][region]
1✔
4323
        fn = store.functions.get(function_name)
1✔
4324
        fn.reserved_concurrent_executions = None
1✔
4325

4326
    # =======================================
4327
    # ===============  TAGS   ===============
4328
    # =======================================
4329
    # only Function, Event Source Mapping, and Code Signing Config (not currently supported by LocalStack) ARNs an are available for tagging in AWS
4330

4331
    def _get_tags(self, resource: TaggableResource) -> dict[str, str]:
1✔
4332
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4333
        lambda_adapted_tags = {
1✔
4334
            tag["Key"]: tag["Value"]
4335
            for tag in state.TAGS.list_tags_for_resource(resource).get("Tags")
4336
        }
4337
        return lambda_adapted_tags
1✔
4338

4339
    def _store_tags(self, resource: TaggableResource, tags: dict[str, str]):
1✔
4340
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4341
        if len(state.TAGS.tags.get(resource, {}) | tags) > LAMBDA_TAG_LIMIT_PER_RESOURCE:
1✔
4342
            raise InvalidParameterValueException(
1✔
4343
                "Number of tags exceeds resource tag limit.", Type="User"
4344
            )
4345

4346
        tag_svc_adapted_tags = [{"Key": key, "Value": value} for key, value in tags.items()]
1✔
4347
        state.TAGS.tag_resource(resource, tag_svc_adapted_tags)
1✔
4348

4349
    def fetch_lambda_store_for_tagging(self, resource: TaggableResource) -> LambdaStore:
1✔
4350
        """
4351
        Takes a resource ARN for a TaggableResource (Lambda Function, Event Source Mapping, Code Signing Config, or Capacity Provider) and returns a corresponding
4352
        LambdaStore for its region and account.
4353

4354
        In addition, this function validates that the ARN is a valid TaggableResource type, and that the TaggableResource exists.
4355

4356
        Raises:
4357
            ValidationException: If the resource ARN is not a full ARN for a TaggableResource.
4358
            ResourceNotFoundException: If the specified resource does not exist.
4359
            InvalidParameterValueException: If the resource ARN is a qualified Lambda Function.
4360
        """
4361

4362
        def _raise_validation_exception():
1✔
4363
            raise ValidationException(
1✔
4364
                f"1 validation error detected: Value '{resource}' at 'resource' failed to satisfy constraint: Member must satisfy regular expression pattern: {api_utils.TAGGABLE_RESOURCE_ARN_PATTERN}"
4365
            )
4366

4367
        # Check whether the ARN we have been passed is correctly formatted
4368
        parsed_resource_arn: ArnData = None
1✔
4369
        try:
1✔
4370
            parsed_resource_arn = parse_arn(resource)
1✔
4371
        except Exception:
1✔
4372
            _raise_validation_exception()
1✔
4373

4374
        # TODO: Should we be checking whether this is a full ARN?
4375
        region, account_id, resource_type = map(
1✔
4376
            parsed_resource_arn.get, ("region", "account", "resource")
4377
        )
4378

4379
        if not all((region, account_id, resource_type)):
1✔
4380
            _raise_validation_exception()
×
4381

4382
        if not (parts := resource_type.split(":")):
1✔
4383
            _raise_validation_exception()
×
4384

4385
        resource_type, resource_identifier, *qualifier = parts
1✔
4386

4387
        # Qualifier validation raises before checking for NotFound
4388
        if qualifier:
1✔
4389
            if resource_type == "function":
1✔
4390
                raise InvalidParameterValueException(
1✔
4391
                    "Tags on function aliases and versions are not supported. Please specify a function ARN.",
4392
                    Type="User",
4393
                )
4394
            _raise_validation_exception()
1✔
4395

4396
        if resource_type == "event-source-mapping":
1✔
4397
            self._get_esm(resource_identifier, account_id, region)
1✔
4398
        elif resource_type == "code-signing-config":
1✔
4399
            raise NotImplementedError("Resource tagging on CSC not yet implemented.")
4400
        elif resource_type == "function":
1✔
4401
            self._get_function(
1✔
4402
                function_name=resource_identifier, account_id=account_id, region=region
4403
            )
4404
        elif resource_type == "capacity-provider":
1✔
4405
            self._get_capacity_provider(resource_identifier, account_id, region)
1✔
4406
        else:
4407
            _raise_validation_exception()
1✔
4408

4409
        # If no exceptions are raised, assume ARN and referenced resource is valid for tag operations
4410
        return lambda_stores[account_id][region]
1✔
4411

4412
    def tag_resource(
1✔
4413
        self, context: RequestContext, resource: TaggableResource, tags: Tags, **kwargs
4414
    ) -> None:
4415
        if not tags:
1✔
4416
            raise InvalidParameterValueException(
1✔
4417
                "An error occurred and the request cannot be processed.", Type="User"
4418
            )
4419
        self._store_tags(resource, tags)
1✔
4420

4421
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4422
            "function"
4423
        ):
4424
            name, _, account, region = function_locators_from_arn(resource)
1✔
4425
            function = self._get_function(name, account, region)
1✔
4426
            with function.lock:
1✔
4427
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4428
                latest_version = function.versions["$LATEST"]
1✔
4429
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4430
                    latest_version, config=dataclasses.replace(latest_version.config)
4431
                )
4432

4433
    def list_tags(
1✔
4434
        self, context: RequestContext, resource: TaggableResource, **kwargs
4435
    ) -> ListTagsResponse:
4436
        tags = self._get_tags(resource)
1✔
4437
        return ListTagsResponse(Tags=tags)
1✔
4438

4439
    def untag_resource(
1✔
4440
        self, context: RequestContext, resource: TaggableResource, tag_keys: TagKeyList, **kwargs
4441
    ) -> None:
4442
        if not tag_keys:
1✔
4443
            raise ValidationException(
1✔
4444
                "1 validation error detected: Value null at 'tagKeys' failed to satisfy constraint: Member must not be null"
4445
            )  # should probably be generalized a bit
4446

4447
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4448
        state.TAGS.untag_resource(resource, tag_keys)
1✔
4449

4450
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4451
            "function"
4452
        ):
4453
            name, _, account, region = function_locators_from_arn(resource)
1✔
4454
            function = self._get_function(name, account, region)
1✔
4455
            # TODO: Potential race condition
4456
            with function.lock:
1✔
4457
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4458
                latest_version = function.versions["$LATEST"]
1✔
4459
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4460
                    latest_version, config=dataclasses.replace(latest_version.config)
4461
                )
4462

4463
    # =======================================
4464
    # =======  LEGACY / DEPRECATED   ========
4465
    # =======================================
4466

4467
    def invoke_async(
1✔
4468
        self,
4469
        context: RequestContext,
4470
        function_name: NamespacedFunctionName,
4471
        invoke_args: IO[BlobStream],
4472
        **kwargs,
4473
    ) -> InvokeAsyncResponse:
4474
        """LEGACY API endpoint. Even AWS heavily discourages its usage."""
4475
        raise NotImplementedError
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc