• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 21615209021

02 Feb 2026 12:25PM UTC coverage: 86.976% (+0.007%) from 86.969%
21615209021

push

github

web-flow
fix lambda state reset method to shutdown esm workers correctly (#13671)

0 of 3 new or added lines in 1 file covered. (0.0%)

19 existing lines in 5 files now uncovered.

70493 of 81049 relevant lines covered (86.98%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.34
/localstack-core/localstack/services/lambda_/provider.py
1
import base64
1✔
2
import dataclasses
1✔
3
import datetime
1✔
4
import itertools
1✔
5
import json
1✔
6
import logging
1✔
7
import re
1✔
8
import threading
1✔
9
import time
1✔
10
from typing import IO, Any
1✔
11

12
from botocore.exceptions import ClientError
1✔
13

14
from localstack import config
1✔
15
from localstack.aws.api import RequestContext, ServiceException, handler
1✔
16
from localstack.aws.api.lambda_ import (
1✔
17
    AccountLimit,
18
    AccountUsage,
19
    AddLayerVersionPermissionResponse,
20
    AddPermissionRequest,
21
    AddPermissionResponse,
22
    Alias,
23
    AliasConfiguration,
24
    AliasRoutingConfiguration,
25
    AllowedPublishers,
26
    Architecture,
27
    Arn,
28
    Blob,
29
    BlobStream,
30
    CapacityProviderConfig,
31
    CodeSigningConfigArn,
32
    CodeSigningConfigNotFoundException,
33
    CodeSigningPolicies,
34
    CompatibleArchitectures,
35
    CompatibleRuntimes,
36
    Concurrency,
37
    Cors,
38
    CreateCodeSigningConfigResponse,
39
    CreateEventSourceMappingRequest,
40
    CreateFunctionRequest,
41
    CreateFunctionUrlConfigResponse,
42
    DeleteCodeSigningConfigResponse,
43
    DeleteFunctionResponse,
44
    Description,
45
    DestinationConfig,
46
    DurableExecutionName,
47
    EventSourceMappingConfiguration,
48
    FunctionCodeLocation,
49
    FunctionConfiguration,
50
    FunctionEventInvokeConfig,
51
    FunctionName,
52
    FunctionUrlAuthType,
53
    FunctionUrlQualifier,
54
    FunctionVersionLatestPublished,
55
    GetAccountSettingsResponse,
56
    GetCodeSigningConfigResponse,
57
    GetFunctionCodeSigningConfigResponse,
58
    GetFunctionConcurrencyResponse,
59
    GetFunctionRecursionConfigResponse,
60
    GetFunctionResponse,
61
    GetFunctionUrlConfigResponse,
62
    GetLayerVersionPolicyResponse,
63
    GetLayerVersionResponse,
64
    GetPolicyResponse,
65
    GetProvisionedConcurrencyConfigResponse,
66
    InvalidParameterValueException,
67
    InvocationResponse,
68
    InvocationType,
69
    InvokeAsyncResponse,
70
    InvokeMode,
71
    LambdaApi,
72
    LambdaManagedInstancesCapacityProviderConfig,
73
    LastUpdateStatus,
74
    LayerName,
75
    LayerPermissionAllowedAction,
76
    LayerPermissionAllowedPrincipal,
77
    LayersListItem,
78
    LayerVersionArn,
79
    LayerVersionContentInput,
80
    LayerVersionNumber,
81
    LicenseInfo,
82
    ListAliasesResponse,
83
    ListCodeSigningConfigsResponse,
84
    ListEventSourceMappingsResponse,
85
    ListFunctionEventInvokeConfigsResponse,
86
    ListFunctionsByCodeSigningConfigResponse,
87
    ListFunctionsResponse,
88
    ListFunctionUrlConfigsResponse,
89
    ListLayersResponse,
90
    ListLayerVersionsResponse,
91
    ListProvisionedConcurrencyConfigsResponse,
92
    ListTagsResponse,
93
    ListVersionsByFunctionResponse,
94
    LogFormat,
95
    LoggingConfig,
96
    LogType,
97
    MasterRegion,
98
    MaxFunctionEventInvokeConfigListItems,
99
    MaximumEventAgeInSeconds,
100
    MaximumRetryAttempts,
101
    MaxItems,
102
    MaxLayerListItems,
103
    MaxListItems,
104
    MaxProvisionedConcurrencyConfigListItems,
105
    NamespacedFunctionName,
106
    NamespacedStatementId,
107
    NumericLatestPublishedOrAliasQualifier,
108
    OnFailure,
109
    OnSuccess,
110
    OrganizationId,
111
    PackageType,
112
    PositiveInteger,
113
    PreconditionFailedException,
114
    ProvisionedConcurrencyConfigListItem,
115
    ProvisionedConcurrencyConfigNotFoundException,
116
    ProvisionedConcurrencyStatusEnum,
117
    PublishLayerVersionResponse,
118
    PutFunctionCodeSigningConfigResponse,
119
    PutFunctionRecursionConfigResponse,
120
    PutProvisionedConcurrencyConfigResponse,
121
    Qualifier,
122
    RecursiveLoop,
123
    ReservedConcurrentExecutions,
124
    ResourceConflictException,
125
    ResourceNotFoundException,
126
    Runtime,
127
    RuntimeVersionConfig,
128
    SnapStart,
129
    SnapStartApplyOn,
130
    SnapStartOptimizationStatus,
131
    SnapStartResponse,
132
    State,
133
    StatementId,
134
    StateReasonCode,
135
    String,
136
    TaggableResource,
137
    TagKeyList,
138
    Tags,
139
    TenantId,
140
    TracingMode,
141
    UnqualifiedFunctionName,
142
    UpdateCodeSigningConfigResponse,
143
    UpdateEventSourceMappingRequest,
144
    UpdateFunctionCodeRequest,
145
    UpdateFunctionConfigurationRequest,
146
    UpdateFunctionUrlConfigResponse,
147
    VersionWithLatestPublished,
148
)
149
from localstack.aws.api.lambda_ import FunctionVersion as FunctionVersionApi
1✔
150
from localstack.aws.api.lambda_ import ServiceException as LambdaServiceException
1✔
151
from localstack.aws.api.pipes import (
1✔
152
    DynamoDBStreamStartPosition,
153
    KinesisStreamStartPosition,
154
)
155
from localstack.aws.connect import connect_to
1✔
156
from localstack.aws.spec import load_service
1✔
157
from localstack.services.edge import ROUTER
1✔
158
from localstack.services.lambda_ import api_utils
1✔
159
from localstack.services.lambda_ import hooks as lambda_hooks
1✔
160
from localstack.services.lambda_.analytics import (
1✔
161
    FunctionInitializationType,
162
    FunctionOperation,
163
    FunctionStatus,
164
    function_counter,
165
)
166
from localstack.services.lambda_.api_utils import (
1✔
167
    ARCHITECTURES,
168
    STATEMENT_ID_REGEX,
169
    SUBNET_ID_REGEX,
170
    function_locators_from_arn,
171
)
172
from localstack.services.lambda_.event_source_mapping.esm_config_factory import (
1✔
173
    EsmConfigFactory,
174
)
175
from localstack.services.lambda_.event_source_mapping.esm_worker import (
1✔
176
    EsmState,
177
    EsmWorker,
178
)
179
from localstack.services.lambda_.event_source_mapping.esm_worker_factory import (
1✔
180
    EsmWorkerFactory,
181
)
182
from localstack.services.lambda_.event_source_mapping.pipe_utils import get_internal_client
1✔
183
from localstack.services.lambda_.invocation import AccessDeniedException
1✔
184
from localstack.services.lambda_.invocation.execution_environment import (
1✔
185
    EnvironmentStartupTimeoutException,
186
)
187
from localstack.services.lambda_.invocation.lambda_models import (
1✔
188
    AliasRoutingConfig,
189
    CodeSigningConfig,
190
    EventInvokeConfig,
191
    Function,
192
    FunctionResourcePolicy,
193
    FunctionUrlConfig,
194
    FunctionVersion,
195
    ImageConfig,
196
    LambdaEphemeralStorage,
197
    Layer,
198
    LayerPolicy,
199
    LayerPolicyStatement,
200
    LayerVersion,
201
    ProvisionedConcurrencyConfiguration,
202
    RequestEntityTooLargeException,
203
    ResourcePolicy,
204
    UpdateStatus,
205
    ValidationException,
206
    VersionAlias,
207
    VersionFunctionConfiguration,
208
    VersionIdentifier,
209
    VersionState,
210
    VpcConfig,
211
)
212
from localstack.services.lambda_.invocation.lambda_service import (
1✔
213
    LambdaService,
214
    create_image_code,
215
    destroy_code_if_not_used,
216
    lambda_stores,
217
    store_lambda_archive,
218
    store_s3_bucket_archive,
219
)
220
from localstack.services.lambda_.invocation.models import CapacityProvider as CapacityProviderModel
1✔
221
from localstack.services.lambda_.invocation.models import LambdaStore
1✔
222
from localstack.services.lambda_.invocation.runtime_executor import get_runtime_executor
1✔
223
from localstack.services.lambda_.lambda_utils import HINT_LOG
1✔
224
from localstack.services.lambda_.layerfetcher.layer_fetcher import LayerFetcher
1✔
225
from localstack.services.lambda_.provider_utils import (
1✔
226
    LambdaLayerVersionIdentifier,
227
    get_function_version,
228
    get_function_version_from_arn,
229
)
230
from localstack.services.lambda_.runtimes import (
1✔
231
    ALL_RUNTIMES,
232
    DEPRECATED_RUNTIMES,
233
    DEPRECATED_RUNTIMES_UPGRADES,
234
    RUNTIMES_AGGREGATED,
235
    SNAP_START_SUPPORTED_RUNTIMES,
236
    VALID_MANAGED_INSTANCE_RUNTIMES,
237
    VALID_RUNTIMES,
238
)
239
from localstack.services.lambda_.urlrouter import FunctionUrlRouter
1✔
240
from localstack.services.plugins import ServiceLifecycleHook
1✔
241
from localstack.state import StateVisitor
1✔
242
from localstack.utils.aws.arns import (
1✔
243
    ArnData,
244
    capacity_provider_arn,
245
    extract_resource_from_arn,
246
    extract_service_from_arn,
247
    get_partition,
248
    lambda_event_source_mapping_arn,
249
    parse_arn,
250
)
251
from localstack.utils.aws.client_types import ServicePrincipal
1✔
252
from localstack.utils.bootstrap import is_api_enabled
1✔
253
from localstack.utils.collections import PaginatedList, merge_recursive
1✔
254
from localstack.utils.event_matcher import validate_event_pattern
1✔
255
from localstack.utils.strings import get_random_hex, short_uid, to_bytes, to_str
1✔
256
from localstack.utils.sync import poll_condition
1✔
257
from localstack.utils.urls import localstack_host
1✔
258

259
LOG = logging.getLogger(__name__)
1✔
260

261
CAPACITY_PROVIDER_ARN_NAME = "arn:aws[a-zA-Z-]*:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:capacity-provider:[a-zA-Z0-9-_]+"
1✔
262
LAMBDA_DEFAULT_TIMEOUT = 3
1✔
263
LAMBDA_DEFAULT_MEMORY_SIZE = 128
1✔
264

265
LAMBDA_TAG_LIMIT_PER_RESOURCE = 50
1✔
266
LAMBDA_LAYERS_LIMIT_PER_FUNCTION = 5
1✔
267

268
TAG_KEY_CUSTOM_URL = "_custom_id_"
1✔
269
# Requirements (from RFC3986 & co): not longer than 63, first char must be
270
# alpha, then alphanumeric or hyphen, except cannot start or end with hyphen
271
TAG_KEY_CUSTOM_URL_VALIDATOR = re.compile(r"^[A-Za-z]([A-Za-z0-9\-]{0,61}[A-Za-z0-9])?$")
1✔
272

273

274
class LambdaProvider(LambdaApi, ServiceLifecycleHook):
1✔
275
    lambda_service: LambdaService
1✔
276
    create_fn_lock: threading.RLock
1✔
277
    create_layer_lock: threading.RLock
1✔
278
    router: FunctionUrlRouter
1✔
279
    esm_workers: dict[str, EsmWorker]
1✔
280
    layer_fetcher: LayerFetcher | None
1✔
281

282
    def __init__(self) -> None:
1✔
283
        self.lambda_service = LambdaService()
1✔
284
        self.create_fn_lock = threading.RLock()
1✔
285
        self.create_layer_lock = threading.RLock()
1✔
286
        self.router = FunctionUrlRouter(ROUTER, self.lambda_service)
1✔
287
        self.esm_workers = {}
1✔
288
        self.layer_fetcher = None
1✔
289
        lambda_hooks.inject_layer_fetcher.run(self)
1✔
290

291
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
292
        visitor.visit(lambda_stores)
×
293

294
    def on_before_state_reset(self):
1✔
NEW
295
        for esm_worker in self.esm_workers.values():
×
NEW
296
            esm_worker.stop_for_shutdown()
×
NEW
297
        self.esm_workers = {}
×
UNCOV
298
        self.lambda_service.stop()
×
299

300
    def on_after_state_reset(self):
1✔
301
        self.router.lambda_service = self.lambda_service = LambdaService()
×
302

303
    def on_before_state_load(self):
1✔
304
        self.lambda_service.stop()
×
305

306
    def on_after_state_load(self):
1✔
307
        self.lambda_service = LambdaService()
×
308
        self.router.lambda_service = self.lambda_service
×
309

310
        for account_id, account_bundle in lambda_stores.items():
×
311
            for region_name, state in account_bundle.items():
×
312
                for fn in state.functions.values():
×
313
                    for fn_version in fn.versions.values():
×
314
                        try:
×
315
                            # $LATEST is not invokable for Lambda functions with a capacity provider
316
                            # and has a different State (i.e., ActiveNonInvokable)
317
                            is_capacity_provider_latest = (
×
318
                                fn_version.config.CapacityProviderConfig
319
                                and fn_version.id.qualifier == "$LATEST"
320
                            )
321
                            if not is_capacity_provider_latest:
×
322
                                # Restore the "Pending" state for the function version and start it
323
                                new_state = VersionState(
×
324
                                    state=State.Pending,
325
                                    code=StateReasonCode.Creating,
326
                                    reason="The function is being created.",
327
                                )
328
                                new_config = dataclasses.replace(fn_version.config, state=new_state)
×
329
                                new_version = dataclasses.replace(fn_version, config=new_config)
×
330
                                fn.versions[fn_version.id.qualifier] = new_version
×
331
                                self.lambda_service.create_function_version(fn_version).result(
×
332
                                    timeout=5
333
                                )
334
                        except Exception:
×
335
                            LOG.warning(
×
336
                                "Failed to restore function version %s",
337
                                fn_version.id.qualified_arn(),
338
                                exc_info=LOG.isEnabledFor(logging.DEBUG),
339
                            )
340
                    # restore provisioned concurrency per function considering both versions and aliases
341
                    for (
×
342
                        provisioned_qualifier,
343
                        provisioned_config,
344
                    ) in fn.provisioned_concurrency_configs.items():
345
                        fn_arn = None
×
346
                        try:
×
347
                            if api_utils.qualifier_is_alias(provisioned_qualifier):
×
348
                                alias = fn.aliases.get(provisioned_qualifier)
×
349
                                resolved_version = fn.versions.get(alias.function_version)
×
350
                                fn_arn = resolved_version.id.qualified_arn()
×
351
                            elif api_utils.qualifier_is_version(provisioned_qualifier):
×
352
                                fn_version = fn.versions.get(provisioned_qualifier)
×
353
                                fn_arn = fn_version.id.qualified_arn()
×
354
                            else:
355
                                raise InvalidParameterValueException(
×
356
                                    "Invalid qualifier type:"
357
                                    " Qualifier can only be an alias or a version for provisioned concurrency."
358
                                )
359

360
                            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
361
                            manager.update_provisioned_concurrency_config(
×
362
                                provisioned_config.provisioned_concurrent_executions
363
                            )
364
                        except Exception:
×
365
                            LOG.warning(
×
366
                                "Failed to restore provisioned concurrency %s for function %s",
367
                                provisioned_config,
368
                                fn_arn,
369
                                exc_info=LOG.isEnabledFor(logging.DEBUG),
370
                            )
371

372
                for esm in state.event_source_mappings.values():
×
373
                    # Restores event source workers
374
                    function_arn = esm.get("FunctionArn")
×
375

376
                    # TODO: How do we know the event source is up?
377
                    # A basic poll to see if the mapped Lambda function is active/failed
378
                    if not poll_condition(
×
379
                        lambda: get_function_version_from_arn(function_arn).config.state.state
380
                        in [State.Active, State.Failed],
381
                        timeout=10,
382
                    ):
383
                        LOG.warning(
×
384
                            "Creating ESM for Lambda that is not in running state: %s",
385
                            function_arn,
386
                        )
387

388
                    function_version = get_function_version_from_arn(function_arn)
×
389
                    function_role = function_version.config.role
×
390

391
                    is_esm_enabled = esm.get("State", EsmState.DISABLED) not in (
×
392
                        EsmState.DISABLED,
393
                        EsmState.DISABLING,
394
                    )
395
                    esm_worker = EsmWorkerFactory(
×
396
                        esm, function_role, is_esm_enabled
397
                    ).get_esm_worker()
398

399
                    # Note: a worker is created in the DISABLED state if not enabled
400
                    esm_worker.create()
×
401
                    # TODO: assigning the esm_worker to the dict only works after .create(). Could it cause a race
402
                    #  condition if we get a shutdown here and have a worker thread spawned but not accounted for?
403
                    self.esm_workers[esm_worker.uuid] = esm_worker
×
404

405
    def on_after_init(self):
1✔
406
        self.router.register_routes()
1✔
407
        get_runtime_executor().validate_environment()
1✔
408

409
    def on_before_stop(self) -> None:
1✔
410
        for esm_worker in self.esm_workers.values():
1✔
411
            esm_worker.stop_for_shutdown()
1✔
412

413
        # TODO: should probably unregister routes?
414
        self.lambda_service.stop()
1✔
415

416
    @staticmethod
1✔
417
    def _get_function(function_name: str, account_id: str, region: str) -> Function:
1✔
418
        state = lambda_stores[account_id][region]
1✔
419
        function = state.functions.get(function_name)
1✔
420
        if not function:
1✔
421
            arn = api_utils.unqualified_lambda_arn(
1✔
422
                function_name=function_name,
423
                account=account_id,
424
                region=region,
425
            )
426
            raise ResourceNotFoundException(
1✔
427
                f"Function not found: {arn}",
428
                Type="User",
429
            )
430
        return function
1✔
431

432
    @staticmethod
1✔
433
    def _get_esm(uuid: str, account_id: str, region: str) -> EventSourceMappingConfiguration:
1✔
434
        state = lambda_stores[account_id][region]
1✔
435
        esm = state.event_source_mappings.get(uuid)
1✔
436
        if not esm:
1✔
437
            arn = lambda_event_source_mapping_arn(uuid, account_id, region)
1✔
438
            raise ResourceNotFoundException(
1✔
439
                f"Event source mapping not found: {arn}",
440
                Type="User",
441
            )
442
        return esm
1✔
443

444
    @staticmethod
1✔
445
    def _get_capacity_provider(
1✔
446
        capacity_provider_name: str,
447
        account_id: str,
448
        region: str,
449
        error_msg_template: str = "Capacity provider not found: {}",
450
    ) -> CapacityProviderModel:
451
        state = lambda_stores[account_id][region]
1✔
452
        cp = state.capacity_providers.get(capacity_provider_name)
1✔
453
        if not cp:
1✔
454
            arn = capacity_provider_arn(capacity_provider_name, account_id, region)
1✔
455
            raise ResourceNotFoundException(
1✔
456
                error_msg_template.format(arn),
457
                Type="User",
458
            )
459
        return cp
×
460

461
    @staticmethod
1✔
462
    def _validate_qualifier_expression(qualifier: str) -> None:
1✔
463
        if error_messages := api_utils.validate_qualifier(qualifier):
1✔
464
            raise ValidationException(
×
465
                message=api_utils.construct_validation_exception_message(error_messages)
466
            )
467

468
    @staticmethod
1✔
469
    def _validate_publish_to(publish_to: str):
1✔
470
        if publish_to != FunctionVersionLatestPublished.LATEST_PUBLISHED:
×
471
            raise ValidationException(
×
472
                message=f"1 validation error detected: Value '{publish_to}' at 'publishTo' failed to satisfy constraint: Member must satisfy enum value set: [LATEST_PUBLISHED]"
473
            )
474

475
    @staticmethod
1✔
476
    def _resolve_fn_qualifier(resolved_fn: Function, qualifier: str | None) -> tuple[str, str]:
1✔
477
        """Attempts to resolve a given qualifier and returns a qualifier that exists or
478
        raises an appropriate ResourceNotFoundException.
479

480
        :param resolved_fn: The resolved lambda function
481
        :param qualifier: The qualifier to be resolved or None
482
        :return: Tuple of (resolved qualifier, function arn either qualified or unqualified)"""
483
        function_name = resolved_fn.function_name
1✔
484
        # assuming function versions need to live in the same account and region
485
        account_id = resolved_fn.latest().id.account
1✔
486
        region = resolved_fn.latest().id.region
1✔
487
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
488
        if qualifier is not None:
1✔
489
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
490
            if api_utils.qualifier_is_alias(qualifier):
1✔
491
                if qualifier not in resolved_fn.aliases:
1✔
492
                    raise ResourceNotFoundException(f"Cannot find alias arn: {fn_arn}", Type="User")
1✔
493
            elif api_utils.qualifier_is_version(qualifier) or qualifier == "$LATEST":
1✔
494
                if qualifier not in resolved_fn.versions:
1✔
495
                    raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
496
            else:
497
                # matches qualifier pattern but invalid alias or version
498
                raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
499
        resolved_qualifier = qualifier or "$LATEST"
1✔
500
        return resolved_qualifier, fn_arn
1✔
501

502
    @staticmethod
1✔
503
    def _function_revision_id(resolved_fn: Function, resolved_qualifier: str) -> str:
1✔
504
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
505
            return resolved_fn.aliases[resolved_qualifier].revision_id
1✔
506
        # Assumes that a non-alias is a version
507
        else:
508
            return resolved_fn.versions[resolved_qualifier].config.revision_id
1✔
509

510
    def _resolve_vpc_id(self, account_id: str, region_name: str, subnet_id: str) -> str:
1✔
511
        ec2_client = connect_to(aws_access_key_id=account_id, region_name=region_name).ec2
1✔
512
        try:
1✔
513
            return ec2_client.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["VpcId"]
1✔
514
        except ec2_client.exceptions.ClientError as e:
1✔
515
            code = e.response["Error"]["Code"]
1✔
516
            message = e.response["Error"]["Message"]
1✔
517
            raise InvalidParameterValueException(
1✔
518
                f"Error occurred while DescribeSubnets. EC2 Error Code: {code}. EC2 Error Message: {message}",
519
                Type="User",
520
            )
521

522
    def _build_vpc_config(
1✔
523
        self,
524
        account_id: str,
525
        region_name: str,
526
        vpc_config: dict | None = None,
527
    ) -> VpcConfig | None:
528
        if not vpc_config or not is_api_enabled("ec2"):
1✔
529
            return None
1✔
530

531
        subnet_ids = vpc_config.get("SubnetIds", [])
1✔
532
        if subnet_ids is not None and len(subnet_ids) == 0:
1✔
533
            return VpcConfig(vpc_id="", security_group_ids=[], subnet_ids=[])
1✔
534

535
        subnet_id = subnet_ids[0]
1✔
536
        if not bool(SUBNET_ID_REGEX.match(subnet_id)):
1✔
537
            raise ValidationException(
1✔
538
                f"1 validation error detected: Value '[{subnet_id}]' at 'vpcConfig.subnetIds' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 0, Member must satisfy regular expression pattern: subnet-[0-9a-z]*]"
539
            )
540

541
        return VpcConfig(
1✔
542
            vpc_id=self._resolve_vpc_id(account_id, region_name, subnet_id),
543
            security_group_ids=vpc_config.get("SecurityGroupIds", []),
544
            subnet_ids=subnet_ids,
545
        )
546

547
    def _create_version_model(
1✔
548
        self,
549
        function_name: str,
550
        region: str,
551
        account_id: str,
552
        description: str | None = None,
553
        revision_id: str | None = None,
554
        code_sha256: str | None = None,
555
        publish_to: FunctionVersionLatestPublished | None = None,
556
        is_active: bool = False,
557
    ) -> tuple[FunctionVersion, bool]:
558
        """
559
        Release a new version to the model if all restrictions are met.
560
        Restrictions:
561
          - CodeSha256, if provided, must equal the current latest version code hash
562
          - RevisionId, if provided, must equal the current latest version revision id
563
          - Some changes have been done to the latest version since last publish
564
        Will return a tuple of the version, and whether the version was published (True) or the latest available version was taken (False).
565
        This can happen if the latest version has not been changed since the last version publish, in this case the last version will be returned.
566

567
        :param function_name: Function name to be published
568
        :param region: Region of the function
569
        :param account_id: Account of the function
570
        :param description: new description of the version (will be the description of the function if missing)
571
        :param revision_id: Revision id, function will raise error if it does not match latest revision id
572
        :param code_sha256: Code sha256, function will raise error if it does not match latest code hash
573
        :return: Tuple of (published version, whether version was released or last released version returned, since nothing changed)
574
        """
575
        current_latest_version = get_function_version(
1✔
576
            function_name=function_name, qualifier="$LATEST", account_id=account_id, region=region
577
        )
578
        if revision_id and current_latest_version.config.revision_id != revision_id:
1✔
579
            raise PreconditionFailedException(
1✔
580
                "The Revision Id provided does not match the latest Revision Id. Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
581
                Type="User",
582
            )
583

584
        # check if code hashes match if they are specified
585
        current_hash = (
1✔
586
            current_latest_version.config.code.code_sha256
587
            if current_latest_version.config.package_type == PackageType.Zip
588
            else current_latest_version.config.image.code_sha256
589
        )
590
        # if the code is a zip package and hot reloaded (hot reloading is currently only supported for zip packagetypes)
591
        # we cannot enforce the codesha256 check
592
        is_hot_reloaded_zip_package = (
1✔
593
            current_latest_version.config.package_type == PackageType.Zip
594
            and current_latest_version.config.code.is_hot_reloading()
595
        )
596
        if code_sha256 and current_hash != code_sha256 and not is_hot_reloaded_zip_package:
1✔
597
            raise InvalidParameterValueException(
1✔
598
                f"CodeSHA256 ({code_sha256}) is different from current CodeSHA256 in $LATEST ({current_hash}). Please try again with the CodeSHA256 in $LATEST.",
599
                Type="User",
600
            )
601

602
        state = lambda_stores[account_id][region]
1✔
603
        function = state.functions.get(function_name)
1✔
604
        changes = {}
1✔
605
        if description is not None:
1✔
606
            changes["description"] = description
1✔
607
        # TODO copy environment instead of restarting one, get rid of all the "Pending"s
608

609
        with function.lock:
1✔
610
            if function.next_version > 1 and (
1✔
611
                prev_version := function.versions.get(str(function.next_version - 1))
612
            ):
613
                if (
1✔
614
                    prev_version.config.internal_revision
615
                    == current_latest_version.config.internal_revision
616
                ):
617
                    return prev_version, False
1✔
618
            # TODO check if there was a change since last version
619
            if publish_to == FunctionVersionLatestPublished.LATEST_PUBLISHED:
1✔
620
                qualifier = "$LATEST.PUBLISHED"
×
621
            else:
622
                qualifier = str(function.next_version)
1✔
623
                function.next_version += 1
1✔
624
            new_id = VersionIdentifier(
1✔
625
                function_name=function_name,
626
                qualifier=qualifier,
627
                region=region,
628
                account=account_id,
629
            )
630

631
            if current_latest_version.config.CapacityProviderConfig:
1✔
632
                # for lambda managed functions, snap start is not supported
633
                snap_start = None
×
634
            else:
635
                apply_on = current_latest_version.config.snap_start["ApplyOn"]
1✔
636
                optimization_status = SnapStartOptimizationStatus.Off
1✔
637
                if apply_on == SnapStartApplyOn.PublishedVersions:
1✔
638
                    optimization_status = SnapStartOptimizationStatus.On
×
639
                snap_start = SnapStartResponse(
1✔
640
                    ApplyOn=apply_on,
641
                    OptimizationStatus=optimization_status,
642
                )
643

644
            last_update = None
1✔
645
            new_state = VersionState(
1✔
646
                state=State.Pending,
647
                code=StateReasonCode.Creating,
648
                reason="The function is being created.",
649
            )
650
            if publish_to == FunctionVersionLatestPublished.LATEST_PUBLISHED:
1✔
651
                last_update = UpdateStatus(
×
652
                    status=LastUpdateStatus.InProgress,
653
                    code="Updating",
654
                    reason="The function is being updated.",
655
                )
656
                if is_active:
×
657
                    new_state = VersionState(state=State.Active)
×
658
            new_version = dataclasses.replace(
1✔
659
                current_latest_version,
660
                config=dataclasses.replace(
661
                    current_latest_version.config,
662
                    last_update=last_update,
663
                    state=new_state,
664
                    snap_start=snap_start,
665
                    **changes,
666
                ),
667
                id=new_id,
668
            )
669
            function.versions[qualifier] = new_version
1✔
670
        return new_version, True
1✔
671

672
    def _publish_version_from_existing_version(
1✔
673
        self,
674
        function_name: str,
675
        region: str,
676
        account_id: str,
677
        description: str | None = None,
678
        revision_id: str | None = None,
679
        code_sha256: str | None = None,
680
        publish_to: FunctionVersionLatestPublished | None = None,
681
    ) -> FunctionVersion:
682
        """
683
        Publish version from an existing, already initialized LATEST
684

685
        :param function_name: Function name
686
        :param region: region
687
        :param account_id: account id
688
        :param description: description
689
        :param revision_id: revision id (check if current version matches)
690
        :param code_sha256: code sha (check if current code matches)
691
        :return: new version
692
        """
693
        is_active = True if publish_to == FunctionVersionLatestPublished.LATEST_PUBLISHED else False
1✔
694
        new_version, changed = self._create_version_model(
1✔
695
            function_name=function_name,
696
            region=region,
697
            account_id=account_id,
698
            description=description,
699
            revision_id=revision_id,
700
            code_sha256=code_sha256,
701
            publish_to=publish_to,
702
            is_active=is_active,
703
        )
704
        if not changed:
1✔
705
            return new_version
1✔
706

707
        if new_version.config.CapacityProviderConfig:
1✔
708
            self.lambda_service.publish_version_async(new_version)
×
709
        else:
710
            self.lambda_service.publish_version(new_version)
1✔
711
        state = lambda_stores[account_id][region]
1✔
712
        function = state.functions.get(function_name)
1✔
713

714
        # Update revision id for $LATEST version
715
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
716
        latest_version = function.versions["$LATEST"]
1✔
717
        function.versions["$LATEST"] = dataclasses.replace(
1✔
718
            latest_version, config=dataclasses.replace(latest_version.config)
719
        )
720
        if new_version.config.CapacityProviderConfig:
1✔
721
            # publish_version happens async for functions with a capacity provider.
722
            # Therefore, we return the new_version with State=Pending or LastUpdateStatus=InProgress ($LATEST.PUBLISHED)
723
            return new_version
×
724
        else:
725
            # Regular functions yield an Active state modified during `publish_version` (sync).
726
            # Therefore, we need to get the updated version from the store.
727
            updated_version = function.versions.get(new_version.id.qualifier)
1✔
728
            return updated_version
1✔
729

730
    def _publish_version_with_changes(
1✔
731
        self,
732
        function_name: str,
733
        region: str,
734
        account_id: str,
735
        description: str | None = None,
736
        revision_id: str | None = None,
737
        code_sha256: str | None = None,
738
        publish_to: FunctionVersionLatestPublished | None = None,
739
        is_active: bool = False,
740
    ) -> FunctionVersion:
741
        """
742
        Publish version together with a new latest version (publish on create / update)
743

744
        :param function_name: Function name
745
        :param region: region
746
        :param account_id: account id
747
        :param description: description
748
        :param revision_id: revision id (check if current version matches)
749
        :param code_sha256: code sha (check if current code matches)
750
        :return: new version
751
        """
752
        new_version, changed = self._create_version_model(
1✔
753
            function_name=function_name,
754
            region=region,
755
            account_id=account_id,
756
            description=description,
757
            revision_id=revision_id,
758
            code_sha256=code_sha256,
759
            publish_to=publish_to,
760
            is_active=is_active,
761
        )
762
        if not changed:
1✔
763
            return new_version
×
764
        self.lambda_service.create_function_version(new_version)
1✔
765
        return new_version
1✔
766

767
    @staticmethod
1✔
768
    def _verify_env_variables(env_vars: dict[str, str]):
1✔
769
        dumped_env_vars = json.dumps(env_vars, separators=(",", ":"))
1✔
770
        if (
1✔
771
            len(dumped_env_vars.encode("utf-8"))
772
            > config.LAMBDA_LIMITS_MAX_FUNCTION_ENVVAR_SIZE_BYTES
773
        ):
774
            raise InvalidParameterValueException(
1✔
775
                f"Lambda was unable to configure your environment variables because the environment variables you have provided exceeded the 4KB limit. String measured: {dumped_env_vars}",
776
                Type="User",
777
            )
778

779
    @staticmethod
1✔
780
    def _validate_snapstart(snap_start: SnapStart, runtime: Runtime):
1✔
781
        apply_on = snap_start.get("ApplyOn")
1✔
782
        if apply_on not in [
1✔
783
            SnapStartApplyOn.PublishedVersions,
784
            SnapStartApplyOn.None_,
785
        ]:
786
            raise ValidationException(
1✔
787
                f"1 validation error detected: Value '{apply_on}' at 'snapStart.applyOn' failed to satisfy constraint: Member must satisfy enum value set: [PublishedVersions, None]"
788
            )
789

790
        if runtime not in SNAP_START_SUPPORTED_RUNTIMES:
1✔
791
            raise InvalidParameterValueException(
×
792
                f"{runtime} is not supported for SnapStart enabled functions.", Type="User"
793
            )
794

795
    def _validate_layers(self, new_layers: list[str], region: str, account_id: str):
1✔
796
        if len(new_layers) > LAMBDA_LAYERS_LIMIT_PER_FUNCTION:
1✔
797
            raise InvalidParameterValueException(
1✔
798
                "Cannot reference more than 5 layers.", Type="User"
799
            )
800

801
        visited_layers = {}
1✔
802
        for layer_version_arn in new_layers:
1✔
803
            (
1✔
804
                layer_region,
805
                layer_account_id,
806
                layer_name,
807
                layer_version_str,
808
            ) = api_utils.parse_layer_arn(layer_version_arn)
809
            if layer_version_str is None:
1✔
810
                raise ValidationException(
1✔
811
                    f"1 validation error detected: Value '[{layer_version_arn}]'"
812
                    + " at 'layers' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 2048, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: "
813
                    + "(arn:(aws[a-zA-Z-]*)?:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+), Member must not be null]",
814
                )
815

816
            state = lambda_stores[layer_account_id][layer_region]
1✔
817
            layer = state.layers.get(layer_name)
1✔
818
            layer_version = None
1✔
819
            if layer is not None:
1✔
820
                layer_version = layer.layer_versions.get(layer_version_str)
1✔
821
            if layer_account_id == account_id:
1✔
822
                if region and layer_region != region:
1✔
823
                    raise InvalidParameterValueException(
1✔
824
                        f"Layers are not in the same region as the function. "
825
                        f"Layers are expected to be in region {region}.",
826
                        Type="User",
827
                    )
828
                if layer is None or layer.layer_versions.get(layer_version_str) is None:
1✔
829
                    raise InvalidParameterValueException(
1✔
830
                        f"Layer version {layer_version_arn} does not exist.", Type="User"
831
                    )
832
            else:  # External layer from other account
833
                # TODO: validate IAM layer policy here, allowing access by default for now and only checking region
834
                if region and layer_region != region:
×
835
                    # TODO: detect user or role from context when IAM users are implemented
836
                    user = "user/localstack-testing"
×
837
                    raise AccessDeniedException(
×
838
                        f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
839
                    )
840
                if layer is None or layer_version is None:
×
841
                    # Limitation: cannot fetch external layers when using the same account id as the target layer
842
                    # because we do not want to trigger the layer fetcher for every non-existing layer.
843
                    if self.layer_fetcher is None:
×
844
                        raise NotImplementedError(
845
                            "Fetching shared layers from AWS is a pro feature."
846
                        )
847

848
                    layer = self.layer_fetcher.fetch_layer(layer_version_arn)
×
849
                    if layer is None:
×
850
                        # TODO: detect user or role from context when IAM users are implemented
851
                        user = "user/localstack-testing"
×
852
                        raise AccessDeniedException(
×
853
                            f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
854
                        )
855

856
                    # Distinguish between new layer and new layer version
857
                    if layer_version is None:
×
858
                        # Create whole layer from scratch
859
                        state.layers[layer_name] = layer
×
860
                    else:
861
                        # Create layer version if another version of the same layer already exists
862
                        state.layers[layer_name].layer_versions[layer_version_str] = (
×
863
                            layer.layer_versions.get(layer_version_str)
864
                        )
865

866
            # only the first two matches in the array are considered for the error message
867
            layer_arn = ":".join(layer_version_arn.split(":")[:-1])
1✔
868
            if layer_arn in visited_layers:
1✔
869
                conflict_layer_version_arn = visited_layers[layer_arn]
1✔
870
                raise InvalidParameterValueException(
1✔
871
                    f"Two different versions of the same layer are not allowed to be referenced in the same function. {conflict_layer_version_arn} and {layer_version_arn} are versions of the same layer.",
872
                    Type="User",
873
                )
874
            visited_layers[layer_arn] = layer_version_arn
1✔
875

876
    def _validate_capacity_provider_config(
1✔
877
        self, capacity_provider_config: CapacityProviderConfig, context: RequestContext
878
    ):
879
        if not capacity_provider_config.get("LambdaManagedInstancesCapacityProviderConfig"):
×
880
            raise ValidationException(
×
881
                "1 validation error detected: Value null at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig' failed to satisfy constraint: Member must not be null"
882
            )
883

884
        capacity_provider_arn = capacity_provider_config.get(
×
885
            "LambdaManagedInstancesCapacityProviderConfig", {}
886
        ).get("CapacityProviderArn")
887
        if not capacity_provider_arn:
×
888
            raise ValidationException(
×
889
                "1 validation error detected: Value null at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig.capacityProviderArn' failed to satisfy constraint: Member must not be null"
890
            )
891

892
        if not re.match(CAPACITY_PROVIDER_ARN_NAME, capacity_provider_arn):
×
893
            raise ValidationException(
×
894
                f"1 validation error detected: Value '{capacity_provider_arn}' at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig.capacityProviderArn' failed to satisfy constraint: Member must satisfy regular expression pattern: {CAPACITY_PROVIDER_ARN_NAME}"
895
            )
896

897
        capacity_provider_name = capacity_provider_arn.split(":")[-1]
×
898
        self.get_capacity_provider(context, capacity_provider_name)
×
899

900
    @staticmethod
1✔
901
    def map_layers(new_layers: list[str]) -> list[LayerVersion]:
1✔
902
        layers = []
1✔
903
        for layer_version_arn in new_layers:
1✔
904
            region_name, account_id, layer_name, layer_version = api_utils.parse_layer_arn(
1✔
905
                layer_version_arn
906
            )
907
            layer = lambda_stores[account_id][region_name].layers.get(layer_name)
1✔
908
            layer_version = layer.layer_versions.get(layer_version)
1✔
909
            layers.append(layer_version)
1✔
910
        return layers
1✔
911

912
    def get_function_recursion_config(
1✔
913
        self,
914
        context: RequestContext,
915
        function_name: UnqualifiedFunctionName,
916
        **kwargs,
917
    ) -> GetFunctionRecursionConfigResponse:
918
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
919
        function_name = api_utils.get_function_name(function_name, context)
1✔
920
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
921
        return GetFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
922

923
    def put_function_recursion_config(
1✔
924
        self,
925
        context: RequestContext,
926
        function_name: UnqualifiedFunctionName,
927
        recursive_loop: RecursiveLoop,
928
        **kwargs,
929
    ) -> PutFunctionRecursionConfigResponse:
930
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
931
        function_name = api_utils.get_function_name(function_name, context)
1✔
932

933
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
934

935
        allowed_values = list(RecursiveLoop.__members__.values())
1✔
936
        if recursive_loop not in allowed_values:
1✔
937
            raise ValidationException(
1✔
938
                f"1 validation error detected: Value '{recursive_loop}' at 'recursiveLoop' failed to satisfy constraint: "
939
                f"Member must satisfy enum value set: [Terminate, Allow]"
940
            )
941

942
        fn.recursive_loop = recursive_loop
1✔
943
        return PutFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
944

945
    @handler(operation="CreateFunction", expand=False)
1✔
946
    def create_function(
1✔
947
        self,
948
        context: RequestContext,
949
        request: CreateFunctionRequest,
950
    ) -> FunctionConfiguration:
951
        context_region = context.region
1✔
952
        context_account_id = context.account_id
1✔
953

954
        zip_file = request.get("Code", {}).get("ZipFile")
1✔
955
        if zip_file and len(zip_file) > config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED:
1✔
956
            raise RequestEntityTooLargeException(
1✔
957
                f"Zipped size must be smaller than {config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED} bytes"
958
            )
959

960
        if context.request.content_length > config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE:
1✔
961
            raise RequestEntityTooLargeException(
1✔
962
                f"Request must be smaller than {config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE} bytes for the CreateFunction operation"
963
            )
964

965
        if architectures := request.get("Architectures"):
1✔
966
            if len(architectures) != 1:
1✔
967
                raise ValidationException(
1✔
968
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
969
                    f"satisfy constraint: Member must have length less than or equal to 1",
970
                )
971
            if architectures[0] not in ARCHITECTURES:
1✔
972
                raise ValidationException(
1✔
973
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
974
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
975
                    f"[x86_64, arm64], Member must not be null]",
976
                )
977

978
        if env_vars := request.get("Environment", {}).get("Variables"):
1✔
979
            self._verify_env_variables(env_vars)
1✔
980

981
        if layers := request.get("Layers", []):
1✔
982
            self._validate_layers(layers, region=context_region, account_id=context_account_id)
1✔
983

984
        if not api_utils.is_role_arn(request.get("Role")):
1✔
985
            raise ValidationException(
1✔
986
                f"1 validation error detected: Value '{request.get('Role')}'"
987
                + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
988
            )
989
        if not self.lambda_service.can_assume_role(request.get("Role"), context.region):
1✔
990
            raise InvalidParameterValueException(
×
991
                "The role defined for the function cannot be assumed by Lambda.", Type="User"
992
            )
993
        package_type = request.get("PackageType", PackageType.Zip)
1✔
994
        runtime = request.get("Runtime")
1✔
995
        self._validate_runtime(package_type, runtime)
1✔
996

997
        request_function_name = request.get("FunctionName")
1✔
998

999
        function_name, *_ = api_utils.get_name_and_qualifier(
1✔
1000
            function_arn_or_name=request_function_name,
1001
            qualifier=None,
1002
            context=context,
1003
        )
1004

1005
        if runtime in DEPRECATED_RUNTIMES:
1✔
1006
            LOG.warning(
1✔
1007
                "The Lambda runtime %s} is deprecated. "
1008
                "Please upgrade the runtime for the function %s: "
1009
                "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
1010
                runtime,
1011
                function_name,
1012
            )
1013
        if snap_start := request.get("SnapStart"):
1✔
1014
            self._validate_snapstart(snap_start, runtime)
1✔
1015
        if publish_to := request.get("PublishTo"):
1✔
1016
            self._validate_publish_to(publish_to)
×
1017
        state = lambda_stores[context_account_id][context_region]
1✔
1018

1019
        with self.create_fn_lock:
1✔
1020
            if function_name in state.functions:
1✔
1021
                raise ResourceConflictException(f"Function already exist: {function_name}")
×
1022
            fn = Function(function_name=function_name)
1✔
1023
            arn = VersionIdentifier(
1✔
1024
                function_name=function_name,
1025
                qualifier="$LATEST",
1026
                region=context_region,
1027
                account=context_account_id,
1028
            )
1029
            # save function code to s3
1030
            code = None
1✔
1031
            image = None
1✔
1032
            image_config = None
1✔
1033
            runtime_version_config = RuntimeVersionConfig(
1✔
1034
                # Limitation: the runtime id (presumably sha256 of image) is currently hardcoded
1035
                # Potential implementation: provide (cached) sha256 hash of used Docker image
1036
                RuntimeVersionArn=f"arn:{context.partition}:lambda:{context_region}::runtime:8eeff65f6809a3ce81507fe733fe09b835899b99481ba22fd75b5a7338290ec1"
1037
            )
1038
            request_code = request.get("Code")
1✔
1039
            if package_type == PackageType.Zip:
1✔
1040
                # TODO verify if correct combination of code is set
1041
                if zip_file := request_code.get("ZipFile"):
1✔
1042
                    code = store_lambda_archive(
1✔
1043
                        archive_file=zip_file,
1044
                        function_name=function_name,
1045
                        region_name=context_region,
1046
                        account_id=context_account_id,
1047
                    )
1048
                elif s3_bucket := request_code.get("S3Bucket"):
1✔
1049
                    s3_key = request_code["S3Key"]
1✔
1050
                    s3_object_version = request_code.get("S3ObjectVersion")
1✔
1051
                    code = store_s3_bucket_archive(
1✔
1052
                        archive_bucket=s3_bucket,
1053
                        archive_key=s3_key,
1054
                        archive_version=s3_object_version,
1055
                        function_name=function_name,
1056
                        region_name=context_region,
1057
                        account_id=context_account_id,
1058
                    )
1059
                else:
1060
                    raise LambdaServiceException("A ZIP file or S3 bucket is required")
×
1061
            elif package_type == PackageType.Image:
1✔
1062
                image = request_code.get("ImageUri")
1✔
1063
                if not image:
1✔
1064
                    raise LambdaServiceException(
×
1065
                        "An image is required when the package type is set to 'image'"
1066
                    )
1067
                image = create_image_code(image_uri=image)
1✔
1068

1069
                image_config_req = request.get("ImageConfig", {})
1✔
1070
                image_config = ImageConfig(
1✔
1071
                    command=image_config_req.get("Command"),
1072
                    entrypoint=image_config_req.get("EntryPoint"),
1073
                    working_directory=image_config_req.get("WorkingDirectory"),
1074
                )
1075
                # Runtime management controls are not available when providing a custom image
1076
                runtime_version_config = None
1✔
1077

1078
            capacity_provider_config = None
1✔
1079
            memory_size = request.get("MemorySize", LAMBDA_DEFAULT_MEMORY_SIZE)
1✔
1080
            if "CapacityProviderConfig" in request:
1✔
1081
                capacity_provider_config = request["CapacityProviderConfig"]
×
1082
                self._validate_capacity_provider_config(capacity_provider_config, context)
×
1083
                self._validate_managed_instances_runtime(runtime)
×
1084

1085
                default_config = CapacityProviderConfig(
×
1086
                    LambdaManagedInstancesCapacityProviderConfig=LambdaManagedInstancesCapacityProviderConfig(
1087
                        ExecutionEnvironmentMemoryGiBPerVCpu=2.0,
1088
                        PerExecutionEnvironmentMaxConcurrency=16,
1089
                    )
1090
                )
1091
                capacity_provider_config = merge_recursive(default_config, capacity_provider_config)
×
1092
                memory_size = 2048
×
1093
                if request.get("LoggingConfig", {}).get("LogFormat") == LogFormat.Text:
×
1094
                    raise InvalidParameterValueException(
×
1095
                        'LogLevel is not supported when LogFormat is set to "Text". Remove LogLevel from your request or change the LogFormat to "JSON" and try again.',
1096
                        Type="User",
1097
                    )
1098
            if "LoggingConfig" in request:
1✔
1099
                logging_config = request["LoggingConfig"]
1✔
1100
                LOG.warning(
1✔
1101
                    "Advanced Lambda Logging Configuration is currently mocked "
1102
                    "and will not impact the logging behavior. "
1103
                    "Please create a feature request if needed."
1104
                )
1105

1106
                # when switching to JSON, app and system level log is auto set to INFO
1107
                if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
1108
                    logging_config = {
1✔
1109
                        "ApplicationLogLevel": "INFO",
1110
                        "SystemLogLevel": "INFO",
1111
                        "LogGroup": f"/aws/lambda/{function_name}",
1112
                    } | logging_config
1113
                else:
1114
                    logging_config = (
×
1115
                        LoggingConfig(
1116
                            LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
1117
                        )
1118
                        | logging_config
1119
                    )
1120

1121
            elif capacity_provider_config:
1✔
1122
                logging_config = LoggingConfig(
×
1123
                    LogFormat=LogFormat.JSON,
1124
                    LogGroup=f"/aws/lambda/{function_name}",
1125
                    ApplicationLogLevel="INFO",
1126
                    SystemLogLevel="INFO",
1127
                )
1128
            else:
1129
                logging_config = LoggingConfig(
1✔
1130
                    LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
1131
                )
1132
            snap_start = (
1✔
1133
                None
1134
                if capacity_provider_config
1135
                else SnapStartResponse(
1136
                    ApplyOn=request.get("SnapStart", {}).get("ApplyOn", SnapStartApplyOn.None_),
1137
                    OptimizationStatus=SnapStartOptimizationStatus.Off,
1138
                )
1139
            )
1140
            version = FunctionVersion(
1✔
1141
                id=arn,
1142
                config=VersionFunctionConfiguration(
1143
                    last_modified=api_utils.format_lambda_date(datetime.datetime.now()),
1144
                    description=request.get("Description", ""),
1145
                    role=request["Role"],
1146
                    timeout=request.get("Timeout", LAMBDA_DEFAULT_TIMEOUT),
1147
                    runtime=request.get("Runtime"),
1148
                    memory_size=memory_size,
1149
                    handler=request.get("Handler"),
1150
                    package_type=package_type,
1151
                    environment=env_vars,
1152
                    architectures=request.get("Architectures") or [Architecture.x86_64],
1153
                    tracing_config_mode=request.get("TracingConfig", {}).get(
1154
                        "Mode", TracingMode.PassThrough
1155
                    ),
1156
                    image=image,
1157
                    image_config=image_config,
1158
                    code=code,
1159
                    layers=self.map_layers(layers),
1160
                    internal_revision=short_uid(),
1161
                    ephemeral_storage=LambdaEphemeralStorage(
1162
                        size=request.get("EphemeralStorage", {}).get("Size", 512)
1163
                    ),
1164
                    snap_start=snap_start,
1165
                    runtime_version_config=runtime_version_config,
1166
                    dead_letter_arn=request.get("DeadLetterConfig", {}).get("TargetArn"),
1167
                    vpc_config=self._build_vpc_config(
1168
                        context_account_id, context_region, request.get("VpcConfig")
1169
                    ),
1170
                    state=VersionState(
1171
                        state=State.Pending,
1172
                        code=StateReasonCode.Creating,
1173
                        reason="The function is being created.",
1174
                    ),
1175
                    logging_config=logging_config,
1176
                    # TODO: might need something like **optional_kwargs if None
1177
                    #   -> Test with regular GetFunction (i.e., without a capacity provider)
1178
                    CapacityProviderConfig=capacity_provider_config,
1179
                ),
1180
            )
1181
            version_post_response = None
1✔
1182
            if capacity_provider_config:
1✔
1183
                version_post_response = dataclasses.replace(
×
1184
                    version,
1185
                    config=dataclasses.replace(
1186
                        version.config,
1187
                        last_update=UpdateStatus(status=LastUpdateStatus.Successful),
1188
                        state=VersionState(state=State.ActiveNonInvocable),
1189
                    ),
1190
                )
1191
            fn.versions["$LATEST"] = version_post_response or version
1✔
1192
            state.functions[function_name] = fn
1✔
1193
        initialization_type = (
1✔
1194
            FunctionInitializationType.lambda_managed_instances
1195
            if capacity_provider_config
1196
            else FunctionInitializationType.on_demand
1197
        )
1198
        function_counter.labels(
1✔
1199
            operation=FunctionOperation.create,
1200
            runtime=runtime or "n/a",
1201
            status=FunctionStatus.success,
1202
            invocation_type="n/a",
1203
            package_type=package_type,
1204
            initialization_type=initialization_type,
1205
        )
1206
        # TODO: consider potential other side effects of not having a function version for $LATEST
1207
        # Provisioning happens upon publishing for functions using a capacity provider
1208
        if not capacity_provider_config:
1✔
1209
            self.lambda_service.create_function_version(version)
1✔
1210

1211
        if tags := request.get("Tags"):
1✔
1212
            # This will check whether the function exists.
1213
            self._store_tags(arn.unqualified_arn(), tags)
1✔
1214

1215
        if request.get("Publish"):
1✔
1216
            version = self._publish_version_with_changes(
1✔
1217
                function_name=function_name,
1218
                region=context_region,
1219
                account_id=context_account_id,
1220
                publish_to=request.get("PublishTo"),
1221
            )
1222

1223
        if config.LAMBDA_SYNCHRONOUS_CREATE:
1✔
1224
            # block via retrying until "terminal" condition reached before returning
1225
            if not poll_condition(
×
1226
                lambda: get_function_version(
1227
                    function_name, version.id.qualifier, version.id.account, version.id.region
1228
                ).config.state.state
1229
                in [State.Active, State.ActiveNonInvocable, State.Failed],
1230
                timeout=10,
1231
            ):
1232
                LOG.warning(
×
1233
                    "LAMBDA_SYNCHRONOUS_CREATE is active, but waiting for %s reached timeout.",
1234
                    function_name,
1235
                )
1236

1237
        return api_utils.map_config_out(
1✔
1238
            version, return_qualified_arn=False, return_update_status=False
1239
        )
1240

1241
    def _validate_runtime(self, package_type, runtime):
1✔
1242
        runtimes = ALL_RUNTIMES
1✔
1243
        if config.LAMBDA_RUNTIME_VALIDATION:
1✔
1244
            runtimes = list(itertools.chain(RUNTIMES_AGGREGATED.values()))
1✔
1245

1246
        if package_type == PackageType.Zip and runtime not in runtimes:
1✔
1247
            # deprecated runtimes have different error
1248
            if runtime in DEPRECATED_RUNTIMES:
1✔
1249
                HINT_LOG.info(
1✔
1250
                    "Set env variable LAMBDA_RUNTIME_VALIDATION to 0"
1251
                    " in order to allow usage of deprecated runtimes"
1252
                )
1253
                self._check_for_recomended_migration_target(runtime)
1✔
1254

1255
            raise InvalidParameterValueException(
1✔
1256
                f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1257
                Type="User",
1258
            )
1259

1260
    def _validate_managed_instances_runtime(self, runtime):
1✔
1261
        if runtime not in VALID_MANAGED_INSTANCE_RUNTIMES:
×
1262
            raise InvalidParameterValueException(
×
1263
                f"Runtime Enum {runtime} does not support specified feature: Lambda Managed Instances"
1264
            )
1265

1266
    def _check_for_recomended_migration_target(self, deprecated_runtime):
1✔
1267
        # AWS offers recommended runtime for migration for "newly" deprecated runtimes
1268
        # in order to preserve parity with error messages we need the code bellow
1269
        latest_runtime = DEPRECATED_RUNTIMES_UPGRADES.get(deprecated_runtime)
1✔
1270

1271
        if latest_runtime is not None:
1✔
1272
            LOG.debug(
1✔
1273
                "The Lambda runtime %s is deprecated. Please upgrade to a supported Lambda runtime such as %s.",
1274
                deprecated_runtime,
1275
                latest_runtime,
1276
            )
1277
            raise InvalidParameterValueException(
1✔
1278
                f"The runtime parameter of {deprecated_runtime} is no longer supported for creating or updating AWS Lambda functions. We recommend you use a supported runtime while creating or updating functions.",
1279
                Type="User",
1280
            )
1281

1282
    @handler(operation="UpdateFunctionConfiguration", expand=False)
1✔
1283
    def update_function_configuration(
1✔
1284
        self, context: RequestContext, request: UpdateFunctionConfigurationRequest
1285
    ) -> FunctionConfiguration:
1286
        """updates the $LATEST version of the function"""
1287
        function_name = request.get("FunctionName")
1✔
1288

1289
        # in case we got ARN or partial ARN
1290
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1291
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1292
        state = lambda_stores[account_id][region]
1✔
1293

1294
        if function_name not in state.functions:
1✔
1295
            raise ResourceNotFoundException(
×
1296
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1297
                Type="User",
1298
            )
1299
        function = state.functions[function_name]
1✔
1300

1301
        # TODO: lock modification of latest version
1302
        # TODO: notify service for changes relevant to re-provisioning of $LATEST
1303
        latest_version = function.latest()
1✔
1304
        latest_version_config = latest_version.config
1✔
1305

1306
        revision_id = request.get("RevisionId")
1✔
1307
        if revision_id and revision_id != latest_version.config.revision_id:
1✔
1308
            raise PreconditionFailedException(
1✔
1309
                "The Revision Id provided does not match the latest Revision Id. "
1310
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1311
                Type="User",
1312
            )
1313

1314
        replace_kwargs = {}
1✔
1315
        if "EphemeralStorage" in request:
1✔
1316
            replace_kwargs["ephemeral_storage"] = LambdaEphemeralStorage(
×
1317
                request.get("EphemeralStorage", {}).get("Size", 512)
1318
            )  # TODO: do defaults here apply as well?
1319

1320
        if "Role" in request:
1✔
1321
            if not api_utils.is_role_arn(request["Role"]):
1✔
1322
                raise ValidationException(
1✔
1323
                    f"1 validation error detected: Value '{request.get('Role')}'"
1324
                    + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
1325
                )
1326
            replace_kwargs["role"] = request["Role"]
1✔
1327

1328
        if "Description" in request:
1✔
1329
            replace_kwargs["description"] = request["Description"]
1✔
1330

1331
        if "Timeout" in request:
1✔
1332
            replace_kwargs["timeout"] = request["Timeout"]
1✔
1333

1334
        if "MemorySize" in request:
1✔
1335
            replace_kwargs["memory_size"] = request["MemorySize"]
1✔
1336

1337
        if "DeadLetterConfig" in request:
1✔
1338
            replace_kwargs["dead_letter_arn"] = request.get("DeadLetterConfig", {}).get("TargetArn")
1✔
1339

1340
        if vpc_config := request.get("VpcConfig"):
1✔
1341
            replace_kwargs["vpc_config"] = self._build_vpc_config(account_id, region, vpc_config)
1✔
1342

1343
        if "Handler" in request:
1✔
1344
            replace_kwargs["handler"] = request["Handler"]
1✔
1345

1346
        if "Runtime" in request:
1✔
1347
            runtime = request["Runtime"]
1✔
1348

1349
            if runtime not in ALL_RUNTIMES:
1✔
1350
                raise InvalidParameterValueException(
1✔
1351
                    f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1352
                    Type="User",
1353
                )
1354
            if runtime in DEPRECATED_RUNTIMES:
1✔
1355
                LOG.warning(
×
1356
                    "The Lambda runtime %s is deprecated. "
1357
                    "Please upgrade the runtime for the function %s: "
1358
                    "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
1359
                    runtime,
1360
                    function_name,
1361
                )
1362
            replace_kwargs["runtime"] = request["Runtime"]
1✔
1363

1364
        if snap_start := request.get("SnapStart"):
1✔
1365
            runtime = replace_kwargs.get("runtime") or latest_version_config.runtime
1✔
1366
            self._validate_snapstart(snap_start, runtime)
1✔
1367
            replace_kwargs["snap_start"] = SnapStartResponse(
1✔
1368
                ApplyOn=snap_start.get("ApplyOn", SnapStartApplyOn.None_),
1369
                OptimizationStatus=SnapStartOptimizationStatus.Off,
1370
            )
1371

1372
        if "Environment" in request:
1✔
1373
            if env_vars := request.get("Environment", {}).get("Variables", {}):
1✔
1374
                self._verify_env_variables(env_vars)
1✔
1375
            replace_kwargs["environment"] = env_vars
1✔
1376

1377
        if "Layers" in request:
1✔
1378
            new_layers = request["Layers"]
1✔
1379
            if new_layers:
1✔
1380
                self._validate_layers(new_layers, region=region, account_id=account_id)
1✔
1381
            replace_kwargs["layers"] = self.map_layers(new_layers)
1✔
1382

1383
        if "ImageConfig" in request:
1✔
1384
            new_image_config = request["ImageConfig"]
1✔
1385
            replace_kwargs["image_config"] = ImageConfig(
1✔
1386
                command=new_image_config.get("Command"),
1387
                entrypoint=new_image_config.get("EntryPoint"),
1388
                working_directory=new_image_config.get("WorkingDirectory"),
1389
            )
1390

1391
        if "LoggingConfig" in request:
1✔
1392
            logging_config = request["LoggingConfig"]
1✔
1393
            LOG.warning(
1✔
1394
                "Advanced Lambda Logging Configuration is currently mocked "
1395
                "and will not impact the logging behavior. "
1396
                "Please create a feature request if needed."
1397
            )
1398

1399
            # when switching to JSON, app and system level log is auto set to INFO
1400
            if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
1401
                logging_config = {
1✔
1402
                    "ApplicationLogLevel": "INFO",
1403
                    "SystemLogLevel": "INFO",
1404
                } | logging_config
1405

1406
            last_config = latest_version_config.logging_config
1✔
1407

1408
            # add partial update
1409
            new_logging_config = last_config | logging_config
1✔
1410

1411
            # in case we switched from JSON to Text we need to remove LogLevel keys
1412
            if (
1✔
1413
                new_logging_config.get("LogFormat") == LogFormat.Text
1414
                and last_config.get("LogFormat") == LogFormat.JSON
1415
            ):
1416
                new_logging_config.pop("ApplicationLogLevel", None)
1✔
1417
                new_logging_config.pop("SystemLogLevel", None)
1✔
1418

1419
            replace_kwargs["logging_config"] = new_logging_config
1✔
1420

1421
        if "TracingConfig" in request:
1✔
1422
            new_mode = request.get("TracingConfig", {}).get("Mode")
×
1423
            if new_mode:
×
1424
                replace_kwargs["tracing_config_mode"] = new_mode
×
1425

1426
        if "CapacityProviderConfig" in request:
1✔
1427
            capacity_provider_config = request["CapacityProviderConfig"]
×
1428
            self._validate_capacity_provider_config(capacity_provider_config, context)
×
1429

1430
            if latest_version.config.CapacityProviderConfig and not request[
×
1431
                "CapacityProviderConfig"
1432
            ].get("LambdaManagedInstancesCapacityProviderConfig"):
1433
                raise ValidationException(
×
1434
                    "1 validation error detected: Value null at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig' failed to satisfy constraint: Member must not be null"
1435
                )
1436
            if not latest_version.config.CapacityProviderConfig:
×
1437
                raise InvalidParameterValueException(
×
1438
                    "CapacityProviderConfig isn't supported for Lambda Default functions.",
1439
                    Type="User",
1440
                )
1441

1442
        new_latest_version = dataclasses.replace(
1✔
1443
            latest_version,
1444
            config=dataclasses.replace(
1445
                latest_version_config,
1446
                last_modified=api_utils.generate_lambda_date(),
1447
                internal_revision=short_uid(),
1448
                last_update=UpdateStatus(
1449
                    status=LastUpdateStatus.InProgress,
1450
                    code="Creating",
1451
                    reason="The function is being created.",
1452
                ),
1453
                **replace_kwargs,
1454
            ),
1455
        )
1456
        function.versions["$LATEST"] = new_latest_version  # TODO: notify
1✔
1457
        self.lambda_service.update_version(new_version=new_latest_version)
1✔
1458

1459
        return api_utils.map_config_out(new_latest_version)
1✔
1460

1461
    @handler(operation="UpdateFunctionCode", expand=False)
1✔
1462
    def update_function_code(
1✔
1463
        self, context: RequestContext, request: UpdateFunctionCodeRequest
1464
    ) -> FunctionConfiguration:
1465
        """updates the $LATEST version of the function"""
1466
        # only supports normal zip packaging atm
1467
        # if request.get("Publish"):
1468
        #     self.lambda_service.create_function_version()
1469

1470
        function_name = request.get("FunctionName")
1✔
1471
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1472
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1473

1474
        store = lambda_stores[account_id][region]
1✔
1475
        if function_name not in store.functions:
1✔
1476
            raise ResourceNotFoundException(
×
1477
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1478
                Type="User",
1479
            )
1480
        function = store.functions[function_name]
1✔
1481

1482
        revision_id = request.get("RevisionId")
1✔
1483
        if revision_id and revision_id != function.latest().config.revision_id:
1✔
1484
            raise PreconditionFailedException(
1✔
1485
                "The Revision Id provided does not match the latest Revision Id. "
1486
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1487
                Type="User",
1488
            )
1489

1490
        # TODO verify if correct combination of code is set
1491
        image = None
1✔
1492
        if (
1✔
1493
            request.get("ZipFile") or request.get("S3Bucket")
1494
        ) and function.latest().config.package_type == PackageType.Image:
1495
            raise InvalidParameterValueException(
1✔
1496
                "Please provide ImageUri when updating a function with packageType Image.",
1497
                Type="User",
1498
            )
1499
        elif request.get("ImageUri") and function.latest().config.package_type == PackageType.Zip:
1✔
1500
            raise InvalidParameterValueException(
1✔
1501
                "Please don't provide ImageUri when updating a function with packageType Zip.",
1502
                Type="User",
1503
            )
1504

1505
        if publish_to := request.get("PublishTo"):
1✔
1506
            self._validate_publish_to(publish_to)
×
1507

1508
        if zip_file := request.get("ZipFile"):
1✔
1509
            code = store_lambda_archive(
1✔
1510
                archive_file=zip_file,
1511
                function_name=function_name,
1512
                region_name=region,
1513
                account_id=account_id,
1514
            )
1515
        elif s3_bucket := request.get("S3Bucket"):
1✔
1516
            s3_key = request["S3Key"]
1✔
1517
            s3_object_version = request.get("S3ObjectVersion")
1✔
1518
            code = store_s3_bucket_archive(
1✔
1519
                archive_bucket=s3_bucket,
1520
                archive_key=s3_key,
1521
                archive_version=s3_object_version,
1522
                function_name=function_name,
1523
                region_name=region,
1524
                account_id=account_id,
1525
            )
1526
        elif image := request.get("ImageUri"):
1✔
1527
            code = None
1✔
1528
            image = create_image_code(image_uri=image)
1✔
1529
        else:
1530
            raise LambdaServiceException("A ZIP file, S3 bucket, or image is required")
×
1531

1532
        old_function_version = function.versions.get("$LATEST")
1✔
1533
        replace_kwargs = {"code": code} if code else {"image": image}
1✔
1534

1535
        if architectures := request.get("Architectures"):
1✔
1536
            if len(architectures) != 1:
×
1537
                raise ValidationException(
×
1538
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1539
                    f"satisfy constraint: Member must have length less than or equal to 1",
1540
                )
1541
            # An empty list of architectures is also forbidden. Further exceptions are tested here for create_function:
1542
            # tests.aws.services.lambda_.test_lambda_api.TestLambdaFunction.test_create_lambda_exceptions
1543
            if architectures[0] not in ARCHITECTURES:
×
1544
                raise ValidationException(
×
1545
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1546
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
1547
                    f"[x86_64, arm64], Member must not be null]",
1548
                )
1549
            replace_kwargs["architectures"] = architectures
×
1550

1551
        config = dataclasses.replace(
1✔
1552
            old_function_version.config,
1553
            internal_revision=short_uid(),
1554
            last_modified=api_utils.generate_lambda_date(),
1555
            last_update=UpdateStatus(
1556
                status=LastUpdateStatus.InProgress,
1557
                code="Creating",
1558
                reason="The function is being created.",
1559
            ),
1560
            **replace_kwargs,
1561
        )
1562
        function_version = dataclasses.replace(old_function_version, config=config)
1✔
1563
        function.versions["$LATEST"] = function_version
1✔
1564

1565
        self.lambda_service.update_version(new_version=function_version)
1✔
1566
        if request.get("Publish"):
1✔
1567
            function_version = self._publish_version_with_changes(
1✔
1568
                function_name=function_name,
1569
                region=region,
1570
                account_id=account_id,
1571
                publish_to=publish_to,
1572
                is_active=True,
1573
            )
1574
        return api_utils.map_config_out(
1✔
1575
            function_version, return_qualified_arn=bool(request.get("Publish"))
1576
        )
1577

1578
    # TODO: does deleting the latest published version affect the next versions number?
1579
    # TODO: what happens when we call this with a qualifier and a fully qualified ARN? (+ conflicts?)
1580
    # TODO: test different ARN patterns (shorthand ARN?)
1581
    # TODO: test deleting across regions?
1582
    # TODO: test mismatch between context region and region in ARN
1583
    # TODO: test qualifier $LATEST, alias-name and version
1584
    def delete_function(
1✔
1585
        self,
1586
        context: RequestContext,
1587
        function_name: NamespacedFunctionName,
1588
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1589
        **kwargs,
1590
    ) -> DeleteFunctionResponse:
1591
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1592
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1593
            function_name, qualifier, context
1594
        )
1595

1596
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1597
            raise InvalidParameterValueException(
×
1598
                "Deletion of aliases is not currently supported.",
1599
                Type="User",
1600
            )
1601

1602
        store = lambda_stores[account_id][region]
1✔
1603
        if qualifier == "$LATEST":
1✔
1604
            raise InvalidParameterValueException(
1✔
1605
                "$LATEST version cannot be deleted without deleting the function.", Type="User"
1606
            )
1607

1608
        if function_name not in store.functions:
1✔
1609
            e = ResourceNotFoundException(
1✔
1610
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1611
                Type="User",
1612
            )
1613
            raise e
1✔
1614
        function = store.functions.get(function_name)
1✔
1615

1616
        function_has_capacity_provider = False
1✔
1617
        if qualifier:
1✔
1618
            # delete a version of the function
1619
            version = function.versions.pop(qualifier, None)
1✔
1620
            if version:
1✔
1621
                if version.config.CapacityProviderConfig:
1✔
1622
                    function_has_capacity_provider = True
×
1623
                self.lambda_service.stop_version(version.id.qualified_arn())
1✔
1624
                destroy_code_if_not_used(code=version.config.code, function=function)
1✔
1625
        else:
1626
            # delete the whole function
1627
            # TODO: introduce locking for safe deletion: We could create a new version at the API layer before
1628
            #  the old version gets cleaned up in the internal lambda service.
1629
            function = store.functions.pop(function_name)
1✔
1630
            for version in function.versions.values():
1✔
1631
                # Functions with a capacity provider do NOT have a version manager for $LATEST because only
1632
                # published versions are invokable.
1633
                if version.config.CapacityProviderConfig:
1✔
1634
                    function_has_capacity_provider = True
×
1635
                    if version.id.qualifier == "$LATEST":
×
1636
                        pass
×
1637
                else:
1638
                    self.lambda_service.stop_version(qualified_arn=version.id.qualified_arn())
1✔
1639
                # we can safely destroy the code here
1640
                if version.config.code:
1✔
1641
                    version.config.code.destroy()
1✔
1642

1643
        return DeleteFunctionResponse(StatusCode=202 if function_has_capacity_provider else 204)
1✔
1644

1645
    def list_functions(
1✔
1646
        self,
1647
        context: RequestContext,
1648
        master_region: MasterRegion = None,  # (only relevant for lambda@edge)
1649
        function_version: FunctionVersionApi = None,
1650
        marker: String = None,
1651
        max_items: MaxListItems = None,
1652
        **kwargs,
1653
    ) -> ListFunctionsResponse:
1654
        state = lambda_stores[context.account_id][context.region]
1✔
1655

1656
        if function_version and function_version != FunctionVersionApi.ALL:
1✔
1657
            raise ValidationException(
1✔
1658
                f"1 validation error detected: Value '{function_version}'"
1659
                + " at 'functionVersion' failed to satisfy constraint: Member must satisfy enum value set: [ALL]"
1660
            )
1661

1662
        if function_version == FunctionVersionApi.ALL:
1✔
1663
            # include all versions for all function
1664
            versions = [v for f in state.functions.values() for v in f.versions.values()]
1✔
1665
            return_qualified_arn = True
1✔
1666
        else:
1667
            versions = [f.latest() for f in state.functions.values()]
1✔
1668
            return_qualified_arn = False
1✔
1669

1670
        versions = [
1✔
1671
            api_utils.map_to_list_response(
1672
                api_utils.map_config_out(fc, return_qualified_arn=return_qualified_arn)
1673
            )
1674
            for fc in versions
1675
        ]
1676
        versions = PaginatedList(versions)
1✔
1677
        page, token = versions.get_page(
1✔
1678
            lambda version: version["FunctionArn"],
1679
            marker,
1680
            max_items,
1681
        )
1682
        return ListFunctionsResponse(Functions=page, NextMarker=token)
1✔
1683

1684
    def get_function(
1✔
1685
        self,
1686
        context: RequestContext,
1687
        function_name: NamespacedFunctionName,
1688
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1689
        **kwargs,
1690
    ) -> GetFunctionResponse:
1691
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1692
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1693
            function_name, qualifier, context
1694
        )
1695

1696
        fn = lambda_stores[account_id][region].functions.get(function_name)
1✔
1697
        if fn is None:
1✔
1698
            if qualifier is None:
1✔
1699
                raise ResourceNotFoundException(
1✔
1700
                    f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
1701
                    Type="User",
1702
                )
1703
            else:
1704
                raise ResourceNotFoundException(
1✔
1705
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
1706
                    Type="User",
1707
                )
1708
        alias_name = None
1✔
1709
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1710
            if qualifier not in fn.aliases:
1✔
1711
                alias_arn = api_utils.qualified_lambda_arn(
1✔
1712
                    function_name, qualifier, account_id, region
1713
                )
1714
                raise ResourceNotFoundException(f"Function not found: {alias_arn}", Type="User")
1✔
1715
            alias_name = qualifier
1✔
1716
            qualifier = fn.aliases[alias_name].function_version
1✔
1717

1718
        version = get_function_version(
1✔
1719
            function_name=function_name,
1720
            qualifier=qualifier,
1721
            account_id=account_id,
1722
            region=region,
1723
        )
1724
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
1725
        additional_fields = {}
1✔
1726
        if tags:
1✔
1727
            additional_fields["Tags"] = tags
1✔
1728
        code_location = None
1✔
1729
        if code := version.config.code:
1✔
1730
            code_location = FunctionCodeLocation(
1✔
1731
                Location=code.generate_presigned_url(endpoint_url=config.external_service_url()),
1732
                RepositoryType="S3",
1733
            )
1734
        elif image := version.config.image:
1✔
1735
            code_location = FunctionCodeLocation(
1✔
1736
                ImageUri=image.image_uri,
1737
                RepositoryType=image.repository_type,
1738
                ResolvedImageUri=image.resolved_image_uri,
1739
            )
1740
        concurrency = None
1✔
1741
        if fn.reserved_concurrent_executions:
1✔
1742
            concurrency = Concurrency(
1✔
1743
                ReservedConcurrentExecutions=fn.reserved_concurrent_executions
1744
            )
1745

1746
        return GetFunctionResponse(
1✔
1747
            Configuration=api_utils.map_config_out(
1748
                version, return_qualified_arn=bool(qualifier), alias_name=alias_name
1749
            ),
1750
            Code=code_location,  # TODO
1751
            Concurrency=concurrency,
1752
            **additional_fields,
1753
        )
1754

1755
    def get_function_configuration(
1✔
1756
        self,
1757
        context: RequestContext,
1758
        function_name: NamespacedFunctionName,
1759
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1760
        **kwargs,
1761
    ) -> FunctionConfiguration:
1762
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1763
        # CAVE: THIS RETURN VALUE IS *NOT* THE SAME AS IN get_function (!) but seems to be only configuration part?
1764
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1765
            function_name, qualifier, context
1766
        )
1767
        version = get_function_version(
1✔
1768
            function_name=function_name,
1769
            qualifier=qualifier,
1770
            account_id=account_id,
1771
            region=region,
1772
        )
1773
        return api_utils.map_config_out(version, return_qualified_arn=bool(qualifier))
1✔
1774

1775
    def invoke(
1✔
1776
        self,
1777
        context: RequestContext,
1778
        function_name: NamespacedFunctionName,
1779
        invocation_type: InvocationType | None = None,
1780
        log_type: LogType | None = None,
1781
        client_context: String | None = None,
1782
        durable_execution_name: DurableExecutionName | None = None,
1783
        payload: IO[Blob] | None = None,
1784
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1785
        tenant_id: TenantId | None = None,
1786
        **kwargs,
1787
    ) -> InvocationResponse:
1788
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1789
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1790
            function_name, qualifier, context
1791
        )
1792

1793
        user_agent = context.request.user_agent.string
1✔
1794

1795
        time_before = time.perf_counter()
1✔
1796
        try:
1✔
1797
            invocation_result = self.lambda_service.invoke(
1✔
1798
                function_name=function_name,
1799
                qualifier=qualifier,
1800
                region=region,
1801
                account_id=account_id,
1802
                invocation_type=invocation_type,
1803
                client_context=client_context,
1804
                request_id=context.request_id,
1805
                trace_context=context.trace_context,
1806
                payload=payload.read() if payload else None,
1807
                user_agent=user_agent,
1808
            )
1809
        except ServiceException:
1✔
1810
            raise
1✔
1811
        except EnvironmentStartupTimeoutException as e:
1✔
1812
            raise LambdaServiceException(
1✔
1813
                f"[{context.request_id}] Timeout while starting up lambda environment for function {function_name}:{qualifier}"
1814
            ) from e
1815
        except Exception as e:
1✔
1816
            LOG.error(
1✔
1817
                "[%s] Error while invoking lambda %s",
1818
                context.request_id,
1819
                function_name,
1820
                exc_info=LOG.isEnabledFor(logging.DEBUG),
1821
            )
1822
            raise LambdaServiceException(
1✔
1823
                f"[{context.request_id}] Internal error while executing lambda {function_name}:{qualifier}. Caused by {type(e).__name__}: {e}"
1824
            ) from e
1825

1826
        if invocation_type == InvocationType.Event:
1✔
1827
            # This happens when invocation type is event
1828
            return InvocationResponse(StatusCode=202)
1✔
1829
        if invocation_type == InvocationType.DryRun:
1✔
1830
            # This happens when invocation type is dryrun
1831
            return InvocationResponse(StatusCode=204)
1✔
1832
        LOG.debug("Lambda invocation duration: %0.2fms", (time.perf_counter() - time_before) * 1000)
1✔
1833

1834
        response = InvocationResponse(
1✔
1835
            StatusCode=200,
1836
            Payload=invocation_result.payload,
1837
            ExecutedVersion=invocation_result.executed_version,
1838
        )
1839

1840
        if invocation_result.is_error:
1✔
1841
            response["FunctionError"] = "Unhandled"
1✔
1842

1843
        if log_type == LogType.Tail:
1✔
1844
            response["LogResult"] = to_str(
1✔
1845
                base64.b64encode(to_bytes(invocation_result.logs)[-4096:])
1846
            )
1847

1848
        return response
1✔
1849

1850
    # Version operations
1851
    def publish_version(
1✔
1852
        self,
1853
        context: RequestContext,
1854
        function_name: FunctionName,
1855
        code_sha256: String | None = None,
1856
        description: Description | None = None,
1857
        revision_id: String | None = None,
1858
        publish_to: FunctionVersionLatestPublished | None = None,
1859
        **kwargs,
1860
    ) -> FunctionConfiguration:
1861
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1862
        function_name = api_utils.get_function_name(function_name, context)
1✔
1863
        if publish_to:
1✔
1864
            self._validate_publish_to(publish_to)
×
1865
        new_version = self._publish_version_from_existing_version(
1✔
1866
            function_name=function_name,
1867
            description=description,
1868
            account_id=account_id,
1869
            region=region,
1870
            revision_id=revision_id,
1871
            code_sha256=code_sha256,
1872
            publish_to=publish_to,
1873
        )
1874
        return api_utils.map_config_out(new_version, return_qualified_arn=True)
1✔
1875

1876
    def list_versions_by_function(
1✔
1877
        self,
1878
        context: RequestContext,
1879
        function_name: NamespacedFunctionName,
1880
        marker: String = None,
1881
        max_items: MaxListItems = None,
1882
        **kwargs,
1883
    ) -> ListVersionsByFunctionResponse:
1884
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1885
        function_name = api_utils.get_function_name(function_name, context)
1✔
1886
        function = self._get_function(
1✔
1887
            function_name=function_name, region=region, account_id=account_id
1888
        )
1889
        versions = [
1✔
1890
            api_utils.map_to_list_response(
1891
                api_utils.map_config_out(version=version, return_qualified_arn=True)
1892
            )
1893
            for version in function.versions.values()
1894
        ]
1895
        items = PaginatedList(versions)
1✔
1896
        page, token = items.get_page(
1✔
1897
            lambda item: item,
1898
            marker,
1899
            max_items,
1900
        )
1901
        return ListVersionsByFunctionResponse(Versions=page, NextMarker=token)
1✔
1902

1903
    # Alias
1904

1905
    def _create_routing_config_model(
1✔
1906
        self, routing_config_dict: dict[str, float], function_version: FunctionVersion
1907
    ):
1908
        if len(routing_config_dict) > 1:
1✔
1909
            raise InvalidParameterValueException(
1✔
1910
                "Number of items in AdditionalVersionWeights cannot be greater than 1",
1911
                Type="User",
1912
            )
1913
        # should be exactly one item here, still iterating, might be supported in the future
1914
        for key, value in routing_config_dict.items():
1✔
1915
            if value < 0.0 or value >= 1.0:
1✔
1916
                raise ValidationException(
1✔
1917
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map value must satisfy constraint: [Member must have value less than or equal to 1.0, Member must have value greater than or equal to 0.0, Member must not be null]"
1918
                )
1919
            if key == function_version.id.qualifier:
1✔
1920
                raise InvalidParameterValueException(
1✔
1921
                    f"Invalid function version {function_version.id.qualifier}. Function version {function_version.id.qualifier} is already included in routing configuration.",
1922
                    Type="User",
1923
                )
1924
            # check if version target is latest, then no routing config is allowed
1925
            if function_version.id.qualifier == "$LATEST":
1✔
1926
                raise InvalidParameterValueException(
1✔
1927
                    "$LATEST is not supported for an alias pointing to more than 1 version"
1928
                )
1929
            if not api_utils.qualifier_is_version(key):
1✔
1930
                raise ValidationException(
1✔
1931
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map keys must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: [0-9]+]"
1932
                )
1933

1934
            # checking if the version in the config exists
1935
            get_function_version(
1✔
1936
                function_name=function_version.id.function_name,
1937
                qualifier=key,
1938
                region=function_version.id.region,
1939
                account_id=function_version.id.account,
1940
            )
1941
        return AliasRoutingConfig(version_weights=routing_config_dict)
1✔
1942

1943
    def create_alias(
1✔
1944
        self,
1945
        context: RequestContext,
1946
        function_name: FunctionName,
1947
        name: Alias,
1948
        function_version: VersionWithLatestPublished,
1949
        description: Description = None,
1950
        routing_config: AliasRoutingConfiguration = None,
1951
        **kwargs,
1952
    ) -> AliasConfiguration:
1953
        if not api_utils.qualifier_is_alias(name):
1✔
1954
            raise ValidationException(
1✔
1955
                f"1 validation error detected: Value '{name}' at 'name' failed to satisfy constraint: Member must satisfy regular expression pattern: (?!^[0-9]+$)([a-zA-Z0-9-_]+)"
1956
            )
1957

1958
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1959
        function_name = api_utils.get_function_name(function_name, context)
1✔
1960
        target_version = get_function_version(
1✔
1961
            function_name=function_name,
1962
            qualifier=function_version,
1963
            region=region,
1964
            account_id=account_id,
1965
        )
1966
        function = self._get_function(
1✔
1967
            function_name=function_name, region=region, account_id=account_id
1968
        )
1969
        # description is always present, if not specified it's an empty string
1970
        description = description or ""
1✔
1971
        with function.lock:
1✔
1972
            if existing_alias := function.aliases.get(name):
1✔
1973
                raise ResourceConflictException(
1✔
1974
                    f"Alias already exists: {api_utils.map_alias_out(alias=existing_alias, function=function)['AliasArn']}",
1975
                    Type="User",
1976
                )
1977
            # checking if the version exists
1978
            routing_configuration = None
1✔
1979
            if routing_config and (
1✔
1980
                routing_config_dict := routing_config.get("AdditionalVersionWeights")
1981
            ):
1982
                routing_configuration = self._create_routing_config_model(
1✔
1983
                    routing_config_dict, target_version
1984
                )
1985

1986
            alias = VersionAlias(
1✔
1987
                name=name,
1988
                function_version=function_version,
1989
                description=description,
1990
                routing_configuration=routing_configuration,
1991
            )
1992
            function.aliases[name] = alias
1✔
1993
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1994

1995
    def list_aliases(
1✔
1996
        self,
1997
        context: RequestContext,
1998
        function_name: FunctionName,
1999
        function_version: VersionWithLatestPublished = None,
2000
        marker: String = None,
2001
        max_items: MaxListItems = None,
2002
        **kwargs,
2003
    ) -> ListAliasesResponse:
2004
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2005
        function_name = api_utils.get_function_name(function_name, context)
1✔
2006
        function = self._get_function(
1✔
2007
            function_name=function_name, region=region, account_id=account_id
2008
        )
2009
        aliases = [
1✔
2010
            api_utils.map_alias_out(alias, function)
2011
            for alias in function.aliases.values()
2012
            if function_version is None or alias.function_version == function_version
2013
        ]
2014

2015
        aliases = PaginatedList(aliases)
1✔
2016
        page, token = aliases.get_page(
1✔
2017
            lambda alias: alias["AliasArn"],
2018
            marker,
2019
            max_items,
2020
        )
2021

2022
        return ListAliasesResponse(Aliases=page, NextMarker=token)
1✔
2023

2024
    def delete_alias(
1✔
2025
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
2026
    ) -> None:
2027
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2028
        function_name = api_utils.get_function_name(function_name, context)
1✔
2029
        function = self._get_function(
1✔
2030
            function_name=function_name, region=region, account_id=account_id
2031
        )
2032
        version_alias = function.aliases.pop(name, None)
1✔
2033

2034
        # cleanup related resources
2035
        if name in function.provisioned_concurrency_configs:
1✔
2036
            function.provisioned_concurrency_configs.pop(name)
1✔
2037

2038
        # TODO: Allow for deactivating/unregistering specific Lambda URLs
2039
        if version_alias and name in function.function_url_configs:
1✔
2040
            url_config = function.function_url_configs.pop(name)
1✔
2041
            LOG.debug(
1✔
2042
                "Stopping aliased Lambda Function URL %s for %s",
2043
                url_config.url,
2044
                url_config.function_name,
2045
            )
2046

2047
    def get_alias(
1✔
2048
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
2049
    ) -> AliasConfiguration:
2050
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2051
        function_name = api_utils.get_function_name(function_name, context)
1✔
2052
        function = self._get_function(
1✔
2053
            function_name=function_name, region=region, account_id=account_id
2054
        )
2055
        if not (alias := function.aliases.get(name)):
1✔
2056
            raise ResourceNotFoundException(
1✔
2057
                f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name=function_name, qualifier=name, region=region, account=account_id)}",
2058
                Type="User",
2059
            )
2060
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
2061

2062
    def update_alias(
1✔
2063
        self,
2064
        context: RequestContext,
2065
        function_name: FunctionName,
2066
        name: Alias,
2067
        function_version: VersionWithLatestPublished = None,
2068
        description: Description = None,
2069
        routing_config: AliasRoutingConfiguration = None,
2070
        revision_id: String = None,
2071
        **kwargs,
2072
    ) -> AliasConfiguration:
2073
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2074
        function_name = api_utils.get_function_name(function_name, context)
1✔
2075
        function = self._get_function(
1✔
2076
            function_name=function_name, region=region, account_id=account_id
2077
        )
2078
        if not (alias := function.aliases.get(name)):
1✔
2079
            fn_arn = api_utils.qualified_lambda_arn(function_name, name, account_id, region)
1✔
2080
            raise ResourceNotFoundException(
1✔
2081
                f"Alias not found: {fn_arn}",
2082
                Type="User",
2083
            )
2084
        if revision_id and alias.revision_id != revision_id:
1✔
2085
            raise PreconditionFailedException(
1✔
2086
                "The Revision Id provided does not match the latest Revision Id. "
2087
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2088
                Type="User",
2089
            )
2090
        changes = {}
1✔
2091
        if function_version is not None:
1✔
2092
            changes |= {"function_version": function_version}
1✔
2093
        if description is not None:
1✔
2094
            changes |= {"description": description}
1✔
2095
        if routing_config is not None:
1✔
2096
            # if it is an empty dict or AdditionalVersionWeights is empty, set routing config to None
2097
            new_routing_config = None
1✔
2098
            if routing_config_dict := routing_config.get("AdditionalVersionWeights"):
1✔
2099
                new_routing_config = self._create_routing_config_model(routing_config_dict)
×
2100
            changes |= {"routing_configuration": new_routing_config}
1✔
2101
        # even if no changes are done, we have to update revision id for some reason
2102
        old_alias = alias
1✔
2103
        alias = dataclasses.replace(alias, **changes)
1✔
2104
        function.aliases[name] = alias
1✔
2105

2106
        # TODO: signal lambda service that pointer potentially changed
2107
        self.lambda_service.update_alias(old_alias=old_alias, new_alias=alias, function=function)
1✔
2108

2109
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
2110

2111
    # =======================================
2112
    # ======= EVENT SOURCE MAPPINGS =========
2113
    # =======================================
2114
    def check_service_resource_exists(
1✔
2115
        self, service: str, resource_arn: str, function_arn: str, function_role_arn: str
2116
    ):
2117
        """
2118
        Check if the service resource exists and if the function has access to it.
2119

2120
        Raises:
2121
            InvalidParameterValueException: If the service resource does not exist or the function does not have access to it.
2122
        """
2123
        arn = parse_arn(resource_arn)
1✔
2124
        source_client = get_internal_client(
1✔
2125
            arn=resource_arn,
2126
            role_arn=function_role_arn,
2127
            service_principal=ServicePrincipal.lambda_,
2128
            source_arn=function_arn,
2129
        )
2130
        if service in ["sqs", "sqs-fifo"]:
1✔
2131
            try:
1✔
2132
                # AWS uses `GetQueueAttributes` internally to verify the queue existence, but we need the `QueueUrl`
2133
                # which is not given directly. We build out a dummy `QueueUrl` which can be parsed by SQS to return
2134
                # the right value
2135
                queue_name = arn["resource"].split("/")[-1]
1✔
2136
                queue_url = f"http://sqs.{arn['region']}.domain/{arn['account']}/{queue_name}"
1✔
2137
                source_client.get_queue_attributes(QueueUrl=queue_url)
1✔
2138
            except ClientError as e:
1✔
2139
                error_code = e.response["Error"]["Code"]
1✔
2140
                if error_code == "AWS.SimpleQueueService.NonExistentQueue":
1✔
2141
                    raise InvalidParameterValueException(
1✔
2142
                        f"Error occurred while ReceiveMessage. SQS Error Code: {error_code}. SQS Error Message: {e.response['Error']['Message']}",
2143
                        Type="User",
2144
                    )
2145
                raise e
×
2146
        elif service in ["kinesis"]:
1✔
2147
            try:
1✔
2148
                source_client.describe_stream(StreamARN=resource_arn)
1✔
2149
            except ClientError as e:
1✔
2150
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
2151
                    raise InvalidParameterValueException(
1✔
2152
                        f"Stream not found: {resource_arn}",
2153
                        Type="User",
2154
                    )
2155
                raise e
×
2156
        elif service in ["dynamodb"]:
1✔
2157
            try:
1✔
2158
                source_client.describe_stream(StreamArn=resource_arn)
1✔
2159
            except ClientError as e:
1✔
2160
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
2161
                    raise InvalidParameterValueException(
1✔
2162
                        f"Stream not found: {resource_arn}",
2163
                        Type="User",
2164
                    )
2165
                raise e
×
2166

2167
    @handler("CreateEventSourceMapping", expand=False)
1✔
2168
    def create_event_source_mapping(
1✔
2169
        self,
2170
        context: RequestContext,
2171
        request: CreateEventSourceMappingRequest,
2172
    ) -> EventSourceMappingConfiguration:
2173
        return self.create_event_source_mapping_v2(context, request)
1✔
2174

2175
    def create_event_source_mapping_v2(
1✔
2176
        self,
2177
        context: RequestContext,
2178
        request: CreateEventSourceMappingRequest,
2179
    ) -> EventSourceMappingConfiguration:
2180
        # Validations
2181
        function_arn, function_name, state, function_version, function_role = (
1✔
2182
            self.validate_event_source_mapping(context, request)
2183
        )
2184

2185
        esm_config = EsmConfigFactory(request, context, function_arn).get_esm_config()
1✔
2186

2187
        # Copy esm_config to avoid a race condition with potential async update in the store
2188
        state.event_source_mappings[esm_config["UUID"]] = esm_config.copy()
1✔
2189
        enabled = request.get("Enabled", True)
1✔
2190
        # TODO: check for potential async race condition update -> think about locking
2191
        esm_worker = EsmWorkerFactory(esm_config, function_role, enabled).get_esm_worker()
1✔
2192
        self.esm_workers[esm_worker.uuid] = esm_worker
1✔
2193
        # TODO: check StateTransitionReason, LastModified, LastProcessingResult (concurrent updates requires locking!)
2194
        if tags := request.get("Tags"):
1✔
2195
            self._store_tags(esm_config.get("EventSourceMappingArn"), tags)
1✔
2196
        esm_worker.create()
1✔
2197
        return esm_config
1✔
2198

2199
    def validate_event_source_mapping(self, context, request):
1✔
2200
        # TODO: test whether stream ARNs are valid sources for Pipes or ESM or whether only DynamoDB table ARNs work
2201
        # TODO: Validate MaxRecordAgeInSeconds (i.e cannot subceed 60s but can be -1) and MaxRetryAttempts parameters.
2202
        # See https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumrecordageinseconds
2203
        is_create_esm_request = context.operation.name == self.create_event_source_mapping.operation
1✔
2204

2205
        if destination_config := request.get("DestinationConfig"):
1✔
2206
            if "OnSuccess" in destination_config:
1✔
2207
                raise InvalidParameterValueException(
1✔
2208
                    "Unsupported DestinationConfig parameter for given event source mapping type.",
2209
                    Type="User",
2210
                )
2211

2212
        service = None
1✔
2213
        if "SelfManagedEventSource" in request:
1✔
2214
            service = "kafka"
×
2215
            if "SourceAccessConfigurations" not in request:
×
2216
                raise InvalidParameterValueException(
×
2217
                    "Required 'sourceAccessConfigurations' parameter is missing.", Type="User"
2218
                )
2219
        if service is None and "EventSourceArn" not in request:
1✔
2220
            raise InvalidParameterValueException("Unrecognized event source.", Type="User")
1✔
2221
        if service is None:
1✔
2222
            service = extract_service_from_arn(request["EventSourceArn"])
1✔
2223

2224
        batch_size = api_utils.validate_and_set_batch_size(service, request.get("BatchSize"))
1✔
2225
        if service in ["dynamodb", "kinesis"]:
1✔
2226
            starting_position = request.get("StartingPosition")
1✔
2227
            if not starting_position:
1✔
2228
                raise InvalidParameterValueException(
1✔
2229
                    "1 validation error detected: Value null at 'startingPosition' failed to satisfy constraint: Member must not be null.",
2230
                    Type="User",
2231
                )
2232

2233
            if starting_position not in KinesisStreamStartPosition.__members__:
1✔
2234
                raise ValidationException(
1✔
2235
                    f"1 validation error detected: Value '{starting_position}' at 'startingPosition' failed to satisfy constraint: Member must satisfy enum value set: [LATEST, AT_TIMESTAMP, TRIM_HORIZON]"
2236
                )
2237
            # AT_TIMESTAMP is not allowed for DynamoDB Streams
2238
            elif (
1✔
2239
                service == "dynamodb"
2240
                and starting_position not in DynamoDBStreamStartPosition.__members__
2241
            ):
2242
                raise InvalidParameterValueException(
1✔
2243
                    f"Unsupported starting position for arn type: {request['EventSourceArn']}",
2244
                    Type="User",
2245
                )
2246

2247
        if service in ["sqs", "sqs-fifo"]:
1✔
2248
            if batch_size > 10 and request.get("MaximumBatchingWindowInSeconds", 0) == 0:
1✔
2249
                raise InvalidParameterValueException(
1✔
2250
                    "Maximum batch window in seconds must be greater than 0 if maximum batch size is greater than 10",
2251
                    Type="User",
2252
                )
2253

2254
        if (filter_criteria := request.get("FilterCriteria")) is not None:
1✔
2255
            for filter_ in filter_criteria.get("Filters", []):
1✔
2256
                pattern_str = filter_.get("Pattern")
1✔
2257
                if not pattern_str or not isinstance(pattern_str, str):
1✔
2258
                    raise InvalidParameterValueException(
×
2259
                        "Invalid filter pattern definition.", Type="User"
2260
                    )
2261

2262
                if not validate_event_pattern(pattern_str):
1✔
2263
                    raise InvalidParameterValueException(
1✔
2264
                        "Invalid filter pattern definition.", Type="User"
2265
                    )
2266

2267
        # Can either have a FunctionName (i.e CreateEventSourceMapping request) or
2268
        # an internal EventSourceMappingConfiguration representation
2269
        request_function_name = request.get("FunctionName") or request.get("FunctionArn")
1✔
2270
        # can be either a partial arn or a full arn for the version/alias
2271
        function_name, qualifier, account, region = function_locators_from_arn(
1✔
2272
            request_function_name
2273
        )
2274
        # TODO: validate `context.region` vs. `region(request["FunctionName"])` vs. `region(request["EventSourceArn"])`
2275
        account = account or context.account_id
1✔
2276
        region = region or context.region
1✔
2277
        state = lambda_stores[account][region]
1✔
2278
        fn = state.functions.get(function_name)
1✔
2279
        if not fn:
1✔
2280
            raise InvalidParameterValueException("Function does not exist", Type="User")
1✔
2281

2282
        if qualifier:
1✔
2283
            # make sure the function version/alias exists
2284
            if api_utils.qualifier_is_alias(qualifier):
1✔
2285
                fn_alias = fn.aliases.get(qualifier)
1✔
2286
                if not fn_alias:
1✔
2287
                    raise Exception("unknown alias")  # TODO: cover via test
×
2288
            elif api_utils.qualifier_is_version(qualifier):
1✔
2289
                fn_version = fn.versions.get(qualifier)
1✔
2290
                if not fn_version:
1✔
2291
                    raise Exception("unknown version")  # TODO: cover via test
×
2292
            elif qualifier == "$LATEST":
1✔
2293
                pass
1✔
2294
            elif qualifier == "$LATEST.PUBLISHED":
×
2295
                if fn.versions.get(qualifier):
×
2296
                    pass
×
2297
            else:
2298
                raise Exception("invalid functionname")  # TODO: cover via test
×
2299
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account, region)
1✔
2300

2301
        else:
2302
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account, region)
1✔
2303

2304
        function_version = get_function_version_from_arn(fn_arn)
1✔
2305
        function_role = function_version.config.role
1✔
2306

2307
        if source_arn := request.get("EventSourceArn"):
1✔
2308
            self.check_service_resource_exists(service, source_arn, fn_arn, function_role)
1✔
2309
        # Check we are validating a CreateEventSourceMapping request
2310
        if is_create_esm_request:
1✔
2311

2312
            def _get_mapping_sources(mapping: dict[str, Any]) -> list[str]:
1✔
2313
                if event_source_arn := mapping.get("EventSourceArn"):
1✔
2314
                    return [event_source_arn]
1✔
2315
                return (
×
2316
                    mapping.get("SelfManagedEventSource", {})
2317
                    .get("Endpoints", {})
2318
                    .get("KAFKA_BOOTSTRAP_SERVERS", [])
2319
                )
2320

2321
            # check for event source duplicates
2322
            # TODO: currently validated for sqs, kinesis, and dynamodb
2323
            service_id = load_service(service).service_id
1✔
2324
            for uuid, mapping in state.event_source_mappings.items():
1✔
2325
                mapping_sources = _get_mapping_sources(mapping)
1✔
2326
                request_sources = _get_mapping_sources(request)
1✔
2327
                if mapping["FunctionArn"] == fn_arn and (
1✔
2328
                    set(mapping_sources).intersection(request_sources)
2329
                ):
2330
                    if service == "sqs":
1✔
2331
                        # *shakes fist at SQS*
2332
                        raise ResourceConflictException(
1✔
2333
                            f'An event source mapping with {service_id} arn (" {mapping["EventSourceArn"]} ") '
2334
                            f'and function (" {function_name} ") already exists. Please update or delete the '
2335
                            f"existing mapping with UUID {uuid}",
2336
                            Type="User",
2337
                        )
2338
                    elif service == "kafka":
1✔
2339
                        if set(mapping["Topics"]).intersection(request["Topics"]):
×
2340
                            raise ResourceConflictException(
×
2341
                                f'An event source mapping with event source ("{",".join(request_sources)}"), '
2342
                                f'function ("{fn_arn}"), '
2343
                                f'topics ("{",".join(request["Topics"])}") already exists. Please update or delete the '
2344
                                f"existing mapping with UUID {uuid}",
2345
                                Type="User",
2346
                            )
2347
                    else:
2348
                        raise ResourceConflictException(
1✔
2349
                            f'The event source arn (" {mapping["EventSourceArn"]} ") and function '
2350
                            f'(" {function_name} ") provided mapping already exists. Please update or delete the '
2351
                            f"existing mapping with UUID {uuid}",
2352
                            Type="User",
2353
                        )
2354
        return fn_arn, function_name, state, function_version, function_role
1✔
2355

2356
    @handler("UpdateEventSourceMapping", expand=False)
1✔
2357
    def update_event_source_mapping(
1✔
2358
        self,
2359
        context: RequestContext,
2360
        request: UpdateEventSourceMappingRequest,
2361
    ) -> EventSourceMappingConfiguration:
2362
        return self.update_event_source_mapping_v2(context, request)
1✔
2363

2364
    def update_event_source_mapping_v2(
1✔
2365
        self,
2366
        context: RequestContext,
2367
        request: UpdateEventSourceMappingRequest,
2368
    ) -> EventSourceMappingConfiguration:
2369
        # TODO: test and implement this properly (quite complex with many validations and limitations!)
2370
        LOG.warning(
1✔
2371
            "Updating Lambda Event Source Mapping is in experimental state and not yet fully tested."
2372
        )
2373
        state = lambda_stores[context.account_id][context.region]
1✔
2374
        request_data = {**request}
1✔
2375
        uuid = request_data.pop("UUID", None)
1✔
2376
        if not uuid:
1✔
2377
            raise ResourceNotFoundException(
×
2378
                "The resource you requested does not exist.", Type="User"
2379
            )
2380
        old_event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2381
        esm_worker = self.esm_workers.get(uuid)
1✔
2382
        if old_event_source_mapping is None or esm_worker is None:
1✔
2383
            raise ResourceNotFoundException(
1✔
2384
                "The resource you requested does not exist.", Type="User"
2385
            )  # TODO: test?
2386

2387
        # normalize values to overwrite
2388
        event_source_mapping = old_event_source_mapping | request_data
1✔
2389

2390
        temp_params = {}  # values only set for the returned response, not saved internally (e.g. transient state)
1✔
2391

2392
        # Validate the newly updated ESM object. We ignore the output here since we only care whether an Exception is raised.
2393
        function_arn, _, _, function_version, function_role = self.validate_event_source_mapping(
1✔
2394
            context, event_source_mapping
2395
        )
2396

2397
        # remove the FunctionName field
2398
        event_source_mapping.pop("FunctionName", None)
1✔
2399

2400
        if function_arn:
1✔
2401
            event_source_mapping["FunctionArn"] = function_arn
1✔
2402

2403
        # Only apply update if the desired state differs
2404
        enabled = request.get("Enabled")
1✔
2405
        if enabled is not None:
1✔
2406
            if enabled and old_event_source_mapping["State"] != EsmState.ENABLED:
1✔
2407
                event_source_mapping["State"] = EsmState.ENABLING
1✔
2408
            # TODO: What happens when trying to update during an update or failed state?!
2409
            elif not enabled and old_event_source_mapping["State"] == EsmState.ENABLED:
1✔
2410
                event_source_mapping["State"] = EsmState.DISABLING
1✔
2411
        else:
2412
            event_source_mapping["State"] = EsmState.UPDATING
1✔
2413

2414
        # To ensure parity, certain responses need to be immediately returned
2415
        temp_params["State"] = event_source_mapping["State"]
1✔
2416

2417
        state.event_source_mappings[uuid] = event_source_mapping
1✔
2418

2419
        # TODO: Currently, we re-create the entire ESM worker. Look into approach with better performance.
2420
        worker_factory = EsmWorkerFactory(
1✔
2421
            event_source_mapping, function_role, request.get("Enabled", esm_worker.enabled)
2422
        )
2423

2424
        # Get a new ESM worker object but do not active it, since the factory holds all logic for creating new worker from configuration.
2425
        updated_esm_worker = worker_factory.get_esm_worker()
1✔
2426
        self.esm_workers[uuid] = updated_esm_worker
1✔
2427

2428
        # We should stop() the worker since the delete() will remove the ESM from the state mapping.
2429
        esm_worker.stop()
1✔
2430
        # This will either create an EsmWorker in the CREATING state if enabled. Otherwise, the DISABLING state is set.
2431
        updated_esm_worker.create()
1✔
2432

2433
        return {**event_source_mapping, **temp_params}
1✔
2434

2435
    def delete_event_source_mapping(
1✔
2436
        self, context: RequestContext, uuid: String, **kwargs
2437
    ) -> EventSourceMappingConfiguration:
2438
        state = lambda_stores[context.account_id][context.region]
1✔
2439
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2440
        if not event_source_mapping:
1✔
2441
            raise ResourceNotFoundException(
1✔
2442
                "The resource you requested does not exist.", Type="User"
2443
            )
2444
        esm = state.event_source_mappings[uuid]
1✔
2445
        # TODO: add proper locking
2446
        esm_worker = self.esm_workers.pop(uuid, None)
1✔
2447
        # Asynchronous delete in v2
2448
        if not esm_worker:
1✔
2449
            raise ResourceNotFoundException(
×
2450
                "The resource you requested does not exist.", Type="User"
2451
            )
2452
        esm_worker.delete()
1✔
2453
        return {**esm, "State": EsmState.DELETING}
1✔
2454

2455
    def get_event_source_mapping(
1✔
2456
        self, context: RequestContext, uuid: String, **kwargs
2457
    ) -> EventSourceMappingConfiguration:
2458
        state = lambda_stores[context.account_id][context.region]
1✔
2459
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2460
        if not event_source_mapping:
1✔
2461
            raise ResourceNotFoundException(
1✔
2462
                "The resource you requested does not exist.", Type="User"
2463
            )
2464
        esm_worker = self.esm_workers.get(uuid)
1✔
2465
        if not esm_worker:
1✔
2466
            raise ResourceNotFoundException(
×
2467
                "The resource you requested does not exist.", Type="User"
2468
            )
2469
        event_source_mapping["State"] = esm_worker.current_state
1✔
2470
        event_source_mapping["StateTransitionReason"] = esm_worker.state_transition_reason
1✔
2471
        return event_source_mapping
1✔
2472

2473
    def list_event_source_mappings(
1✔
2474
        self,
2475
        context: RequestContext,
2476
        event_source_arn: Arn = None,
2477
        function_name: FunctionName = None,
2478
        marker: String = None,
2479
        max_items: MaxListItems = None,
2480
        **kwargs,
2481
    ) -> ListEventSourceMappingsResponse:
2482
        state = lambda_stores[context.account_id][context.region]
1✔
2483

2484
        esms = state.event_source_mappings.values()
1✔
2485
        # TODO: update and test State and StateTransitionReason for ESM v2
2486

2487
        if event_source_arn:  # TODO: validate pattern
1✔
2488
            esms = [e for e in esms if e.get("EventSourceArn") == event_source_arn]
1✔
2489

2490
        if function_name:
1✔
2491
            esms = [e for e in esms if function_name in e["FunctionArn"]]
1✔
2492

2493
        esms = PaginatedList(esms)
1✔
2494
        page, token = esms.get_page(
1✔
2495
            lambda x: x["UUID"],
2496
            marker,
2497
            max_items,
2498
        )
2499
        return ListEventSourceMappingsResponse(EventSourceMappings=page, NextMarker=token)
1✔
2500

2501
    def get_source_type_from_request(self, request: dict[str, Any]) -> str:
1✔
2502
        if event_source_arn := request.get("EventSourceArn", ""):
×
2503
            service = extract_service_from_arn(event_source_arn)
×
2504
            if service == "sqs" and "fifo" in event_source_arn:
×
2505
                service = "sqs-fifo"
×
2506
            return service
×
2507
        elif request.get("SelfManagedEventSource"):
×
2508
            return "kafka"
×
2509

2510
    # =======================================
2511
    # ============ FUNCTION URLS ============
2512
    # =======================================
2513

2514
    @staticmethod
1✔
2515
    def _validate_qualifier(qualifier: str) -> None:
1✔
2516
        if qualifier in ["$LATEST", "$LATEST.PUBLISHED"] or (
1✔
2517
            qualifier and api_utils.qualifier_is_version(qualifier)
2518
        ):
2519
            raise ValidationException(
1✔
2520
                f"1 validation error detected: Value '{qualifier}' at 'qualifier' failed to satisfy constraint: Member must satisfy regular expression pattern: ((?!^\\d+$)^[0-9a-zA-Z-_]+$)"
2521
            )
2522

2523
    @staticmethod
1✔
2524
    def _validate_invoke_mode(invoke_mode: str) -> None:
1✔
2525
        if invoke_mode and invoke_mode not in [InvokeMode.BUFFERED, InvokeMode.RESPONSE_STREAM]:
1✔
2526
            raise ValidationException(
1✔
2527
                f"1 validation error detected: Value '{invoke_mode}' at 'invokeMode' failed to satisfy constraint: Member must satisfy enum value set: [RESPONSE_STREAM, BUFFERED]"
2528
            )
2529
        if invoke_mode == InvokeMode.RESPONSE_STREAM:
1✔
2530
            # TODO should we actually fail for setting RESPONSE_STREAM?
2531
            #  It should trigger InvokeWithResponseStream which is not implemented
2532
            LOG.warning(
1✔
2533
                "The invokeMode 'RESPONSE_STREAM' is not yet supported on LocalStack. The property is only mocked, the execution will still be 'BUFFERED'"
2534
            )
2535

2536
    # TODO: what happens if function state is not active?
2537
    def create_function_url_config(
1✔
2538
        self,
2539
        context: RequestContext,
2540
        function_name: FunctionName,
2541
        auth_type: FunctionUrlAuthType,
2542
        qualifier: FunctionUrlQualifier = None,
2543
        cors: Cors = None,
2544
        invoke_mode: InvokeMode = None,
2545
        **kwargs,
2546
    ) -> CreateFunctionUrlConfigResponse:
2547
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2548
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2549
            function_name, qualifier, context
2550
        )
2551
        state = lambda_stores[account_id][region]
1✔
2552
        self._validate_qualifier(qualifier)
1✔
2553
        self._validate_invoke_mode(invoke_mode)
1✔
2554

2555
        fn = state.functions.get(function_name)
1✔
2556
        if fn is None:
1✔
2557
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2558

2559
        url_config = fn.function_url_configs.get(qualifier or "$LATEST")
1✔
2560
        if url_config:
1✔
2561
            raise ResourceConflictException(
1✔
2562
                f"Failed to create function url config for [functionArn = {url_config.function_arn}]. Error message:  FunctionUrlConfig exists for this Lambda function",
2563
                Type="User",
2564
            )
2565

2566
        if qualifier and qualifier != "$LATEST" and qualifier not in fn.aliases:
1✔
2567
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2568

2569
        normalized_qualifier = qualifier or "$LATEST"
1✔
2570

2571
        function_arn = (
1✔
2572
            api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
2573
            if qualifier
2574
            else api_utils.unqualified_lambda_arn(function_name, account_id, region)
2575
        )
2576

2577
        custom_id: str | None = None
1✔
2578

2579
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
2580
        if TAG_KEY_CUSTOM_URL in tags:
1✔
2581
            # Note: I really wanted to add verification here that the
2582
            # url_id is unique, so we could surface that to the user ASAP.
2583
            # However, it seems like that information isn't available yet,
2584
            # since (as far as I can tell) we call
2585
            # self.router.register_routes() once, in a single shot, for all
2586
            # of the routes -- and we need to verify that it's unique not
2587
            # just for this particular lambda function, but for the entire
2588
            # lambda provider. Therefore... that idea proved non-trivial!
2589
            custom_id_tag_value = (
1✔
2590
                f"{tags[TAG_KEY_CUSTOM_URL]}-{qualifier}" if qualifier else tags[TAG_KEY_CUSTOM_URL]
2591
            )
2592
            if TAG_KEY_CUSTOM_URL_VALIDATOR.match(custom_id_tag_value):
1✔
2593
                custom_id = custom_id_tag_value
1✔
2594

2595
            else:
2596
                # Note: we're logging here instead of raising to prioritize
2597
                # strict parity with AWS over the localstack-only custom_id
2598
                LOG.warning(
1✔
2599
                    "Invalid custom ID tag value for lambda URL (%s=%s). "
2600
                    "Replaced with default (random id)",
2601
                    TAG_KEY_CUSTOM_URL,
2602
                    custom_id_tag_value,
2603
                )
2604

2605
        # The url_id is the subdomain used for the URL we're creating. This
2606
        # is either created randomly (as in AWS), or can be passed as a tag
2607
        # to the lambda itself (localstack-only).
2608
        url_id: str
2609
        if custom_id is None:
1✔
2610
            url_id = api_utils.generate_random_url_id()
1✔
2611
        else:
2612
            url_id = custom_id
1✔
2613

2614
        host_definition = localstack_host(custom_port=config.GATEWAY_LISTEN[0].port)
1✔
2615
        fn.function_url_configs[normalized_qualifier] = FunctionUrlConfig(
1✔
2616
            function_arn=function_arn,
2617
            function_name=function_name,
2618
            cors=cors,
2619
            url_id=url_id,
2620
            url=f"http://{url_id}.lambda-url.{context.region}.{host_definition.host_and_port()}/",  # TODO: https support
2621
            auth_type=auth_type,
2622
            creation_time=api_utils.generate_lambda_date(),
2623
            last_modified_time=api_utils.generate_lambda_date(),
2624
            invoke_mode=invoke_mode,
2625
        )
2626

2627
        # persist and start URL
2628
        # TODO: implement URL invoke
2629
        api_url_config = api_utils.map_function_url_config(
1✔
2630
            fn.function_url_configs[normalized_qualifier]
2631
        )
2632

2633
        return CreateFunctionUrlConfigResponse(
1✔
2634
            FunctionUrl=api_url_config["FunctionUrl"],
2635
            FunctionArn=api_url_config["FunctionArn"],
2636
            AuthType=api_url_config["AuthType"],
2637
            Cors=api_url_config["Cors"],
2638
            CreationTime=api_url_config["CreationTime"],
2639
            InvokeMode=api_url_config["InvokeMode"],
2640
        )
2641

2642
    def get_function_url_config(
1✔
2643
        self,
2644
        context: RequestContext,
2645
        function_name: FunctionName,
2646
        qualifier: FunctionUrlQualifier = None,
2647
        **kwargs,
2648
    ) -> GetFunctionUrlConfigResponse:
2649
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2650
        state = lambda_stores[account_id][region]
1✔
2651

2652
        fn_name, qualifier = api_utils.get_name_and_qualifier(function_name, qualifier, context)
1✔
2653

2654
        self._validate_qualifier(qualifier)
1✔
2655

2656
        resolved_fn = state.functions.get(fn_name)
1✔
2657
        if not resolved_fn:
1✔
2658
            raise ResourceNotFoundException(
1✔
2659
                "The resource you requested does not exist.", Type="User"
2660
            )
2661

2662
        qualifier = qualifier or "$LATEST"
1✔
2663
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2664
        if not url_config:
1✔
2665
            raise ResourceNotFoundException(
1✔
2666
                "The resource you requested does not exist.", Type="User"
2667
            )
2668

2669
        return api_utils.map_function_url_config(url_config)
1✔
2670

2671
    def update_function_url_config(
1✔
2672
        self,
2673
        context: RequestContext,
2674
        function_name: FunctionName,
2675
        qualifier: FunctionUrlQualifier = None,
2676
        auth_type: FunctionUrlAuthType = None,
2677
        cors: Cors = None,
2678
        invoke_mode: InvokeMode = None,
2679
        **kwargs,
2680
    ) -> UpdateFunctionUrlConfigResponse:
2681
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2682
        state = lambda_stores[account_id][region]
1✔
2683

2684
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2685
            function_name, qualifier, context
2686
        )
2687
        self._validate_qualifier(qualifier)
1✔
2688
        self._validate_invoke_mode(invoke_mode)
1✔
2689

2690
        fn = state.functions.get(function_name)
1✔
2691
        if not fn:
1✔
2692
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2693

2694
        normalized_qualifier = qualifier or "$LATEST"
1✔
2695

2696
        if (
1✔
2697
            api_utils.qualifier_is_alias(normalized_qualifier)
2698
            and normalized_qualifier not in fn.aliases
2699
        ):
2700
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2701

2702
        url_config = fn.function_url_configs.get(normalized_qualifier)
1✔
2703
        if not url_config:
1✔
2704
            raise ResourceNotFoundException(
1✔
2705
                "The resource you requested does not exist.", Type="User"
2706
            )
2707

2708
        changes = {
1✔
2709
            "last_modified_time": api_utils.generate_lambda_date(),
2710
            **({"cors": cors} if cors is not None else {}),
2711
            **({"auth_type": auth_type} if auth_type is not None else {}),
2712
        }
2713

2714
        if invoke_mode:
1✔
2715
            changes["invoke_mode"] = invoke_mode
1✔
2716

2717
        new_url_config = dataclasses.replace(url_config, **changes)
1✔
2718
        fn.function_url_configs[normalized_qualifier] = new_url_config
1✔
2719

2720
        return UpdateFunctionUrlConfigResponse(
1✔
2721
            FunctionUrl=new_url_config.url,
2722
            FunctionArn=new_url_config.function_arn,
2723
            AuthType=new_url_config.auth_type,
2724
            Cors=new_url_config.cors,
2725
            CreationTime=new_url_config.creation_time,
2726
            LastModifiedTime=new_url_config.last_modified_time,
2727
            InvokeMode=new_url_config.invoke_mode,
2728
        )
2729

2730
    def delete_function_url_config(
1✔
2731
        self,
2732
        context: RequestContext,
2733
        function_name: FunctionName,
2734
        qualifier: FunctionUrlQualifier = None,
2735
        **kwargs,
2736
    ) -> None:
2737
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2738
        state = lambda_stores[account_id][region]
1✔
2739

2740
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2741
            function_name, qualifier, context
2742
        )
2743
        self._validate_qualifier(qualifier)
1✔
2744

2745
        resolved_fn = state.functions.get(function_name)
1✔
2746
        if not resolved_fn:
1✔
2747
            raise ResourceNotFoundException(
1✔
2748
                "The resource you requested does not exist.", Type="User"
2749
            )
2750

2751
        qualifier = qualifier or "$LATEST"
1✔
2752
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2753
        if not url_config:
1✔
2754
            raise ResourceNotFoundException(
1✔
2755
                "The resource you requested does not exist.", Type="User"
2756
            )
2757

2758
        del resolved_fn.function_url_configs[qualifier]
1✔
2759

2760
    def list_function_url_configs(
1✔
2761
        self,
2762
        context: RequestContext,
2763
        function_name: FunctionName,
2764
        marker: String = None,
2765
        max_items: MaxItems = None,
2766
        **kwargs,
2767
    ) -> ListFunctionUrlConfigsResponse:
2768
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2769
        state = lambda_stores[account_id][region]
1✔
2770

2771
        fn_name = api_utils.get_function_name(function_name, context)
1✔
2772
        resolved_fn = state.functions.get(fn_name)
1✔
2773
        if not resolved_fn:
1✔
2774
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2775

2776
        url_configs = [
1✔
2777
            api_utils.map_function_url_config(fn_conf)
2778
            for fn_conf in resolved_fn.function_url_configs.values()
2779
        ]
2780
        url_configs = PaginatedList(url_configs)
1✔
2781
        page, token = url_configs.get_page(
1✔
2782
            lambda url_config: url_config["FunctionArn"],
2783
            marker,
2784
            max_items,
2785
        )
2786
        url_configs = page
1✔
2787
        return ListFunctionUrlConfigsResponse(FunctionUrlConfigs=url_configs, NextMarker=token)
1✔
2788

2789
    # =======================================
2790
    # ============  Permissions  ============
2791
    # =======================================
2792

2793
    @handler("AddPermission", expand=False)
1✔
2794
    def add_permission(
1✔
2795
        self,
2796
        context: RequestContext,
2797
        request: AddPermissionRequest,
2798
    ) -> AddPermissionResponse:
2799
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2800
            request.get("FunctionName"), request.get("Qualifier"), context
2801
        )
2802

2803
        # validate qualifier
2804
        if qualifier is not None:
1✔
2805
            self._validate_qualifier_expression(qualifier)
1✔
2806
            if qualifier == "$LATEST":
1✔
2807
                raise InvalidParameterValueException(
1✔
2808
                    "We currently do not support adding policies for $LATEST.", Type="User"
2809
                )
2810
        account_id, region = api_utils.get_account_and_region(request.get("FunctionName"), context)
1✔
2811

2812
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2813
        resolved_qualifier, fn_arn = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2814

2815
        revision_id = request.get("RevisionId")
1✔
2816
        if revision_id:
1✔
2817
            fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2818
            if revision_id != fn_revision_id:
1✔
2819
                raise PreconditionFailedException(
1✔
2820
                    "The Revision Id provided does not match the latest Revision Id. "
2821
                    "Call the GetPolicy API to retrieve the latest Revision Id",
2822
                    Type="User",
2823
                )
2824

2825
        request_sid = request["StatementId"]
1✔
2826
        if not bool(STATEMENT_ID_REGEX.match(request_sid)):
1✔
2827
            raise ValidationException(
1✔
2828
                f"1 validation error detected: Value '{request_sid}' at 'statementId' failed to satisfy constraint: Member must satisfy regular expression pattern: ([a-zA-Z0-9-_]+)"
2829
            )
2830
        # check for an already existing policy and any conflicts in existing statements
2831
        existing_policy = resolved_fn.permissions.get(resolved_qualifier)
1✔
2832
        if existing_policy:
1✔
2833
            if request_sid in [s["Sid"] for s in existing_policy.policy.Statement]:
1✔
2834
                # uniqueness scope: statement id needs to be unique per qualified function ($LATEST, version, or alias)
2835
                # Counterexample: the same sid can exist within $LATEST, version, and alias
2836
                raise ResourceConflictException(
1✔
2837
                    f"The statement id ({request_sid}) provided already exists. Please provide a new statement id, or remove the existing statement.",
2838
                    Type="User",
2839
                )
2840

2841
        permission_statement = api_utils.build_statement(
1✔
2842
            partition=context.partition,
2843
            resource_arn=fn_arn,
2844
            statement_id=request["StatementId"],
2845
            action=request["Action"],
2846
            principal=request["Principal"],
2847
            source_arn=request.get("SourceArn"),
2848
            source_account=request.get("SourceAccount"),
2849
            principal_org_id=request.get("PrincipalOrgID"),
2850
            event_source_token=request.get("EventSourceToken"),
2851
            auth_type=request.get("FunctionUrlAuthType"),
2852
        )
2853
        new_policy = existing_policy
1✔
2854
        if not existing_policy:
1✔
2855
            new_policy = FunctionResourcePolicy(
1✔
2856
                policy=ResourcePolicy(Version="2012-10-17", Id="default", Statement=[])
2857
            )
2858
        new_policy.policy.Statement.append(permission_statement)
1✔
2859
        if not existing_policy:
1✔
2860
            resolved_fn.permissions[resolved_qualifier] = new_policy
1✔
2861

2862
        # Update revision id of alias or version
2863
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2864
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2865
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2866
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2867
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
1✔
2868
        # Assumes that a non-alias is a version
2869
        else:
2870
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2871
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2872
                resolved_version, config=dataclasses.replace(resolved_version.config)
2873
            )
2874
        return AddPermissionResponse(Statement=json.dumps(permission_statement))
1✔
2875

2876
    def remove_permission(
1✔
2877
        self,
2878
        context: RequestContext,
2879
        function_name: NamespacedFunctionName,
2880
        statement_id: NamespacedStatementId,
2881
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
2882
        revision_id: String | None = None,
2883
        **kwargs,
2884
    ) -> None:
2885
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2886
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2887
            function_name, qualifier, context
2888
        )
2889
        if qualifier is not None:
1✔
2890
            self._validate_qualifier_expression(qualifier)
1✔
2891

2892
        state = lambda_stores[account_id][region]
1✔
2893
        resolved_fn = state.functions.get(function_name)
1✔
2894
        if resolved_fn is None:
1✔
2895
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2896
            raise ResourceNotFoundException(f"No policy found for: {fn_arn}", Type="User")
1✔
2897

2898
        resolved_qualifier, _ = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2899
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2900
        if not function_permission:
1✔
2901
            raise ResourceNotFoundException(
1✔
2902
                "No policy is associated with the given resource.", Type="User"
2903
            )
2904

2905
        # try to find statement in policy and delete it
2906
        statement = None
1✔
2907
        for s in function_permission.policy.Statement:
1✔
2908
            if s["Sid"] == statement_id:
1✔
2909
                statement = s
1✔
2910
                break
1✔
2911

2912
        if not statement:
1✔
2913
            raise ResourceNotFoundException(
1✔
2914
                f"Statement {statement_id} is not found in resource policy.", Type="User"
2915
            )
2916
        fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2917
        if revision_id and revision_id != fn_revision_id:
1✔
2918
            raise PreconditionFailedException(
×
2919
                "The Revision Id provided does not match the latest Revision Id. "
2920
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2921
                Type="User",
2922
            )
2923
        function_permission.policy.Statement.remove(statement)
1✔
2924

2925
        # Update revision id for alias or version
2926
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2927
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2928
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2929
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
×
2930
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
×
2931
        # Assumes that a non-alias is a version
2932
        else:
2933
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2934
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2935
                resolved_version, config=dataclasses.replace(resolved_version.config)
2936
            )
2937

2938
        # remove the policy as a whole when there's no statement left in it
2939
        if len(function_permission.policy.Statement) == 0:
1✔
2940
            del resolved_fn.permissions[resolved_qualifier]
1✔
2941

2942
    def get_policy(
1✔
2943
        self,
2944
        context: RequestContext,
2945
        function_name: NamespacedFunctionName,
2946
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
2947
        **kwargs,
2948
    ) -> GetPolicyResponse:
2949
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2950
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2951
            function_name, qualifier, context
2952
        )
2953

2954
        if qualifier is not None:
1✔
2955
            self._validate_qualifier_expression(qualifier)
1✔
2956

2957
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2958

2959
        resolved_qualifier = qualifier or "$LATEST"
1✔
2960
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2961
        if not function_permission:
1✔
2962
            raise ResourceNotFoundException(
1✔
2963
                "The resource you requested does not exist.", Type="User"
2964
            )
2965

2966
        fn_revision_id = None
1✔
2967
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2968
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2969
            fn_revision_id = resolved_alias.revision_id
1✔
2970
        # Assumes that a non-alias is a version
2971
        else:
2972
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2973
            fn_revision_id = resolved_version.config.revision_id
1✔
2974

2975
        return GetPolicyResponse(
1✔
2976
            Policy=json.dumps(dataclasses.asdict(function_permission.policy)),
2977
            RevisionId=fn_revision_id,
2978
        )
2979

2980
    # =======================================
2981
    # ========  Code signing config  ========
2982
    # =======================================
2983

2984
    def create_code_signing_config(
1✔
2985
        self,
2986
        context: RequestContext,
2987
        allowed_publishers: AllowedPublishers,
2988
        description: Description | None = None,
2989
        code_signing_policies: CodeSigningPolicies | None = None,
2990
        tags: Tags | None = None,
2991
        **kwargs,
2992
    ) -> CreateCodeSigningConfigResponse:
2993
        account = context.account_id
1✔
2994
        region = context.region
1✔
2995

2996
        state = lambda_stores[account][region]
1✔
2997
        # TODO: can there be duplicates?
2998
        csc_id = f"csc-{get_random_hex(17)}"  # e.g. 'csc-077c33b4c19e26036'
1✔
2999
        csc_arn = f"arn:{context.partition}:lambda:{region}:{account}:code-signing-config:{csc_id}"
1✔
3000
        csc = CodeSigningConfig(
1✔
3001
            csc_id=csc_id,
3002
            arn=csc_arn,
3003
            allowed_publishers=allowed_publishers,
3004
            policies=code_signing_policies,
3005
            last_modified=api_utils.generate_lambda_date(),
3006
            description=description,
3007
        )
3008
        state.code_signing_configs[csc_arn] = csc
1✔
3009
        return CreateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
3010

3011
    def put_function_code_signing_config(
1✔
3012
        self,
3013
        context: RequestContext,
3014
        code_signing_config_arn: CodeSigningConfigArn,
3015
        function_name: NamespacedFunctionName,
3016
        **kwargs,
3017
    ) -> PutFunctionCodeSigningConfigResponse:
3018
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3019
        state = lambda_stores[account_id][region]
1✔
3020
        function_name = api_utils.get_function_name(function_name, context)
1✔
3021

3022
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3023
        if not csc:
1✔
3024
            raise CodeSigningConfigNotFoundException(
1✔
3025
                f"The code signing configuration cannot be found. Check that the provided configuration is not deleted: {code_signing_config_arn}.",
3026
                Type="User",
3027
            )
3028

3029
        fn = state.functions.get(function_name)
1✔
3030
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
3031
        if not fn:
1✔
3032
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
3033

3034
        fn.code_signing_config_arn = code_signing_config_arn
1✔
3035
        return PutFunctionCodeSigningConfigResponse(
1✔
3036
            CodeSigningConfigArn=code_signing_config_arn, FunctionName=function_name
3037
        )
3038

3039
    def update_code_signing_config(
1✔
3040
        self,
3041
        context: RequestContext,
3042
        code_signing_config_arn: CodeSigningConfigArn,
3043
        description: Description = None,
3044
        allowed_publishers: AllowedPublishers = None,
3045
        code_signing_policies: CodeSigningPolicies = None,
3046
        **kwargs,
3047
    ) -> UpdateCodeSigningConfigResponse:
3048
        state = lambda_stores[context.account_id][context.region]
1✔
3049
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3050
        if not csc:
1✔
3051
            raise ResourceNotFoundException(
1✔
3052
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3053
            )
3054

3055
        changes = {
1✔
3056
            **(
3057
                {"allowed_publishers": allowed_publishers} if allowed_publishers is not None else {}
3058
            ),
3059
            **({"policies": code_signing_policies} if code_signing_policies is not None else {}),
3060
            **({"description": description} if description is not None else {}),
3061
        }
3062
        new_csc = dataclasses.replace(
1✔
3063
            csc, last_modified=api_utils.generate_lambda_date(), **changes
3064
        )
3065
        state.code_signing_configs[code_signing_config_arn] = new_csc
1✔
3066

3067
        return UpdateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(new_csc))
1✔
3068

3069
    def get_code_signing_config(
1✔
3070
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
3071
    ) -> GetCodeSigningConfigResponse:
3072
        state = lambda_stores[context.account_id][context.region]
1✔
3073
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3074
        if not csc:
1✔
3075
            raise ResourceNotFoundException(
1✔
3076
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3077
            )
3078

3079
        return GetCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
3080

3081
    def get_function_code_signing_config(
1✔
3082
        self, context: RequestContext, function_name: NamespacedFunctionName, **kwargs
3083
    ) -> GetFunctionCodeSigningConfigResponse:
3084
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3085
        state = lambda_stores[account_id][region]
1✔
3086
        function_name = api_utils.get_function_name(function_name, context)
1✔
3087
        fn = state.functions.get(function_name)
1✔
3088
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
3089
        if not fn:
1✔
3090
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
3091

3092
        if fn.code_signing_config_arn:
1✔
3093
            return GetFunctionCodeSigningConfigResponse(
1✔
3094
                CodeSigningConfigArn=fn.code_signing_config_arn, FunctionName=function_name
3095
            )
3096

3097
        return GetFunctionCodeSigningConfigResponse()
1✔
3098

3099
    def delete_function_code_signing_config(
1✔
3100
        self, context: RequestContext, function_name: NamespacedFunctionName, **kwargs
3101
    ) -> None:
3102
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3103
        state = lambda_stores[account_id][region]
1✔
3104
        function_name = api_utils.get_function_name(function_name, context)
1✔
3105
        fn = state.functions.get(function_name)
1✔
3106
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
3107
        if not fn:
1✔
3108
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
3109

3110
        fn.code_signing_config_arn = None
1✔
3111

3112
    def delete_code_signing_config(
1✔
3113
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
3114
    ) -> DeleteCodeSigningConfigResponse:
3115
        state = lambda_stores[context.account_id][context.region]
1✔
3116

3117
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3118
        if not csc:
1✔
3119
            raise ResourceNotFoundException(
1✔
3120
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3121
            )
3122

3123
        del state.code_signing_configs[code_signing_config_arn]
1✔
3124

3125
        return DeleteCodeSigningConfigResponse()
1✔
3126

3127
    def list_code_signing_configs(
1✔
3128
        self,
3129
        context: RequestContext,
3130
        marker: String = None,
3131
        max_items: MaxListItems = None,
3132
        **kwargs,
3133
    ) -> ListCodeSigningConfigsResponse:
3134
        state = lambda_stores[context.account_id][context.region]
1✔
3135

3136
        cscs = [api_utils.map_csc(csc) for csc in state.code_signing_configs.values()]
1✔
3137
        cscs = PaginatedList(cscs)
1✔
3138
        page, token = cscs.get_page(
1✔
3139
            lambda csc: csc["CodeSigningConfigId"],
3140
            marker,
3141
            max_items,
3142
        )
3143
        return ListCodeSigningConfigsResponse(CodeSigningConfigs=page, NextMarker=token)
1✔
3144

3145
    def list_functions_by_code_signing_config(
1✔
3146
        self,
3147
        context: RequestContext,
3148
        code_signing_config_arn: CodeSigningConfigArn,
3149
        marker: String = None,
3150
        max_items: MaxListItems = None,
3151
        **kwargs,
3152
    ) -> ListFunctionsByCodeSigningConfigResponse:
3153
        account = context.account_id
1✔
3154
        region = context.region
1✔
3155

3156
        state = lambda_stores[account][region]
1✔
3157

3158
        if code_signing_config_arn not in state.code_signing_configs:
1✔
3159
            raise ResourceNotFoundException(
1✔
3160
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3161
            )
3162

3163
        fn_arns = [
1✔
3164
            api_utils.unqualified_lambda_arn(fn.function_name, account, region)
3165
            for fn in state.functions.values()
3166
            if fn.code_signing_config_arn == code_signing_config_arn
3167
        ]
3168

3169
        cscs = PaginatedList(fn_arns)
1✔
3170
        page, token = cscs.get_page(
1✔
3171
            lambda x: x,
3172
            marker,
3173
            max_items,
3174
        )
3175
        return ListFunctionsByCodeSigningConfigResponse(FunctionArns=page, NextMarker=token)
1✔
3176

3177
    # =======================================
3178
    # =========  Account Settings   =========
3179
    # =======================================
3180

3181
    # CAVE: these settings & usages are *per* region!
3182
    # Lambda quotas: https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
3183
    def get_account_settings(self, context: RequestContext, **kwargs) -> GetAccountSettingsResponse:
1✔
3184
        state = lambda_stores[context.account_id][context.region]
1✔
3185

3186
        fn_count = 0
1✔
3187
        code_size_sum = 0
1✔
3188
        reserved_concurrency_sum = 0
1✔
3189
        for fn in state.functions.values():
1✔
3190
            fn_count += 1
1✔
3191
            for fn_version in fn.versions.values():
1✔
3192
                # Image-based Lambdas do not have a code attribute and count against the ECR quotas instead
3193
                if fn_version.config.package_type == PackageType.Zip:
1✔
3194
                    code_size_sum += fn_version.config.code.code_size
1✔
3195
            if fn.reserved_concurrent_executions is not None:
1✔
3196
                reserved_concurrency_sum += fn.reserved_concurrent_executions
1✔
3197
            for c in fn.provisioned_concurrency_configs.values():
1✔
3198
                reserved_concurrency_sum += c.provisioned_concurrent_executions
1✔
3199
        for layer in state.layers.values():
1✔
3200
            for layer_version in layer.layer_versions.values():
1✔
3201
                code_size_sum += layer_version.code.code_size
1✔
3202
        return GetAccountSettingsResponse(
1✔
3203
            AccountLimit=AccountLimit(
3204
                TotalCodeSize=config.LAMBDA_LIMITS_TOTAL_CODE_SIZE,
3205
                CodeSizeZipped=config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED,
3206
                CodeSizeUnzipped=config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED,
3207
                ConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS,
3208
                UnreservedConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS
3209
                - reserved_concurrency_sum,
3210
            ),
3211
            AccountUsage=AccountUsage(
3212
                TotalCodeSize=code_size_sum,
3213
                FunctionCount=fn_count,
3214
            ),
3215
        )
3216

3217
    # =======================================
3218
    # ==  Provisioned Concurrency Config   ==
3219
    # =======================================
3220

3221
    def _get_provisioned_config(
1✔
3222
        self, context: RequestContext, function_name: str, qualifier: str
3223
    ) -> ProvisionedConcurrencyConfiguration | None:
3224
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3225
        state = lambda_stores[account_id][region]
1✔
3226
        function_name = api_utils.get_function_name(function_name, context)
1✔
3227
        fn = state.functions.get(function_name)
1✔
3228
        if api_utils.qualifier_is_alias(qualifier):
1✔
3229
            fn_alias = None
1✔
3230
            if fn:
1✔
3231
                fn_alias = fn.aliases.get(qualifier)
1✔
3232
            if fn_alias is None:
1✔
3233
                raise ResourceNotFoundException(
1✔
3234
                    f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3235
                    Type="User",
3236
                )
3237
        elif api_utils.qualifier_is_version(qualifier):
1✔
3238
            fn_version = None
1✔
3239
            if fn:
1✔
3240
                fn_version = fn.versions.get(qualifier)
1✔
3241
            if fn_version is None:
1✔
3242
                raise ResourceNotFoundException(
1✔
3243
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3244
                    Type="User",
3245
                )
3246

3247
        return fn.provisioned_concurrency_configs.get(qualifier)
1✔
3248

3249
    def put_provisioned_concurrency_config(
1✔
3250
        self,
3251
        context: RequestContext,
3252
        function_name: FunctionName,
3253
        qualifier: Qualifier,
3254
        provisioned_concurrent_executions: PositiveInteger,
3255
        **kwargs,
3256
    ) -> PutProvisionedConcurrencyConfigResponse:
3257
        if provisioned_concurrent_executions <= 0:
1✔
3258
            raise ValidationException(
1✔
3259
                f"1 validation error detected: Value '{provisioned_concurrent_executions}' at 'provisionedConcurrentExecutions' failed to satisfy constraint: Member must have value greater than or equal to 1"
3260
            )
3261

3262
        if qualifier == "$LATEST":
1✔
3263
            raise InvalidParameterValueException(
1✔
3264
                "Provisioned Concurrency Configs cannot be applied to unpublished function versions.",
3265
                Type="User",
3266
            )
3267
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3268
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3269
            function_name, qualifier, context
3270
        )
3271
        state = lambda_stores[account_id][region]
1✔
3272
        fn = state.functions.get(function_name)
1✔
3273

3274
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3275

3276
        if provisioned_config:  # TODO: merge?
1✔
3277
            # TODO: add a test for partial updates (if possible)
3278
            LOG.warning(
1✔
3279
                "Partial update of provisioned concurrency config is currently not supported."
3280
            )
3281

3282
        other_provisioned_sum = sum(
1✔
3283
            [
3284
                provisioned_configs.provisioned_concurrent_executions
3285
                for provisioned_qualifier, provisioned_configs in fn.provisioned_concurrency_configs.items()
3286
                if provisioned_qualifier != qualifier
3287
            ]
3288
        )
3289

3290
        if (
1✔
3291
            fn.reserved_concurrent_executions is not None
3292
            and fn.reserved_concurrent_executions
3293
            < other_provisioned_sum + provisioned_concurrent_executions
3294
        ):
3295
            raise InvalidParameterValueException(
1✔
3296
                "Requested Provisioned Concurrency should not be greater than the reservedConcurrentExecution for function",
3297
                Type="User",
3298
            )
3299

3300
        if provisioned_concurrent_executions > config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS:
1✔
3301
            raise InvalidParameterValueException(
1✔
3302
                f"Specified ConcurrentExecutions for function is greater than account's unreserved concurrency"
3303
                f" [{config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS}]."
3304
            )
3305

3306
        settings = self.get_account_settings(context)
1✔
3307
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
3308
            "UnreservedConcurrentExecutions"
3309
        ]
3310
        if (
1✔
3311
            unreserved_concurrent_executions - provisioned_concurrent_executions
3312
            < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY
3313
        ):
3314
            raise InvalidParameterValueException(
1✔
3315
                f"Specified ConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below"
3316
                f" its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
3317
            )
3318

3319
        provisioned_config = ProvisionedConcurrencyConfiguration(
1✔
3320
            provisioned_concurrent_executions, api_utils.generate_lambda_date()
3321
        )
3322
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3323

3324
        if api_utils.qualifier_is_alias(qualifier):
1✔
3325
            alias = fn.aliases.get(qualifier)
1✔
3326
            resolved_version = fn.versions.get(alias.function_version)
1✔
3327

3328
            if (
1✔
3329
                resolved_version
3330
                and fn.provisioned_concurrency_configs.get(alias.function_version) is not None
3331
            ):
3332
                raise ResourceConflictException(
1✔
3333
                    "Alias can't be used for Provisioned Concurrency configuration on an already Provisioned version",
3334
                    Type="User",
3335
                )
3336
            fn_arn = resolved_version.id.qualified_arn()
1✔
3337
        elif api_utils.qualifier_is_version(qualifier):
1✔
3338
            fn_version = fn.versions.get(qualifier)
1✔
3339

3340
            # TODO: might be useful other places, utilize
3341
            pointing_aliases = []
1✔
3342
            for alias in fn.aliases.values():
1✔
3343
                if (
1✔
3344
                    alias.function_version == qualifier
3345
                    and fn.provisioned_concurrency_configs.get(alias.name) is not None
3346
                ):
3347
                    pointing_aliases.append(alias.name)
1✔
3348
            if pointing_aliases:
1✔
3349
                raise ResourceConflictException(
1✔
3350
                    "Version is pointed by a Provisioned Concurrency alias", Type="User"
3351
                )
3352

3353
            fn_arn = fn_version.id.qualified_arn()
1✔
3354

3355
        manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3356

3357
        fn.provisioned_concurrency_configs[qualifier] = provisioned_config
1✔
3358

3359
        manager.update_provisioned_concurrency_config(
1✔
3360
            provisioned_config.provisioned_concurrent_executions
3361
        )
3362

3363
        return PutProvisionedConcurrencyConfigResponse(
1✔
3364
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3365
            AvailableProvisionedConcurrentExecutions=0,
3366
            AllocatedProvisionedConcurrentExecutions=0,
3367
            Status=ProvisionedConcurrencyStatusEnum.IN_PROGRESS,
3368
            # StatusReason=manager.provisioned_state.status_reason,
3369
            LastModified=provisioned_config.last_modified,  # TODO: does change with configuration or also with state changes?
3370
        )
3371

3372
    def get_provisioned_concurrency_config(
1✔
3373
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3374
    ) -> GetProvisionedConcurrencyConfigResponse:
3375
        if qualifier == "$LATEST":
1✔
3376
            raise InvalidParameterValueException(
1✔
3377
                "The function resource provided must be an alias or a published version.",
3378
                Type="User",
3379
            )
3380
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3381
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3382
            function_name, qualifier, context
3383
        )
3384

3385
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3386
        if not provisioned_config:
1✔
3387
            raise ProvisionedConcurrencyConfigNotFoundException(
1✔
3388
                "No Provisioned Concurrency Config found for this function", Type="User"
3389
            )
3390

3391
        # TODO: make this compatible with alias pointer migration on update
3392
        if api_utils.qualifier_is_alias(qualifier):
1✔
3393
            state = lambda_stores[account_id][region]
1✔
3394
            fn = state.functions.get(function_name)
1✔
3395
            alias = fn.aliases.get(qualifier)
1✔
3396
            fn_arn = api_utils.qualified_lambda_arn(
1✔
3397
                function_name, alias.function_version, account_id, region
3398
            )
3399
        else:
3400
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3401

3402
        ver_manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3403

3404
        return GetProvisionedConcurrencyConfigResponse(
1✔
3405
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3406
            LastModified=provisioned_config.last_modified,
3407
            AvailableProvisionedConcurrentExecutions=ver_manager.provisioned_state.available,
3408
            AllocatedProvisionedConcurrentExecutions=ver_manager.provisioned_state.allocated,
3409
            Status=ver_manager.provisioned_state.status,
3410
            StatusReason=ver_manager.provisioned_state.status_reason,
3411
        )
3412

3413
    def list_provisioned_concurrency_configs(
1✔
3414
        self,
3415
        context: RequestContext,
3416
        function_name: FunctionName,
3417
        marker: String = None,
3418
        max_items: MaxProvisionedConcurrencyConfigListItems = None,
3419
        **kwargs,
3420
    ) -> ListProvisionedConcurrencyConfigsResponse:
3421
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3422
        state = lambda_stores[account_id][region]
1✔
3423

3424
        function_name = api_utils.get_function_name(function_name, context)
1✔
3425
        fn = state.functions.get(function_name)
1✔
3426
        if fn is None:
1✔
3427
            raise ResourceNotFoundException(
1✔
3428
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
3429
                Type="User",
3430
            )
3431

3432
        configs = []
1✔
3433
        for qualifier, pc_config in fn.provisioned_concurrency_configs.items():
1✔
3434
            if api_utils.qualifier_is_alias(qualifier):
×
3435
                alias = fn.aliases.get(qualifier)
×
3436
                fn_arn = api_utils.qualified_lambda_arn(
×
3437
                    function_name, alias.function_version, account_id, region
3438
                )
3439
            else:
3440
                fn_arn = api_utils.qualified_lambda_arn(
×
3441
                    function_name, qualifier, account_id, region
3442
                )
3443

3444
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
3445

3446
            configs.append(
×
3447
                ProvisionedConcurrencyConfigListItem(
3448
                    FunctionArn=api_utils.qualified_lambda_arn(
3449
                        function_name, qualifier, account_id, region
3450
                    ),
3451
                    RequestedProvisionedConcurrentExecutions=pc_config.provisioned_concurrent_executions,
3452
                    AvailableProvisionedConcurrentExecutions=manager.provisioned_state.available,
3453
                    AllocatedProvisionedConcurrentExecutions=manager.provisioned_state.allocated,
3454
                    Status=manager.provisioned_state.status,
3455
                    StatusReason=manager.provisioned_state.status_reason,
3456
                    LastModified=pc_config.last_modified,
3457
                )
3458
            )
3459

3460
        provisioned_concurrency_configs = configs
1✔
3461
        provisioned_concurrency_configs = PaginatedList(provisioned_concurrency_configs)
1✔
3462
        page, token = provisioned_concurrency_configs.get_page(
1✔
3463
            lambda x: x,
3464
            marker,
3465
            max_items,
3466
        )
3467
        return ListProvisionedConcurrencyConfigsResponse(
1✔
3468
            ProvisionedConcurrencyConfigs=page, NextMarker=token
3469
        )
3470

3471
    def delete_provisioned_concurrency_config(
1✔
3472
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3473
    ) -> None:
3474
        if qualifier == "$LATEST":
1✔
3475
            raise InvalidParameterValueException(
1✔
3476
                "The function resource provided must be an alias or a published version.",
3477
                Type="User",
3478
            )
3479
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3480
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3481
            function_name, qualifier, context
3482
        )
3483
        state = lambda_stores[account_id][region]
1✔
3484
        fn = state.functions.get(function_name)
1✔
3485

3486
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3487
        # delete is idempotent and doesn't actually care about the provisioned concurrency config not existing
3488
        if provisioned_config:
1✔
3489
            fn.provisioned_concurrency_configs.pop(qualifier)
1✔
3490
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3491
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3492
            manager.update_provisioned_concurrency_config(0)
1✔
3493

3494
    # =======================================
3495
    # =======  Event Invoke Config   ========
3496
    # =======================================
3497

3498
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)"
3499
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]2((-gov)|(-iso(b?)))?-[a-z]+-\\d1)?:(\\d12)?:(.*)" ... (expected → actual)
3500

3501
    def _validate_destination_config(
1✔
3502
        self, store: LambdaStore, function_name: str, destination_config: DestinationConfig
3503
    ):
3504
        def _validate_destination_arn(destination_arn) -> bool:
1✔
3505
            if not api_utils.DESTINATION_ARN_PATTERN.match(destination_arn):
1✔
3506
                # technically we shouldn't handle this in the provider
3507
                raise ValidationException(
1✔
3508
                    "1 validation error detected: Value '"
3509
                    + destination_arn
3510
                    + "' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: "
3511
                    + "$|kafka://([^.]([a-zA-Z0-9\\-_.]{0,248}))|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:((eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)"
3512
                )
3513

3514
            match destination_arn.split(":")[2]:
1✔
3515
                case "lambda":
1✔
3516
                    fn_parts = api_utils.FULL_FN_ARN_PATTERN.search(destination_arn).groupdict()
1✔
3517
                    if fn_parts:
1✔
3518
                        # check if it exists
3519
                        fn = store.functions.get(fn_parts["function_name"])
1✔
3520
                        if not fn:
1✔
3521
                            raise InvalidParameterValueException(
1✔
3522
                                f"The destination ARN {destination_arn} is invalid.", Type="User"
3523
                            )
3524
                        if fn_parts["function_name"] == function_name:
1✔
3525
                            raise InvalidParameterValueException(
1✔
3526
                                "You can't specify the function as a destination for itself.",
3527
                                Type="User",
3528
                            )
3529
                case "sns" | "sqs" | "events":
1✔
3530
                    pass
1✔
3531
                case _:
1✔
3532
                    return False
1✔
3533
            return True
1✔
3534

3535
        validation_err = False
1✔
3536

3537
        failure_destination = destination_config.get("OnFailure", {}).get("Destination")
1✔
3538
        if failure_destination:
1✔
3539
            validation_err = validation_err or not _validate_destination_arn(failure_destination)
1✔
3540

3541
        success_destination = destination_config.get("OnSuccess", {}).get("Destination")
1✔
3542
        if success_destination:
1✔
3543
            validation_err = validation_err or not _validate_destination_arn(success_destination)
1✔
3544

3545
        if validation_err:
1✔
3546
            on_success_part = (
1✔
3547
                f"OnSuccess(destination={success_destination})" if success_destination else "null"
3548
            )
3549
            on_failure_part = (
1✔
3550
                f"OnFailure(destination={failure_destination})" if failure_destination else "null"
3551
            )
3552
            raise InvalidParameterValueException(
1✔
3553
                f"The provided destination config DestinationConfig(onSuccess={on_success_part}, onFailure={on_failure_part}) is invalid.",
3554
                Type="User",
3555
            )
3556

3557
    def put_function_event_invoke_config(
1✔
3558
        self,
3559
        context: RequestContext,
3560
        function_name: FunctionName,
3561
        qualifier: NumericLatestPublishedOrAliasQualifier = None,
3562
        maximum_retry_attempts: MaximumRetryAttempts = None,
3563
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3564
        destination_config: DestinationConfig = None,
3565
        **kwargs,
3566
    ) -> FunctionEventInvokeConfig:
3567
        """
3568
        Destination ARNs can be:
3569
        * SQS arn
3570
        * SNS arn
3571
        * Lambda arn
3572
        * EventBridge arn
3573

3574
        Differences between put_ and update_:
3575
            * put overwrites any existing config
3576
            * update allows changes only single values while keeping the rest of existing ones
3577
            * update fails on non-existing configs
3578

3579
        Differences between destination and DLQ
3580
            * "However, a dead-letter queue is part of a function's version-specific configuration, so it is locked in when you publish a version."
3581
            *  "On-failure destinations also support additional targets and include details about the function's response in the invocation record."
3582

3583
        """
3584
        if (
1✔
3585
            maximum_event_age_in_seconds is None
3586
            and maximum_retry_attempts is None
3587
            and destination_config is None
3588
        ):
3589
            raise InvalidParameterValueException(
1✔
3590
                "You must specify at least one of error handling or destination setting.",
3591
                Type="User",
3592
            )
3593
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3594
        state = lambda_stores[account_id][region]
1✔
3595
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3596
            function_name, qualifier, context
3597
        )
3598
        fn = state.functions.get(function_name)
1✔
3599
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3600
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3601

3602
        qualifier = qualifier or "$LATEST"
1✔
3603

3604
        # validate and normalize destination config
3605
        if destination_config:
1✔
3606
            self._validate_destination_config(state, function_name, destination_config)
1✔
3607

3608
        destination_config = DestinationConfig(
1✔
3609
            OnSuccess=OnSuccess(
3610
                Destination=(destination_config or {}).get("OnSuccess", {}).get("Destination")
3611
            ),
3612
            OnFailure=OnFailure(
3613
                Destination=(destination_config or {}).get("OnFailure", {}).get("Destination")
3614
            ),
3615
        )
3616

3617
        config = EventInvokeConfig(
1✔
3618
            function_name=function_name,
3619
            qualifier=qualifier,
3620
            maximum_event_age_in_seconds=maximum_event_age_in_seconds,
3621
            maximum_retry_attempts=maximum_retry_attempts,
3622
            last_modified=api_utils.generate_lambda_date(),
3623
            destination_config=destination_config,
3624
        )
3625
        fn.event_invoke_configs[qualifier] = config
1✔
3626

3627
        return FunctionEventInvokeConfig(
1✔
3628
            LastModified=datetime.datetime.strptime(
3629
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3630
            ),
3631
            FunctionArn=api_utils.qualified_lambda_arn(
3632
                function_name, qualifier or "$LATEST", account_id, region
3633
            ),
3634
            DestinationConfig=destination_config,
3635
            MaximumEventAgeInSeconds=maximum_event_age_in_seconds,
3636
            MaximumRetryAttempts=maximum_retry_attempts,
3637
        )
3638

3639
    def get_function_event_invoke_config(
1✔
3640
        self,
3641
        context: RequestContext,
3642
        function_name: FunctionName,
3643
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
3644
        **kwargs,
3645
    ) -> FunctionEventInvokeConfig:
3646
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3647
        state = lambda_stores[account_id][region]
1✔
3648
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3649
            function_name, qualifier, context
3650
        )
3651

3652
        qualifier = qualifier or "$LATEST"
1✔
3653
        fn = state.functions.get(function_name)
1✔
3654
        if not fn:
1✔
3655
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3656
            raise ResourceNotFoundException(
1✔
3657
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3658
            )
3659

3660
        config = fn.event_invoke_configs.get(qualifier)
1✔
3661
        if not config:
1✔
3662
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3663
            raise ResourceNotFoundException(
1✔
3664
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3665
            )
3666

3667
        return FunctionEventInvokeConfig(
1✔
3668
            LastModified=datetime.datetime.strptime(
3669
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3670
            ),
3671
            FunctionArn=api_utils.qualified_lambda_arn(
3672
                function_name, qualifier, account_id, region
3673
            ),
3674
            DestinationConfig=config.destination_config,
3675
            MaximumEventAgeInSeconds=config.maximum_event_age_in_seconds,
3676
            MaximumRetryAttempts=config.maximum_retry_attempts,
3677
        )
3678

3679
    def list_function_event_invoke_configs(
1✔
3680
        self,
3681
        context: RequestContext,
3682
        function_name: FunctionName,
3683
        marker: String = None,
3684
        max_items: MaxFunctionEventInvokeConfigListItems = None,
3685
        **kwargs,
3686
    ) -> ListFunctionEventInvokeConfigsResponse:
3687
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3688
        state = lambda_stores[account_id][region]
1✔
3689
        fn = state.functions.get(function_name)
1✔
3690
        if not fn:
1✔
3691
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3692

3693
        event_invoke_configs = [
1✔
3694
            FunctionEventInvokeConfig(
3695
                LastModified=c.last_modified,
3696
                FunctionArn=api_utils.qualified_lambda_arn(
3697
                    function_name, c.qualifier, account_id, region
3698
                ),
3699
                MaximumEventAgeInSeconds=c.maximum_event_age_in_seconds,
3700
                MaximumRetryAttempts=c.maximum_retry_attempts,
3701
                DestinationConfig=c.destination_config,
3702
            )
3703
            for c in fn.event_invoke_configs.values()
3704
        ]
3705

3706
        event_invoke_configs = PaginatedList(event_invoke_configs)
1✔
3707
        page, token = event_invoke_configs.get_page(
1✔
3708
            lambda x: x["FunctionArn"],
3709
            marker,
3710
            max_items,
3711
        )
3712
        return ListFunctionEventInvokeConfigsResponse(
1✔
3713
            FunctionEventInvokeConfigs=page, NextMarker=token
3714
        )
3715

3716
    def delete_function_event_invoke_config(
1✔
3717
        self,
3718
        context: RequestContext,
3719
        function_name: FunctionName,
3720
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
3721
        **kwargs,
3722
    ) -> None:
3723
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3724
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3725
            function_name, qualifier, context
3726
        )
3727
        state = lambda_stores[account_id][region]
1✔
3728
        fn = state.functions.get(function_name)
1✔
3729
        resolved_qualifier = qualifier or "$LATEST"
1✔
3730
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3731
        if not fn:
1✔
3732
            raise ResourceNotFoundException(
1✔
3733
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3734
            )
3735

3736
        config = fn.event_invoke_configs.get(resolved_qualifier)
1✔
3737
        if not config:
1✔
3738
            raise ResourceNotFoundException(
1✔
3739
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3740
            )
3741

3742
        del fn.event_invoke_configs[resolved_qualifier]
1✔
3743

3744
    def update_function_event_invoke_config(
1✔
3745
        self,
3746
        context: RequestContext,
3747
        function_name: FunctionName,
3748
        qualifier: NumericLatestPublishedOrAliasQualifier = None,
3749
        maximum_retry_attempts: MaximumRetryAttempts = None,
3750
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3751
        destination_config: DestinationConfig = None,
3752
        **kwargs,
3753
    ) -> FunctionEventInvokeConfig:
3754
        # like put but only update single fields via replace
3755
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3756
        state = lambda_stores[account_id][region]
1✔
3757
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3758
            function_name, qualifier, context
3759
        )
3760

3761
        if (
1✔
3762
            maximum_event_age_in_seconds is None
3763
            and maximum_retry_attempts is None
3764
            and destination_config is None
3765
        ):
3766
            raise InvalidParameterValueException(
×
3767
                "You must specify at least one of error handling or destination setting.",
3768
                Type="User",
3769
            )
3770

3771
        fn = state.functions.get(function_name)
1✔
3772
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3773
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3774

3775
        qualifier = qualifier or "$LATEST"
1✔
3776

3777
        config = fn.event_invoke_configs.get(qualifier)
1✔
3778
        if not config:
1✔
3779
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3780
            raise ResourceNotFoundException(
1✔
3781
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3782
            )
3783

3784
        if destination_config:
1✔
3785
            self._validate_destination_config(state, function_name, destination_config)
×
3786

3787
        optional_kwargs = {
1✔
3788
            k: v
3789
            for k, v in {
3790
                "destination_config": destination_config,
3791
                "maximum_retry_attempts": maximum_retry_attempts,
3792
                "maximum_event_age_in_seconds": maximum_event_age_in_seconds,
3793
            }.items()
3794
            if v is not None
3795
        }
3796

3797
        new_config = dataclasses.replace(
1✔
3798
            config, last_modified=api_utils.generate_lambda_date(), **optional_kwargs
3799
        )
3800
        fn.event_invoke_configs[qualifier] = new_config
1✔
3801

3802
        return FunctionEventInvokeConfig(
1✔
3803
            LastModified=datetime.datetime.strptime(
3804
                new_config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3805
            ),
3806
            FunctionArn=api_utils.qualified_lambda_arn(
3807
                function_name, qualifier or "$LATEST", account_id, region
3808
            ),
3809
            DestinationConfig=new_config.destination_config,
3810
            MaximumEventAgeInSeconds=new_config.maximum_event_age_in_seconds,
3811
            MaximumRetryAttempts=new_config.maximum_retry_attempts,
3812
        )
3813

3814
    # =======================================
3815
    # ======  Layer & Layer Versions  =======
3816
    # =======================================
3817

3818
    @staticmethod
1✔
3819
    def _resolve_layer(
1✔
3820
        layer_name_or_arn: str, context: RequestContext
3821
    ) -> tuple[str, str, str, str | None]:
3822
        """
3823
        Return locator attributes for a given Lambda layer.
3824

3825
        :param layer_name_or_arn: Layer name or ARN
3826
        :param context: Request context
3827
        :return: Tuple of region, account ID, layer name, layer version
3828
        """
3829
        if api_utils.is_layer_arn(layer_name_or_arn):
1✔
3830
            return api_utils.parse_layer_arn(layer_name_or_arn)
1✔
3831

3832
        return context.region, context.account_id, layer_name_or_arn, None
1✔
3833

3834
    def publish_layer_version(
1✔
3835
        self,
3836
        context: RequestContext,
3837
        layer_name: LayerName,
3838
        content: LayerVersionContentInput,
3839
        description: Description | None = None,
3840
        compatible_runtimes: CompatibleRuntimes | None = None,
3841
        license_info: LicenseInfo | None = None,
3842
        compatible_architectures: CompatibleArchitectures | None = None,
3843
        **kwargs,
3844
    ) -> PublishLayerVersionResponse:
3845
        """
3846
        On first use of a LayerName a new layer is created and for each subsequent call with the same LayerName a new version is created.
3847
        Note that there are no $LATEST versions with layers!
3848

3849
        """
3850
        account = context.account_id
1✔
3851
        region = context.region
1✔
3852

3853
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
3854
            compatible_runtimes, compatible_architectures
3855
        )
3856
        if validation_errors:
1✔
3857
            raise ValidationException(
1✔
3858
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {'; '.join(validation_errors)}"
3859
            )
3860

3861
        state = lambda_stores[account][region]
1✔
3862
        with self.create_layer_lock:
1✔
3863
            if layer_name not in state.layers:
1✔
3864
                # we don't have a version so create new layer object
3865
                # lock is required to avoid creating two v1 objects for the same name
3866
                layer = Layer(
1✔
3867
                    arn=api_utils.layer_arn(layer_name=layer_name, account=account, region=region)
3868
                )
3869
                state.layers[layer_name] = layer
1✔
3870

3871
        layer = state.layers[layer_name]
1✔
3872
        with layer.next_version_lock:
1✔
3873
            next_version = LambdaLayerVersionIdentifier(
1✔
3874
                account_id=account, region=region, layer_name=layer_name
3875
            ).generate(next_version=layer.next_version)
3876
            # When creating a layer with user defined layer version, it is possible that we
3877
            # create layer versions out of order.
3878
            # ie. a user could replicate layer v2 then layer v1. It is important to always keep the maximum possible
3879
            # value for next layer to avoid overwriting existing versions
3880
            if layer.next_version <= next_version:
1✔
3881
                # We don't need to update layer.next_version if the created version is lower than the "next in line"
3882
                layer.next_version = max(next_version, layer.next_version) + 1
1✔
3883

3884
        # creating a new layer
3885
        if content.get("ZipFile"):
1✔
3886
            code = store_lambda_archive(
1✔
3887
                archive_file=content["ZipFile"],
3888
                function_name=layer_name,
3889
                region_name=region,
3890
                account_id=account,
3891
            )
3892
        else:
3893
            code = store_s3_bucket_archive(
1✔
3894
                archive_bucket=content["S3Bucket"],
3895
                archive_key=content["S3Key"],
3896
                archive_version=content.get("S3ObjectVersion"),
3897
                function_name=layer_name,
3898
                region_name=region,
3899
                account_id=account,
3900
            )
3901

3902
        new_layer_version = LayerVersion(
1✔
3903
            layer_version_arn=api_utils.layer_version_arn(
3904
                layer_name=layer_name,
3905
                account=account,
3906
                region=region,
3907
                version=str(next_version),
3908
            ),
3909
            layer_arn=layer.arn,
3910
            version=next_version,
3911
            description=description or "",
3912
            license_info=license_info,
3913
            compatible_runtimes=compatible_runtimes,
3914
            compatible_architectures=compatible_architectures,
3915
            created=api_utils.generate_lambda_date(),
3916
            code=code,
3917
        )
3918

3919
        layer.layer_versions[str(next_version)] = new_layer_version
1✔
3920

3921
        return api_utils.map_layer_out(new_layer_version)
1✔
3922

3923
    def get_layer_version(
1✔
3924
        self,
3925
        context: RequestContext,
3926
        layer_name: LayerName,
3927
        version_number: LayerVersionNumber,
3928
        **kwargs,
3929
    ) -> GetLayerVersionResponse:
3930
        # TODO: handle layer_name as an ARN
3931

3932
        region_name, account_id, layer_name, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
3933
        state = lambda_stores[account_id][region_name]
1✔
3934

3935
        layer = state.layers.get(layer_name)
1✔
3936
        if version_number < 1:
1✔
3937
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
3938
        if layer is None:
1✔
3939
            raise ResourceNotFoundException(
1✔
3940
                "The resource you requested does not exist.", Type="User"
3941
            )
3942
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3943
        if layer_version is None:
1✔
3944
            raise ResourceNotFoundException(
1✔
3945
                "The resource you requested does not exist.", Type="User"
3946
            )
3947
        return api_utils.map_layer_out(layer_version)
1✔
3948

3949
    def get_layer_version_by_arn(
1✔
3950
        self, context: RequestContext, arn: LayerVersionArn, **kwargs
3951
    ) -> GetLayerVersionResponse:
3952
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3953
            arn, context
3954
        )
3955

3956
        if not layer_version:
1✔
3957
            raise ValidationException(
1✔
3958
                f"1 validation error detected: Value '{arn}' at 'arn' failed to satisfy constraint: Member must satisfy regular expression pattern: "
3959
                + "(arn:(aws[a-zA-Z-]*)?:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+)"
3960
            )
3961

3962
        store = lambda_stores[account_id][region_name]
1✔
3963
        if not (layers := store.layers.get(layer_name)):
1✔
3964
            raise ResourceNotFoundException(
×
3965
                "The resource you requested does not exist.", Type="User"
3966
            )
3967

3968
        layer_version = layers.layer_versions.get(layer_version)
1✔
3969

3970
        if not layer_version:
1✔
3971
            raise ResourceNotFoundException(
1✔
3972
                "The resource you requested does not exist.", Type="User"
3973
            )
3974

3975
        return api_utils.map_layer_out(layer_version)
1✔
3976

3977
    def list_layers(
1✔
3978
        self,
3979
        context: RequestContext,
3980
        compatible_runtime: Runtime | None = None,
3981
        marker: String | None = None,
3982
        max_items: MaxLayerListItems | None = None,
3983
        compatible_architecture: Architecture | None = None,
3984
        **kwargs,
3985
    ) -> ListLayersResponse:
3986
        validation_errors = []
1✔
3987

3988
        validation_error_arch = api_utils.validate_layer_architecture(compatible_architecture)
1✔
3989
        if validation_error_arch:
1✔
3990
            validation_errors.append(validation_error_arch)
1✔
3991

3992
        validation_error_runtime = api_utils.validate_layer_runtime(compatible_runtime)
1✔
3993
        if validation_error_runtime:
1✔
3994
            validation_errors.append(validation_error_runtime)
1✔
3995

3996
        if validation_errors:
1✔
3997
            raise ValidationException(
1✔
3998
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
3999
            )
4000
        # TODO: handle filter: compatible_runtime
4001
        # TODO: handle filter: compatible_architecture
4002

4003
        state = lambda_stores[context.account_id][context.region]
×
4004
        layers = state.layers
×
4005

4006
        # TODO: test how filters behave together with only returning layers here? Does it return the latest "matching" layer, i.e. does it ignore later layer versions that don't match?
4007

4008
        responses: list[LayersListItem] = []
×
4009
        for layer_name, layer in layers.items():
×
4010
            # fetch latest version
4011
            layer_versions = list(layer.layer_versions.values())
×
4012
            sorted(layer_versions, key=lambda x: x.version)
×
4013
            latest_layer_version = layer_versions[-1]
×
4014
            responses.append(
×
4015
                LayersListItem(
4016
                    LayerName=layer_name,
4017
                    LayerArn=layer.arn,
4018
                    LatestMatchingVersion=api_utils.map_layer_out(latest_layer_version),
4019
                )
4020
            )
4021

4022
        responses = PaginatedList(responses)
×
4023
        page, token = responses.get_page(
×
4024
            lambda version: version,
4025
            marker,
4026
            max_items,
4027
        )
4028

4029
        return ListLayersResponse(NextMarker=token, Layers=page)
×
4030

4031
    def list_layer_versions(
1✔
4032
        self,
4033
        context: RequestContext,
4034
        layer_name: LayerName,
4035
        compatible_runtime: Runtime | None = None,
4036
        marker: String | None = None,
4037
        max_items: MaxLayerListItems | None = None,
4038
        compatible_architecture: Architecture | None = None,
4039
        **kwargs,
4040
    ) -> ListLayerVersionsResponse:
4041
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
4042
            [compatible_runtime] if compatible_runtime else [],
4043
            [compatible_architecture] if compatible_architecture else [],
4044
        )
4045
        if validation_errors:
1✔
4046
            raise ValidationException(
×
4047
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
4048
            )
4049

4050
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
4051
            layer_name, context
4052
        )
4053
        state = lambda_stores[account_id][region_name]
1✔
4054

4055
        # TODO: Test & handle filter: compatible_runtime
4056
        # TODO: Test & handle filter: compatible_architecture
4057
        all_layer_versions = []
1✔
4058
        layer = state.layers.get(layer_name)
1✔
4059
        if layer is not None:
1✔
4060
            for layer_version in layer.layer_versions.values():
1✔
4061
                all_layer_versions.append(api_utils.map_layer_out(layer_version))
1✔
4062

4063
        all_layer_versions.sort(key=lambda x: x["Version"], reverse=True)
1✔
4064
        all_layer_versions = PaginatedList(all_layer_versions)
1✔
4065
        page, token = all_layer_versions.get_page(
1✔
4066
            lambda version: version["LayerVersionArn"],
4067
            marker,
4068
            max_items,
4069
        )
4070
        return ListLayerVersionsResponse(NextMarker=token, LayerVersions=page)
1✔
4071

4072
    def delete_layer_version(
1✔
4073
        self,
4074
        context: RequestContext,
4075
        layer_name: LayerName,
4076
        version_number: LayerVersionNumber,
4077
        **kwargs,
4078
    ) -> None:
4079
        if version_number < 1:
1✔
4080
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
4081

4082
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
4083
            layer_name, context
4084
        )
4085

4086
        store = lambda_stores[account_id][region_name]
1✔
4087
        layer = store.layers.get(layer_name, {})
1✔
4088
        if layer:
1✔
4089
            layer.layer_versions.pop(str(version_number), None)
1✔
4090

4091
    # =======================================
4092
    # =====  Layer Version Permissions  =====
4093
    # =======================================
4094
    # TODO: lock updates that change revision IDs
4095

4096
    def add_layer_version_permission(
1✔
4097
        self,
4098
        context: RequestContext,
4099
        layer_name: LayerName,
4100
        version_number: LayerVersionNumber,
4101
        statement_id: StatementId,
4102
        action: LayerPermissionAllowedAction,
4103
        principal: LayerPermissionAllowedPrincipal,
4104
        organization_id: OrganizationId = None,
4105
        revision_id: String = None,
4106
        **kwargs,
4107
    ) -> AddLayerVersionPermissionResponse:
4108
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4109
        # `layer_n` contains the layer name.
4110
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
4111

4112
        if action != "lambda:GetLayerVersion":
1✔
4113
            raise ValidationException(
1✔
4114
                f"1 validation error detected: Value '{action}' at 'action' failed to satisfy constraint: Member must satisfy regular expression pattern: lambda:GetLayerVersion"
4115
            )
4116

4117
        store = lambda_stores[account_id][region_name]
1✔
4118
        layer = store.layers.get(layer_n)
1✔
4119

4120
        layer_version_arn = api_utils.layer_version_arn(
1✔
4121
            layer_name, account_id, region_name, str(version_number)
4122
        )
4123

4124
        if layer is None:
1✔
4125
            raise ResourceNotFoundException(
1✔
4126
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4127
            )
4128
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4129
        if layer_version is None:
1✔
4130
            raise ResourceNotFoundException(
1✔
4131
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4132
            )
4133
        # do we have a policy? if not set one
4134
        if layer_version.policy is None:
1✔
4135
            layer_version.policy = LayerPolicy()
1✔
4136

4137
        if statement_id in layer_version.policy.statements:
1✔
4138
            raise ResourceConflictException(
1✔
4139
                f"The statement id ({statement_id}) provided already exists. Please provide a new statement id, or remove the existing statement.",
4140
                Type="User",
4141
            )
4142

4143
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
4144
            raise PreconditionFailedException(
1✔
4145
                "The Revision Id provided does not match the latest Revision Id. "
4146
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
4147
                Type="User",
4148
            )
4149

4150
        statement = LayerPolicyStatement(
1✔
4151
            sid=statement_id, action=action, principal=principal, organization_id=organization_id
4152
        )
4153

4154
        old_statements = layer_version.policy.statements
1✔
4155
        layer_version.policy = dataclasses.replace(
1✔
4156
            layer_version.policy, statements={**old_statements, statement_id: statement}
4157
        )
4158

4159
        return AddLayerVersionPermissionResponse(
1✔
4160
            Statement=json.dumps(
4161
                {
4162
                    "Sid": statement.sid,
4163
                    "Effect": "Allow",
4164
                    "Principal": statement.principal,
4165
                    "Action": statement.action,
4166
                    "Resource": layer_version.layer_version_arn,
4167
                }
4168
            ),
4169
            RevisionId=layer_version.policy.revision_id,
4170
        )
4171

4172
    def remove_layer_version_permission(
1✔
4173
        self,
4174
        context: RequestContext,
4175
        layer_name: LayerName,
4176
        version_number: LayerVersionNumber,
4177
        statement_id: StatementId,
4178
        revision_id: String = None,
4179
        **kwargs,
4180
    ) -> None:
4181
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4182
        # `layer_n` contains the layer name.
4183
        region_name, account_id, layer_n, layer_version = LambdaProvider._resolve_layer(
1✔
4184
            layer_name, context
4185
        )
4186

4187
        layer_version_arn = api_utils.layer_version_arn(
1✔
4188
            layer_name, account_id, region_name, str(version_number)
4189
        )
4190

4191
        state = lambda_stores[account_id][region_name]
1✔
4192
        layer = state.layers.get(layer_n)
1✔
4193
        if layer is None:
1✔
4194
            raise ResourceNotFoundException(
1✔
4195
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4196
            )
4197
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4198
        if layer_version is None:
1✔
4199
            raise ResourceNotFoundException(
1✔
4200
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4201
            )
4202

4203
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
4204
            raise PreconditionFailedException(
1✔
4205
                "The Revision Id provided does not match the latest Revision Id. "
4206
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
4207
                Type="User",
4208
            )
4209

4210
        if statement_id not in layer_version.policy.statements:
1✔
4211
            raise ResourceNotFoundException(
1✔
4212
                f"Statement {statement_id} is not found in resource policy.", Type="User"
4213
            )
4214

4215
        old_statements = layer_version.policy.statements
1✔
4216
        layer_version.policy = dataclasses.replace(
1✔
4217
            layer_version.policy,
4218
            statements={k: v for k, v in old_statements.items() if k != statement_id},
4219
        )
4220

4221
    def get_layer_version_policy(
1✔
4222
        self,
4223
        context: RequestContext,
4224
        layer_name: LayerName,
4225
        version_number: LayerVersionNumber,
4226
        **kwargs,
4227
    ) -> GetLayerVersionPolicyResponse:
4228
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4229
        # `layer_n` contains the layer name.
4230
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
4231

4232
        layer_version_arn = api_utils.layer_version_arn(
1✔
4233
            layer_name, account_id, region_name, str(version_number)
4234
        )
4235

4236
        store = lambda_stores[account_id][region_name]
1✔
4237
        layer = store.layers.get(layer_n)
1✔
4238

4239
        if layer is None:
1✔
4240
            raise ResourceNotFoundException(
1✔
4241
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4242
            )
4243

4244
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4245
        if layer_version is None:
1✔
4246
            raise ResourceNotFoundException(
1✔
4247
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4248
            )
4249

4250
        if layer_version.policy is None:
1✔
4251
            raise ResourceNotFoundException(
1✔
4252
                "No policy is associated with the given resource.", Type="User"
4253
            )
4254

4255
        return GetLayerVersionPolicyResponse(
1✔
4256
            Policy=json.dumps(
4257
                {
4258
                    "Version": layer_version.policy.version,
4259
                    "Id": layer_version.policy.id,
4260
                    "Statement": [
4261
                        {
4262
                            "Sid": ps.sid,
4263
                            "Effect": "Allow",
4264
                            "Principal": ps.principal,
4265
                            "Action": ps.action,
4266
                            "Resource": layer_version.layer_version_arn,
4267
                        }
4268
                        for ps in layer_version.policy.statements.values()
4269
                    ],
4270
                }
4271
            ),
4272
            RevisionId=layer_version.policy.revision_id,
4273
        )
4274

4275
    # =======================================
4276
    # =======  Function Concurrency  ========
4277
    # =======================================
4278
    # (Reserved) function concurrency is scoped to the whole function
4279

4280
    def get_function_concurrency(
1✔
4281
        self, context: RequestContext, function_name: FunctionName, **kwargs
4282
    ) -> GetFunctionConcurrencyResponse:
4283
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4284
        function_name = api_utils.get_function_name(function_name, context)
1✔
4285
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
4286
        return GetFunctionConcurrencyResponse(
1✔
4287
            ReservedConcurrentExecutions=fn.reserved_concurrent_executions
4288
        )
4289

4290
    def put_function_concurrency(
1✔
4291
        self,
4292
        context: RequestContext,
4293
        function_name: FunctionName,
4294
        reserved_concurrent_executions: ReservedConcurrentExecutions,
4295
        **kwargs,
4296
    ) -> Concurrency:
4297
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4298

4299
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4300
        if qualifier:
1✔
4301
            raise InvalidParameterValueException(
1✔
4302
                "This operation is permitted on Lambda functions only. Aliases and versions do not support this operation. Please specify either a function name or an unqualified function ARN.",
4303
                Type="User",
4304
            )
4305

4306
        store = lambda_stores[account_id][region]
1✔
4307
        fn = store.functions.get(function_name)
1✔
4308
        if not fn:
1✔
4309
            fn_arn = api_utils.qualified_lambda_arn(
1✔
4310
                function_name,
4311
                qualifier="$LATEST",
4312
                account=account_id,
4313
                region=region,
4314
            )
4315
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
4316

4317
        settings = self.get_account_settings(context)
1✔
4318
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
4319
            "UnreservedConcurrentExecutions"
4320
        ]
4321

4322
        # The existing reserved concurrent executions for the same function are already deduced in
4323
        # unreserved_concurrent_executions but must not count because the new one will replace the existing one.
4324
        # Joel tested this behavior manually against AWS (2023-11-28).
4325
        existing_reserved_concurrent_executions = (
1✔
4326
            fn.reserved_concurrent_executions if fn.reserved_concurrent_executions else 0
4327
        )
4328
        if (
1✔
4329
            unreserved_concurrent_executions
4330
            - reserved_concurrent_executions
4331
            + existing_reserved_concurrent_executions
4332
        ) < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY:
4333
            raise InvalidParameterValueException(
1✔
4334
                f"Specified ReservedConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
4335
            )
4336

4337
        total_provisioned_concurrency = sum(
1✔
4338
            [
4339
                provisioned_configs.provisioned_concurrent_executions
4340
                for provisioned_configs in fn.provisioned_concurrency_configs.values()
4341
            ]
4342
        )
4343
        if total_provisioned_concurrency > reserved_concurrent_executions:
1✔
4344
            raise InvalidParameterValueException(
1✔
4345
                f" ReservedConcurrentExecutions  {reserved_concurrent_executions} should not be lower than function's total provisioned concurrency [{total_provisioned_concurrency}]."
4346
            )
4347

4348
        fn.reserved_concurrent_executions = reserved_concurrent_executions
1✔
4349

4350
        return Concurrency(ReservedConcurrentExecutions=fn.reserved_concurrent_executions)
1✔
4351

4352
    def delete_function_concurrency(
1✔
4353
        self, context: RequestContext, function_name: FunctionName, **kwargs
4354
    ) -> None:
4355
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4356
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4357
        store = lambda_stores[account_id][region]
1✔
4358
        fn = store.functions.get(function_name)
1✔
4359
        fn.reserved_concurrent_executions = None
1✔
4360

4361
    # =======================================
4362
    # ===============  TAGS   ===============
4363
    # =======================================
4364
    # only Function, Event Source Mapping, and Code Signing Config (not currently supported by LocalStack) ARNs an are available for tagging in AWS
4365

4366
    def _get_tags(self, resource: TaggableResource) -> dict[str, str]:
1✔
4367
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4368
        lambda_adapted_tags = {
1✔
4369
            tag["Key"]: tag["Value"]
4370
            for tag in state.TAGS.list_tags_for_resource(resource).get("Tags")
4371
        }
4372
        return lambda_adapted_tags
1✔
4373

4374
    def _store_tags(self, resource: TaggableResource, tags: dict[str, str]):
1✔
4375
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4376
        if len(state.TAGS.tags.get(resource, {}) | tags) > LAMBDA_TAG_LIMIT_PER_RESOURCE:
1✔
4377
            raise InvalidParameterValueException(
1✔
4378
                "Number of tags exceeds resource tag limit.", Type="User"
4379
            )
4380

4381
        tag_svc_adapted_tags = [{"Key": key, "Value": value} for key, value in tags.items()]
1✔
4382
        state.TAGS.tag_resource(resource, tag_svc_adapted_tags)
1✔
4383

4384
    def fetch_lambda_store_for_tagging(self, resource: TaggableResource) -> LambdaStore:
1✔
4385
        """
4386
        Takes a resource ARN for a TaggableResource (Lambda Function, Event Source Mapping, Code Signing Config, or Capacity Provider) and returns a corresponding
4387
        LambdaStore for its region and account.
4388

4389
        In addition, this function validates that the ARN is a valid TaggableResource type, and that the TaggableResource exists.
4390

4391
        Raises:
4392
            ValidationException: If the resource ARN is not a full ARN for a TaggableResource.
4393
            ResourceNotFoundException: If the specified resource does not exist.
4394
            InvalidParameterValueException: If the resource ARN is a qualified Lambda Function.
4395
        """
4396

4397
        def _raise_validation_exception():
1✔
4398
            raise ValidationException(
1✔
4399
                f"1 validation error detected: Value '{resource}' at 'resource' failed to satisfy constraint: Member must satisfy regular expression pattern: {api_utils.TAGGABLE_RESOURCE_ARN_PATTERN}"
4400
            )
4401

4402
        # Check whether the ARN we have been passed is correctly formatted
4403
        parsed_resource_arn: ArnData = None
1✔
4404
        try:
1✔
4405
            parsed_resource_arn = parse_arn(resource)
1✔
4406
        except Exception:
1✔
4407
            _raise_validation_exception()
1✔
4408

4409
        # TODO: Should we be checking whether this is a full ARN?
4410
        region, account_id, resource_type = map(
1✔
4411
            parsed_resource_arn.get, ("region", "account", "resource")
4412
        )
4413

4414
        if not all((region, account_id, resource_type)):
1✔
4415
            _raise_validation_exception()
×
4416

4417
        if not (parts := resource_type.split(":")):
1✔
4418
            _raise_validation_exception()
×
4419

4420
        resource_type, resource_identifier, *qualifier = parts
1✔
4421

4422
        # Qualifier validation raises before checking for NotFound
4423
        if qualifier:
1✔
4424
            if resource_type == "function":
1✔
4425
                raise InvalidParameterValueException(
1✔
4426
                    "Tags on function aliases and versions are not supported. Please specify a function ARN.",
4427
                    Type="User",
4428
                )
4429
            _raise_validation_exception()
1✔
4430

4431
        if resource_type == "event-source-mapping":
1✔
4432
            self._get_esm(resource_identifier, account_id, region)
1✔
4433
        elif resource_type == "code-signing-config":
1✔
4434
            raise NotImplementedError("Resource tagging on CSC not yet implemented.")
4435
        elif resource_type == "function":
1✔
4436
            self._get_function(
1✔
4437
                function_name=resource_identifier, account_id=account_id, region=region
4438
            )
4439
        elif resource_type == "capacity-provider":
1✔
4440
            self._get_capacity_provider(resource_identifier, account_id, region)
1✔
4441
        else:
4442
            _raise_validation_exception()
1✔
4443

4444
        # If no exceptions are raised, assume ARN and referenced resource is valid for tag operations
4445
        return lambda_stores[account_id][region]
1✔
4446

4447
    def tag_resource(
1✔
4448
        self, context: RequestContext, resource: TaggableResource, tags: Tags, **kwargs
4449
    ) -> None:
4450
        if not tags:
1✔
4451
            raise InvalidParameterValueException(
1✔
4452
                "An error occurred and the request cannot be processed.", Type="User"
4453
            )
4454
        self._store_tags(resource, tags)
1✔
4455

4456
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4457
            "function"
4458
        ):
4459
            name, _, account, region = function_locators_from_arn(resource)
1✔
4460
            function = self._get_function(name, account, region)
1✔
4461
            with function.lock:
1✔
4462
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4463
                latest_version = function.versions["$LATEST"]
1✔
4464
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4465
                    latest_version, config=dataclasses.replace(latest_version.config)
4466
                )
4467

4468
    def list_tags(
1✔
4469
        self, context: RequestContext, resource: TaggableResource, **kwargs
4470
    ) -> ListTagsResponse:
4471
        tags = self._get_tags(resource)
1✔
4472
        return ListTagsResponse(Tags=tags)
1✔
4473

4474
    def untag_resource(
1✔
4475
        self, context: RequestContext, resource: TaggableResource, tag_keys: TagKeyList, **kwargs
4476
    ) -> None:
4477
        if not tag_keys:
1✔
4478
            raise ValidationException(
1✔
4479
                "1 validation error detected: Value null at 'tagKeys' failed to satisfy constraint: Member must not be null"
4480
            )  # should probably be generalized a bit
4481

4482
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4483
        state.TAGS.untag_resource(resource, tag_keys)
1✔
4484

4485
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4486
            "function"
4487
        ):
4488
            name, _, account, region = function_locators_from_arn(resource)
1✔
4489
            function = self._get_function(name, account, region)
1✔
4490
            # TODO: Potential race condition
4491
            with function.lock:
1✔
4492
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4493
                latest_version = function.versions["$LATEST"]
1✔
4494
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4495
                    latest_version, config=dataclasses.replace(latest_version.config)
4496
                )
4497

4498
    # =======================================
4499
    # =======  LEGACY / DEPRECATED   ========
4500
    # =======================================
4501

4502
    def invoke_async(
1✔
4503
        self,
4504
        context: RequestContext,
4505
        function_name: NamespacedFunctionName,
4506
        invoke_args: IO[BlobStream],
4507
        **kwargs,
4508
    ) -> InvokeAsyncResponse:
4509
        """LEGACY API endpoint. Even AWS heavily discourages its usage."""
4510
        raise NotImplementedError
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc