• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20218772397

12 Dec 2025 11:29AM UTC coverage: 86.875% (+0.002%) from 86.873%
20218772397

push

github

web-flow
dev: support executing kubernetes dev setup on command execution (#13509)

69935 of 80501 relevant lines covered (86.87%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.05
/localstack-core/localstack/services/lambda_/provider.py
1
import base64
1✔
2
import dataclasses
1✔
3
import datetime
1✔
4
import itertools
1✔
5
import json
1✔
6
import logging
1✔
7
import re
1✔
8
import threading
1✔
9
import time
1✔
10
from typing import IO, Any
1✔
11

12
from botocore.exceptions import ClientError
1✔
13

14
from localstack import config
1✔
15
from localstack.aws.api import RequestContext, ServiceException, handler
1✔
16
from localstack.aws.api.lambda_ import (
1✔
17
    AccountLimit,
18
    AccountUsage,
19
    AddLayerVersionPermissionResponse,
20
    AddPermissionRequest,
21
    AddPermissionResponse,
22
    Alias,
23
    AliasConfiguration,
24
    AliasRoutingConfiguration,
25
    AllowedPublishers,
26
    Architecture,
27
    Arn,
28
    Blob,
29
    BlobStream,
30
    CapacityProviderConfig,
31
    CodeSigningConfigArn,
32
    CodeSigningConfigNotFoundException,
33
    CodeSigningPolicies,
34
    CompatibleArchitectures,
35
    CompatibleRuntimes,
36
    Concurrency,
37
    Cors,
38
    CreateCodeSigningConfigResponse,
39
    CreateEventSourceMappingRequest,
40
    CreateFunctionRequest,
41
    CreateFunctionUrlConfigResponse,
42
    DeleteCodeSigningConfigResponse,
43
    DeleteFunctionResponse,
44
    Description,
45
    DestinationConfig,
46
    DurableExecutionName,
47
    EventSourceMappingConfiguration,
48
    FunctionCodeLocation,
49
    FunctionConfiguration,
50
    FunctionEventInvokeConfig,
51
    FunctionName,
52
    FunctionUrlAuthType,
53
    FunctionUrlQualifier,
54
    FunctionVersionLatestPublished,
55
    GetAccountSettingsResponse,
56
    GetCodeSigningConfigResponse,
57
    GetFunctionCodeSigningConfigResponse,
58
    GetFunctionConcurrencyResponse,
59
    GetFunctionRecursionConfigResponse,
60
    GetFunctionResponse,
61
    GetFunctionUrlConfigResponse,
62
    GetLayerVersionPolicyResponse,
63
    GetLayerVersionResponse,
64
    GetPolicyResponse,
65
    GetProvisionedConcurrencyConfigResponse,
66
    InvalidParameterValueException,
67
    InvocationResponse,
68
    InvocationType,
69
    InvokeAsyncResponse,
70
    InvokeMode,
71
    LambdaApi,
72
    LambdaManagedInstancesCapacityProviderConfig,
73
    LastUpdateStatus,
74
    LayerName,
75
    LayerPermissionAllowedAction,
76
    LayerPermissionAllowedPrincipal,
77
    LayersListItem,
78
    LayerVersionArn,
79
    LayerVersionContentInput,
80
    LayerVersionNumber,
81
    LicenseInfo,
82
    ListAliasesResponse,
83
    ListCodeSigningConfigsResponse,
84
    ListEventSourceMappingsResponse,
85
    ListFunctionEventInvokeConfigsResponse,
86
    ListFunctionsByCodeSigningConfigResponse,
87
    ListFunctionsResponse,
88
    ListFunctionUrlConfigsResponse,
89
    ListLayersResponse,
90
    ListLayerVersionsResponse,
91
    ListProvisionedConcurrencyConfigsResponse,
92
    ListTagsResponse,
93
    ListVersionsByFunctionResponse,
94
    LogFormat,
95
    LoggingConfig,
96
    LogType,
97
    MasterRegion,
98
    MaxFunctionEventInvokeConfigListItems,
99
    MaximumEventAgeInSeconds,
100
    MaximumRetryAttempts,
101
    MaxItems,
102
    MaxLayerListItems,
103
    MaxListItems,
104
    MaxProvisionedConcurrencyConfigListItems,
105
    NamespacedFunctionName,
106
    NamespacedStatementId,
107
    NumericLatestPublishedOrAliasQualifier,
108
    OnFailure,
109
    OnSuccess,
110
    OrganizationId,
111
    PackageType,
112
    PositiveInteger,
113
    PreconditionFailedException,
114
    ProvisionedConcurrencyConfigListItem,
115
    ProvisionedConcurrencyConfigNotFoundException,
116
    ProvisionedConcurrencyStatusEnum,
117
    PublishLayerVersionResponse,
118
    PutFunctionCodeSigningConfigResponse,
119
    PutFunctionRecursionConfigResponse,
120
    PutProvisionedConcurrencyConfigResponse,
121
    Qualifier,
122
    RecursiveLoop,
123
    ReservedConcurrentExecutions,
124
    ResourceConflictException,
125
    ResourceNotFoundException,
126
    Runtime,
127
    RuntimeVersionConfig,
128
    SnapStart,
129
    SnapStartApplyOn,
130
    SnapStartOptimizationStatus,
131
    SnapStartResponse,
132
    State,
133
    StatementId,
134
    StateReasonCode,
135
    String,
136
    TaggableResource,
137
    TagKeyList,
138
    Tags,
139
    TenantId,
140
    TracingMode,
141
    UnqualifiedFunctionName,
142
    UpdateCodeSigningConfigResponse,
143
    UpdateEventSourceMappingRequest,
144
    UpdateFunctionCodeRequest,
145
    UpdateFunctionConfigurationRequest,
146
    UpdateFunctionUrlConfigResponse,
147
    VersionWithLatestPublished,
148
)
149
from localstack.aws.api.lambda_ import FunctionVersion as FunctionVersionApi
1✔
150
from localstack.aws.api.lambda_ import ServiceException as LambdaServiceException
1✔
151
from localstack.aws.api.pipes import (
1✔
152
    DynamoDBStreamStartPosition,
153
    KinesisStreamStartPosition,
154
)
155
from localstack.aws.connect import connect_to
1✔
156
from localstack.aws.spec import load_service
1✔
157
from localstack.services.edge import ROUTER
1✔
158
from localstack.services.lambda_ import api_utils
1✔
159
from localstack.services.lambda_ import hooks as lambda_hooks
1✔
160
from localstack.services.lambda_.analytics import (
1✔
161
    FunctionInitializationType,
162
    FunctionOperation,
163
    FunctionStatus,
164
    function_counter,
165
)
166
from localstack.services.lambda_.api_utils import (
1✔
167
    ARCHITECTURES,
168
    STATEMENT_ID_REGEX,
169
    SUBNET_ID_REGEX,
170
    function_locators_from_arn,
171
)
172
from localstack.services.lambda_.event_source_mapping.esm_config_factory import (
1✔
173
    EsmConfigFactory,
174
)
175
from localstack.services.lambda_.event_source_mapping.esm_worker import (
1✔
176
    EsmState,
177
    EsmWorker,
178
)
179
from localstack.services.lambda_.event_source_mapping.esm_worker_factory import (
1✔
180
    EsmWorkerFactory,
181
)
182
from localstack.services.lambda_.event_source_mapping.pipe_utils import get_internal_client
1✔
183
from localstack.services.lambda_.invocation import AccessDeniedException
1✔
184
from localstack.services.lambda_.invocation.execution_environment import (
1✔
185
    EnvironmentStartupTimeoutException,
186
)
187
from localstack.services.lambda_.invocation.lambda_models import (
1✔
188
    AliasRoutingConfig,
189
    CodeSigningConfig,
190
    EventInvokeConfig,
191
    Function,
192
    FunctionResourcePolicy,
193
    FunctionUrlConfig,
194
    FunctionVersion,
195
    ImageConfig,
196
    LambdaEphemeralStorage,
197
    Layer,
198
    LayerPolicy,
199
    LayerPolicyStatement,
200
    LayerVersion,
201
    ProvisionedConcurrencyConfiguration,
202
    RequestEntityTooLargeException,
203
    ResourcePolicy,
204
    UpdateStatus,
205
    ValidationException,
206
    VersionAlias,
207
    VersionFunctionConfiguration,
208
    VersionIdentifier,
209
    VersionState,
210
    VpcConfig,
211
)
212
from localstack.services.lambda_.invocation.lambda_service import (
1✔
213
    LambdaService,
214
    create_image_code,
215
    destroy_code_if_not_used,
216
    lambda_stores,
217
    store_lambda_archive,
218
    store_s3_bucket_archive,
219
)
220
from localstack.services.lambda_.invocation.models import CapacityProvider as CapacityProviderModel
1✔
221
from localstack.services.lambda_.invocation.models import LambdaStore
1✔
222
from localstack.services.lambda_.invocation.runtime_executor import get_runtime_executor
1✔
223
from localstack.services.lambda_.lambda_utils import HINT_LOG
1✔
224
from localstack.services.lambda_.layerfetcher.layer_fetcher import LayerFetcher
1✔
225
from localstack.services.lambda_.provider_utils import (
1✔
226
    LambdaLayerVersionIdentifier,
227
    get_function_version,
228
    get_function_version_from_arn,
229
)
230
from localstack.services.lambda_.runtimes import (
1✔
231
    ALL_RUNTIMES,
232
    DEPRECATED_RUNTIMES,
233
    DEPRECATED_RUNTIMES_UPGRADES,
234
    RUNTIMES_AGGREGATED,
235
    SNAP_START_SUPPORTED_RUNTIMES,
236
    VALID_RUNTIMES,
237
)
238
from localstack.services.lambda_.urlrouter import FunctionUrlRouter
1✔
239
from localstack.services.plugins import ServiceLifecycleHook
1✔
240
from localstack.state import StateVisitor
1✔
241
from localstack.utils.aws.arns import (
1✔
242
    ArnData,
243
    capacity_provider_arn,
244
    extract_resource_from_arn,
245
    extract_service_from_arn,
246
    get_partition,
247
    lambda_event_source_mapping_arn,
248
    parse_arn,
249
)
250
from localstack.utils.aws.client_types import ServicePrincipal
1✔
251
from localstack.utils.bootstrap import is_api_enabled
1✔
252
from localstack.utils.collections import PaginatedList, merge_recursive
1✔
253
from localstack.utils.event_matcher import validate_event_pattern
1✔
254
from localstack.utils.strings import get_random_hex, short_uid, to_bytes, to_str
1✔
255
from localstack.utils.sync import poll_condition
1✔
256
from localstack.utils.urls import localstack_host
1✔
257

258
LOG = logging.getLogger(__name__)
1✔
259

260
CAPACITY_PROVIDER_ARN_NAME = "arn:aws[a-zA-Z-]*:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:capacity-provider:[a-zA-Z0-9-_]+"
1✔
261
LAMBDA_DEFAULT_TIMEOUT = 3
1✔
262
LAMBDA_DEFAULT_MEMORY_SIZE = 128
1✔
263

264
LAMBDA_TAG_LIMIT_PER_RESOURCE = 50
1✔
265
LAMBDA_LAYERS_LIMIT_PER_FUNCTION = 5
1✔
266

267
TAG_KEY_CUSTOM_URL = "_custom_id_"
1✔
268
# Requirements (from RFC3986 & co): not longer than 63, first char must be
269
# alpha, then alphanumeric or hyphen, except cannot start or end with hyphen
270
TAG_KEY_CUSTOM_URL_VALIDATOR = re.compile(r"^[A-Za-z]([A-Za-z0-9\-]{0,61}[A-Za-z0-9])?$")
1✔
271

272

273
class LambdaProvider(LambdaApi, ServiceLifecycleHook):
1✔
274
    lambda_service: LambdaService
1✔
275
    create_fn_lock: threading.RLock
1✔
276
    create_layer_lock: threading.RLock
1✔
277
    router: FunctionUrlRouter
1✔
278
    esm_workers: dict[str, EsmWorker]
1✔
279
    layer_fetcher: LayerFetcher | None
1✔
280

281
    def __init__(self) -> None:
1✔
282
        self.lambda_service = LambdaService()
1✔
283
        self.create_fn_lock = threading.RLock()
1✔
284
        self.create_layer_lock = threading.RLock()
1✔
285
        self.router = FunctionUrlRouter(ROUTER, self.lambda_service)
1✔
286
        self.esm_workers = {}
1✔
287
        self.layer_fetcher = None
1✔
288
        lambda_hooks.inject_layer_fetcher.run(self)
1✔
289

290
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
291
        visitor.visit(lambda_stores)
×
292

293
    def on_before_state_reset(self):
1✔
294
        self.lambda_service.stop()
×
295

296
    def on_after_state_reset(self):
1✔
297
        self.router.lambda_service = self.lambda_service = LambdaService()
×
298

299
    def on_before_state_load(self):
1✔
300
        self.lambda_service.stop()
×
301

302
    def on_after_state_load(self):
1✔
303
        self.lambda_service = LambdaService()
×
304
        self.router.lambda_service = self.lambda_service
×
305

306
        for account_id, account_bundle in lambda_stores.items():
×
307
            for region_name, state in account_bundle.items():
×
308
                for fn in state.functions.values():
×
309
                    for fn_version in fn.versions.values():
×
310
                        try:
×
311
                            # $LATEST is not invokable for Lambda functions with a capacity provider
312
                            # and has a different State (i.e., ActiveNonInvokable)
313
                            is_capacity_provider_latest = (
×
314
                                fn_version.config.CapacityProviderConfig
315
                                and fn_version.id.qualifier == "$LATEST"
316
                            )
317
                            if not is_capacity_provider_latest:
×
318
                                # Restore the "Pending" state for the function version and start it
319
                                new_state = VersionState(
×
320
                                    state=State.Pending,
321
                                    code=StateReasonCode.Creating,
322
                                    reason="The function is being created.",
323
                                )
324
                                new_config = dataclasses.replace(fn_version.config, state=new_state)
×
325
                                new_version = dataclasses.replace(fn_version, config=new_config)
×
326
                                fn.versions[fn_version.id.qualifier] = new_version
×
327
                                self.lambda_service.create_function_version(fn_version).result(
×
328
                                    timeout=5
329
                                )
330
                        except Exception:
×
331
                            LOG.warning(
×
332
                                "Failed to restore function version %s",
333
                                fn_version.id.qualified_arn(),
334
                                exc_info=LOG.isEnabledFor(logging.DEBUG),
335
                            )
336
                    # restore provisioned concurrency per function considering both versions and aliases
337
                    for (
×
338
                        provisioned_qualifier,
339
                        provisioned_config,
340
                    ) in fn.provisioned_concurrency_configs.items():
341
                        fn_arn = None
×
342
                        try:
×
343
                            if api_utils.qualifier_is_alias(provisioned_qualifier):
×
344
                                alias = fn.aliases.get(provisioned_qualifier)
×
345
                                resolved_version = fn.versions.get(alias.function_version)
×
346
                                fn_arn = resolved_version.id.qualified_arn()
×
347
                            elif api_utils.qualifier_is_version(provisioned_qualifier):
×
348
                                fn_version = fn.versions.get(provisioned_qualifier)
×
349
                                fn_arn = fn_version.id.qualified_arn()
×
350
                            else:
351
                                raise InvalidParameterValueException(
×
352
                                    "Invalid qualifier type:"
353
                                    " Qualifier can only be an alias or a version for provisioned concurrency."
354
                                )
355

356
                            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
357
                            manager.update_provisioned_concurrency_config(
×
358
                                provisioned_config.provisioned_concurrent_executions
359
                            )
360
                        except Exception:
×
361
                            LOG.warning(
×
362
                                "Failed to restore provisioned concurrency %s for function %s",
363
                                provisioned_config,
364
                                fn_arn,
365
                                exc_info=LOG.isEnabledFor(logging.DEBUG),
366
                            )
367

368
                for esm in state.event_source_mappings.values():
×
369
                    # Restores event source workers
370
                    function_arn = esm.get("FunctionArn")
×
371

372
                    # TODO: How do we know the event source is up?
373
                    # A basic poll to see if the mapped Lambda function is active/failed
374
                    if not poll_condition(
×
375
                        lambda: get_function_version_from_arn(function_arn).config.state.state
376
                        in [State.Active, State.Failed],
377
                        timeout=10,
378
                    ):
379
                        LOG.warning(
×
380
                            "Creating ESM for Lambda that is not in running state: %s",
381
                            function_arn,
382
                        )
383

384
                    function_version = get_function_version_from_arn(function_arn)
×
385
                    function_role = function_version.config.role
×
386

387
                    is_esm_enabled = esm.get("State", EsmState.DISABLED) not in (
×
388
                        EsmState.DISABLED,
389
                        EsmState.DISABLING,
390
                    )
391
                    esm_worker = EsmWorkerFactory(
×
392
                        esm, function_role, is_esm_enabled
393
                    ).get_esm_worker()
394

395
                    # Note: a worker is created in the DISABLED state if not enabled
396
                    esm_worker.create()
×
397
                    # TODO: assigning the esm_worker to the dict only works after .create(). Could it cause a race
398
                    #  condition if we get a shutdown here and have a worker thread spawned but not accounted for?
399
                    self.esm_workers[esm_worker.uuid] = esm_worker
×
400

401
    def on_after_init(self):
1✔
402
        self.router.register_routes()
1✔
403
        get_runtime_executor().validate_environment()
1✔
404

405
    def on_before_stop(self) -> None:
1✔
406
        for esm_worker in self.esm_workers.values():
1✔
407
            esm_worker.stop_for_shutdown()
1✔
408

409
        # TODO: should probably unregister routes?
410
        self.lambda_service.stop()
1✔
411

412
    @staticmethod
1✔
413
    def _get_function(function_name: str, account_id: str, region: str) -> Function:
1✔
414
        state = lambda_stores[account_id][region]
1✔
415
        function = state.functions.get(function_name)
1✔
416
        if not function:
1✔
417
            arn = api_utils.unqualified_lambda_arn(
1✔
418
                function_name=function_name,
419
                account=account_id,
420
                region=region,
421
            )
422
            raise ResourceNotFoundException(
1✔
423
                f"Function not found: {arn}",
424
                Type="User",
425
            )
426
        return function
1✔
427

428
    @staticmethod
1✔
429
    def _get_esm(uuid: str, account_id: str, region: str) -> EventSourceMappingConfiguration:
1✔
430
        state = lambda_stores[account_id][region]
1✔
431
        esm = state.event_source_mappings.get(uuid)
1✔
432
        if not esm:
1✔
433
            arn = lambda_event_source_mapping_arn(uuid, account_id, region)
1✔
434
            raise ResourceNotFoundException(
1✔
435
                f"Event source mapping not found: {arn}",
436
                Type="User",
437
            )
438
        return esm
1✔
439

440
    @staticmethod
1✔
441
    def _get_capacity_provider(
1✔
442
        capacity_provider_name: str,
443
        account_id: str,
444
        region: str,
445
        error_msg_template: str = "Capacity provider not found: {}",
446
    ) -> CapacityProviderModel:
447
        state = lambda_stores[account_id][region]
1✔
448
        cp = state.capacity_providers.get(capacity_provider_name)
1✔
449
        if not cp:
1✔
450
            arn = capacity_provider_arn(capacity_provider_name, account_id, region)
1✔
451
            raise ResourceNotFoundException(
1✔
452
                error_msg_template.format(arn),
453
                Type="User",
454
            )
455
        return cp
×
456

457
    @staticmethod
1✔
458
    def _validate_qualifier_expression(qualifier: str) -> None:
1✔
459
        if error_messages := api_utils.validate_qualifier(qualifier):
1✔
460
            raise ValidationException(
×
461
                message=api_utils.construct_validation_exception_message(error_messages)
462
            )
463

464
    @staticmethod
1✔
465
    def _resolve_fn_qualifier(resolved_fn: Function, qualifier: str | None) -> tuple[str, str]:
1✔
466
        """Attempts to resolve a given qualifier and returns a qualifier that exists or
467
        raises an appropriate ResourceNotFoundException.
468

469
        :param resolved_fn: The resolved lambda function
470
        :param qualifier: The qualifier to be resolved or None
471
        :return: Tuple of (resolved qualifier, function arn either qualified or unqualified)"""
472
        function_name = resolved_fn.function_name
1✔
473
        # assuming function versions need to live in the same account and region
474
        account_id = resolved_fn.latest().id.account
1✔
475
        region = resolved_fn.latest().id.region
1✔
476
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
477
        if qualifier is not None:
1✔
478
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
479
            if api_utils.qualifier_is_alias(qualifier):
1✔
480
                if qualifier not in resolved_fn.aliases:
1✔
481
                    raise ResourceNotFoundException(f"Cannot find alias arn: {fn_arn}", Type="User")
1✔
482
            elif api_utils.qualifier_is_version(qualifier) or qualifier == "$LATEST":
1✔
483
                if qualifier not in resolved_fn.versions:
1✔
484
                    raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
485
            else:
486
                # matches qualifier pattern but invalid alias or version
487
                raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
488
        resolved_qualifier = qualifier or "$LATEST"
1✔
489
        return resolved_qualifier, fn_arn
1✔
490

491
    @staticmethod
1✔
492
    def _function_revision_id(resolved_fn: Function, resolved_qualifier: str) -> str:
1✔
493
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
494
            return resolved_fn.aliases[resolved_qualifier].revision_id
1✔
495
        # Assumes that a non-alias is a version
496
        else:
497
            return resolved_fn.versions[resolved_qualifier].config.revision_id
1✔
498

499
    def _resolve_vpc_id(self, account_id: str, region_name: str, subnet_id: str) -> str:
1✔
500
        ec2_client = connect_to(aws_access_key_id=account_id, region_name=region_name).ec2
1✔
501
        try:
1✔
502
            return ec2_client.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["VpcId"]
1✔
503
        except ec2_client.exceptions.ClientError as e:
1✔
504
            code = e.response["Error"]["Code"]
1✔
505
            message = e.response["Error"]["Message"]
1✔
506
            raise InvalidParameterValueException(
1✔
507
                f"Error occurred while DescribeSubnets. EC2 Error Code: {code}. EC2 Error Message: {message}",
508
                Type="User",
509
            )
510

511
    def _build_vpc_config(
1✔
512
        self,
513
        account_id: str,
514
        region_name: str,
515
        vpc_config: dict | None = None,
516
    ) -> VpcConfig | None:
517
        if not vpc_config or not is_api_enabled("ec2"):
1✔
518
            return None
1✔
519

520
        subnet_ids = vpc_config.get("SubnetIds", [])
1✔
521
        if subnet_ids is not None and len(subnet_ids) == 0:
1✔
522
            return VpcConfig(vpc_id="", security_group_ids=[], subnet_ids=[])
1✔
523

524
        subnet_id = subnet_ids[0]
1✔
525
        if not bool(SUBNET_ID_REGEX.match(subnet_id)):
1✔
526
            raise ValidationException(
1✔
527
                f"1 validation error detected: Value '[{subnet_id}]' at 'vpcConfig.subnetIds' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 0, Member must satisfy regular expression pattern: subnet-[0-9a-z]*]"
528
            )
529

530
        return VpcConfig(
1✔
531
            vpc_id=self._resolve_vpc_id(account_id, region_name, subnet_id),
532
            security_group_ids=vpc_config.get("SecurityGroupIds", []),
533
            subnet_ids=subnet_ids,
534
        )
535

536
    def _create_version_model(
1✔
537
        self,
538
        function_name: str,
539
        region: str,
540
        account_id: str,
541
        description: str | None = None,
542
        revision_id: str | None = None,
543
        code_sha256: str | None = None,
544
        publish_to: FunctionVersionLatestPublished | None = None,
545
        is_active: bool = False,
546
    ) -> tuple[FunctionVersion, bool]:
547
        """
548
        Release a new version to the model if all restrictions are met.
549
        Restrictions:
550
          - CodeSha256, if provided, must equal the current latest version code hash
551
          - RevisionId, if provided, must equal the current latest version revision id
552
          - Some changes have been done to the latest version since last publish
553
        Will return a tuple of the version, and whether the version was published (True) or the latest available version was taken (False).
554
        This can happen if the latest version has not been changed since the last version publish, in this case the last version will be returned.
555

556
        :param function_name: Function name to be published
557
        :param region: Region of the function
558
        :param account_id: Account of the function
559
        :param description: new description of the version (will be the description of the function if missing)
560
        :param revision_id: Revision id, function will raise error if it does not match latest revision id
561
        :param code_sha256: Code sha256, function will raise error if it does not match latest code hash
562
        :return: Tuple of (published version, whether version was released or last released version returned, since nothing changed)
563
        """
564
        current_latest_version = get_function_version(
1✔
565
            function_name=function_name, qualifier="$LATEST", account_id=account_id, region=region
566
        )
567
        if revision_id and current_latest_version.config.revision_id != revision_id:
1✔
568
            raise PreconditionFailedException(
1✔
569
                "The Revision Id provided does not match the latest Revision Id. Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
570
                Type="User",
571
            )
572

573
        # check if code hashes match if they are specified
574
        current_hash = (
1✔
575
            current_latest_version.config.code.code_sha256
576
            if current_latest_version.config.package_type == PackageType.Zip
577
            else current_latest_version.config.image.code_sha256
578
        )
579
        # if the code is a zip package and hot reloaded (hot reloading is currently only supported for zip packagetypes)
580
        # we cannot enforce the codesha256 check
581
        is_hot_reloaded_zip_package = (
1✔
582
            current_latest_version.config.package_type == PackageType.Zip
583
            and current_latest_version.config.code.is_hot_reloading()
584
        )
585
        if code_sha256 and current_hash != code_sha256 and not is_hot_reloaded_zip_package:
1✔
586
            raise InvalidParameterValueException(
1✔
587
                f"CodeSHA256 ({code_sha256}) is different from current CodeSHA256 in $LATEST ({current_hash}). Please try again with the CodeSHA256 in $LATEST.",
588
                Type="User",
589
            )
590

591
        state = lambda_stores[account_id][region]
1✔
592
        function = state.functions.get(function_name)
1✔
593
        changes = {}
1✔
594
        if description is not None:
1✔
595
            changes["description"] = description
1✔
596
        # TODO copy environment instead of restarting one, get rid of all the "Pending"s
597

598
        with function.lock:
1✔
599
            if function.next_version > 1 and (
1✔
600
                prev_version := function.versions.get(str(function.next_version - 1))
601
            ):
602
                if (
1✔
603
                    prev_version.config.internal_revision
604
                    == current_latest_version.config.internal_revision
605
                ):
606
                    return prev_version, False
1✔
607
            # TODO check if there was a change since last version
608
            if publish_to == FunctionVersionLatestPublished.LATEST_PUBLISHED:
1✔
609
                qualifier = "$LATEST.PUBLISHED"
×
610
            else:
611
                qualifier = str(function.next_version)
1✔
612
                function.next_version += 1
1✔
613
            new_id = VersionIdentifier(
1✔
614
                function_name=function_name,
615
                qualifier=qualifier,
616
                region=region,
617
                account=account_id,
618
            )
619

620
            if current_latest_version.config.CapacityProviderConfig:
1✔
621
                # for lambda managed functions, snap start is not supported
622
                snap_start = None
×
623
            else:
624
                apply_on = current_latest_version.config.snap_start["ApplyOn"]
1✔
625
                optimization_status = SnapStartOptimizationStatus.Off
1✔
626
                if apply_on == SnapStartApplyOn.PublishedVersions:
1✔
627
                    optimization_status = SnapStartOptimizationStatus.On
×
628
                snap_start = SnapStartResponse(
1✔
629
                    ApplyOn=apply_on,
630
                    OptimizationStatus=optimization_status,
631
                )
632

633
            last_update = None
1✔
634
            new_state = VersionState(
1✔
635
                state=State.Pending,
636
                code=StateReasonCode.Creating,
637
                reason="The function is being created.",
638
            )
639
            if publish_to == FunctionVersionLatestPublished.LATEST_PUBLISHED:
1✔
640
                last_update = UpdateStatus(
×
641
                    status=LastUpdateStatus.InProgress,
642
                    code="Updating",
643
                    reason="The function is being updated.",
644
                )
645
                if is_active:
×
646
                    new_state = VersionState(state=State.Active)
×
647
            new_version = dataclasses.replace(
1✔
648
                current_latest_version,
649
                config=dataclasses.replace(
650
                    current_latest_version.config,
651
                    last_update=last_update,
652
                    state=new_state,
653
                    snap_start=snap_start,
654
                    **changes,
655
                ),
656
                id=new_id,
657
            )
658
            function.versions[qualifier] = new_version
1✔
659
        return new_version, True
1✔
660

661
    def _publish_version_from_existing_version(
1✔
662
        self,
663
        function_name: str,
664
        region: str,
665
        account_id: str,
666
        description: str | None = None,
667
        revision_id: str | None = None,
668
        code_sha256: str | None = None,
669
        publish_to: FunctionVersionLatestPublished | None = None,
670
    ) -> FunctionVersion:
671
        """
672
        Publish version from an existing, already initialized LATEST
673

674
        :param function_name: Function name
675
        :param region: region
676
        :param account_id: account id
677
        :param description: description
678
        :param revision_id: revision id (check if current version matches)
679
        :param code_sha256: code sha (check if current code matches)
680
        :return: new version
681
        """
682
        is_active = True if publish_to == FunctionVersionLatestPublished.LATEST_PUBLISHED else False
1✔
683
        new_version, changed = self._create_version_model(
1✔
684
            function_name=function_name,
685
            region=region,
686
            account_id=account_id,
687
            description=description,
688
            revision_id=revision_id,
689
            code_sha256=code_sha256,
690
            publish_to=publish_to,
691
            is_active=is_active,
692
        )
693
        if not changed:
1✔
694
            return new_version
1✔
695

696
        if new_version.config.CapacityProviderConfig:
1✔
697
            self.lambda_service.publish_version_async(new_version)
×
698
        else:
699
            self.lambda_service.publish_version(new_version)
1✔
700
        state = lambda_stores[account_id][region]
1✔
701
        function = state.functions.get(function_name)
1✔
702

703
        # Update revision id for $LATEST version
704
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
705
        latest_version = function.versions["$LATEST"]
1✔
706
        function.versions["$LATEST"] = dataclasses.replace(
1✔
707
            latest_version, config=dataclasses.replace(latest_version.config)
708
        )
709
        if new_version.config.CapacityProviderConfig:
1✔
710
            # publish_version happens async for functions with a capacity provider.
711
            # Therefore, we return the new_version with State=Pending or LastUpdateStatus=InProgress ($LATEST.PUBLISHED)
712
            return new_version
×
713
        else:
714
            # Regular functions yield an Active state modified during `publish_version` (sync).
715
            # Therefore, we need to get the updated version from the store.
716
            updated_version = function.versions.get(new_version.id.qualifier)
1✔
717
            return updated_version
1✔
718

719
    def _publish_version_with_changes(
1✔
720
        self,
721
        function_name: str,
722
        region: str,
723
        account_id: str,
724
        description: str | None = None,
725
        revision_id: str | None = None,
726
        code_sha256: str | None = None,
727
        publish_to: FunctionVersionLatestPublished | None = None,
728
        is_active: bool = False,
729
    ) -> FunctionVersion:
730
        """
731
        Publish version together with a new latest version (publish on create / update)
732

733
        :param function_name: Function name
734
        :param region: region
735
        :param account_id: account id
736
        :param description: description
737
        :param revision_id: revision id (check if current version matches)
738
        :param code_sha256: code sha (check if current code matches)
739
        :return: new version
740
        """
741
        new_version, changed = self._create_version_model(
1✔
742
            function_name=function_name,
743
            region=region,
744
            account_id=account_id,
745
            description=description,
746
            revision_id=revision_id,
747
            code_sha256=code_sha256,
748
            publish_to=publish_to,
749
            is_active=is_active,
750
        )
751
        if not changed:
1✔
752
            return new_version
×
753
        self.lambda_service.create_function_version(new_version)
1✔
754
        return new_version
1✔
755

756
    @staticmethod
1✔
757
    def _verify_env_variables(env_vars: dict[str, str]):
1✔
758
        dumped_env_vars = json.dumps(env_vars, separators=(",", ":"))
1✔
759
        if (
1✔
760
            len(dumped_env_vars.encode("utf-8"))
761
            > config.LAMBDA_LIMITS_MAX_FUNCTION_ENVVAR_SIZE_BYTES
762
        ):
763
            raise InvalidParameterValueException(
1✔
764
                f"Lambda was unable to configure your environment variables because the environment variables you have provided exceeded the 4KB limit. String measured: {dumped_env_vars}",
765
                Type="User",
766
            )
767

768
    @staticmethod
1✔
769
    def _validate_snapstart(snap_start: SnapStart, runtime: Runtime):
1✔
770
        apply_on = snap_start.get("ApplyOn")
1✔
771
        if apply_on not in [
1✔
772
            SnapStartApplyOn.PublishedVersions,
773
            SnapStartApplyOn.None_,
774
        ]:
775
            raise ValidationException(
1✔
776
                f"1 validation error detected: Value '{apply_on}' at 'snapStart.applyOn' failed to satisfy constraint: Member must satisfy enum value set: [PublishedVersions, None]"
777
            )
778

779
        if runtime not in SNAP_START_SUPPORTED_RUNTIMES:
1✔
780
            raise InvalidParameterValueException(
×
781
                f"{runtime} is not supported for SnapStart enabled functions.", Type="User"
782
            )
783

784
    def _validate_layers(self, new_layers: list[str], region: str, account_id: str):
1✔
785
        if len(new_layers) > LAMBDA_LAYERS_LIMIT_PER_FUNCTION:
1✔
786
            raise InvalidParameterValueException(
1✔
787
                "Cannot reference more than 5 layers.", Type="User"
788
            )
789

790
        visited_layers = {}
1✔
791
        for layer_version_arn in new_layers:
1✔
792
            (
1✔
793
                layer_region,
794
                layer_account_id,
795
                layer_name,
796
                layer_version_str,
797
            ) = api_utils.parse_layer_arn(layer_version_arn)
798
            if layer_version_str is None:
1✔
799
                raise ValidationException(
1✔
800
                    f"1 validation error detected: Value '[{layer_version_arn}]'"
801
                    + " at 'layers' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 2048, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: "
802
                    + "(arn:(aws[a-zA-Z-]*)?:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+), Member must not be null]",
803
                )
804

805
            state = lambda_stores[layer_account_id][layer_region]
1✔
806
            layer = state.layers.get(layer_name)
1✔
807
            layer_version = None
1✔
808
            if layer is not None:
1✔
809
                layer_version = layer.layer_versions.get(layer_version_str)
1✔
810
            if layer_account_id == account_id:
1✔
811
                if region and layer_region != region:
1✔
812
                    raise InvalidParameterValueException(
1✔
813
                        f"Layers are not in the same region as the function. "
814
                        f"Layers are expected to be in region {region}.",
815
                        Type="User",
816
                    )
817
                if layer is None or layer.layer_versions.get(layer_version_str) is None:
1✔
818
                    raise InvalidParameterValueException(
1✔
819
                        f"Layer version {layer_version_arn} does not exist.", Type="User"
820
                    )
821
            else:  # External layer from other account
822
                # TODO: validate IAM layer policy here, allowing access by default for now and only checking region
823
                if region and layer_region != region:
×
824
                    # TODO: detect user or role from context when IAM users are implemented
825
                    user = "user/localstack-testing"
×
826
                    raise AccessDeniedException(
×
827
                        f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
828
                    )
829
                if layer is None or layer_version is None:
×
830
                    # Limitation: cannot fetch external layers when using the same account id as the target layer
831
                    # because we do not want to trigger the layer fetcher for every non-existing layer.
832
                    if self.layer_fetcher is None:
×
833
                        raise NotImplementedError(
834
                            "Fetching shared layers from AWS is a pro feature."
835
                        )
836

837
                    layer = self.layer_fetcher.fetch_layer(layer_version_arn)
×
838
                    if layer is None:
×
839
                        # TODO: detect user or role from context when IAM users are implemented
840
                        user = "user/localstack-testing"
×
841
                        raise AccessDeniedException(
×
842
                            f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
843
                        )
844

845
                    # Distinguish between new layer and new layer version
846
                    if layer_version is None:
×
847
                        # Create whole layer from scratch
848
                        state.layers[layer_name] = layer
×
849
                    else:
850
                        # Create layer version if another version of the same layer already exists
851
                        state.layers[layer_name].layer_versions[layer_version_str] = (
×
852
                            layer.layer_versions.get(layer_version_str)
853
                        )
854

855
            # only the first two matches in the array are considered for the error message
856
            layer_arn = ":".join(layer_version_arn.split(":")[:-1])
1✔
857
            if layer_arn in visited_layers:
1✔
858
                conflict_layer_version_arn = visited_layers[layer_arn]
1✔
859
                raise InvalidParameterValueException(
1✔
860
                    f"Two different versions of the same layer are not allowed to be referenced in the same function. {conflict_layer_version_arn} and {layer_version_arn} are versions of the same layer.",
861
                    Type="User",
862
                )
863
            visited_layers[layer_arn] = layer_version_arn
1✔
864

865
    def _validate_capacity_provider_config(
1✔
866
        self, capacity_provider_config: CapacityProviderConfig, context: RequestContext
867
    ):
868
        if not capacity_provider_config.get("LambdaManagedInstancesCapacityProviderConfig"):
×
869
            raise ValidationException(
×
870
                "1 validation error detected: Value null at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig' failed to satisfy constraint: Member must not be null"
871
            )
872

873
        capacity_provider_arn = capacity_provider_config.get(
×
874
            "LambdaManagedInstancesCapacityProviderConfig", {}
875
        ).get("CapacityProviderArn")
876
        if not capacity_provider_arn:
×
877
            raise ValidationException(
×
878
                "1 validation error detected: Value null at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig.capacityProviderArn' failed to satisfy constraint: Member must not be null"
879
            )
880

881
        if not re.match(CAPACITY_PROVIDER_ARN_NAME, capacity_provider_arn):
×
882
            raise ValidationException(
×
883
                f"1 validation error detected: Value '{capacity_provider_arn}' at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig.capacityProviderArn' failed to satisfy constraint: Member must satisfy regular expression pattern: {CAPACITY_PROVIDER_ARN_NAME}"
884
            )
885

886
        capacity_provider_name = capacity_provider_arn.split(":")[-1]
×
887
        self.get_capacity_provider(context, capacity_provider_name)
×
888

889
    @staticmethod
1✔
890
    def map_layers(new_layers: list[str]) -> list[LayerVersion]:
1✔
891
        layers = []
1✔
892
        for layer_version_arn in new_layers:
1✔
893
            region_name, account_id, layer_name, layer_version = api_utils.parse_layer_arn(
1✔
894
                layer_version_arn
895
            )
896
            layer = lambda_stores[account_id][region_name].layers.get(layer_name)
1✔
897
            layer_version = layer.layer_versions.get(layer_version)
1✔
898
            layers.append(layer_version)
1✔
899
        return layers
1✔
900

901
    def get_function_recursion_config(
1✔
902
        self,
903
        context: RequestContext,
904
        function_name: UnqualifiedFunctionName,
905
        **kwargs,
906
    ) -> GetFunctionRecursionConfigResponse:
907
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
908
        function_name = api_utils.get_function_name(function_name, context)
1✔
909
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
910
        return GetFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
911

912
    def put_function_recursion_config(
1✔
913
        self,
914
        context: RequestContext,
915
        function_name: UnqualifiedFunctionName,
916
        recursive_loop: RecursiveLoop,
917
        **kwargs,
918
    ) -> PutFunctionRecursionConfigResponse:
919
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
920
        function_name = api_utils.get_function_name(function_name, context)
1✔
921

922
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
923

924
        allowed_values = list(RecursiveLoop.__members__.values())
1✔
925
        if recursive_loop not in allowed_values:
1✔
926
            raise ValidationException(
1✔
927
                f"1 validation error detected: Value '{recursive_loop}' at 'recursiveLoop' failed to satisfy constraint: "
928
                f"Member must satisfy enum value set: [Terminate, Allow]"
929
            )
930

931
        fn.recursive_loop = recursive_loop
1✔
932
        return PutFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
933

934
    @handler(operation="CreateFunction", expand=False)
1✔
935
    def create_function(
1✔
936
        self,
937
        context: RequestContext,
938
        request: CreateFunctionRequest,
939
    ) -> FunctionConfiguration:
940
        context_region = context.region
1✔
941
        context_account_id = context.account_id
1✔
942

943
        zip_file = request.get("Code", {}).get("ZipFile")
1✔
944
        if zip_file and len(zip_file) > config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED:
1✔
945
            raise RequestEntityTooLargeException(
1✔
946
                f"Zipped size must be smaller than {config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED} bytes"
947
            )
948

949
        if context.request.content_length > config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE:
1✔
950
            raise RequestEntityTooLargeException(
1✔
951
                f"Request must be smaller than {config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE} bytes for the CreateFunction operation"
952
            )
953

954
        if architectures := request.get("Architectures"):
1✔
955
            if len(architectures) != 1:
1✔
956
                raise ValidationException(
1✔
957
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
958
                    f"satisfy constraint: Member must have length less than or equal to 1",
959
                )
960
            if architectures[0] not in ARCHITECTURES:
1✔
961
                raise ValidationException(
1✔
962
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
963
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
964
                    f"[x86_64, arm64], Member must not be null]",
965
                )
966

967
        if env_vars := request.get("Environment", {}).get("Variables"):
1✔
968
            self._verify_env_variables(env_vars)
1✔
969

970
        if layers := request.get("Layers", []):
1✔
971
            self._validate_layers(layers, region=context_region, account_id=context_account_id)
1✔
972

973
        if not api_utils.is_role_arn(request.get("Role")):
1✔
974
            raise ValidationException(
1✔
975
                f"1 validation error detected: Value '{request.get('Role')}'"
976
                + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
977
            )
978
        if not self.lambda_service.can_assume_role(request.get("Role"), context.region):
1✔
979
            raise InvalidParameterValueException(
×
980
                "The role defined for the function cannot be assumed by Lambda.", Type="User"
981
            )
982
        package_type = request.get("PackageType", PackageType.Zip)
1✔
983
        runtime = request.get("Runtime")
1✔
984
        self._validate_runtime(package_type, runtime)
1✔
985

986
        request_function_name = request.get("FunctionName")
1✔
987

988
        function_name, *_ = api_utils.get_name_and_qualifier(
1✔
989
            function_arn_or_name=request_function_name,
990
            qualifier=None,
991
            context=context,
992
        )
993

994
        if runtime in DEPRECATED_RUNTIMES:
1✔
995
            LOG.warning(
1✔
996
                "The Lambda runtime %s} is deprecated. "
997
                "Please upgrade the runtime for the function %s: "
998
                "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
999
                runtime,
1000
                function_name,
1001
            )
1002
        if snap_start := request.get("SnapStart"):
1✔
1003
            self._validate_snapstart(snap_start, runtime)
1✔
1004
        state = lambda_stores[context_account_id][context_region]
1✔
1005

1006
        with self.create_fn_lock:
1✔
1007
            if function_name in state.functions:
1✔
1008
                raise ResourceConflictException(f"Function already exist: {function_name}")
×
1009
            fn = Function(function_name=function_name)
1✔
1010
            arn = VersionIdentifier(
1✔
1011
                function_name=function_name,
1012
                qualifier="$LATEST",
1013
                region=context_region,
1014
                account=context_account_id,
1015
            )
1016
            # save function code to s3
1017
            code = None
1✔
1018
            image = None
1✔
1019
            image_config = None
1✔
1020
            runtime_version_config = RuntimeVersionConfig(
1✔
1021
                # Limitation: the runtime id (presumably sha256 of image) is currently hardcoded
1022
                # Potential implementation: provide (cached) sha256 hash of used Docker image
1023
                RuntimeVersionArn=f"arn:{context.partition}:lambda:{context_region}::runtime:8eeff65f6809a3ce81507fe733fe09b835899b99481ba22fd75b5a7338290ec1"
1024
            )
1025
            request_code = request.get("Code")
1✔
1026
            if package_type == PackageType.Zip:
1✔
1027
                # TODO verify if correct combination of code is set
1028
                if zip_file := request_code.get("ZipFile"):
1✔
1029
                    code = store_lambda_archive(
1✔
1030
                        archive_file=zip_file,
1031
                        function_name=function_name,
1032
                        region_name=context_region,
1033
                        account_id=context_account_id,
1034
                    )
1035
                elif s3_bucket := request_code.get("S3Bucket"):
1✔
1036
                    s3_key = request_code["S3Key"]
1✔
1037
                    s3_object_version = request_code.get("S3ObjectVersion")
1✔
1038
                    code = store_s3_bucket_archive(
1✔
1039
                        archive_bucket=s3_bucket,
1040
                        archive_key=s3_key,
1041
                        archive_version=s3_object_version,
1042
                        function_name=function_name,
1043
                        region_name=context_region,
1044
                        account_id=context_account_id,
1045
                    )
1046
                else:
1047
                    raise LambdaServiceException("A ZIP file or S3 bucket is required")
×
1048
            elif package_type == PackageType.Image:
1✔
1049
                image = request_code.get("ImageUri")
1✔
1050
                if not image:
1✔
1051
                    raise LambdaServiceException(
×
1052
                        "An image is required when the package type is set to 'image'"
1053
                    )
1054
                image = create_image_code(image_uri=image)
1✔
1055

1056
                image_config_req = request.get("ImageConfig", {})
1✔
1057
                image_config = ImageConfig(
1✔
1058
                    command=image_config_req.get("Command"),
1059
                    entrypoint=image_config_req.get("EntryPoint"),
1060
                    working_directory=image_config_req.get("WorkingDirectory"),
1061
                )
1062
                # Runtime management controls are not available when providing a custom image
1063
                runtime_version_config = None
1✔
1064

1065
            capacity_provider_config = None
1✔
1066
            memory_size = request.get("MemorySize", LAMBDA_DEFAULT_MEMORY_SIZE)
1✔
1067
            if "CapacityProviderConfig" in request:
1✔
1068
                capacity_provider_config = request["CapacityProviderConfig"]
×
1069
                self._validate_capacity_provider_config(capacity_provider_config, context)
×
1070

1071
                default_config = CapacityProviderConfig(
×
1072
                    LambdaManagedInstancesCapacityProviderConfig=LambdaManagedInstancesCapacityProviderConfig(
1073
                        ExecutionEnvironmentMemoryGiBPerVCpu=2.0,
1074
                        PerExecutionEnvironmentMaxConcurrency=16,
1075
                    )
1076
                )
1077
                capacity_provider_config = merge_recursive(default_config, capacity_provider_config)
×
1078
                memory_size = 2048
×
1079
                if request.get("LoggingConfig", {}).get("LogFormat") == LogFormat.Text:
×
1080
                    raise InvalidParameterValueException(
×
1081
                        'LogLevel is not supported when LogFormat is set to "Text". Remove LogLevel from your request or change the LogFormat to "JSON" and try again.',
1082
                        Type="User",
1083
                    )
1084
            if "LoggingConfig" in request:
1✔
1085
                logging_config = request["LoggingConfig"]
1✔
1086
                LOG.warning(
1✔
1087
                    "Advanced Lambda Logging Configuration is currently mocked "
1088
                    "and will not impact the logging behavior. "
1089
                    "Please create a feature request if needed."
1090
                )
1091

1092
                # when switching to JSON, app and system level log is auto set to INFO
1093
                if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
1094
                    logging_config = {
1✔
1095
                        "ApplicationLogLevel": "INFO",
1096
                        "SystemLogLevel": "INFO",
1097
                        "LogGroup": f"/aws/lambda/{function_name}",
1098
                    } | logging_config
1099
                else:
1100
                    logging_config = (
×
1101
                        LoggingConfig(
1102
                            LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
1103
                        )
1104
                        | logging_config
1105
                    )
1106

1107
            elif capacity_provider_config:
1✔
1108
                logging_config = LoggingConfig(
×
1109
                    LogFormat=LogFormat.JSON,
1110
                    LogGroup=f"/aws/lambda/{function_name}",
1111
                    ApplicationLogLevel="INFO",
1112
                    SystemLogLevel="INFO",
1113
                )
1114
            else:
1115
                logging_config = LoggingConfig(
1✔
1116
                    LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
1117
                )
1118
            snap_start = (
1✔
1119
                None
1120
                if capacity_provider_config
1121
                else SnapStartResponse(
1122
                    ApplyOn=request.get("SnapStart", {}).get("ApplyOn", SnapStartApplyOn.None_),
1123
                    OptimizationStatus=SnapStartOptimizationStatus.Off,
1124
                )
1125
            )
1126
            version = FunctionVersion(
1✔
1127
                id=arn,
1128
                config=VersionFunctionConfiguration(
1129
                    last_modified=api_utils.format_lambda_date(datetime.datetime.now()),
1130
                    description=request.get("Description", ""),
1131
                    role=request["Role"],
1132
                    timeout=request.get("Timeout", LAMBDA_DEFAULT_TIMEOUT),
1133
                    runtime=request.get("Runtime"),
1134
                    memory_size=memory_size,
1135
                    handler=request.get("Handler"),
1136
                    package_type=package_type,
1137
                    environment=env_vars,
1138
                    architectures=request.get("Architectures") or [Architecture.x86_64],
1139
                    tracing_config_mode=request.get("TracingConfig", {}).get(
1140
                        "Mode", TracingMode.PassThrough
1141
                    ),
1142
                    image=image,
1143
                    image_config=image_config,
1144
                    code=code,
1145
                    layers=self.map_layers(layers),
1146
                    internal_revision=short_uid(),
1147
                    ephemeral_storage=LambdaEphemeralStorage(
1148
                        size=request.get("EphemeralStorage", {}).get("Size", 512)
1149
                    ),
1150
                    snap_start=snap_start,
1151
                    runtime_version_config=runtime_version_config,
1152
                    dead_letter_arn=request.get("DeadLetterConfig", {}).get("TargetArn"),
1153
                    vpc_config=self._build_vpc_config(
1154
                        context_account_id, context_region, request.get("VpcConfig")
1155
                    ),
1156
                    state=VersionState(
1157
                        state=State.Pending,
1158
                        code=StateReasonCode.Creating,
1159
                        reason="The function is being created.",
1160
                    ),
1161
                    logging_config=logging_config,
1162
                    # TODO: might need something like **optional_kwargs if None
1163
                    #   -> Test with regular GetFunction (i.e., without a capacity provider)
1164
                    CapacityProviderConfig=capacity_provider_config,
1165
                ),
1166
            )
1167
            version_post_response = None
1✔
1168
            if capacity_provider_config:
1✔
1169
                version_post_response = dataclasses.replace(
×
1170
                    version,
1171
                    config=dataclasses.replace(
1172
                        version.config,
1173
                        last_update=UpdateStatus(status=LastUpdateStatus.Successful),
1174
                        state=VersionState(state=State.ActiveNonInvocable),
1175
                    ),
1176
                )
1177
            fn.versions["$LATEST"] = version_post_response or version
1✔
1178
            state.functions[function_name] = fn
1✔
1179
        initialization_type = (
1✔
1180
            FunctionInitializationType.lambda_managed_instances
1181
            if capacity_provider_config
1182
            else FunctionInitializationType.on_demand
1183
        )
1184
        function_counter.labels(
1✔
1185
            operation=FunctionOperation.create,
1186
            runtime=runtime or "n/a",
1187
            status=FunctionStatus.success,
1188
            invocation_type="n/a",
1189
            package_type=package_type,
1190
            initialization_type=initialization_type,
1191
        )
1192
        # TODO: consider potential other side effects of not having a function version for $LATEST
1193
        # Provisioning happens upon publishing for functions using a capacity provider
1194
        if not capacity_provider_config:
1✔
1195
            self.lambda_service.create_function_version(version)
1✔
1196

1197
        if tags := request.get("Tags"):
1✔
1198
            # This will check whether the function exists.
1199
            self._store_tags(arn.unqualified_arn(), tags)
1✔
1200

1201
        if request.get("Publish"):
1✔
1202
            version = self._publish_version_with_changes(
1✔
1203
                function_name=function_name,
1204
                region=context_region,
1205
                account_id=context_account_id,
1206
                publish_to=request.get("PublishTo"),
1207
            )
1208

1209
        if config.LAMBDA_SYNCHRONOUS_CREATE:
1✔
1210
            # block via retrying until "terminal" condition reached before returning
1211
            if not poll_condition(
×
1212
                lambda: get_function_version(
1213
                    function_name, version.id.qualifier, version.id.account, version.id.region
1214
                ).config.state.state
1215
                in [State.Active, State.ActiveNonInvocable, State.Failed],
1216
                timeout=10,
1217
            ):
1218
                LOG.warning(
×
1219
                    "LAMBDA_SYNCHRONOUS_CREATE is active, but waiting for %s reached timeout.",
1220
                    function_name,
1221
                )
1222

1223
        return api_utils.map_config_out(
1✔
1224
            version, return_qualified_arn=False, return_update_status=False
1225
        )
1226

1227
    def _validate_runtime(self, package_type, runtime):
1✔
1228
        runtimes = ALL_RUNTIMES
1✔
1229
        if config.LAMBDA_RUNTIME_VALIDATION:
1✔
1230
            runtimes = list(itertools.chain(RUNTIMES_AGGREGATED.values()))
1✔
1231

1232
        if package_type == PackageType.Zip and runtime not in runtimes:
1✔
1233
            # deprecated runtimes have different error
1234
            if runtime in DEPRECATED_RUNTIMES:
1✔
1235
                HINT_LOG.info(
1✔
1236
                    "Set env variable LAMBDA_RUNTIME_VALIDATION to 0"
1237
                    " in order to allow usage of deprecated runtimes"
1238
                )
1239
                self._check_for_recomended_migration_target(runtime)
1✔
1240

1241
            raise InvalidParameterValueException(
1✔
1242
                f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1243
                Type="User",
1244
            )
1245

1246
    def _check_for_recomended_migration_target(self, deprecated_runtime):
1✔
1247
        # AWS offers recommended runtime for migration for "newly" deprecated runtimes
1248
        # in order to preserve parity with error messages we need the code bellow
1249
        latest_runtime = DEPRECATED_RUNTIMES_UPGRADES.get(deprecated_runtime)
1✔
1250

1251
        if latest_runtime is not None:
1✔
1252
            LOG.debug(
1✔
1253
                "The Lambda runtime %s is deprecated. Please upgrade to a supported Lambda runtime such as %s.",
1254
                deprecated_runtime,
1255
                latest_runtime,
1256
            )
1257
            raise InvalidParameterValueException(
1✔
1258
                f"The runtime parameter of {deprecated_runtime} is no longer supported for creating or updating AWS Lambda functions. We recommend you use a supported runtime while creating or updating functions.",
1259
                Type="User",
1260
            )
1261

1262
    @handler(operation="UpdateFunctionConfiguration", expand=False)
1✔
1263
    def update_function_configuration(
1✔
1264
        self, context: RequestContext, request: UpdateFunctionConfigurationRequest
1265
    ) -> FunctionConfiguration:
1266
        """updates the $LATEST version of the function"""
1267
        function_name = request.get("FunctionName")
1✔
1268

1269
        # in case we got ARN or partial ARN
1270
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1271
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1272
        state = lambda_stores[account_id][region]
1✔
1273

1274
        if function_name not in state.functions:
1✔
1275
            raise ResourceNotFoundException(
×
1276
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1277
                Type="User",
1278
            )
1279
        function = state.functions[function_name]
1✔
1280

1281
        # TODO: lock modification of latest version
1282
        # TODO: notify service for changes relevant to re-provisioning of $LATEST
1283
        latest_version = function.latest()
1✔
1284
        latest_version_config = latest_version.config
1✔
1285

1286
        revision_id = request.get("RevisionId")
1✔
1287
        if revision_id and revision_id != latest_version.config.revision_id:
1✔
1288
            raise PreconditionFailedException(
1✔
1289
                "The Revision Id provided does not match the latest Revision Id. "
1290
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1291
                Type="User",
1292
            )
1293

1294
        replace_kwargs = {}
1✔
1295
        if "EphemeralStorage" in request:
1✔
1296
            replace_kwargs["ephemeral_storage"] = LambdaEphemeralStorage(
×
1297
                request.get("EphemeralStorage", {}).get("Size", 512)
1298
            )  # TODO: do defaults here apply as well?
1299

1300
        if "Role" in request:
1✔
1301
            if not api_utils.is_role_arn(request["Role"]):
1✔
1302
                raise ValidationException(
1✔
1303
                    f"1 validation error detected: Value '{request.get('Role')}'"
1304
                    + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
1305
                )
1306
            replace_kwargs["role"] = request["Role"]
1✔
1307

1308
        if "Description" in request:
1✔
1309
            replace_kwargs["description"] = request["Description"]
1✔
1310

1311
        if "Timeout" in request:
1✔
1312
            replace_kwargs["timeout"] = request["Timeout"]
1✔
1313

1314
        if "MemorySize" in request:
1✔
1315
            replace_kwargs["memory_size"] = request["MemorySize"]
1✔
1316

1317
        if "DeadLetterConfig" in request:
1✔
1318
            replace_kwargs["dead_letter_arn"] = request.get("DeadLetterConfig", {}).get("TargetArn")
1✔
1319

1320
        if vpc_config := request.get("VpcConfig"):
1✔
1321
            replace_kwargs["vpc_config"] = self._build_vpc_config(account_id, region, vpc_config)
1✔
1322

1323
        if "Handler" in request:
1✔
1324
            replace_kwargs["handler"] = request["Handler"]
1✔
1325

1326
        if "Runtime" in request:
1✔
1327
            runtime = request["Runtime"]
1✔
1328

1329
            if runtime not in ALL_RUNTIMES:
1✔
1330
                raise InvalidParameterValueException(
1✔
1331
                    f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1332
                    Type="User",
1333
                )
1334
            if runtime in DEPRECATED_RUNTIMES:
1✔
1335
                LOG.warning(
×
1336
                    "The Lambda runtime %s is deprecated. "
1337
                    "Please upgrade the runtime for the function %s: "
1338
                    "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
1339
                    runtime,
1340
                    function_name,
1341
                )
1342
            replace_kwargs["runtime"] = request["Runtime"]
1✔
1343

1344
        if snap_start := request.get("SnapStart"):
1✔
1345
            runtime = replace_kwargs.get("runtime") or latest_version_config.runtime
1✔
1346
            self._validate_snapstart(snap_start, runtime)
1✔
1347
            replace_kwargs["snap_start"] = SnapStartResponse(
1✔
1348
                ApplyOn=snap_start.get("ApplyOn", SnapStartApplyOn.None_),
1349
                OptimizationStatus=SnapStartOptimizationStatus.Off,
1350
            )
1351

1352
        if "Environment" in request:
1✔
1353
            if env_vars := request.get("Environment", {}).get("Variables", {}):
1✔
1354
                self._verify_env_variables(env_vars)
1✔
1355
            replace_kwargs["environment"] = env_vars
1✔
1356

1357
        if "Layers" in request:
1✔
1358
            new_layers = request["Layers"]
1✔
1359
            if new_layers:
1✔
1360
                self._validate_layers(new_layers, region=region, account_id=account_id)
1✔
1361
            replace_kwargs["layers"] = self.map_layers(new_layers)
1✔
1362

1363
        if "ImageConfig" in request:
1✔
1364
            new_image_config = request["ImageConfig"]
1✔
1365
            replace_kwargs["image_config"] = ImageConfig(
1✔
1366
                command=new_image_config.get("Command"),
1367
                entrypoint=new_image_config.get("EntryPoint"),
1368
                working_directory=new_image_config.get("WorkingDirectory"),
1369
            )
1370

1371
        if "LoggingConfig" in request:
1✔
1372
            logging_config = request["LoggingConfig"]
1✔
1373
            LOG.warning(
1✔
1374
                "Advanced Lambda Logging Configuration is currently mocked "
1375
                "and will not impact the logging behavior. "
1376
                "Please create a feature request if needed."
1377
            )
1378

1379
            # when switching to JSON, app and system level log is auto set to INFO
1380
            if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
1381
                logging_config = {
1✔
1382
                    "ApplicationLogLevel": "INFO",
1383
                    "SystemLogLevel": "INFO",
1384
                } | logging_config
1385

1386
            last_config = latest_version_config.logging_config
1✔
1387

1388
            # add partial update
1389
            new_logging_config = last_config | logging_config
1✔
1390

1391
            # in case we switched from JSON to Text we need to remove LogLevel keys
1392
            if (
1✔
1393
                new_logging_config.get("LogFormat") == LogFormat.Text
1394
                and last_config.get("LogFormat") == LogFormat.JSON
1395
            ):
1396
                new_logging_config.pop("ApplicationLogLevel", None)
1✔
1397
                new_logging_config.pop("SystemLogLevel", None)
1✔
1398

1399
            replace_kwargs["logging_config"] = new_logging_config
1✔
1400

1401
        if "TracingConfig" in request:
1✔
1402
            new_mode = request.get("TracingConfig", {}).get("Mode")
×
1403
            if new_mode:
×
1404
                replace_kwargs["tracing_config_mode"] = new_mode
×
1405

1406
        if "CapacityProviderConfig" in request:
1✔
1407
            if latest_version.config.CapacityProviderConfig and not request[
×
1408
                "CapacityProviderConfig"
1409
            ].get("LambdaManagedInstancesCapacityProviderConfig"):
1410
                raise ValidationException(
×
1411
                    "1 validation error detected: Value null at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig' failed to satisfy constraint: Member must not be null"
1412
                )
1413
            if not latest_version.config.CapacityProviderConfig:
×
1414
                raise InvalidParameterValueException(
×
1415
                    "CapacityProviderConfig isn't supported for Lambda Default functions.",
1416
                    Type="User",
1417
                )
1418

1419
        new_latest_version = dataclasses.replace(
1✔
1420
            latest_version,
1421
            config=dataclasses.replace(
1422
                latest_version_config,
1423
                last_modified=api_utils.generate_lambda_date(),
1424
                internal_revision=short_uid(),
1425
                last_update=UpdateStatus(
1426
                    status=LastUpdateStatus.InProgress,
1427
                    code="Creating",
1428
                    reason="The function is being created.",
1429
                ),
1430
                **replace_kwargs,
1431
            ),
1432
        )
1433
        function.versions["$LATEST"] = new_latest_version  # TODO: notify
1✔
1434
        self.lambda_service.update_version(new_version=new_latest_version)
1✔
1435

1436
        return api_utils.map_config_out(new_latest_version)
1✔
1437

1438
    @handler(operation="UpdateFunctionCode", expand=False)
1✔
1439
    def update_function_code(
1✔
1440
        self, context: RequestContext, request: UpdateFunctionCodeRequest
1441
    ) -> FunctionConfiguration:
1442
        """updates the $LATEST version of the function"""
1443
        # only supports normal zip packaging atm
1444
        # if request.get("Publish"):
1445
        #     self.lambda_service.create_function_version()
1446

1447
        function_name = request.get("FunctionName")
1✔
1448
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1449
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1450

1451
        store = lambda_stores[account_id][region]
1✔
1452
        if function_name not in store.functions:
1✔
1453
            raise ResourceNotFoundException(
×
1454
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1455
                Type="User",
1456
            )
1457
        function = store.functions[function_name]
1✔
1458

1459
        revision_id = request.get("RevisionId")
1✔
1460
        if revision_id and revision_id != function.latest().config.revision_id:
1✔
1461
            raise PreconditionFailedException(
1✔
1462
                "The Revision Id provided does not match the latest Revision Id. "
1463
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1464
                Type="User",
1465
            )
1466

1467
        # TODO verify if correct combination of code is set
1468
        image = None
1✔
1469
        if (
1✔
1470
            request.get("ZipFile") or request.get("S3Bucket")
1471
        ) and function.latest().config.package_type == PackageType.Image:
1472
            raise InvalidParameterValueException(
1✔
1473
                "Please provide ImageUri when updating a function with packageType Image.",
1474
                Type="User",
1475
            )
1476
        elif request.get("ImageUri") and function.latest().config.package_type == PackageType.Zip:
1✔
1477
            raise InvalidParameterValueException(
1✔
1478
                "Please don't provide ImageUri when updating a function with packageType Zip.",
1479
                Type="User",
1480
            )
1481

1482
        if zip_file := request.get("ZipFile"):
1✔
1483
            code = store_lambda_archive(
1✔
1484
                archive_file=zip_file,
1485
                function_name=function_name,
1486
                region_name=region,
1487
                account_id=account_id,
1488
            )
1489
        elif s3_bucket := request.get("S3Bucket"):
1✔
1490
            s3_key = request["S3Key"]
1✔
1491
            s3_object_version = request.get("S3ObjectVersion")
1✔
1492
            code = store_s3_bucket_archive(
1✔
1493
                archive_bucket=s3_bucket,
1494
                archive_key=s3_key,
1495
                archive_version=s3_object_version,
1496
                function_name=function_name,
1497
                region_name=region,
1498
                account_id=account_id,
1499
            )
1500
        elif image := request.get("ImageUri"):
1✔
1501
            code = None
1✔
1502
            image = create_image_code(image_uri=image)
1✔
1503
        else:
1504
            raise LambdaServiceException("A ZIP file, S3 bucket, or image is required")
×
1505

1506
        old_function_version = function.versions.get("$LATEST")
1✔
1507
        replace_kwargs = {"code": code} if code else {"image": image}
1✔
1508

1509
        if architectures := request.get("Architectures"):
1✔
1510
            if len(architectures) != 1:
×
1511
                raise ValidationException(
×
1512
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1513
                    f"satisfy constraint: Member must have length less than or equal to 1",
1514
                )
1515
            # An empty list of architectures is also forbidden. Further exceptions are tested here for create_function:
1516
            # tests.aws.services.lambda_.test_lambda_api.TestLambdaFunction.test_create_lambda_exceptions
1517
            if architectures[0] not in ARCHITECTURES:
×
1518
                raise ValidationException(
×
1519
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1520
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
1521
                    f"[x86_64, arm64], Member must not be null]",
1522
                )
1523
            replace_kwargs["architectures"] = architectures
×
1524

1525
        config = dataclasses.replace(
1✔
1526
            old_function_version.config,
1527
            internal_revision=short_uid(),
1528
            last_modified=api_utils.generate_lambda_date(),
1529
            last_update=UpdateStatus(
1530
                status=LastUpdateStatus.InProgress,
1531
                code="Creating",
1532
                reason="The function is being created.",
1533
            ),
1534
            **replace_kwargs,
1535
        )
1536
        function_version = dataclasses.replace(old_function_version, config=config)
1✔
1537
        function.versions["$LATEST"] = function_version
1✔
1538

1539
        self.lambda_service.update_version(new_version=function_version)
1✔
1540
        if request.get("Publish"):
1✔
1541
            function_version = self._publish_version_with_changes(
1✔
1542
                function_name=function_name,
1543
                region=region,
1544
                account_id=account_id,
1545
                # TODO: validations for PublishTo without Publish=True
1546
                publish_to=request.get("PublishTo"),
1547
                is_active=True,
1548
            )
1549
        return api_utils.map_config_out(
1✔
1550
            function_version, return_qualified_arn=bool(request.get("Publish"))
1551
        )
1552

1553
    # TODO: does deleting the latest published version affect the next versions number?
1554
    # TODO: what happens when we call this with a qualifier and a fully qualified ARN? (+ conflicts?)
1555
    # TODO: test different ARN patterns (shorthand ARN?)
1556
    # TODO: test deleting across regions?
1557
    # TODO: test mismatch between context region and region in ARN
1558
    # TODO: test qualifier $LATEST, alias-name and version
1559
    def delete_function(
1✔
1560
        self,
1561
        context: RequestContext,
1562
        function_name: NamespacedFunctionName,
1563
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1564
        **kwargs,
1565
    ) -> DeleteFunctionResponse:
1566
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1567
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1568
            function_name, qualifier, context
1569
        )
1570

1571
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1572
            raise InvalidParameterValueException(
×
1573
                "Deletion of aliases is not currently supported.",
1574
                Type="User",
1575
            )
1576

1577
        store = lambda_stores[account_id][region]
1✔
1578
        if qualifier == "$LATEST":
1✔
1579
            raise InvalidParameterValueException(
1✔
1580
                "$LATEST version cannot be deleted without deleting the function.", Type="User"
1581
            )
1582

1583
        if function_name not in store.functions:
1✔
1584
            e = ResourceNotFoundException(
1✔
1585
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1586
                Type="User",
1587
            )
1588
            raise e
1✔
1589
        function = store.functions.get(function_name)
1✔
1590

1591
        function_has_capacity_provider = False
1✔
1592
        if qualifier:
1✔
1593
            # delete a version of the function
1594
            version = function.versions.pop(qualifier, None)
1✔
1595
            if version:
1✔
1596
                if version.config.CapacityProviderConfig:
1✔
1597
                    function_has_capacity_provider = True
×
1598
                self.lambda_service.stop_version(version.id.qualified_arn())
1✔
1599
                destroy_code_if_not_used(code=version.config.code, function=function)
1✔
1600
        else:
1601
            # delete the whole function
1602
            # TODO: introduce locking for safe deletion: We could create a new version at the API layer before
1603
            #  the old version gets cleaned up in the internal lambda service.
1604
            function = store.functions.pop(function_name)
1✔
1605
            for version in function.versions.values():
1✔
1606
                # Functions with a capacity provider do NOT have a version manager for $LATEST because only
1607
                # published versions are invokable.
1608
                if version.config.CapacityProviderConfig:
1✔
1609
                    function_has_capacity_provider = True
×
1610
                    if version.id.qualifier == "$LATEST":
×
1611
                        pass
×
1612
                else:
1613
                    self.lambda_service.stop_version(qualified_arn=version.id.qualified_arn())
1✔
1614
                # we can safely destroy the code here
1615
                if version.config.code:
1✔
1616
                    version.config.code.destroy()
1✔
1617

1618
        return DeleteFunctionResponse(StatusCode=202 if function_has_capacity_provider else 204)
1✔
1619

1620
    def list_functions(
1✔
1621
        self,
1622
        context: RequestContext,
1623
        master_region: MasterRegion = None,  # (only relevant for lambda@edge)
1624
        function_version: FunctionVersionApi = None,
1625
        marker: String = None,
1626
        max_items: MaxListItems = None,
1627
        **kwargs,
1628
    ) -> ListFunctionsResponse:
1629
        state = lambda_stores[context.account_id][context.region]
1✔
1630

1631
        if function_version and function_version != FunctionVersionApi.ALL:
1✔
1632
            raise ValidationException(
1✔
1633
                f"1 validation error detected: Value '{function_version}'"
1634
                + " at 'functionVersion' failed to satisfy constraint: Member must satisfy enum value set: [ALL]"
1635
            )
1636

1637
        if function_version == FunctionVersionApi.ALL:
1✔
1638
            # include all versions for all function
1639
            versions = [v for f in state.functions.values() for v in f.versions.values()]
1✔
1640
            return_qualified_arn = True
1✔
1641
        else:
1642
            versions = [f.latest() for f in state.functions.values()]
1✔
1643
            return_qualified_arn = False
1✔
1644

1645
        versions = [
1✔
1646
            api_utils.map_to_list_response(
1647
                api_utils.map_config_out(fc, return_qualified_arn=return_qualified_arn)
1648
            )
1649
            for fc in versions
1650
        ]
1651
        versions = PaginatedList(versions)
1✔
1652
        page, token = versions.get_page(
1✔
1653
            lambda version: version["FunctionArn"],
1654
            marker,
1655
            max_items,
1656
        )
1657
        return ListFunctionsResponse(Functions=page, NextMarker=token)
1✔
1658

1659
    def get_function(
1✔
1660
        self,
1661
        context: RequestContext,
1662
        function_name: NamespacedFunctionName,
1663
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1664
        **kwargs,
1665
    ) -> GetFunctionResponse:
1666
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1667
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1668
            function_name, qualifier, context
1669
        )
1670

1671
        fn = lambda_stores[account_id][region].functions.get(function_name)
1✔
1672
        if fn is None:
1✔
1673
            if qualifier is None:
1✔
1674
                raise ResourceNotFoundException(
1✔
1675
                    f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
1676
                    Type="User",
1677
                )
1678
            else:
1679
                raise ResourceNotFoundException(
1✔
1680
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
1681
                    Type="User",
1682
                )
1683
        alias_name = None
1✔
1684
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1685
            if qualifier not in fn.aliases:
1✔
1686
                alias_arn = api_utils.qualified_lambda_arn(
1✔
1687
                    function_name, qualifier, account_id, region
1688
                )
1689
                raise ResourceNotFoundException(f"Function not found: {alias_arn}", Type="User")
1✔
1690
            alias_name = qualifier
1✔
1691
            qualifier = fn.aliases[alias_name].function_version
1✔
1692

1693
        version = get_function_version(
1✔
1694
            function_name=function_name,
1695
            qualifier=qualifier,
1696
            account_id=account_id,
1697
            region=region,
1698
        )
1699
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
1700
        additional_fields = {}
1✔
1701
        if tags:
1✔
1702
            additional_fields["Tags"] = tags
1✔
1703
        code_location = None
1✔
1704
        if code := version.config.code:
1✔
1705
            code_location = FunctionCodeLocation(
1✔
1706
                Location=code.generate_presigned_url(endpoint_url=config.external_service_url()),
1707
                RepositoryType="S3",
1708
            )
1709
        elif image := version.config.image:
1✔
1710
            code_location = FunctionCodeLocation(
1✔
1711
                ImageUri=image.image_uri,
1712
                RepositoryType=image.repository_type,
1713
                ResolvedImageUri=image.resolved_image_uri,
1714
            )
1715
        concurrency = None
1✔
1716
        if fn.reserved_concurrent_executions:
1✔
1717
            concurrency = Concurrency(
1✔
1718
                ReservedConcurrentExecutions=fn.reserved_concurrent_executions
1719
            )
1720

1721
        return GetFunctionResponse(
1✔
1722
            Configuration=api_utils.map_config_out(
1723
                version, return_qualified_arn=bool(qualifier), alias_name=alias_name
1724
            ),
1725
            Code=code_location,  # TODO
1726
            Concurrency=concurrency,
1727
            **additional_fields,
1728
        )
1729

1730
    def get_function_configuration(
1✔
1731
        self,
1732
        context: RequestContext,
1733
        function_name: NamespacedFunctionName,
1734
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1735
        **kwargs,
1736
    ) -> FunctionConfiguration:
1737
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1738
        # CAVE: THIS RETURN VALUE IS *NOT* THE SAME AS IN get_function (!) but seems to be only configuration part?
1739
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1740
            function_name, qualifier, context
1741
        )
1742
        version = get_function_version(
1✔
1743
            function_name=function_name,
1744
            qualifier=qualifier,
1745
            account_id=account_id,
1746
            region=region,
1747
        )
1748
        return api_utils.map_config_out(version, return_qualified_arn=bool(qualifier))
1✔
1749

1750
    def invoke(
1✔
1751
        self,
1752
        context: RequestContext,
1753
        function_name: NamespacedFunctionName,
1754
        invocation_type: InvocationType | None = None,
1755
        log_type: LogType | None = None,
1756
        client_context: String | None = None,
1757
        durable_execution_name: DurableExecutionName | None = None,
1758
        payload: IO[Blob] | None = None,
1759
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1760
        tenant_id: TenantId | None = None,
1761
        **kwargs,
1762
    ) -> InvocationResponse:
1763
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1764
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1765
            function_name, qualifier, context
1766
        )
1767

1768
        user_agent = context.request.user_agent.string
1✔
1769

1770
        time_before = time.perf_counter()
1✔
1771
        try:
1✔
1772
            invocation_result = self.lambda_service.invoke(
1✔
1773
                function_name=function_name,
1774
                qualifier=qualifier,
1775
                region=region,
1776
                account_id=account_id,
1777
                invocation_type=invocation_type,
1778
                client_context=client_context,
1779
                request_id=context.request_id,
1780
                trace_context=context.trace_context,
1781
                payload=payload.read() if payload else None,
1782
                user_agent=user_agent,
1783
            )
1784
        except ServiceException:
1✔
1785
            raise
1✔
1786
        except EnvironmentStartupTimeoutException as e:
1✔
1787
            raise LambdaServiceException(
1✔
1788
                f"[{context.request_id}] Timeout while starting up lambda environment for function {function_name}:{qualifier}"
1789
            ) from e
1790
        except Exception as e:
1✔
1791
            LOG.error(
1✔
1792
                "[%s] Error while invoking lambda %s",
1793
                context.request_id,
1794
                function_name,
1795
                exc_info=LOG.isEnabledFor(logging.DEBUG),
1796
            )
1797
            raise LambdaServiceException(
1✔
1798
                f"[{context.request_id}] Internal error while executing lambda {function_name}:{qualifier}. Caused by {type(e).__name__}: {e}"
1799
            ) from e
1800

1801
        if invocation_type == InvocationType.Event:
1✔
1802
            # This happens when invocation type is event
1803
            return InvocationResponse(StatusCode=202)
1✔
1804
        if invocation_type == InvocationType.DryRun:
1✔
1805
            # This happens when invocation type is dryrun
1806
            return InvocationResponse(StatusCode=204)
1✔
1807
        LOG.debug("Lambda invocation duration: %0.2fms", (time.perf_counter() - time_before) * 1000)
1✔
1808

1809
        response = InvocationResponse(
1✔
1810
            StatusCode=200,
1811
            Payload=invocation_result.payload,
1812
            ExecutedVersion=invocation_result.executed_version,
1813
        )
1814

1815
        if invocation_result.is_error:
1✔
1816
            response["FunctionError"] = "Unhandled"
1✔
1817

1818
        if log_type == LogType.Tail:
1✔
1819
            response["LogResult"] = to_str(
1✔
1820
                base64.b64encode(to_bytes(invocation_result.logs)[-4096:])
1821
            )
1822

1823
        return response
1✔
1824

1825
    # Version operations
1826
    def publish_version(
1✔
1827
        self,
1828
        context: RequestContext,
1829
        function_name: FunctionName,
1830
        code_sha256: String | None = None,
1831
        description: Description | None = None,
1832
        revision_id: String | None = None,
1833
        publish_to: FunctionVersionLatestPublished | None = None,
1834
        **kwargs,
1835
    ) -> FunctionConfiguration:
1836
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1837
        function_name = api_utils.get_function_name(function_name, context)
1✔
1838
        new_version = self._publish_version_from_existing_version(
1✔
1839
            function_name=function_name,
1840
            description=description,
1841
            account_id=account_id,
1842
            region=region,
1843
            revision_id=revision_id,
1844
            code_sha256=code_sha256,
1845
            publish_to=publish_to,
1846
        )
1847
        return api_utils.map_config_out(new_version, return_qualified_arn=True)
1✔
1848

1849
    def list_versions_by_function(
1✔
1850
        self,
1851
        context: RequestContext,
1852
        function_name: NamespacedFunctionName,
1853
        marker: String = None,
1854
        max_items: MaxListItems = None,
1855
        **kwargs,
1856
    ) -> ListVersionsByFunctionResponse:
1857
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1858
        function_name = api_utils.get_function_name(function_name, context)
1✔
1859
        function = self._get_function(
1✔
1860
            function_name=function_name, region=region, account_id=account_id
1861
        )
1862
        versions = [
1✔
1863
            api_utils.map_to_list_response(
1864
                api_utils.map_config_out(version=version, return_qualified_arn=True)
1865
            )
1866
            for version in function.versions.values()
1867
        ]
1868
        items = PaginatedList(versions)
1✔
1869
        page, token = items.get_page(
1✔
1870
            lambda item: item,
1871
            marker,
1872
            max_items,
1873
        )
1874
        return ListVersionsByFunctionResponse(Versions=page, NextMarker=token)
1✔
1875

1876
    # Alias
1877

1878
    def _create_routing_config_model(
1✔
1879
        self, routing_config_dict: dict[str, float], function_version: FunctionVersion
1880
    ):
1881
        if len(routing_config_dict) > 1:
1✔
1882
            raise InvalidParameterValueException(
1✔
1883
                "Number of items in AdditionalVersionWeights cannot be greater than 1",
1884
                Type="User",
1885
            )
1886
        # should be exactly one item here, still iterating, might be supported in the future
1887
        for key, value in routing_config_dict.items():
1✔
1888
            if value < 0.0 or value >= 1.0:
1✔
1889
                raise ValidationException(
1✔
1890
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map value must satisfy constraint: [Member must have value less than or equal to 1.0, Member must have value greater than or equal to 0.0, Member must not be null]"
1891
                )
1892
            if key == function_version.id.qualifier:
1✔
1893
                raise InvalidParameterValueException(
1✔
1894
                    f"Invalid function version {function_version.id.qualifier}. Function version {function_version.id.qualifier} is already included in routing configuration.",
1895
                    Type="User",
1896
                )
1897
            # check if version target is latest, then no routing config is allowed
1898
            if function_version.id.qualifier == "$LATEST":
1✔
1899
                raise InvalidParameterValueException(
1✔
1900
                    "$LATEST is not supported for an alias pointing to more than 1 version"
1901
                )
1902
            if not api_utils.qualifier_is_version(key):
1✔
1903
                raise ValidationException(
1✔
1904
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map keys must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: [0-9]+]"
1905
                )
1906

1907
            # checking if the version in the config exists
1908
            get_function_version(
1✔
1909
                function_name=function_version.id.function_name,
1910
                qualifier=key,
1911
                region=function_version.id.region,
1912
                account_id=function_version.id.account,
1913
            )
1914
        return AliasRoutingConfig(version_weights=routing_config_dict)
1✔
1915

1916
    def create_alias(
1✔
1917
        self,
1918
        context: RequestContext,
1919
        function_name: FunctionName,
1920
        name: Alias,
1921
        function_version: VersionWithLatestPublished,
1922
        description: Description = None,
1923
        routing_config: AliasRoutingConfiguration = None,
1924
        **kwargs,
1925
    ) -> AliasConfiguration:
1926
        if not api_utils.qualifier_is_alias(name):
1✔
1927
            raise ValidationException(
1✔
1928
                f"1 validation error detected: Value '{name}' at 'name' failed to satisfy constraint: Member must satisfy regular expression pattern: (?!^[0-9]+$)([a-zA-Z0-9-_]+)"
1929
            )
1930

1931
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1932
        function_name = api_utils.get_function_name(function_name, context)
1✔
1933
        target_version = get_function_version(
1✔
1934
            function_name=function_name,
1935
            qualifier=function_version,
1936
            region=region,
1937
            account_id=account_id,
1938
        )
1939
        function = self._get_function(
1✔
1940
            function_name=function_name, region=region, account_id=account_id
1941
        )
1942
        # description is always present, if not specified it's an empty string
1943
        description = description or ""
1✔
1944
        with function.lock:
1✔
1945
            if existing_alias := function.aliases.get(name):
1✔
1946
                raise ResourceConflictException(
1✔
1947
                    f"Alias already exists: {api_utils.map_alias_out(alias=existing_alias, function=function)['AliasArn']}",
1948
                    Type="User",
1949
                )
1950
            # checking if the version exists
1951
            routing_configuration = None
1✔
1952
            if routing_config and (
1✔
1953
                routing_config_dict := routing_config.get("AdditionalVersionWeights")
1954
            ):
1955
                routing_configuration = self._create_routing_config_model(
1✔
1956
                    routing_config_dict, target_version
1957
                )
1958

1959
            alias = VersionAlias(
1✔
1960
                name=name,
1961
                function_version=function_version,
1962
                description=description,
1963
                routing_configuration=routing_configuration,
1964
            )
1965
            function.aliases[name] = alias
1✔
1966
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1967

1968
    def list_aliases(
1✔
1969
        self,
1970
        context: RequestContext,
1971
        function_name: FunctionName,
1972
        function_version: VersionWithLatestPublished = None,
1973
        marker: String = None,
1974
        max_items: MaxListItems = None,
1975
        **kwargs,
1976
    ) -> ListAliasesResponse:
1977
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1978
        function_name = api_utils.get_function_name(function_name, context)
1✔
1979
        function = self._get_function(
1✔
1980
            function_name=function_name, region=region, account_id=account_id
1981
        )
1982
        aliases = [
1✔
1983
            api_utils.map_alias_out(alias, function)
1984
            for alias in function.aliases.values()
1985
            if function_version is None or alias.function_version == function_version
1986
        ]
1987

1988
        aliases = PaginatedList(aliases)
1✔
1989
        page, token = aliases.get_page(
1✔
1990
            lambda alias: alias["AliasArn"],
1991
            marker,
1992
            max_items,
1993
        )
1994

1995
        return ListAliasesResponse(Aliases=page, NextMarker=token)
1✔
1996

1997
    def delete_alias(
1✔
1998
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
1999
    ) -> None:
2000
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2001
        function_name = api_utils.get_function_name(function_name, context)
1✔
2002
        function = self._get_function(
1✔
2003
            function_name=function_name, region=region, account_id=account_id
2004
        )
2005
        version_alias = function.aliases.pop(name, None)
1✔
2006

2007
        # cleanup related resources
2008
        if name in function.provisioned_concurrency_configs:
1✔
2009
            function.provisioned_concurrency_configs.pop(name)
1✔
2010

2011
        # TODO: Allow for deactivating/unregistering specific Lambda URLs
2012
        if version_alias and name in function.function_url_configs:
1✔
2013
            url_config = function.function_url_configs.pop(name)
1✔
2014
            LOG.debug(
1✔
2015
                "Stopping aliased Lambda Function URL %s for %s",
2016
                url_config.url,
2017
                url_config.function_name,
2018
            )
2019

2020
    def get_alias(
1✔
2021
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
2022
    ) -> AliasConfiguration:
2023
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2024
        function_name = api_utils.get_function_name(function_name, context)
1✔
2025
        function = self._get_function(
1✔
2026
            function_name=function_name, region=region, account_id=account_id
2027
        )
2028
        if not (alias := function.aliases.get(name)):
1✔
2029
            raise ResourceNotFoundException(
1✔
2030
                f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name=function_name, qualifier=name, region=region, account=account_id)}",
2031
                Type="User",
2032
            )
2033
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
2034

2035
    def update_alias(
1✔
2036
        self,
2037
        context: RequestContext,
2038
        function_name: FunctionName,
2039
        name: Alias,
2040
        function_version: VersionWithLatestPublished = None,
2041
        description: Description = None,
2042
        routing_config: AliasRoutingConfiguration = None,
2043
        revision_id: String = None,
2044
        **kwargs,
2045
    ) -> AliasConfiguration:
2046
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2047
        function_name = api_utils.get_function_name(function_name, context)
1✔
2048
        function = self._get_function(
1✔
2049
            function_name=function_name, region=region, account_id=account_id
2050
        )
2051
        if not (alias := function.aliases.get(name)):
1✔
2052
            fn_arn = api_utils.qualified_lambda_arn(function_name, name, account_id, region)
1✔
2053
            raise ResourceNotFoundException(
1✔
2054
                f"Alias not found: {fn_arn}",
2055
                Type="User",
2056
            )
2057
        if revision_id and alias.revision_id != revision_id:
1✔
2058
            raise PreconditionFailedException(
1✔
2059
                "The Revision Id provided does not match the latest Revision Id. "
2060
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2061
                Type="User",
2062
            )
2063
        changes = {}
1✔
2064
        if function_version is not None:
1✔
2065
            changes |= {"function_version": function_version}
1✔
2066
        if description is not None:
1✔
2067
            changes |= {"description": description}
1✔
2068
        if routing_config is not None:
1✔
2069
            # if it is an empty dict or AdditionalVersionWeights is empty, set routing config to None
2070
            new_routing_config = None
1✔
2071
            if routing_config_dict := routing_config.get("AdditionalVersionWeights"):
1✔
2072
                new_routing_config = self._create_routing_config_model(routing_config_dict)
×
2073
            changes |= {"routing_configuration": new_routing_config}
1✔
2074
        # even if no changes are done, we have to update revision id for some reason
2075
        old_alias = alias
1✔
2076
        alias = dataclasses.replace(alias, **changes)
1✔
2077
        function.aliases[name] = alias
1✔
2078

2079
        # TODO: signal lambda service that pointer potentially changed
2080
        self.lambda_service.update_alias(old_alias=old_alias, new_alias=alias, function=function)
1✔
2081

2082
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
2083

2084
    # =======================================
2085
    # ======= EVENT SOURCE MAPPINGS =========
2086
    # =======================================
2087
    def check_service_resource_exists(
1✔
2088
        self, service: str, resource_arn: str, function_arn: str, function_role_arn: str
2089
    ):
2090
        """
2091
        Check if the service resource exists and if the function has access to it.
2092

2093
        Raises:
2094
            InvalidParameterValueException: If the service resource does not exist or the function does not have access to it.
2095
        """
2096
        arn = parse_arn(resource_arn)
1✔
2097
        source_client = get_internal_client(
1✔
2098
            arn=resource_arn,
2099
            role_arn=function_role_arn,
2100
            service_principal=ServicePrincipal.lambda_,
2101
            source_arn=function_arn,
2102
        )
2103
        if service in ["sqs", "sqs-fifo"]:
1✔
2104
            try:
1✔
2105
                # AWS uses `GetQueueAttributes` internally to verify the queue existence, but we need the `QueueUrl`
2106
                # which is not given directly. We build out a dummy `QueueUrl` which can be parsed by SQS to return
2107
                # the right value
2108
                queue_name = arn["resource"].split("/")[-1]
1✔
2109
                queue_url = f"http://sqs.{arn['region']}.domain/{arn['account']}/{queue_name}"
1✔
2110
                source_client.get_queue_attributes(QueueUrl=queue_url)
1✔
2111
            except ClientError as e:
1✔
2112
                error_code = e.response["Error"]["Code"]
1✔
2113
                if error_code == "AWS.SimpleQueueService.NonExistentQueue":
1✔
2114
                    raise InvalidParameterValueException(
1✔
2115
                        f"Error occurred while ReceiveMessage. SQS Error Code: {error_code}. SQS Error Message: {e.response['Error']['Message']}",
2116
                        Type="User",
2117
                    )
2118
                raise e
×
2119
        elif service in ["kinesis"]:
1✔
2120
            try:
1✔
2121
                source_client.describe_stream(StreamARN=resource_arn)
1✔
2122
            except ClientError as e:
1✔
2123
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
2124
                    raise InvalidParameterValueException(
1✔
2125
                        f"Stream not found: {resource_arn}",
2126
                        Type="User",
2127
                    )
2128
                raise e
×
2129
        elif service in ["dynamodb"]:
1✔
2130
            try:
1✔
2131
                source_client.describe_stream(StreamArn=resource_arn)
1✔
2132
            except ClientError as e:
1✔
2133
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
2134
                    raise InvalidParameterValueException(
1✔
2135
                        f"Stream not found: {resource_arn}",
2136
                        Type="User",
2137
                    )
2138
                raise e
×
2139

2140
    @handler("CreateEventSourceMapping", expand=False)
1✔
2141
    def create_event_source_mapping(
1✔
2142
        self,
2143
        context: RequestContext,
2144
        request: CreateEventSourceMappingRequest,
2145
    ) -> EventSourceMappingConfiguration:
2146
        return self.create_event_source_mapping_v2(context, request)
1✔
2147

2148
    def create_event_source_mapping_v2(
1✔
2149
        self,
2150
        context: RequestContext,
2151
        request: CreateEventSourceMappingRequest,
2152
    ) -> EventSourceMappingConfiguration:
2153
        # Validations
2154
        function_arn, function_name, state, function_version, function_role = (
1✔
2155
            self.validate_event_source_mapping(context, request)
2156
        )
2157

2158
        esm_config = EsmConfigFactory(request, context, function_arn).get_esm_config()
1✔
2159

2160
        # Copy esm_config to avoid a race condition with potential async update in the store
2161
        state.event_source_mappings[esm_config["UUID"]] = esm_config.copy()
1✔
2162
        enabled = request.get("Enabled", True)
1✔
2163
        # TODO: check for potential async race condition update -> think about locking
2164
        esm_worker = EsmWorkerFactory(esm_config, function_role, enabled).get_esm_worker()
1✔
2165
        self.esm_workers[esm_worker.uuid] = esm_worker
1✔
2166
        # TODO: check StateTransitionReason, LastModified, LastProcessingResult (concurrent updates requires locking!)
2167
        if tags := request.get("Tags"):
1✔
2168
            self._store_tags(esm_config.get("EventSourceMappingArn"), tags)
1✔
2169
        esm_worker.create()
1✔
2170
        return esm_config
1✔
2171

2172
    def validate_event_source_mapping(self, context, request):
1✔
2173
        # TODO: test whether stream ARNs are valid sources for Pipes or ESM or whether only DynamoDB table ARNs work
2174
        # TODO: Validate MaxRecordAgeInSeconds (i.e cannot subceed 60s but can be -1) and MaxRetryAttempts parameters.
2175
        # See https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumrecordageinseconds
2176
        is_create_esm_request = context.operation.name == self.create_event_source_mapping.operation
1✔
2177

2178
        if destination_config := request.get("DestinationConfig"):
1✔
2179
            if "OnSuccess" in destination_config:
1✔
2180
                raise InvalidParameterValueException(
1✔
2181
                    "Unsupported DestinationConfig parameter for given event source mapping type.",
2182
                    Type="User",
2183
                )
2184

2185
        service = None
1✔
2186
        if "SelfManagedEventSource" in request:
1✔
2187
            service = "kafka"
×
2188
            if "SourceAccessConfigurations" not in request:
×
2189
                raise InvalidParameterValueException(
×
2190
                    "Required 'sourceAccessConfigurations' parameter is missing.", Type="User"
2191
                )
2192
        if service is None and "EventSourceArn" not in request:
1✔
2193
            raise InvalidParameterValueException("Unrecognized event source.", Type="User")
1✔
2194
        if service is None:
1✔
2195
            service = extract_service_from_arn(request["EventSourceArn"])
1✔
2196

2197
        batch_size = api_utils.validate_and_set_batch_size(service, request.get("BatchSize"))
1✔
2198
        if service in ["dynamodb", "kinesis"]:
1✔
2199
            starting_position = request.get("StartingPosition")
1✔
2200
            if not starting_position:
1✔
2201
                raise InvalidParameterValueException(
1✔
2202
                    "1 validation error detected: Value null at 'startingPosition' failed to satisfy constraint: Member must not be null.",
2203
                    Type="User",
2204
                )
2205

2206
            if starting_position not in KinesisStreamStartPosition.__members__:
1✔
2207
                raise ValidationException(
1✔
2208
                    f"1 validation error detected: Value '{starting_position}' at 'startingPosition' failed to satisfy constraint: Member must satisfy enum value set: [LATEST, AT_TIMESTAMP, TRIM_HORIZON]"
2209
                )
2210
            # AT_TIMESTAMP is not allowed for DynamoDB Streams
2211
            elif (
1✔
2212
                service == "dynamodb"
2213
                and starting_position not in DynamoDBStreamStartPosition.__members__
2214
            ):
2215
                raise InvalidParameterValueException(
1✔
2216
                    f"Unsupported starting position for arn type: {request['EventSourceArn']}",
2217
                    Type="User",
2218
                )
2219

2220
        if service in ["sqs", "sqs-fifo"]:
1✔
2221
            if batch_size > 10 and request.get("MaximumBatchingWindowInSeconds", 0) == 0:
1✔
2222
                raise InvalidParameterValueException(
1✔
2223
                    "Maximum batch window in seconds must be greater than 0 if maximum batch size is greater than 10",
2224
                    Type="User",
2225
                )
2226

2227
        if (filter_criteria := request.get("FilterCriteria")) is not None:
1✔
2228
            for filter_ in filter_criteria.get("Filters", []):
1✔
2229
                pattern_str = filter_.get("Pattern")
1✔
2230
                if not pattern_str or not isinstance(pattern_str, str):
1✔
2231
                    raise InvalidParameterValueException(
×
2232
                        "Invalid filter pattern definition.", Type="User"
2233
                    )
2234

2235
                if not validate_event_pattern(pattern_str):
1✔
2236
                    raise InvalidParameterValueException(
1✔
2237
                        "Invalid filter pattern definition.", Type="User"
2238
                    )
2239

2240
        # Can either have a FunctionName (i.e CreateEventSourceMapping request) or
2241
        # an internal EventSourceMappingConfiguration representation
2242
        request_function_name = request.get("FunctionName") or request.get("FunctionArn")
1✔
2243
        # can be either a partial arn or a full arn for the version/alias
2244
        function_name, qualifier, account, region = function_locators_from_arn(
1✔
2245
            request_function_name
2246
        )
2247
        # TODO: validate `context.region` vs. `region(request["FunctionName"])` vs. `region(request["EventSourceArn"])`
2248
        account = account or context.account_id
1✔
2249
        region = region or context.region
1✔
2250
        state = lambda_stores[account][region]
1✔
2251
        fn = state.functions.get(function_name)
1✔
2252
        if not fn:
1✔
2253
            raise InvalidParameterValueException("Function does not exist", Type="User")
1✔
2254

2255
        if qualifier:
1✔
2256
            # make sure the function version/alias exists
2257
            if api_utils.qualifier_is_alias(qualifier):
1✔
2258
                fn_alias = fn.aliases.get(qualifier)
1✔
2259
                if not fn_alias:
1✔
2260
                    raise Exception("unknown alias")  # TODO: cover via test
×
2261
            elif api_utils.qualifier_is_version(qualifier):
1✔
2262
                fn_version = fn.versions.get(qualifier)
1✔
2263
                if not fn_version:
1✔
2264
                    raise Exception("unknown version")  # TODO: cover via test
×
2265
            elif qualifier == "$LATEST":
1✔
2266
                pass
1✔
2267
            elif qualifier == "$LATEST.PUBLISHED":
×
2268
                if fn.versions.get(qualifier):
×
2269
                    pass
×
2270
            else:
2271
                raise Exception("invalid functionname")  # TODO: cover via test
×
2272
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account, region)
1✔
2273

2274
        else:
2275
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account, region)
1✔
2276

2277
        function_version = get_function_version_from_arn(fn_arn)
1✔
2278
        function_role = function_version.config.role
1✔
2279

2280
        if source_arn := request.get("EventSourceArn"):
1✔
2281
            self.check_service_resource_exists(service, source_arn, fn_arn, function_role)
1✔
2282
        # Check we are validating a CreateEventSourceMapping request
2283
        if is_create_esm_request:
1✔
2284

2285
            def _get_mapping_sources(mapping: dict[str, Any]) -> list[str]:
1✔
2286
                if event_source_arn := mapping.get("EventSourceArn"):
1✔
2287
                    return [event_source_arn]
1✔
2288
                return (
×
2289
                    mapping.get("SelfManagedEventSource", {})
2290
                    .get("Endpoints", {})
2291
                    .get("KAFKA_BOOTSTRAP_SERVERS", [])
2292
                )
2293

2294
            # check for event source duplicates
2295
            # TODO: currently validated for sqs, kinesis, and dynamodb
2296
            service_id = load_service(service).service_id
1✔
2297
            for uuid, mapping in state.event_source_mappings.items():
1✔
2298
                mapping_sources = _get_mapping_sources(mapping)
1✔
2299
                request_sources = _get_mapping_sources(request)
1✔
2300
                if mapping["FunctionArn"] == fn_arn and (
1✔
2301
                    set(mapping_sources).intersection(request_sources)
2302
                ):
2303
                    if service == "sqs":
1✔
2304
                        # *shakes fist at SQS*
2305
                        raise ResourceConflictException(
1✔
2306
                            f'An event source mapping with {service_id} arn (" {mapping["EventSourceArn"]} ") '
2307
                            f'and function (" {function_name} ") already exists. Please update or delete the '
2308
                            f"existing mapping with UUID {uuid}",
2309
                            Type="User",
2310
                        )
2311
                    elif service == "kafka":
1✔
2312
                        if set(mapping["Topics"]).intersection(request["Topics"]):
×
2313
                            raise ResourceConflictException(
×
2314
                                f'An event source mapping with event source ("{",".join(request_sources)}"), '
2315
                                f'function ("{fn_arn}"), '
2316
                                f'topics ("{",".join(request["Topics"])}") already exists. Please update or delete the '
2317
                                f"existing mapping with UUID {uuid}",
2318
                                Type="User",
2319
                            )
2320
                    else:
2321
                        raise ResourceConflictException(
1✔
2322
                            f'The event source arn (" {mapping["EventSourceArn"]} ") and function '
2323
                            f'(" {function_name} ") provided mapping already exists. Please update or delete the '
2324
                            f"existing mapping with UUID {uuid}",
2325
                            Type="User",
2326
                        )
2327
        return fn_arn, function_name, state, function_version, function_role
1✔
2328

2329
    @handler("UpdateEventSourceMapping", expand=False)
1✔
2330
    def update_event_source_mapping(
1✔
2331
        self,
2332
        context: RequestContext,
2333
        request: UpdateEventSourceMappingRequest,
2334
    ) -> EventSourceMappingConfiguration:
2335
        return self.update_event_source_mapping_v2(context, request)
1✔
2336

2337
    def update_event_source_mapping_v2(
1✔
2338
        self,
2339
        context: RequestContext,
2340
        request: UpdateEventSourceMappingRequest,
2341
    ) -> EventSourceMappingConfiguration:
2342
        # TODO: test and implement this properly (quite complex with many validations and limitations!)
2343
        LOG.warning(
1✔
2344
            "Updating Lambda Event Source Mapping is in experimental state and not yet fully tested."
2345
        )
2346
        state = lambda_stores[context.account_id][context.region]
1✔
2347
        request_data = {**request}
1✔
2348
        uuid = request_data.pop("UUID", None)
1✔
2349
        if not uuid:
1✔
2350
            raise ResourceNotFoundException(
×
2351
                "The resource you requested does not exist.", Type="User"
2352
            )
2353
        old_event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2354
        esm_worker = self.esm_workers.get(uuid)
1✔
2355
        if old_event_source_mapping is None or esm_worker is None:
1✔
2356
            raise ResourceNotFoundException(
1✔
2357
                "The resource you requested does not exist.", Type="User"
2358
            )  # TODO: test?
2359

2360
        # normalize values to overwrite
2361
        event_source_mapping = old_event_source_mapping | request_data
1✔
2362

2363
        temp_params = {}  # values only set for the returned response, not saved internally (e.g. transient state)
1✔
2364

2365
        # Validate the newly updated ESM object. We ignore the output here since we only care whether an Exception is raised.
2366
        function_arn, _, _, function_version, function_role = self.validate_event_source_mapping(
1✔
2367
            context, event_source_mapping
2368
        )
2369

2370
        # remove the FunctionName field
2371
        event_source_mapping.pop("FunctionName", None)
1✔
2372

2373
        if function_arn:
1✔
2374
            event_source_mapping["FunctionArn"] = function_arn
1✔
2375

2376
        # Only apply update if the desired state differs
2377
        enabled = request.get("Enabled")
1✔
2378
        if enabled is not None:
1✔
2379
            if enabled and old_event_source_mapping["State"] != EsmState.ENABLED:
1✔
2380
                event_source_mapping["State"] = EsmState.ENABLING
1✔
2381
            # TODO: What happens when trying to update during an update or failed state?!
2382
            elif not enabled and old_event_source_mapping["State"] == EsmState.ENABLED:
1✔
2383
                event_source_mapping["State"] = EsmState.DISABLING
1✔
2384
        else:
2385
            event_source_mapping["State"] = EsmState.UPDATING
1✔
2386

2387
        # To ensure parity, certain responses need to be immediately returned
2388
        temp_params["State"] = event_source_mapping["State"]
1✔
2389

2390
        state.event_source_mappings[uuid] = event_source_mapping
1✔
2391

2392
        # TODO: Currently, we re-create the entire ESM worker. Look into approach with better performance.
2393
        worker_factory = EsmWorkerFactory(
1✔
2394
            event_source_mapping, function_role, request.get("Enabled", esm_worker.enabled)
2395
        )
2396

2397
        # Get a new ESM worker object but do not active it, since the factory holds all logic for creating new worker from configuration.
2398
        updated_esm_worker = worker_factory.get_esm_worker()
1✔
2399
        self.esm_workers[uuid] = updated_esm_worker
1✔
2400

2401
        # We should stop() the worker since the delete() will remove the ESM from the state mapping.
2402
        esm_worker.stop()
1✔
2403
        # This will either create an EsmWorker in the CREATING state if enabled. Otherwise, the DISABLING state is set.
2404
        updated_esm_worker.create()
1✔
2405

2406
        return {**event_source_mapping, **temp_params}
1✔
2407

2408
    def delete_event_source_mapping(
1✔
2409
        self, context: RequestContext, uuid: String, **kwargs
2410
    ) -> EventSourceMappingConfiguration:
2411
        state = lambda_stores[context.account_id][context.region]
1✔
2412
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2413
        if not event_source_mapping:
1✔
2414
            raise ResourceNotFoundException(
1✔
2415
                "The resource you requested does not exist.", Type="User"
2416
            )
2417
        esm = state.event_source_mappings[uuid]
1✔
2418
        # TODO: add proper locking
2419
        esm_worker = self.esm_workers.pop(uuid, None)
1✔
2420
        # Asynchronous delete in v2
2421
        if not esm_worker:
1✔
2422
            raise ResourceNotFoundException(
×
2423
                "The resource you requested does not exist.", Type="User"
2424
            )
2425
        esm_worker.delete()
1✔
2426
        return {**esm, "State": EsmState.DELETING}
1✔
2427

2428
    def get_event_source_mapping(
1✔
2429
        self, context: RequestContext, uuid: String, **kwargs
2430
    ) -> EventSourceMappingConfiguration:
2431
        state = lambda_stores[context.account_id][context.region]
1✔
2432
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2433
        if not event_source_mapping:
1✔
2434
            raise ResourceNotFoundException(
1✔
2435
                "The resource you requested does not exist.", Type="User"
2436
            )
2437
        esm_worker = self.esm_workers.get(uuid)
1✔
2438
        if not esm_worker:
1✔
2439
            raise ResourceNotFoundException(
1✔
2440
                "The resource you requested does not exist.", Type="User"
2441
            )
2442
        event_source_mapping["State"] = esm_worker.current_state
1✔
2443
        event_source_mapping["StateTransitionReason"] = esm_worker.state_transition_reason
1✔
2444
        return event_source_mapping
1✔
2445

2446
    def list_event_source_mappings(
1✔
2447
        self,
2448
        context: RequestContext,
2449
        event_source_arn: Arn = None,
2450
        function_name: FunctionName = None,
2451
        marker: String = None,
2452
        max_items: MaxListItems = None,
2453
        **kwargs,
2454
    ) -> ListEventSourceMappingsResponse:
2455
        state = lambda_stores[context.account_id][context.region]
1✔
2456

2457
        esms = state.event_source_mappings.values()
1✔
2458
        # TODO: update and test State and StateTransitionReason for ESM v2
2459

2460
        if event_source_arn:  # TODO: validate pattern
1✔
2461
            esms = [e for e in esms if e.get("EventSourceArn") == event_source_arn]
1✔
2462

2463
        if function_name:
1✔
2464
            esms = [e for e in esms if function_name in e["FunctionArn"]]
1✔
2465

2466
        esms = PaginatedList(esms)
1✔
2467
        page, token = esms.get_page(
1✔
2468
            lambda x: x["UUID"],
2469
            marker,
2470
            max_items,
2471
        )
2472
        return ListEventSourceMappingsResponse(EventSourceMappings=page, NextMarker=token)
1✔
2473

2474
    def get_source_type_from_request(self, request: dict[str, Any]) -> str:
1✔
2475
        if event_source_arn := request.get("EventSourceArn", ""):
×
2476
            service = extract_service_from_arn(event_source_arn)
×
2477
            if service == "sqs" and "fifo" in event_source_arn:
×
2478
                service = "sqs-fifo"
×
2479
            return service
×
2480
        elif request.get("SelfManagedEventSource"):
×
2481
            return "kafka"
×
2482

2483
    # =======================================
2484
    # ============ FUNCTION URLS ============
2485
    # =======================================
2486

2487
    @staticmethod
1✔
2488
    def _validate_qualifier(qualifier: str) -> None:
1✔
2489
        if qualifier == "$LATEST" or (qualifier and api_utils.qualifier_is_version(qualifier)):
1✔
2490
            raise ValidationException(
1✔
2491
                f"1 validation error detected: Value '{qualifier}' at 'qualifier' failed to satisfy constraint: Member must satisfy regular expression pattern: ((?!^\\d+$)^[0-9a-zA-Z-_]+$)"
2492
            )
2493

2494
    @staticmethod
1✔
2495
    def _validate_invoke_mode(invoke_mode: str) -> None:
1✔
2496
        if invoke_mode and invoke_mode not in [InvokeMode.BUFFERED, InvokeMode.RESPONSE_STREAM]:
1✔
2497
            raise ValidationException(
1✔
2498
                f"1 validation error detected: Value '{invoke_mode}' at 'invokeMode' failed to satisfy constraint: Member must satisfy enum value set: [RESPONSE_STREAM, BUFFERED]"
2499
            )
2500
        if invoke_mode == InvokeMode.RESPONSE_STREAM:
1✔
2501
            # TODO should we actually fail for setting RESPONSE_STREAM?
2502
            #  It should trigger InvokeWithResponseStream which is not implemented
2503
            LOG.warning(
1✔
2504
                "The invokeMode 'RESPONSE_STREAM' is not yet supported on LocalStack. The property is only mocked, the execution will still be 'BUFFERED'"
2505
            )
2506

2507
    # TODO: what happens if function state is not active?
2508
    def create_function_url_config(
1✔
2509
        self,
2510
        context: RequestContext,
2511
        function_name: FunctionName,
2512
        auth_type: FunctionUrlAuthType,
2513
        qualifier: FunctionUrlQualifier = None,
2514
        cors: Cors = None,
2515
        invoke_mode: InvokeMode = None,
2516
        **kwargs,
2517
    ) -> CreateFunctionUrlConfigResponse:
2518
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2519
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2520
            function_name, qualifier, context
2521
        )
2522
        state = lambda_stores[account_id][region]
1✔
2523
        self._validate_qualifier(qualifier)
1✔
2524
        self._validate_invoke_mode(invoke_mode)
1✔
2525

2526
        fn = state.functions.get(function_name)
1✔
2527
        if fn is None:
1✔
2528
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2529

2530
        url_config = fn.function_url_configs.get(qualifier or "$LATEST")
1✔
2531
        if url_config:
1✔
2532
            raise ResourceConflictException(
1✔
2533
                f"Failed to create function url config for [functionArn = {url_config.function_arn}]. Error message:  FunctionUrlConfig exists for this Lambda function",
2534
                Type="User",
2535
            )
2536

2537
        if qualifier and qualifier != "$LATEST" and qualifier not in fn.aliases:
1✔
2538
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2539

2540
        normalized_qualifier = qualifier or "$LATEST"
1✔
2541

2542
        function_arn = (
1✔
2543
            api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
2544
            if qualifier
2545
            else api_utils.unqualified_lambda_arn(function_name, account_id, region)
2546
        )
2547

2548
        custom_id: str | None = None
1✔
2549

2550
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
2551
        if TAG_KEY_CUSTOM_URL in tags:
1✔
2552
            # Note: I really wanted to add verification here that the
2553
            # url_id is unique, so we could surface that to the user ASAP.
2554
            # However, it seems like that information isn't available yet,
2555
            # since (as far as I can tell) we call
2556
            # self.router.register_routes() once, in a single shot, for all
2557
            # of the routes -- and we need to verify that it's unique not
2558
            # just for this particular lambda function, but for the entire
2559
            # lambda provider. Therefore... that idea proved non-trivial!
2560
            custom_id_tag_value = (
1✔
2561
                f"{tags[TAG_KEY_CUSTOM_URL]}-{qualifier}" if qualifier else tags[TAG_KEY_CUSTOM_URL]
2562
            )
2563
            if TAG_KEY_CUSTOM_URL_VALIDATOR.match(custom_id_tag_value):
1✔
2564
                custom_id = custom_id_tag_value
1✔
2565

2566
            else:
2567
                # Note: we're logging here instead of raising to prioritize
2568
                # strict parity with AWS over the localstack-only custom_id
2569
                LOG.warning(
1✔
2570
                    "Invalid custom ID tag value for lambda URL (%s=%s). "
2571
                    "Replaced with default (random id)",
2572
                    TAG_KEY_CUSTOM_URL,
2573
                    custom_id_tag_value,
2574
                )
2575

2576
        # The url_id is the subdomain used for the URL we're creating. This
2577
        # is either created randomly (as in AWS), or can be passed as a tag
2578
        # to the lambda itself (localstack-only).
2579
        url_id: str
2580
        if custom_id is None:
1✔
2581
            url_id = api_utils.generate_random_url_id()
1✔
2582
        else:
2583
            url_id = custom_id
1✔
2584

2585
        host_definition = localstack_host(custom_port=config.GATEWAY_LISTEN[0].port)
1✔
2586
        fn.function_url_configs[normalized_qualifier] = FunctionUrlConfig(
1✔
2587
            function_arn=function_arn,
2588
            function_name=function_name,
2589
            cors=cors,
2590
            url_id=url_id,
2591
            url=f"http://{url_id}.lambda-url.{context.region}.{host_definition.host_and_port()}/",  # TODO: https support
2592
            auth_type=auth_type,
2593
            creation_time=api_utils.generate_lambda_date(),
2594
            last_modified_time=api_utils.generate_lambda_date(),
2595
            invoke_mode=invoke_mode,
2596
        )
2597

2598
        # persist and start URL
2599
        # TODO: implement URL invoke
2600
        api_url_config = api_utils.map_function_url_config(
1✔
2601
            fn.function_url_configs[normalized_qualifier]
2602
        )
2603

2604
        return CreateFunctionUrlConfigResponse(
1✔
2605
            FunctionUrl=api_url_config["FunctionUrl"],
2606
            FunctionArn=api_url_config["FunctionArn"],
2607
            AuthType=api_url_config["AuthType"],
2608
            Cors=api_url_config["Cors"],
2609
            CreationTime=api_url_config["CreationTime"],
2610
            InvokeMode=api_url_config["InvokeMode"],
2611
        )
2612

2613
    def get_function_url_config(
1✔
2614
        self,
2615
        context: RequestContext,
2616
        function_name: FunctionName,
2617
        qualifier: FunctionUrlQualifier = None,
2618
        **kwargs,
2619
    ) -> GetFunctionUrlConfigResponse:
2620
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2621
        state = lambda_stores[account_id][region]
1✔
2622

2623
        fn_name, qualifier = api_utils.get_name_and_qualifier(function_name, qualifier, context)
1✔
2624

2625
        self._validate_qualifier(qualifier)
1✔
2626

2627
        resolved_fn = state.functions.get(fn_name)
1✔
2628
        if not resolved_fn:
1✔
2629
            raise ResourceNotFoundException(
1✔
2630
                "The resource you requested does not exist.", Type="User"
2631
            )
2632

2633
        qualifier = qualifier or "$LATEST"
1✔
2634
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2635
        if not url_config:
1✔
2636
            raise ResourceNotFoundException(
1✔
2637
                "The resource you requested does not exist.", Type="User"
2638
            )
2639

2640
        return api_utils.map_function_url_config(url_config)
1✔
2641

2642
    def update_function_url_config(
1✔
2643
        self,
2644
        context: RequestContext,
2645
        function_name: FunctionName,
2646
        qualifier: FunctionUrlQualifier = None,
2647
        auth_type: FunctionUrlAuthType = None,
2648
        cors: Cors = None,
2649
        invoke_mode: InvokeMode = None,
2650
        **kwargs,
2651
    ) -> UpdateFunctionUrlConfigResponse:
2652
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2653
        state = lambda_stores[account_id][region]
1✔
2654

2655
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2656
            function_name, qualifier, context
2657
        )
2658
        self._validate_qualifier(qualifier)
1✔
2659
        self._validate_invoke_mode(invoke_mode)
1✔
2660

2661
        fn = state.functions.get(function_name)
1✔
2662
        if not fn:
1✔
2663
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2664

2665
        normalized_qualifier = qualifier or "$LATEST"
1✔
2666

2667
        if (
1✔
2668
            api_utils.qualifier_is_alias(normalized_qualifier)
2669
            and normalized_qualifier not in fn.aliases
2670
        ):
2671
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2672

2673
        url_config = fn.function_url_configs.get(normalized_qualifier)
1✔
2674
        if not url_config:
1✔
2675
            raise ResourceNotFoundException(
1✔
2676
                "The resource you requested does not exist.", Type="User"
2677
            )
2678

2679
        changes = {
1✔
2680
            "last_modified_time": api_utils.generate_lambda_date(),
2681
            **({"cors": cors} if cors is not None else {}),
2682
            **({"auth_type": auth_type} if auth_type is not None else {}),
2683
        }
2684

2685
        if invoke_mode:
1✔
2686
            changes["invoke_mode"] = invoke_mode
1✔
2687

2688
        new_url_config = dataclasses.replace(url_config, **changes)
1✔
2689
        fn.function_url_configs[normalized_qualifier] = new_url_config
1✔
2690

2691
        return UpdateFunctionUrlConfigResponse(
1✔
2692
            FunctionUrl=new_url_config.url,
2693
            FunctionArn=new_url_config.function_arn,
2694
            AuthType=new_url_config.auth_type,
2695
            Cors=new_url_config.cors,
2696
            CreationTime=new_url_config.creation_time,
2697
            LastModifiedTime=new_url_config.last_modified_time,
2698
            InvokeMode=new_url_config.invoke_mode,
2699
        )
2700

2701
    def delete_function_url_config(
1✔
2702
        self,
2703
        context: RequestContext,
2704
        function_name: FunctionName,
2705
        qualifier: FunctionUrlQualifier = None,
2706
        **kwargs,
2707
    ) -> None:
2708
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2709
        state = lambda_stores[account_id][region]
1✔
2710

2711
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2712
            function_name, qualifier, context
2713
        )
2714
        self._validate_qualifier(qualifier)
1✔
2715

2716
        resolved_fn = state.functions.get(function_name)
1✔
2717
        if not resolved_fn:
1✔
2718
            raise ResourceNotFoundException(
1✔
2719
                "The resource you requested does not exist.", Type="User"
2720
            )
2721

2722
        qualifier = qualifier or "$LATEST"
1✔
2723
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2724
        if not url_config:
1✔
2725
            raise ResourceNotFoundException(
1✔
2726
                "The resource you requested does not exist.", Type="User"
2727
            )
2728

2729
        del resolved_fn.function_url_configs[qualifier]
1✔
2730

2731
    def list_function_url_configs(
1✔
2732
        self,
2733
        context: RequestContext,
2734
        function_name: FunctionName,
2735
        marker: String = None,
2736
        max_items: MaxItems = None,
2737
        **kwargs,
2738
    ) -> ListFunctionUrlConfigsResponse:
2739
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2740
        state = lambda_stores[account_id][region]
1✔
2741

2742
        fn_name = api_utils.get_function_name(function_name, context)
1✔
2743
        resolved_fn = state.functions.get(fn_name)
1✔
2744
        if not resolved_fn:
1✔
2745
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2746

2747
        url_configs = [
1✔
2748
            api_utils.map_function_url_config(fn_conf)
2749
            for fn_conf in resolved_fn.function_url_configs.values()
2750
        ]
2751
        url_configs = PaginatedList(url_configs)
1✔
2752
        page, token = url_configs.get_page(
1✔
2753
            lambda url_config: url_config["FunctionArn"],
2754
            marker,
2755
            max_items,
2756
        )
2757
        url_configs = page
1✔
2758
        return ListFunctionUrlConfigsResponse(FunctionUrlConfigs=url_configs, NextMarker=token)
1✔
2759

2760
    # =======================================
2761
    # ============  Permissions  ============
2762
    # =======================================
2763

2764
    @handler("AddPermission", expand=False)
1✔
2765
    def add_permission(
1✔
2766
        self,
2767
        context: RequestContext,
2768
        request: AddPermissionRequest,
2769
    ) -> AddPermissionResponse:
2770
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2771
            request.get("FunctionName"), request.get("Qualifier"), context
2772
        )
2773

2774
        # validate qualifier
2775
        if qualifier is not None:
1✔
2776
            self._validate_qualifier_expression(qualifier)
1✔
2777
            if qualifier == "$LATEST":
1✔
2778
                raise InvalidParameterValueException(
1✔
2779
                    "We currently do not support adding policies for $LATEST.", Type="User"
2780
                )
2781
        account_id, region = api_utils.get_account_and_region(request.get("FunctionName"), context)
1✔
2782

2783
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2784
        resolved_qualifier, fn_arn = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2785

2786
        revision_id = request.get("RevisionId")
1✔
2787
        if revision_id:
1✔
2788
            fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2789
            if revision_id != fn_revision_id:
1✔
2790
                raise PreconditionFailedException(
1✔
2791
                    "The Revision Id provided does not match the latest Revision Id. "
2792
                    "Call the GetPolicy API to retrieve the latest Revision Id",
2793
                    Type="User",
2794
                )
2795

2796
        request_sid = request["StatementId"]
1✔
2797
        if not bool(STATEMENT_ID_REGEX.match(request_sid)):
1✔
2798
            raise ValidationException(
1✔
2799
                f"1 validation error detected: Value '{request_sid}' at 'statementId' failed to satisfy constraint: Member must satisfy regular expression pattern: ([a-zA-Z0-9-_]+)"
2800
            )
2801
        # check for an already existing policy and any conflicts in existing statements
2802
        existing_policy = resolved_fn.permissions.get(resolved_qualifier)
1✔
2803
        if existing_policy:
1✔
2804
            if request_sid in [s["Sid"] for s in existing_policy.policy.Statement]:
1✔
2805
                # uniqueness scope: statement id needs to be unique per qualified function ($LATEST, version, or alias)
2806
                # Counterexample: the same sid can exist within $LATEST, version, and alias
2807
                raise ResourceConflictException(
1✔
2808
                    f"The statement id ({request_sid}) provided already exists. Please provide a new statement id, or remove the existing statement.",
2809
                    Type="User",
2810
                )
2811

2812
        permission_statement = api_utils.build_statement(
1✔
2813
            partition=context.partition,
2814
            resource_arn=fn_arn,
2815
            statement_id=request["StatementId"],
2816
            action=request["Action"],
2817
            principal=request["Principal"],
2818
            source_arn=request.get("SourceArn"),
2819
            source_account=request.get("SourceAccount"),
2820
            principal_org_id=request.get("PrincipalOrgID"),
2821
            event_source_token=request.get("EventSourceToken"),
2822
            auth_type=request.get("FunctionUrlAuthType"),
2823
        )
2824
        new_policy = existing_policy
1✔
2825
        if not existing_policy:
1✔
2826
            new_policy = FunctionResourcePolicy(
1✔
2827
                policy=ResourcePolicy(Version="2012-10-17", Id="default", Statement=[])
2828
            )
2829
        new_policy.policy.Statement.append(permission_statement)
1✔
2830
        if not existing_policy:
1✔
2831
            resolved_fn.permissions[resolved_qualifier] = new_policy
1✔
2832

2833
        # Update revision id of alias or version
2834
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2835
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2836
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2837
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2838
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
1✔
2839
        # Assumes that a non-alias is a version
2840
        else:
2841
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2842
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2843
                resolved_version, config=dataclasses.replace(resolved_version.config)
2844
            )
2845
        return AddPermissionResponse(Statement=json.dumps(permission_statement))
1✔
2846

2847
    def remove_permission(
1✔
2848
        self,
2849
        context: RequestContext,
2850
        function_name: NamespacedFunctionName,
2851
        statement_id: NamespacedStatementId,
2852
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
2853
        revision_id: String | None = None,
2854
        **kwargs,
2855
    ) -> None:
2856
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2857
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2858
            function_name, qualifier, context
2859
        )
2860
        if qualifier is not None:
1✔
2861
            self._validate_qualifier_expression(qualifier)
1✔
2862

2863
        state = lambda_stores[account_id][region]
1✔
2864
        resolved_fn = state.functions.get(function_name)
1✔
2865
        if resolved_fn is None:
1✔
2866
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2867
            raise ResourceNotFoundException(f"No policy found for: {fn_arn}", Type="User")
1✔
2868

2869
        resolved_qualifier, _ = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2870
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2871
        if not function_permission:
1✔
2872
            raise ResourceNotFoundException(
1✔
2873
                "No policy is associated with the given resource.", Type="User"
2874
            )
2875

2876
        # try to find statement in policy and delete it
2877
        statement = None
1✔
2878
        for s in function_permission.policy.Statement:
1✔
2879
            if s["Sid"] == statement_id:
1✔
2880
                statement = s
1✔
2881
                break
1✔
2882

2883
        if not statement:
1✔
2884
            raise ResourceNotFoundException(
1✔
2885
                f"Statement {statement_id} is not found in resource policy.", Type="User"
2886
            )
2887
        fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2888
        if revision_id and revision_id != fn_revision_id:
1✔
2889
            raise PreconditionFailedException(
×
2890
                "The Revision Id provided does not match the latest Revision Id. "
2891
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2892
                Type="User",
2893
            )
2894
        function_permission.policy.Statement.remove(statement)
1✔
2895

2896
        # Update revision id for alias or version
2897
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2898
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2899
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2900
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
×
2901
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
×
2902
        # Assumes that a non-alias is a version
2903
        else:
2904
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2905
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2906
                resolved_version, config=dataclasses.replace(resolved_version.config)
2907
            )
2908

2909
        # remove the policy as a whole when there's no statement left in it
2910
        if len(function_permission.policy.Statement) == 0:
1✔
2911
            del resolved_fn.permissions[resolved_qualifier]
1✔
2912

2913
    def get_policy(
1✔
2914
        self,
2915
        context: RequestContext,
2916
        function_name: NamespacedFunctionName,
2917
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
2918
        **kwargs,
2919
    ) -> GetPolicyResponse:
2920
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2921
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2922
            function_name, qualifier, context
2923
        )
2924

2925
        if qualifier is not None:
1✔
2926
            self._validate_qualifier_expression(qualifier)
1✔
2927

2928
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2929

2930
        resolved_qualifier = qualifier or "$LATEST"
1✔
2931
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2932
        if not function_permission:
1✔
2933
            raise ResourceNotFoundException(
1✔
2934
                "The resource you requested does not exist.", Type="User"
2935
            )
2936

2937
        fn_revision_id = None
1✔
2938
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2939
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2940
            fn_revision_id = resolved_alias.revision_id
1✔
2941
        # Assumes that a non-alias is a version
2942
        else:
2943
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2944
            fn_revision_id = resolved_version.config.revision_id
1✔
2945

2946
        return GetPolicyResponse(
1✔
2947
            Policy=json.dumps(dataclasses.asdict(function_permission.policy)),
2948
            RevisionId=fn_revision_id,
2949
        )
2950

2951
    # =======================================
2952
    # ========  Code signing config  ========
2953
    # =======================================
2954

2955
    def create_code_signing_config(
1✔
2956
        self,
2957
        context: RequestContext,
2958
        allowed_publishers: AllowedPublishers,
2959
        description: Description | None = None,
2960
        code_signing_policies: CodeSigningPolicies | None = None,
2961
        tags: Tags | None = None,
2962
        **kwargs,
2963
    ) -> CreateCodeSigningConfigResponse:
2964
        account = context.account_id
1✔
2965
        region = context.region
1✔
2966

2967
        state = lambda_stores[account][region]
1✔
2968
        # TODO: can there be duplicates?
2969
        csc_id = f"csc-{get_random_hex(17)}"  # e.g. 'csc-077c33b4c19e26036'
1✔
2970
        csc_arn = f"arn:{context.partition}:lambda:{region}:{account}:code-signing-config:{csc_id}"
1✔
2971
        csc = CodeSigningConfig(
1✔
2972
            csc_id=csc_id,
2973
            arn=csc_arn,
2974
            allowed_publishers=allowed_publishers,
2975
            policies=code_signing_policies,
2976
            last_modified=api_utils.generate_lambda_date(),
2977
            description=description,
2978
        )
2979
        state.code_signing_configs[csc_arn] = csc
1✔
2980
        return CreateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
2981

2982
    def put_function_code_signing_config(
1✔
2983
        self,
2984
        context: RequestContext,
2985
        code_signing_config_arn: CodeSigningConfigArn,
2986
        function_name: NamespacedFunctionName,
2987
        **kwargs,
2988
    ) -> PutFunctionCodeSigningConfigResponse:
2989
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2990
        state = lambda_stores[account_id][region]
1✔
2991
        function_name = api_utils.get_function_name(function_name, context)
1✔
2992

2993
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2994
        if not csc:
1✔
2995
            raise CodeSigningConfigNotFoundException(
1✔
2996
                f"The code signing configuration cannot be found. Check that the provided configuration is not deleted: {code_signing_config_arn}.",
2997
                Type="User",
2998
            )
2999

3000
        fn = state.functions.get(function_name)
1✔
3001
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
3002
        if not fn:
1✔
3003
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
3004

3005
        fn.code_signing_config_arn = code_signing_config_arn
1✔
3006
        return PutFunctionCodeSigningConfigResponse(
1✔
3007
            CodeSigningConfigArn=code_signing_config_arn, FunctionName=function_name
3008
        )
3009

3010
    def update_code_signing_config(
1✔
3011
        self,
3012
        context: RequestContext,
3013
        code_signing_config_arn: CodeSigningConfigArn,
3014
        description: Description = None,
3015
        allowed_publishers: AllowedPublishers = None,
3016
        code_signing_policies: CodeSigningPolicies = None,
3017
        **kwargs,
3018
    ) -> UpdateCodeSigningConfigResponse:
3019
        state = lambda_stores[context.account_id][context.region]
1✔
3020
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3021
        if not csc:
1✔
3022
            raise ResourceNotFoundException(
1✔
3023
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3024
            )
3025

3026
        changes = {
1✔
3027
            **(
3028
                {"allowed_publishers": allowed_publishers} if allowed_publishers is not None else {}
3029
            ),
3030
            **({"policies": code_signing_policies} if code_signing_policies is not None else {}),
3031
            **({"description": description} if description is not None else {}),
3032
        }
3033
        new_csc = dataclasses.replace(
1✔
3034
            csc, last_modified=api_utils.generate_lambda_date(), **changes
3035
        )
3036
        state.code_signing_configs[code_signing_config_arn] = new_csc
1✔
3037

3038
        return UpdateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(new_csc))
1✔
3039

3040
    def get_code_signing_config(
1✔
3041
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
3042
    ) -> GetCodeSigningConfigResponse:
3043
        state = lambda_stores[context.account_id][context.region]
1✔
3044
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3045
        if not csc:
1✔
3046
            raise ResourceNotFoundException(
1✔
3047
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3048
            )
3049

3050
        return GetCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
3051

3052
    def get_function_code_signing_config(
1✔
3053
        self, context: RequestContext, function_name: NamespacedFunctionName, **kwargs
3054
    ) -> GetFunctionCodeSigningConfigResponse:
3055
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3056
        state = lambda_stores[account_id][region]
1✔
3057
        function_name = api_utils.get_function_name(function_name, context)
1✔
3058
        fn = state.functions.get(function_name)
1✔
3059
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
3060
        if not fn:
1✔
3061
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
3062

3063
        if fn.code_signing_config_arn:
1✔
3064
            return GetFunctionCodeSigningConfigResponse(
1✔
3065
                CodeSigningConfigArn=fn.code_signing_config_arn, FunctionName=function_name
3066
            )
3067

3068
        return GetFunctionCodeSigningConfigResponse()
1✔
3069

3070
    def delete_function_code_signing_config(
1✔
3071
        self, context: RequestContext, function_name: NamespacedFunctionName, **kwargs
3072
    ) -> None:
3073
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3074
        state = lambda_stores[account_id][region]
1✔
3075
        function_name = api_utils.get_function_name(function_name, context)
1✔
3076
        fn = state.functions.get(function_name)
1✔
3077
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
3078
        if not fn:
1✔
3079
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
3080

3081
        fn.code_signing_config_arn = None
1✔
3082

3083
    def delete_code_signing_config(
1✔
3084
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
3085
    ) -> DeleteCodeSigningConfigResponse:
3086
        state = lambda_stores[context.account_id][context.region]
1✔
3087

3088
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3089
        if not csc:
1✔
3090
            raise ResourceNotFoundException(
1✔
3091
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3092
            )
3093

3094
        del state.code_signing_configs[code_signing_config_arn]
1✔
3095

3096
        return DeleteCodeSigningConfigResponse()
1✔
3097

3098
    def list_code_signing_configs(
1✔
3099
        self,
3100
        context: RequestContext,
3101
        marker: String = None,
3102
        max_items: MaxListItems = None,
3103
        **kwargs,
3104
    ) -> ListCodeSigningConfigsResponse:
3105
        state = lambda_stores[context.account_id][context.region]
1✔
3106

3107
        cscs = [api_utils.map_csc(csc) for csc in state.code_signing_configs.values()]
1✔
3108
        cscs = PaginatedList(cscs)
1✔
3109
        page, token = cscs.get_page(
1✔
3110
            lambda csc: csc["CodeSigningConfigId"],
3111
            marker,
3112
            max_items,
3113
        )
3114
        return ListCodeSigningConfigsResponse(CodeSigningConfigs=page, NextMarker=token)
1✔
3115

3116
    def list_functions_by_code_signing_config(
1✔
3117
        self,
3118
        context: RequestContext,
3119
        code_signing_config_arn: CodeSigningConfigArn,
3120
        marker: String = None,
3121
        max_items: MaxListItems = None,
3122
        **kwargs,
3123
    ) -> ListFunctionsByCodeSigningConfigResponse:
3124
        account = context.account_id
1✔
3125
        region = context.region
1✔
3126

3127
        state = lambda_stores[account][region]
1✔
3128

3129
        if code_signing_config_arn not in state.code_signing_configs:
1✔
3130
            raise ResourceNotFoundException(
1✔
3131
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3132
            )
3133

3134
        fn_arns = [
1✔
3135
            api_utils.unqualified_lambda_arn(fn.function_name, account, region)
3136
            for fn in state.functions.values()
3137
            if fn.code_signing_config_arn == code_signing_config_arn
3138
        ]
3139

3140
        cscs = PaginatedList(fn_arns)
1✔
3141
        page, token = cscs.get_page(
1✔
3142
            lambda x: x,
3143
            marker,
3144
            max_items,
3145
        )
3146
        return ListFunctionsByCodeSigningConfigResponse(FunctionArns=page, NextMarker=token)
1✔
3147

3148
    # =======================================
3149
    # =========  Account Settings   =========
3150
    # =======================================
3151

3152
    # CAVE: these settings & usages are *per* region!
3153
    # Lambda quotas: https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
3154
    def get_account_settings(self, context: RequestContext, **kwargs) -> GetAccountSettingsResponse:
1✔
3155
        state = lambda_stores[context.account_id][context.region]
1✔
3156

3157
        fn_count = 0
1✔
3158
        code_size_sum = 0
1✔
3159
        reserved_concurrency_sum = 0
1✔
3160
        for fn in state.functions.values():
1✔
3161
            fn_count += 1
1✔
3162
            for fn_version in fn.versions.values():
1✔
3163
                # Image-based Lambdas do not have a code attribute and count against the ECR quotas instead
3164
                if fn_version.config.package_type == PackageType.Zip:
1✔
3165
                    code_size_sum += fn_version.config.code.code_size
1✔
3166
            if fn.reserved_concurrent_executions is not None:
1✔
3167
                reserved_concurrency_sum += fn.reserved_concurrent_executions
1✔
3168
            for c in fn.provisioned_concurrency_configs.values():
1✔
3169
                reserved_concurrency_sum += c.provisioned_concurrent_executions
1✔
3170
        for layer in state.layers.values():
1✔
3171
            for layer_version in layer.layer_versions.values():
1✔
3172
                code_size_sum += layer_version.code.code_size
1✔
3173
        return GetAccountSettingsResponse(
1✔
3174
            AccountLimit=AccountLimit(
3175
                TotalCodeSize=config.LAMBDA_LIMITS_TOTAL_CODE_SIZE,
3176
                CodeSizeZipped=config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED,
3177
                CodeSizeUnzipped=config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED,
3178
                ConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS,
3179
                UnreservedConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS
3180
                - reserved_concurrency_sum,
3181
            ),
3182
            AccountUsage=AccountUsage(
3183
                TotalCodeSize=code_size_sum,
3184
                FunctionCount=fn_count,
3185
            ),
3186
        )
3187

3188
    # =======================================
3189
    # ==  Provisioned Concurrency Config   ==
3190
    # =======================================
3191

3192
    def _get_provisioned_config(
1✔
3193
        self, context: RequestContext, function_name: str, qualifier: str
3194
    ) -> ProvisionedConcurrencyConfiguration | None:
3195
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3196
        state = lambda_stores[account_id][region]
1✔
3197
        function_name = api_utils.get_function_name(function_name, context)
1✔
3198
        fn = state.functions.get(function_name)
1✔
3199
        if api_utils.qualifier_is_alias(qualifier):
1✔
3200
            fn_alias = None
1✔
3201
            if fn:
1✔
3202
                fn_alias = fn.aliases.get(qualifier)
1✔
3203
            if fn_alias is None:
1✔
3204
                raise ResourceNotFoundException(
1✔
3205
                    f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3206
                    Type="User",
3207
                )
3208
        elif api_utils.qualifier_is_version(qualifier):
1✔
3209
            fn_version = None
1✔
3210
            if fn:
1✔
3211
                fn_version = fn.versions.get(qualifier)
1✔
3212
            if fn_version is None:
1✔
3213
                raise ResourceNotFoundException(
1✔
3214
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3215
                    Type="User",
3216
                )
3217

3218
        return fn.provisioned_concurrency_configs.get(qualifier)
1✔
3219

3220
    def put_provisioned_concurrency_config(
1✔
3221
        self,
3222
        context: RequestContext,
3223
        function_name: FunctionName,
3224
        qualifier: Qualifier,
3225
        provisioned_concurrent_executions: PositiveInteger,
3226
        **kwargs,
3227
    ) -> PutProvisionedConcurrencyConfigResponse:
3228
        if provisioned_concurrent_executions <= 0:
1✔
3229
            raise ValidationException(
1✔
3230
                f"1 validation error detected: Value '{provisioned_concurrent_executions}' at 'provisionedConcurrentExecutions' failed to satisfy constraint: Member must have value greater than or equal to 1"
3231
            )
3232

3233
        if qualifier == "$LATEST":
1✔
3234
            raise InvalidParameterValueException(
1✔
3235
                "Provisioned Concurrency Configs cannot be applied to unpublished function versions.",
3236
                Type="User",
3237
            )
3238
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3239
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3240
            function_name, qualifier, context
3241
        )
3242
        state = lambda_stores[account_id][region]
1✔
3243
        fn = state.functions.get(function_name)
1✔
3244

3245
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3246

3247
        if provisioned_config:  # TODO: merge?
1✔
3248
            # TODO: add a test for partial updates (if possible)
3249
            LOG.warning(
1✔
3250
                "Partial update of provisioned concurrency config is currently not supported."
3251
            )
3252

3253
        other_provisioned_sum = sum(
1✔
3254
            [
3255
                provisioned_configs.provisioned_concurrent_executions
3256
                for provisioned_qualifier, provisioned_configs in fn.provisioned_concurrency_configs.items()
3257
                if provisioned_qualifier != qualifier
3258
            ]
3259
        )
3260

3261
        if (
1✔
3262
            fn.reserved_concurrent_executions is not None
3263
            and fn.reserved_concurrent_executions
3264
            < other_provisioned_sum + provisioned_concurrent_executions
3265
        ):
3266
            raise InvalidParameterValueException(
1✔
3267
                "Requested Provisioned Concurrency should not be greater than the reservedConcurrentExecution for function",
3268
                Type="User",
3269
            )
3270

3271
        if provisioned_concurrent_executions > config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS:
1✔
3272
            raise InvalidParameterValueException(
1✔
3273
                f"Specified ConcurrentExecutions for function is greater than account's unreserved concurrency"
3274
                f" [{config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS}]."
3275
            )
3276

3277
        settings = self.get_account_settings(context)
1✔
3278
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
3279
            "UnreservedConcurrentExecutions"
3280
        ]
3281
        if (
1✔
3282
            unreserved_concurrent_executions - provisioned_concurrent_executions
3283
            < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY
3284
        ):
3285
            raise InvalidParameterValueException(
1✔
3286
                f"Specified ConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below"
3287
                f" its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
3288
            )
3289

3290
        provisioned_config = ProvisionedConcurrencyConfiguration(
1✔
3291
            provisioned_concurrent_executions, api_utils.generate_lambda_date()
3292
        )
3293
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3294

3295
        if api_utils.qualifier_is_alias(qualifier):
1✔
3296
            alias = fn.aliases.get(qualifier)
1✔
3297
            resolved_version = fn.versions.get(alias.function_version)
1✔
3298

3299
            if (
1✔
3300
                resolved_version
3301
                and fn.provisioned_concurrency_configs.get(alias.function_version) is not None
3302
            ):
3303
                raise ResourceConflictException(
1✔
3304
                    "Alias can't be used for Provisioned Concurrency configuration on an already Provisioned version",
3305
                    Type="User",
3306
                )
3307
            fn_arn = resolved_version.id.qualified_arn()
1✔
3308
        elif api_utils.qualifier_is_version(qualifier):
1✔
3309
            fn_version = fn.versions.get(qualifier)
1✔
3310

3311
            # TODO: might be useful other places, utilize
3312
            pointing_aliases = []
1✔
3313
            for alias in fn.aliases.values():
1✔
3314
                if (
1✔
3315
                    alias.function_version == qualifier
3316
                    and fn.provisioned_concurrency_configs.get(alias.name) is not None
3317
                ):
3318
                    pointing_aliases.append(alias.name)
1✔
3319
            if pointing_aliases:
1✔
3320
                raise ResourceConflictException(
1✔
3321
                    "Version is pointed by a Provisioned Concurrency alias", Type="User"
3322
                )
3323

3324
            fn_arn = fn_version.id.qualified_arn()
1✔
3325

3326
        manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3327

3328
        fn.provisioned_concurrency_configs[qualifier] = provisioned_config
1✔
3329

3330
        manager.update_provisioned_concurrency_config(
1✔
3331
            provisioned_config.provisioned_concurrent_executions
3332
        )
3333

3334
        return PutProvisionedConcurrencyConfigResponse(
1✔
3335
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3336
            AvailableProvisionedConcurrentExecutions=0,
3337
            AllocatedProvisionedConcurrentExecutions=0,
3338
            Status=ProvisionedConcurrencyStatusEnum.IN_PROGRESS,
3339
            # StatusReason=manager.provisioned_state.status_reason,
3340
            LastModified=provisioned_config.last_modified,  # TODO: does change with configuration or also with state changes?
3341
        )
3342

3343
    def get_provisioned_concurrency_config(
1✔
3344
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3345
    ) -> GetProvisionedConcurrencyConfigResponse:
3346
        if qualifier == "$LATEST":
1✔
3347
            raise InvalidParameterValueException(
1✔
3348
                "The function resource provided must be an alias or a published version.",
3349
                Type="User",
3350
            )
3351
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3352
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3353
            function_name, qualifier, context
3354
        )
3355

3356
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3357
        if not provisioned_config:
1✔
3358
            raise ProvisionedConcurrencyConfigNotFoundException(
1✔
3359
                "No Provisioned Concurrency Config found for this function", Type="User"
3360
            )
3361

3362
        # TODO: make this compatible with alias pointer migration on update
3363
        if api_utils.qualifier_is_alias(qualifier):
1✔
3364
            state = lambda_stores[account_id][region]
1✔
3365
            fn = state.functions.get(function_name)
1✔
3366
            alias = fn.aliases.get(qualifier)
1✔
3367
            fn_arn = api_utils.qualified_lambda_arn(
1✔
3368
                function_name, alias.function_version, account_id, region
3369
            )
3370
        else:
3371
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3372

3373
        ver_manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3374

3375
        return GetProvisionedConcurrencyConfigResponse(
1✔
3376
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3377
            LastModified=provisioned_config.last_modified,
3378
            AvailableProvisionedConcurrentExecutions=ver_manager.provisioned_state.available,
3379
            AllocatedProvisionedConcurrentExecutions=ver_manager.provisioned_state.allocated,
3380
            Status=ver_manager.provisioned_state.status,
3381
            StatusReason=ver_manager.provisioned_state.status_reason,
3382
        )
3383

3384
    def list_provisioned_concurrency_configs(
1✔
3385
        self,
3386
        context: RequestContext,
3387
        function_name: FunctionName,
3388
        marker: String = None,
3389
        max_items: MaxProvisionedConcurrencyConfigListItems = None,
3390
        **kwargs,
3391
    ) -> ListProvisionedConcurrencyConfigsResponse:
3392
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3393
        state = lambda_stores[account_id][region]
1✔
3394

3395
        function_name = api_utils.get_function_name(function_name, context)
1✔
3396
        fn = state.functions.get(function_name)
1✔
3397
        if fn is None:
1✔
3398
            raise ResourceNotFoundException(
1✔
3399
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
3400
                Type="User",
3401
            )
3402

3403
        configs = []
1✔
3404
        for qualifier, pc_config in fn.provisioned_concurrency_configs.items():
1✔
3405
            if api_utils.qualifier_is_alias(qualifier):
×
3406
                alias = fn.aliases.get(qualifier)
×
3407
                fn_arn = api_utils.qualified_lambda_arn(
×
3408
                    function_name, alias.function_version, account_id, region
3409
                )
3410
            else:
3411
                fn_arn = api_utils.qualified_lambda_arn(
×
3412
                    function_name, qualifier, account_id, region
3413
                )
3414

3415
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
3416

3417
            configs.append(
×
3418
                ProvisionedConcurrencyConfigListItem(
3419
                    FunctionArn=api_utils.qualified_lambda_arn(
3420
                        function_name, qualifier, account_id, region
3421
                    ),
3422
                    RequestedProvisionedConcurrentExecutions=pc_config.provisioned_concurrent_executions,
3423
                    AvailableProvisionedConcurrentExecutions=manager.provisioned_state.available,
3424
                    AllocatedProvisionedConcurrentExecutions=manager.provisioned_state.allocated,
3425
                    Status=manager.provisioned_state.status,
3426
                    StatusReason=manager.provisioned_state.status_reason,
3427
                    LastModified=pc_config.last_modified,
3428
                )
3429
            )
3430

3431
        provisioned_concurrency_configs = configs
1✔
3432
        provisioned_concurrency_configs = PaginatedList(provisioned_concurrency_configs)
1✔
3433
        page, token = provisioned_concurrency_configs.get_page(
1✔
3434
            lambda x: x,
3435
            marker,
3436
            max_items,
3437
        )
3438
        return ListProvisionedConcurrencyConfigsResponse(
1✔
3439
            ProvisionedConcurrencyConfigs=page, NextMarker=token
3440
        )
3441

3442
    def delete_provisioned_concurrency_config(
1✔
3443
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3444
    ) -> None:
3445
        if qualifier == "$LATEST":
1✔
3446
            raise InvalidParameterValueException(
1✔
3447
                "The function resource provided must be an alias or a published version.",
3448
                Type="User",
3449
            )
3450
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3451
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3452
            function_name, qualifier, context
3453
        )
3454
        state = lambda_stores[account_id][region]
1✔
3455
        fn = state.functions.get(function_name)
1✔
3456

3457
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3458
        # delete is idempotent and doesn't actually care about the provisioned concurrency config not existing
3459
        if provisioned_config:
1✔
3460
            fn.provisioned_concurrency_configs.pop(qualifier)
1✔
3461
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3462
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3463
            manager.update_provisioned_concurrency_config(0)
1✔
3464

3465
    # =======================================
3466
    # =======  Event Invoke Config   ========
3467
    # =======================================
3468

3469
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)"
3470
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]2((-gov)|(-iso(b?)))?-[a-z]+-\\d1)?:(\\d12)?:(.*)" ... (expected → actual)
3471

3472
    def _validate_destination_config(
1✔
3473
        self, store: LambdaStore, function_name: str, destination_config: DestinationConfig
3474
    ):
3475
        def _validate_destination_arn(destination_arn) -> bool:
1✔
3476
            if not api_utils.DESTINATION_ARN_PATTERN.match(destination_arn):
1✔
3477
                # technically we shouldn't handle this in the provider
3478
                raise ValidationException(
1✔
3479
                    "1 validation error detected: Value '"
3480
                    + destination_arn
3481
                    + "' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: "
3482
                    + "$|kafka://([^.]([a-zA-Z0-9\\-_.]{0,248}))|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:((eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)"
3483
                )
3484

3485
            match destination_arn.split(":")[2]:
1✔
3486
                case "lambda":
1✔
3487
                    fn_parts = api_utils.FULL_FN_ARN_PATTERN.search(destination_arn).groupdict()
1✔
3488
                    if fn_parts:
1✔
3489
                        # check if it exists
3490
                        fn = store.functions.get(fn_parts["function_name"])
1✔
3491
                        if not fn:
1✔
3492
                            raise InvalidParameterValueException(
1✔
3493
                                f"The destination ARN {destination_arn} is invalid.", Type="User"
3494
                            )
3495
                        if fn_parts["function_name"] == function_name:
1✔
3496
                            raise InvalidParameterValueException(
1✔
3497
                                "You can't specify the function as a destination for itself.",
3498
                                Type="User",
3499
                            )
3500
                case "sns" | "sqs" | "events":
1✔
3501
                    pass
1✔
3502
                case _:
1✔
3503
                    return False
1✔
3504
            return True
1✔
3505

3506
        validation_err = False
1✔
3507

3508
        failure_destination = destination_config.get("OnFailure", {}).get("Destination")
1✔
3509
        if failure_destination:
1✔
3510
            validation_err = validation_err or not _validate_destination_arn(failure_destination)
1✔
3511

3512
        success_destination = destination_config.get("OnSuccess", {}).get("Destination")
1✔
3513
        if success_destination:
1✔
3514
            validation_err = validation_err or not _validate_destination_arn(success_destination)
1✔
3515

3516
        if validation_err:
1✔
3517
            on_success_part = (
1✔
3518
                f"OnSuccess(destination={success_destination})" if success_destination else "null"
3519
            )
3520
            on_failure_part = (
1✔
3521
                f"OnFailure(destination={failure_destination})" if failure_destination else "null"
3522
            )
3523
            raise InvalidParameterValueException(
1✔
3524
                f"The provided destination config DestinationConfig(onSuccess={on_success_part}, onFailure={on_failure_part}) is invalid.",
3525
                Type="User",
3526
            )
3527

3528
    def put_function_event_invoke_config(
1✔
3529
        self,
3530
        context: RequestContext,
3531
        function_name: FunctionName,
3532
        qualifier: NumericLatestPublishedOrAliasQualifier = None,
3533
        maximum_retry_attempts: MaximumRetryAttempts = None,
3534
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3535
        destination_config: DestinationConfig = None,
3536
        **kwargs,
3537
    ) -> FunctionEventInvokeConfig:
3538
        """
3539
        Destination ARNs can be:
3540
        * SQS arn
3541
        * SNS arn
3542
        * Lambda arn
3543
        * EventBridge arn
3544

3545
        Differences between put_ and update_:
3546
            * put overwrites any existing config
3547
            * update allows changes only single values while keeping the rest of existing ones
3548
            * update fails on non-existing configs
3549

3550
        Differences between destination and DLQ
3551
            * "However, a dead-letter queue is part of a function's version-specific configuration, so it is locked in when you publish a version."
3552
            *  "On-failure destinations also support additional targets and include details about the function's response in the invocation record."
3553

3554
        """
3555
        if (
1✔
3556
            maximum_event_age_in_seconds is None
3557
            and maximum_retry_attempts is None
3558
            and destination_config is None
3559
        ):
3560
            raise InvalidParameterValueException(
1✔
3561
                "You must specify at least one of error handling or destination setting.",
3562
                Type="User",
3563
            )
3564
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3565
        state = lambda_stores[account_id][region]
1✔
3566
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3567
            function_name, qualifier, context
3568
        )
3569
        fn = state.functions.get(function_name)
1✔
3570
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3571
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3572

3573
        qualifier = qualifier or "$LATEST"
1✔
3574

3575
        # validate and normalize destination config
3576
        if destination_config:
1✔
3577
            self._validate_destination_config(state, function_name, destination_config)
1✔
3578

3579
        destination_config = DestinationConfig(
1✔
3580
            OnSuccess=OnSuccess(
3581
                Destination=(destination_config or {}).get("OnSuccess", {}).get("Destination")
3582
            ),
3583
            OnFailure=OnFailure(
3584
                Destination=(destination_config or {}).get("OnFailure", {}).get("Destination")
3585
            ),
3586
        )
3587

3588
        config = EventInvokeConfig(
1✔
3589
            function_name=function_name,
3590
            qualifier=qualifier,
3591
            maximum_event_age_in_seconds=maximum_event_age_in_seconds,
3592
            maximum_retry_attempts=maximum_retry_attempts,
3593
            last_modified=api_utils.generate_lambda_date(),
3594
            destination_config=destination_config,
3595
        )
3596
        fn.event_invoke_configs[qualifier] = config
1✔
3597

3598
        return FunctionEventInvokeConfig(
1✔
3599
            LastModified=datetime.datetime.strptime(
3600
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3601
            ),
3602
            FunctionArn=api_utils.qualified_lambda_arn(
3603
                function_name, qualifier or "$LATEST", account_id, region
3604
            ),
3605
            DestinationConfig=destination_config,
3606
            MaximumEventAgeInSeconds=maximum_event_age_in_seconds,
3607
            MaximumRetryAttempts=maximum_retry_attempts,
3608
        )
3609

3610
    def get_function_event_invoke_config(
1✔
3611
        self,
3612
        context: RequestContext,
3613
        function_name: FunctionName,
3614
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
3615
        **kwargs,
3616
    ) -> FunctionEventInvokeConfig:
3617
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3618
        state = lambda_stores[account_id][region]
1✔
3619
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3620
            function_name, qualifier, context
3621
        )
3622

3623
        qualifier = qualifier or "$LATEST"
1✔
3624
        fn = state.functions.get(function_name)
1✔
3625
        if not fn:
1✔
3626
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3627
            raise ResourceNotFoundException(
1✔
3628
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3629
            )
3630

3631
        config = fn.event_invoke_configs.get(qualifier)
1✔
3632
        if not config:
1✔
3633
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3634
            raise ResourceNotFoundException(
1✔
3635
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3636
            )
3637

3638
        return FunctionEventInvokeConfig(
1✔
3639
            LastModified=datetime.datetime.strptime(
3640
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3641
            ),
3642
            FunctionArn=api_utils.qualified_lambda_arn(
3643
                function_name, qualifier, account_id, region
3644
            ),
3645
            DestinationConfig=config.destination_config,
3646
            MaximumEventAgeInSeconds=config.maximum_event_age_in_seconds,
3647
            MaximumRetryAttempts=config.maximum_retry_attempts,
3648
        )
3649

3650
    def list_function_event_invoke_configs(
1✔
3651
        self,
3652
        context: RequestContext,
3653
        function_name: FunctionName,
3654
        marker: String = None,
3655
        max_items: MaxFunctionEventInvokeConfigListItems = None,
3656
        **kwargs,
3657
    ) -> ListFunctionEventInvokeConfigsResponse:
3658
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3659
        state = lambda_stores[account_id][region]
1✔
3660
        fn = state.functions.get(function_name)
1✔
3661
        if not fn:
1✔
3662
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3663

3664
        event_invoke_configs = [
1✔
3665
            FunctionEventInvokeConfig(
3666
                LastModified=c.last_modified,
3667
                FunctionArn=api_utils.qualified_lambda_arn(
3668
                    function_name, c.qualifier, account_id, region
3669
                ),
3670
                MaximumEventAgeInSeconds=c.maximum_event_age_in_seconds,
3671
                MaximumRetryAttempts=c.maximum_retry_attempts,
3672
                DestinationConfig=c.destination_config,
3673
            )
3674
            for c in fn.event_invoke_configs.values()
3675
        ]
3676

3677
        event_invoke_configs = PaginatedList(event_invoke_configs)
1✔
3678
        page, token = event_invoke_configs.get_page(
1✔
3679
            lambda x: x["FunctionArn"],
3680
            marker,
3681
            max_items,
3682
        )
3683
        return ListFunctionEventInvokeConfigsResponse(
1✔
3684
            FunctionEventInvokeConfigs=page, NextMarker=token
3685
        )
3686

3687
    def delete_function_event_invoke_config(
1✔
3688
        self,
3689
        context: RequestContext,
3690
        function_name: FunctionName,
3691
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
3692
        **kwargs,
3693
    ) -> None:
3694
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3695
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3696
            function_name, qualifier, context
3697
        )
3698
        state = lambda_stores[account_id][region]
1✔
3699
        fn = state.functions.get(function_name)
1✔
3700
        resolved_qualifier = qualifier or "$LATEST"
1✔
3701
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3702
        if not fn:
1✔
3703
            raise ResourceNotFoundException(
1✔
3704
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3705
            )
3706

3707
        config = fn.event_invoke_configs.get(resolved_qualifier)
1✔
3708
        if not config:
1✔
3709
            raise ResourceNotFoundException(
1✔
3710
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3711
            )
3712

3713
        del fn.event_invoke_configs[resolved_qualifier]
1✔
3714

3715
    def update_function_event_invoke_config(
1✔
3716
        self,
3717
        context: RequestContext,
3718
        function_name: FunctionName,
3719
        qualifier: NumericLatestPublishedOrAliasQualifier = None,
3720
        maximum_retry_attempts: MaximumRetryAttempts = None,
3721
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3722
        destination_config: DestinationConfig = None,
3723
        **kwargs,
3724
    ) -> FunctionEventInvokeConfig:
3725
        # like put but only update single fields via replace
3726
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3727
        state = lambda_stores[account_id][region]
1✔
3728
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3729
            function_name, qualifier, context
3730
        )
3731

3732
        if (
1✔
3733
            maximum_event_age_in_seconds is None
3734
            and maximum_retry_attempts is None
3735
            and destination_config is None
3736
        ):
3737
            raise InvalidParameterValueException(
×
3738
                "You must specify at least one of error handling or destination setting.",
3739
                Type="User",
3740
            )
3741

3742
        fn = state.functions.get(function_name)
1✔
3743
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3744
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3745

3746
        qualifier = qualifier or "$LATEST"
1✔
3747

3748
        config = fn.event_invoke_configs.get(qualifier)
1✔
3749
        if not config:
1✔
3750
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3751
            raise ResourceNotFoundException(
1✔
3752
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3753
            )
3754

3755
        if destination_config:
1✔
3756
            self._validate_destination_config(state, function_name, destination_config)
×
3757

3758
        optional_kwargs = {
1✔
3759
            k: v
3760
            for k, v in {
3761
                "destination_config": destination_config,
3762
                "maximum_retry_attempts": maximum_retry_attempts,
3763
                "maximum_event_age_in_seconds": maximum_event_age_in_seconds,
3764
            }.items()
3765
            if v is not None
3766
        }
3767

3768
        new_config = dataclasses.replace(
1✔
3769
            config, last_modified=api_utils.generate_lambda_date(), **optional_kwargs
3770
        )
3771
        fn.event_invoke_configs[qualifier] = new_config
1✔
3772

3773
        return FunctionEventInvokeConfig(
1✔
3774
            LastModified=datetime.datetime.strptime(
3775
                new_config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3776
            ),
3777
            FunctionArn=api_utils.qualified_lambda_arn(
3778
                function_name, qualifier or "$LATEST", account_id, region
3779
            ),
3780
            DestinationConfig=new_config.destination_config,
3781
            MaximumEventAgeInSeconds=new_config.maximum_event_age_in_seconds,
3782
            MaximumRetryAttempts=new_config.maximum_retry_attempts,
3783
        )
3784

3785
    # =======================================
3786
    # ======  Layer & Layer Versions  =======
3787
    # =======================================
3788

3789
    @staticmethod
1✔
3790
    def _resolve_layer(
1✔
3791
        layer_name_or_arn: str, context: RequestContext
3792
    ) -> tuple[str, str, str, str | None]:
3793
        """
3794
        Return locator attributes for a given Lambda layer.
3795

3796
        :param layer_name_or_arn: Layer name or ARN
3797
        :param context: Request context
3798
        :return: Tuple of region, account ID, layer name, layer version
3799
        """
3800
        if api_utils.is_layer_arn(layer_name_or_arn):
1✔
3801
            return api_utils.parse_layer_arn(layer_name_or_arn)
1✔
3802

3803
        return context.region, context.account_id, layer_name_or_arn, None
1✔
3804

3805
    def publish_layer_version(
1✔
3806
        self,
3807
        context: RequestContext,
3808
        layer_name: LayerName,
3809
        content: LayerVersionContentInput,
3810
        description: Description | None = None,
3811
        compatible_runtimes: CompatibleRuntimes | None = None,
3812
        license_info: LicenseInfo | None = None,
3813
        compatible_architectures: CompatibleArchitectures | None = None,
3814
        **kwargs,
3815
    ) -> PublishLayerVersionResponse:
3816
        """
3817
        On first use of a LayerName a new layer is created and for each subsequent call with the same LayerName a new version is created.
3818
        Note that there are no $LATEST versions with layers!
3819

3820
        """
3821
        account = context.account_id
1✔
3822
        region = context.region
1✔
3823

3824
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
3825
            compatible_runtimes, compatible_architectures
3826
        )
3827
        if validation_errors:
1✔
3828
            raise ValidationException(
1✔
3829
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {'; '.join(validation_errors)}"
3830
            )
3831

3832
        state = lambda_stores[account][region]
1✔
3833
        with self.create_layer_lock:
1✔
3834
            if layer_name not in state.layers:
1✔
3835
                # we don't have a version so create new layer object
3836
                # lock is required to avoid creating two v1 objects for the same name
3837
                layer = Layer(
1✔
3838
                    arn=api_utils.layer_arn(layer_name=layer_name, account=account, region=region)
3839
                )
3840
                state.layers[layer_name] = layer
1✔
3841

3842
        layer = state.layers[layer_name]
1✔
3843
        with layer.next_version_lock:
1✔
3844
            next_version = LambdaLayerVersionIdentifier(
1✔
3845
                account_id=account, region=region, layer_name=layer_name
3846
            ).generate(next_version=layer.next_version)
3847
            # When creating a layer with user defined layer version, it is possible that we
3848
            # create layer versions out of order.
3849
            # ie. a user could replicate layer v2 then layer v1. It is important to always keep the maximum possible
3850
            # value for next layer to avoid overwriting existing versions
3851
            if layer.next_version <= next_version:
1✔
3852
                # We don't need to update layer.next_version if the created version is lower than the "next in line"
3853
                layer.next_version = max(next_version, layer.next_version) + 1
1✔
3854

3855
        # creating a new layer
3856
        if content.get("ZipFile"):
1✔
3857
            code = store_lambda_archive(
1✔
3858
                archive_file=content["ZipFile"],
3859
                function_name=layer_name,
3860
                region_name=region,
3861
                account_id=account,
3862
            )
3863
        else:
3864
            code = store_s3_bucket_archive(
1✔
3865
                archive_bucket=content["S3Bucket"],
3866
                archive_key=content["S3Key"],
3867
                archive_version=content.get("S3ObjectVersion"),
3868
                function_name=layer_name,
3869
                region_name=region,
3870
                account_id=account,
3871
            )
3872

3873
        new_layer_version = LayerVersion(
1✔
3874
            layer_version_arn=api_utils.layer_version_arn(
3875
                layer_name=layer_name,
3876
                account=account,
3877
                region=region,
3878
                version=str(next_version),
3879
            ),
3880
            layer_arn=layer.arn,
3881
            version=next_version,
3882
            description=description or "",
3883
            license_info=license_info,
3884
            compatible_runtimes=compatible_runtimes,
3885
            compatible_architectures=compatible_architectures,
3886
            created=api_utils.generate_lambda_date(),
3887
            code=code,
3888
        )
3889

3890
        layer.layer_versions[str(next_version)] = new_layer_version
1✔
3891

3892
        return api_utils.map_layer_out(new_layer_version)
1✔
3893

3894
    def get_layer_version(
1✔
3895
        self,
3896
        context: RequestContext,
3897
        layer_name: LayerName,
3898
        version_number: LayerVersionNumber,
3899
        **kwargs,
3900
    ) -> GetLayerVersionResponse:
3901
        # TODO: handle layer_name as an ARN
3902

3903
        region_name, account_id, layer_name, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
3904
        state = lambda_stores[account_id][region_name]
1✔
3905

3906
        layer = state.layers.get(layer_name)
1✔
3907
        if version_number < 1:
1✔
3908
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
3909
        if layer is None:
1✔
3910
            raise ResourceNotFoundException(
1✔
3911
                "The resource you requested does not exist.", Type="User"
3912
            )
3913
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3914
        if layer_version is None:
1✔
3915
            raise ResourceNotFoundException(
1✔
3916
                "The resource you requested does not exist.", Type="User"
3917
            )
3918
        return api_utils.map_layer_out(layer_version)
1✔
3919

3920
    def get_layer_version_by_arn(
1✔
3921
        self, context: RequestContext, arn: LayerVersionArn, **kwargs
3922
    ) -> GetLayerVersionResponse:
3923
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3924
            arn, context
3925
        )
3926

3927
        if not layer_version:
1✔
3928
            raise ValidationException(
1✔
3929
                f"1 validation error detected: Value '{arn}' at 'arn' failed to satisfy constraint: Member must satisfy regular expression pattern: "
3930
                + "(arn:(aws[a-zA-Z-]*)?:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+)"
3931
            )
3932

3933
        store = lambda_stores[account_id][region_name]
1✔
3934
        if not (layers := store.layers.get(layer_name)):
1✔
3935
            raise ResourceNotFoundException(
×
3936
                "The resource you requested does not exist.", Type="User"
3937
            )
3938

3939
        layer_version = layers.layer_versions.get(layer_version)
1✔
3940

3941
        if not layer_version:
1✔
3942
            raise ResourceNotFoundException(
1✔
3943
                "The resource you requested does not exist.", Type="User"
3944
            )
3945

3946
        return api_utils.map_layer_out(layer_version)
1✔
3947

3948
    def list_layers(
1✔
3949
        self,
3950
        context: RequestContext,
3951
        compatible_runtime: Runtime | None = None,
3952
        marker: String | None = None,
3953
        max_items: MaxLayerListItems | None = None,
3954
        compatible_architecture: Architecture | None = None,
3955
        **kwargs,
3956
    ) -> ListLayersResponse:
3957
        validation_errors = []
1✔
3958

3959
        validation_error_arch = api_utils.validate_layer_architecture(compatible_architecture)
1✔
3960
        if validation_error_arch:
1✔
3961
            validation_errors.append(validation_error_arch)
1✔
3962

3963
        validation_error_runtime = api_utils.validate_layer_runtime(compatible_runtime)
1✔
3964
        if validation_error_runtime:
1✔
3965
            validation_errors.append(validation_error_runtime)
1✔
3966

3967
        if validation_errors:
1✔
3968
            raise ValidationException(
1✔
3969
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
3970
            )
3971
        # TODO: handle filter: compatible_runtime
3972
        # TODO: handle filter: compatible_architecture
3973

3974
        state = lambda_stores[context.account_id][context.region]
×
3975
        layers = state.layers
×
3976

3977
        # TODO: test how filters behave together with only returning layers here? Does it return the latest "matching" layer, i.e. does it ignore later layer versions that don't match?
3978

3979
        responses: list[LayersListItem] = []
×
3980
        for layer_name, layer in layers.items():
×
3981
            # fetch latest version
3982
            layer_versions = list(layer.layer_versions.values())
×
3983
            sorted(layer_versions, key=lambda x: x.version)
×
3984
            latest_layer_version = layer_versions[-1]
×
3985
            responses.append(
×
3986
                LayersListItem(
3987
                    LayerName=layer_name,
3988
                    LayerArn=layer.arn,
3989
                    LatestMatchingVersion=api_utils.map_layer_out(latest_layer_version),
3990
                )
3991
            )
3992

3993
        responses = PaginatedList(responses)
×
3994
        page, token = responses.get_page(
×
3995
            lambda version: version,
3996
            marker,
3997
            max_items,
3998
        )
3999

4000
        return ListLayersResponse(NextMarker=token, Layers=page)
×
4001

4002
    def list_layer_versions(
1✔
4003
        self,
4004
        context: RequestContext,
4005
        layer_name: LayerName,
4006
        compatible_runtime: Runtime | None = None,
4007
        marker: String | None = None,
4008
        max_items: MaxLayerListItems | None = None,
4009
        compatible_architecture: Architecture | None = None,
4010
        **kwargs,
4011
    ) -> ListLayerVersionsResponse:
4012
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
4013
            [compatible_runtime] if compatible_runtime else [],
4014
            [compatible_architecture] if compatible_architecture else [],
4015
        )
4016
        if validation_errors:
1✔
4017
            raise ValidationException(
×
4018
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
4019
            )
4020

4021
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
4022
            layer_name, context
4023
        )
4024
        state = lambda_stores[account_id][region_name]
1✔
4025

4026
        # TODO: Test & handle filter: compatible_runtime
4027
        # TODO: Test & handle filter: compatible_architecture
4028
        all_layer_versions = []
1✔
4029
        layer = state.layers.get(layer_name)
1✔
4030
        if layer is not None:
1✔
4031
            for layer_version in layer.layer_versions.values():
1✔
4032
                all_layer_versions.append(api_utils.map_layer_out(layer_version))
1✔
4033

4034
        all_layer_versions.sort(key=lambda x: x["Version"], reverse=True)
1✔
4035
        all_layer_versions = PaginatedList(all_layer_versions)
1✔
4036
        page, token = all_layer_versions.get_page(
1✔
4037
            lambda version: version["LayerVersionArn"],
4038
            marker,
4039
            max_items,
4040
        )
4041
        return ListLayerVersionsResponse(NextMarker=token, LayerVersions=page)
1✔
4042

4043
    def delete_layer_version(
1✔
4044
        self,
4045
        context: RequestContext,
4046
        layer_name: LayerName,
4047
        version_number: LayerVersionNumber,
4048
        **kwargs,
4049
    ) -> None:
4050
        if version_number < 1:
1✔
4051
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
4052

4053
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
4054
            layer_name, context
4055
        )
4056

4057
        store = lambda_stores[account_id][region_name]
1✔
4058
        layer = store.layers.get(layer_name, {})
1✔
4059
        if layer:
1✔
4060
            layer.layer_versions.pop(str(version_number), None)
1✔
4061

4062
    # =======================================
4063
    # =====  Layer Version Permissions  =====
4064
    # =======================================
4065
    # TODO: lock updates that change revision IDs
4066

4067
    def add_layer_version_permission(
1✔
4068
        self,
4069
        context: RequestContext,
4070
        layer_name: LayerName,
4071
        version_number: LayerVersionNumber,
4072
        statement_id: StatementId,
4073
        action: LayerPermissionAllowedAction,
4074
        principal: LayerPermissionAllowedPrincipal,
4075
        organization_id: OrganizationId = None,
4076
        revision_id: String = None,
4077
        **kwargs,
4078
    ) -> AddLayerVersionPermissionResponse:
4079
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4080
        # `layer_n` contains the layer name.
4081
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
4082

4083
        if action != "lambda:GetLayerVersion":
1✔
4084
            raise ValidationException(
1✔
4085
                f"1 validation error detected: Value '{action}' at 'action' failed to satisfy constraint: Member must satisfy regular expression pattern: lambda:GetLayerVersion"
4086
            )
4087

4088
        store = lambda_stores[account_id][region_name]
1✔
4089
        layer = store.layers.get(layer_n)
1✔
4090

4091
        layer_version_arn = api_utils.layer_version_arn(
1✔
4092
            layer_name, account_id, region_name, str(version_number)
4093
        )
4094

4095
        if layer is None:
1✔
4096
            raise ResourceNotFoundException(
1✔
4097
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4098
            )
4099
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4100
        if layer_version is None:
1✔
4101
            raise ResourceNotFoundException(
1✔
4102
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4103
            )
4104
        # do we have a policy? if not set one
4105
        if layer_version.policy is None:
1✔
4106
            layer_version.policy = LayerPolicy()
1✔
4107

4108
        if statement_id in layer_version.policy.statements:
1✔
4109
            raise ResourceConflictException(
1✔
4110
                f"The statement id ({statement_id}) provided already exists. Please provide a new statement id, or remove the existing statement.",
4111
                Type="User",
4112
            )
4113

4114
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
4115
            raise PreconditionFailedException(
1✔
4116
                "The Revision Id provided does not match the latest Revision Id. "
4117
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
4118
                Type="User",
4119
            )
4120

4121
        statement = LayerPolicyStatement(
1✔
4122
            sid=statement_id, action=action, principal=principal, organization_id=organization_id
4123
        )
4124

4125
        old_statements = layer_version.policy.statements
1✔
4126
        layer_version.policy = dataclasses.replace(
1✔
4127
            layer_version.policy, statements={**old_statements, statement_id: statement}
4128
        )
4129

4130
        return AddLayerVersionPermissionResponse(
1✔
4131
            Statement=json.dumps(
4132
                {
4133
                    "Sid": statement.sid,
4134
                    "Effect": "Allow",
4135
                    "Principal": statement.principal,
4136
                    "Action": statement.action,
4137
                    "Resource": layer_version.layer_version_arn,
4138
                }
4139
            ),
4140
            RevisionId=layer_version.policy.revision_id,
4141
        )
4142

4143
    def remove_layer_version_permission(
1✔
4144
        self,
4145
        context: RequestContext,
4146
        layer_name: LayerName,
4147
        version_number: LayerVersionNumber,
4148
        statement_id: StatementId,
4149
        revision_id: String = None,
4150
        **kwargs,
4151
    ) -> None:
4152
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4153
        # `layer_n` contains the layer name.
4154
        region_name, account_id, layer_n, layer_version = LambdaProvider._resolve_layer(
1✔
4155
            layer_name, context
4156
        )
4157

4158
        layer_version_arn = api_utils.layer_version_arn(
1✔
4159
            layer_name, account_id, region_name, str(version_number)
4160
        )
4161

4162
        state = lambda_stores[account_id][region_name]
1✔
4163
        layer = state.layers.get(layer_n)
1✔
4164
        if layer is None:
1✔
4165
            raise ResourceNotFoundException(
1✔
4166
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4167
            )
4168
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4169
        if layer_version is None:
1✔
4170
            raise ResourceNotFoundException(
1✔
4171
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4172
            )
4173

4174
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
4175
            raise PreconditionFailedException(
1✔
4176
                "The Revision Id provided does not match the latest Revision Id. "
4177
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
4178
                Type="User",
4179
            )
4180

4181
        if statement_id not in layer_version.policy.statements:
1✔
4182
            raise ResourceNotFoundException(
1✔
4183
                f"Statement {statement_id} is not found in resource policy.", Type="User"
4184
            )
4185

4186
        old_statements = layer_version.policy.statements
1✔
4187
        layer_version.policy = dataclasses.replace(
1✔
4188
            layer_version.policy,
4189
            statements={k: v for k, v in old_statements.items() if k != statement_id},
4190
        )
4191

4192
    def get_layer_version_policy(
1✔
4193
        self,
4194
        context: RequestContext,
4195
        layer_name: LayerName,
4196
        version_number: LayerVersionNumber,
4197
        **kwargs,
4198
    ) -> GetLayerVersionPolicyResponse:
4199
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4200
        # `layer_n` contains the layer name.
4201
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
4202

4203
        layer_version_arn = api_utils.layer_version_arn(
1✔
4204
            layer_name, account_id, region_name, str(version_number)
4205
        )
4206

4207
        store = lambda_stores[account_id][region_name]
1✔
4208
        layer = store.layers.get(layer_n)
1✔
4209

4210
        if layer is None:
1✔
4211
            raise ResourceNotFoundException(
1✔
4212
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4213
            )
4214

4215
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4216
        if layer_version is None:
1✔
4217
            raise ResourceNotFoundException(
1✔
4218
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4219
            )
4220

4221
        if layer_version.policy is None:
1✔
4222
            raise ResourceNotFoundException(
1✔
4223
                "No policy is associated with the given resource.", Type="User"
4224
            )
4225

4226
        return GetLayerVersionPolicyResponse(
1✔
4227
            Policy=json.dumps(
4228
                {
4229
                    "Version": layer_version.policy.version,
4230
                    "Id": layer_version.policy.id,
4231
                    "Statement": [
4232
                        {
4233
                            "Sid": ps.sid,
4234
                            "Effect": "Allow",
4235
                            "Principal": ps.principal,
4236
                            "Action": ps.action,
4237
                            "Resource": layer_version.layer_version_arn,
4238
                        }
4239
                        for ps in layer_version.policy.statements.values()
4240
                    ],
4241
                }
4242
            ),
4243
            RevisionId=layer_version.policy.revision_id,
4244
        )
4245

4246
    # =======================================
4247
    # =======  Function Concurrency  ========
4248
    # =======================================
4249
    # (Reserved) function concurrency is scoped to the whole function
4250

4251
    def get_function_concurrency(
1✔
4252
        self, context: RequestContext, function_name: FunctionName, **kwargs
4253
    ) -> GetFunctionConcurrencyResponse:
4254
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4255
        function_name = api_utils.get_function_name(function_name, context)
1✔
4256
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
4257
        return GetFunctionConcurrencyResponse(
1✔
4258
            ReservedConcurrentExecutions=fn.reserved_concurrent_executions
4259
        )
4260

4261
    def put_function_concurrency(
1✔
4262
        self,
4263
        context: RequestContext,
4264
        function_name: FunctionName,
4265
        reserved_concurrent_executions: ReservedConcurrentExecutions,
4266
        **kwargs,
4267
    ) -> Concurrency:
4268
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4269

4270
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4271
        if qualifier:
1✔
4272
            raise InvalidParameterValueException(
1✔
4273
                "This operation is permitted on Lambda functions only. Aliases and versions do not support this operation. Please specify either a function name or an unqualified function ARN.",
4274
                Type="User",
4275
            )
4276

4277
        store = lambda_stores[account_id][region]
1✔
4278
        fn = store.functions.get(function_name)
1✔
4279
        if not fn:
1✔
4280
            fn_arn = api_utils.qualified_lambda_arn(
1✔
4281
                function_name,
4282
                qualifier="$LATEST",
4283
                account=account_id,
4284
                region=region,
4285
            )
4286
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
4287

4288
        settings = self.get_account_settings(context)
1✔
4289
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
4290
            "UnreservedConcurrentExecutions"
4291
        ]
4292

4293
        # The existing reserved concurrent executions for the same function are already deduced in
4294
        # unreserved_concurrent_executions but must not count because the new one will replace the existing one.
4295
        # Joel tested this behavior manually against AWS (2023-11-28).
4296
        existing_reserved_concurrent_executions = (
1✔
4297
            fn.reserved_concurrent_executions if fn.reserved_concurrent_executions else 0
4298
        )
4299
        if (
1✔
4300
            unreserved_concurrent_executions
4301
            - reserved_concurrent_executions
4302
            + existing_reserved_concurrent_executions
4303
        ) < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY:
4304
            raise InvalidParameterValueException(
1✔
4305
                f"Specified ReservedConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
4306
            )
4307

4308
        total_provisioned_concurrency = sum(
1✔
4309
            [
4310
                provisioned_configs.provisioned_concurrent_executions
4311
                for provisioned_configs in fn.provisioned_concurrency_configs.values()
4312
            ]
4313
        )
4314
        if total_provisioned_concurrency > reserved_concurrent_executions:
1✔
4315
            raise InvalidParameterValueException(
1✔
4316
                f" ReservedConcurrentExecutions  {reserved_concurrent_executions} should not be lower than function's total provisioned concurrency [{total_provisioned_concurrency}]."
4317
            )
4318

4319
        fn.reserved_concurrent_executions = reserved_concurrent_executions
1✔
4320

4321
        return Concurrency(ReservedConcurrentExecutions=fn.reserved_concurrent_executions)
1✔
4322

4323
    def delete_function_concurrency(
1✔
4324
        self, context: RequestContext, function_name: FunctionName, **kwargs
4325
    ) -> None:
4326
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4327
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4328
        store = lambda_stores[account_id][region]
1✔
4329
        fn = store.functions.get(function_name)
1✔
4330
        fn.reserved_concurrent_executions = None
1✔
4331

4332
    # =======================================
4333
    # ===============  TAGS   ===============
4334
    # =======================================
4335
    # only Function, Event Source Mapping, and Code Signing Config (not currently supported by LocalStack) ARNs an are available for tagging in AWS
4336

4337
    def _get_tags(self, resource: TaggableResource) -> dict[str, str]:
1✔
4338
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4339
        lambda_adapted_tags = {
1✔
4340
            tag["Key"]: tag["Value"]
4341
            for tag in state.TAGS.list_tags_for_resource(resource).get("Tags")
4342
        }
4343
        return lambda_adapted_tags
1✔
4344

4345
    def _store_tags(self, resource: TaggableResource, tags: dict[str, str]):
1✔
4346
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4347
        if len(state.TAGS.tags.get(resource, {}) | tags) > LAMBDA_TAG_LIMIT_PER_RESOURCE:
1✔
4348
            raise InvalidParameterValueException(
1✔
4349
                "Number of tags exceeds resource tag limit.", Type="User"
4350
            )
4351

4352
        tag_svc_adapted_tags = [{"Key": key, "Value": value} for key, value in tags.items()]
1✔
4353
        state.TAGS.tag_resource(resource, tag_svc_adapted_tags)
1✔
4354

4355
    def fetch_lambda_store_for_tagging(self, resource: TaggableResource) -> LambdaStore:
1✔
4356
        """
4357
        Takes a resource ARN for a TaggableResource (Lambda Function, Event Source Mapping, Code Signing Config, or Capacity Provider) and returns a corresponding
4358
        LambdaStore for its region and account.
4359

4360
        In addition, this function validates that the ARN is a valid TaggableResource type, and that the TaggableResource exists.
4361

4362
        Raises:
4363
            ValidationException: If the resource ARN is not a full ARN for a TaggableResource.
4364
            ResourceNotFoundException: If the specified resource does not exist.
4365
            InvalidParameterValueException: If the resource ARN is a qualified Lambda Function.
4366
        """
4367

4368
        def _raise_validation_exception():
1✔
4369
            raise ValidationException(
1✔
4370
                f"1 validation error detected: Value '{resource}' at 'resource' failed to satisfy constraint: Member must satisfy regular expression pattern: {api_utils.TAGGABLE_RESOURCE_ARN_PATTERN}"
4371
            )
4372

4373
        # Check whether the ARN we have been passed is correctly formatted
4374
        parsed_resource_arn: ArnData = None
1✔
4375
        try:
1✔
4376
            parsed_resource_arn = parse_arn(resource)
1✔
4377
        except Exception:
1✔
4378
            _raise_validation_exception()
1✔
4379

4380
        # TODO: Should we be checking whether this is a full ARN?
4381
        region, account_id, resource_type = map(
1✔
4382
            parsed_resource_arn.get, ("region", "account", "resource")
4383
        )
4384

4385
        if not all((region, account_id, resource_type)):
1✔
4386
            _raise_validation_exception()
×
4387

4388
        if not (parts := resource_type.split(":")):
1✔
4389
            _raise_validation_exception()
×
4390

4391
        resource_type, resource_identifier, *qualifier = parts
1✔
4392

4393
        # Qualifier validation raises before checking for NotFound
4394
        if qualifier:
1✔
4395
            if resource_type == "function":
1✔
4396
                raise InvalidParameterValueException(
1✔
4397
                    "Tags on function aliases and versions are not supported. Please specify a function ARN.",
4398
                    Type="User",
4399
                )
4400
            _raise_validation_exception()
1✔
4401

4402
        if resource_type == "event-source-mapping":
1✔
4403
            self._get_esm(resource_identifier, account_id, region)
1✔
4404
        elif resource_type == "code-signing-config":
1✔
4405
            raise NotImplementedError("Resource tagging on CSC not yet implemented.")
4406
        elif resource_type == "function":
1✔
4407
            self._get_function(
1✔
4408
                function_name=resource_identifier, account_id=account_id, region=region
4409
            )
4410
        elif resource_type == "capacity-provider":
1✔
4411
            self._get_capacity_provider(resource_identifier, account_id, region)
1✔
4412
        else:
4413
            _raise_validation_exception()
1✔
4414

4415
        # If no exceptions are raised, assume ARN and referenced resource is valid for tag operations
4416
        return lambda_stores[account_id][region]
1✔
4417

4418
    def tag_resource(
1✔
4419
        self, context: RequestContext, resource: TaggableResource, tags: Tags, **kwargs
4420
    ) -> None:
4421
        if not tags:
1✔
4422
            raise InvalidParameterValueException(
1✔
4423
                "An error occurred and the request cannot be processed.", Type="User"
4424
            )
4425
        self._store_tags(resource, tags)
1✔
4426

4427
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4428
            "function"
4429
        ):
4430
            name, _, account, region = function_locators_from_arn(resource)
1✔
4431
            function = self._get_function(name, account, region)
1✔
4432
            with function.lock:
1✔
4433
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4434
                latest_version = function.versions["$LATEST"]
1✔
4435
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4436
                    latest_version, config=dataclasses.replace(latest_version.config)
4437
                )
4438

4439
    def list_tags(
1✔
4440
        self, context: RequestContext, resource: TaggableResource, **kwargs
4441
    ) -> ListTagsResponse:
4442
        tags = self._get_tags(resource)
1✔
4443
        return ListTagsResponse(Tags=tags)
1✔
4444

4445
    def untag_resource(
1✔
4446
        self, context: RequestContext, resource: TaggableResource, tag_keys: TagKeyList, **kwargs
4447
    ) -> None:
4448
        if not tag_keys:
1✔
4449
            raise ValidationException(
1✔
4450
                "1 validation error detected: Value null at 'tagKeys' failed to satisfy constraint: Member must not be null"
4451
            )  # should probably be generalized a bit
4452

4453
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4454
        state.TAGS.untag_resource(resource, tag_keys)
1✔
4455

4456
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4457
            "function"
4458
        ):
4459
            name, _, account, region = function_locators_from_arn(resource)
1✔
4460
            function = self._get_function(name, account, region)
1✔
4461
            # TODO: Potential race condition
4462
            with function.lock:
1✔
4463
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4464
                latest_version = function.versions["$LATEST"]
1✔
4465
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4466
                    latest_version, config=dataclasses.replace(latest_version.config)
4467
                )
4468

4469
    # =======================================
4470
    # =======  LEGACY / DEPRECATED   ========
4471
    # =======================================
4472

4473
    def invoke_async(
1✔
4474
        self,
4475
        context: RequestContext,
4476
        function_name: NamespacedFunctionName,
4477
        invoke_args: IO[BlobStream],
4478
        **kwargs,
4479
    ) -> InvokeAsyncResponse:
4480
        """LEGACY API endpoint. Even AWS heavily discourages its usage."""
4481
        raise NotImplementedError
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc