• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 22334798432

23 Feb 2026 06:42PM UTC coverage: 86.956% (-0.02%) from 86.973%
22334798432

push

github

web-flow
S3: regenerate test snapshots & parity fixes (#13824)

69831 of 80306 relevant lines covered (86.96%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.98
/localstack-core/localstack/services/lambda_/provider.py
1
import base64
1✔
2
import dataclasses
1✔
3
import datetime
1✔
4
import itertools
1✔
5
import json
1✔
6
import logging
1✔
7
import re
1✔
8
import threading
1✔
9
import time
1✔
10
from typing import IO, Any
1✔
11

12
from botocore.exceptions import ClientError
1✔
13

14
from localstack import config
1✔
15
from localstack.aws.api import RequestContext, ServiceException, handler
1✔
16
from localstack.aws.api.lambda_ import (
1✔
17
    AccountLimit,
18
    AccountUsage,
19
    AddLayerVersionPermissionResponse,
20
    AddPermissionRequest,
21
    AddPermissionResponse,
22
    Alias,
23
    AliasConfiguration,
24
    AliasRoutingConfiguration,
25
    AllowedPublishers,
26
    Architecture,
27
    Arn,
28
    Blob,
29
    BlobStream,
30
    CapacityProviderConfig,
31
    CodeSigningConfigArn,
32
    CodeSigningConfigNotFoundException,
33
    CodeSigningPolicies,
34
    CompatibleArchitectures,
35
    CompatibleRuntimes,
36
    Concurrency,
37
    Cors,
38
    CreateCodeSigningConfigResponse,
39
    CreateEventSourceMappingRequest,
40
    CreateFunctionRequest,
41
    CreateFunctionUrlConfigResponse,
42
    DeleteCodeSigningConfigResponse,
43
    DeleteFunctionResponse,
44
    Description,
45
    DestinationConfig,
46
    DurableExecutionName,
47
    EventSourceMappingConfiguration,
48
    FunctionCodeLocation,
49
    FunctionConfiguration,
50
    FunctionEventInvokeConfig,
51
    FunctionName,
52
    FunctionUrlAuthType,
53
    FunctionUrlQualifier,
54
    FunctionVersionLatestPublished,
55
    GetAccountSettingsResponse,
56
    GetCodeSigningConfigResponse,
57
    GetFunctionCodeSigningConfigResponse,
58
    GetFunctionConcurrencyResponse,
59
    GetFunctionRecursionConfigResponse,
60
    GetFunctionResponse,
61
    GetFunctionUrlConfigResponse,
62
    GetLayerVersionPolicyResponse,
63
    GetLayerVersionResponse,
64
    GetPolicyResponse,
65
    GetProvisionedConcurrencyConfigResponse,
66
    InvalidParameterValueException,
67
    InvocationResponse,
68
    InvocationType,
69
    InvokeAsyncResponse,
70
    InvokeMode,
71
    LambdaApi,
72
    LambdaManagedInstancesCapacityProviderConfig,
73
    LastUpdateStatus,
74
    LayerName,
75
    LayerPermissionAllowedAction,
76
    LayerPermissionAllowedPrincipal,
77
    LayersListItem,
78
    LayerVersionArn,
79
    LayerVersionContentInput,
80
    LayerVersionNumber,
81
    LicenseInfo,
82
    ListAliasesResponse,
83
    ListCodeSigningConfigsResponse,
84
    ListEventSourceMappingsResponse,
85
    ListFunctionEventInvokeConfigsResponse,
86
    ListFunctionsByCodeSigningConfigResponse,
87
    ListFunctionsResponse,
88
    ListFunctionUrlConfigsResponse,
89
    ListLayersResponse,
90
    ListLayerVersionsResponse,
91
    ListProvisionedConcurrencyConfigsResponse,
92
    ListTagsResponse,
93
    ListVersionsByFunctionResponse,
94
    LogFormat,
95
    LoggingConfig,
96
    LogType,
97
    MasterRegion,
98
    MaxFunctionEventInvokeConfigListItems,
99
    MaximumEventAgeInSeconds,
100
    MaximumRetryAttempts,
101
    MaxItems,
102
    MaxLayerListItems,
103
    MaxListItems,
104
    MaxProvisionedConcurrencyConfigListItems,
105
    NamespacedFunctionName,
106
    NamespacedStatementId,
107
    NumericLatestPublishedOrAliasQualifier,
108
    OnFailure,
109
    OnSuccess,
110
    OrganizationId,
111
    PackageType,
112
    PositiveInteger,
113
    PreconditionFailedException,
114
    ProvisionedConcurrencyConfigListItem,
115
    ProvisionedConcurrencyConfigNotFoundException,
116
    ProvisionedConcurrencyStatusEnum,
117
    PublishLayerVersionResponse,
118
    PutFunctionCodeSigningConfigResponse,
119
    PutFunctionRecursionConfigResponse,
120
    PutProvisionedConcurrencyConfigResponse,
121
    Qualifier,
122
    RecursiveLoop,
123
    ReservedConcurrentExecutions,
124
    ResourceConflictException,
125
    ResourceNotFoundException,
126
    Runtime,
127
    RuntimeVersionConfig,
128
    SnapStart,
129
    SnapStartApplyOn,
130
    SnapStartOptimizationStatus,
131
    SnapStartResponse,
132
    State,
133
    StatementId,
134
    StateReasonCode,
135
    String,
136
    TaggableResource,
137
    TagKeyList,
138
    Tags,
139
    TenantId,
140
    TracingMode,
141
    UnqualifiedFunctionName,
142
    UpdateCodeSigningConfigResponse,
143
    UpdateEventSourceMappingRequest,
144
    UpdateFunctionCodeRequest,
145
    UpdateFunctionConfigurationRequest,
146
    UpdateFunctionUrlConfigResponse,
147
    VersionWithLatestPublished,
148
)
149
from localstack.aws.api.lambda_ import FunctionVersion as FunctionVersionApi
1✔
150
from localstack.aws.api.lambda_ import ServiceException as LambdaServiceException
1✔
151
from localstack.aws.api.pipes import (
1✔
152
    DynamoDBStreamStartPosition,
153
    KinesisStreamStartPosition,
154
)
155
from localstack.aws.connect import connect_to
1✔
156
from localstack.aws.spec import load_service
1✔
157
from localstack.services.edge import ROUTER
1✔
158
from localstack.services.lambda_ import api_utils
1✔
159
from localstack.services.lambda_ import hooks as lambda_hooks
1✔
160
from localstack.services.lambda_.analytics import (
1✔
161
    FunctionInitializationType,
162
    FunctionOperation,
163
    FunctionStatus,
164
    function_counter,
165
)
166
from localstack.services.lambda_.api_utils import (
1✔
167
    ARCHITECTURES,
168
    STATEMENT_ID_REGEX,
169
    SUBNET_ID_REGEX,
170
    function_locators_from_arn,
171
)
172
from localstack.services.lambda_.event_source_mapping.esm_config_factory import (
1✔
173
    EsmConfigFactory,
174
)
175
from localstack.services.lambda_.event_source_mapping.esm_worker import (
1✔
176
    EsmState,
177
    EsmWorker,
178
)
179
from localstack.services.lambda_.event_source_mapping.esm_worker_factory import (
1✔
180
    EsmWorkerFactory,
181
)
182
from localstack.services.lambda_.event_source_mapping.pipe_utils import get_internal_client
1✔
183
from localstack.services.lambda_.invocation import AccessDeniedException
1✔
184
from localstack.services.lambda_.invocation.execution_environment import (
1✔
185
    EnvironmentStartupTimeoutException,
186
)
187
from localstack.services.lambda_.invocation.lambda_models import (
1✔
188
    AliasRoutingConfig,
189
    CodeSigningConfig,
190
    EventInvokeConfig,
191
    Function,
192
    FunctionResourcePolicy,
193
    FunctionUrlConfig,
194
    FunctionVersion,
195
    ImageConfig,
196
    LambdaEphemeralStorage,
197
    Layer,
198
    LayerPolicy,
199
    LayerPolicyStatement,
200
    LayerVersion,
201
    ProvisionedConcurrencyConfiguration,
202
    RequestEntityTooLargeException,
203
    ResourcePolicy,
204
    UpdateStatus,
205
    ValidationException,
206
    VersionAlias,
207
    VersionFunctionConfiguration,
208
    VersionIdentifier,
209
    VersionState,
210
    VpcConfig,
211
)
212
from localstack.services.lambda_.invocation.lambda_service import (
1✔
213
    LambdaService,
214
    create_image_code,
215
    destroy_code_if_not_used,
216
    lambda_stores,
217
    store_lambda_archive,
218
    store_s3_bucket_archive,
219
)
220
from localstack.services.lambda_.invocation.models import CapacityProvider as CapacityProviderModel
1✔
221
from localstack.services.lambda_.invocation.models import LambdaStore
1✔
222
from localstack.services.lambda_.invocation.runtime_executor import get_runtime_executor
1✔
223
from localstack.services.lambda_.lambda_utils import HINT_LOG
1✔
224
from localstack.services.lambda_.layerfetcher.layer_fetcher import LayerFetcher
1✔
225
from localstack.services.lambda_.provider_utils import (
1✔
226
    LambdaLayerVersionIdentifier,
227
    get_function_version,
228
    get_function_version_from_arn,
229
)
230
from localstack.services.lambda_.runtimes import (
1✔
231
    ALL_RUNTIMES,
232
    DEPRECATED_RUNTIMES,
233
    DEPRECATED_RUNTIMES_UPGRADES,
234
    RUNTIMES_AGGREGATED,
235
    SNAP_START_SUPPORTED_RUNTIMES,
236
    VALID_MANAGED_INSTANCE_RUNTIMES,
237
    VALID_RUNTIMES,
238
)
239
from localstack.services.lambda_.urlrouter import FunctionUrlRouter
1✔
240
from localstack.services.plugins import ServiceLifecycleHook
1✔
241
from localstack.state import StateVisitor
1✔
242
from localstack.utils.aws.arns import (
1✔
243
    ArnData,
244
    capacity_provider_arn,
245
    extract_resource_from_arn,
246
    extract_service_from_arn,
247
    get_partition,
248
    lambda_event_source_mapping_arn,
249
    parse_arn,
250
)
251
from localstack.utils.aws.client_types import ServicePrincipal
1✔
252
from localstack.utils.bootstrap import is_api_enabled
1✔
253
from localstack.utils.collections import PaginatedList, merge_recursive
1✔
254
from localstack.utils.event_matcher import validate_event_pattern
1✔
255
from localstack.utils.strings import get_random_hex, short_uid, to_bytes, to_str
1✔
256
from localstack.utils.sync import poll_condition
1✔
257
from localstack.utils.urls import localstack_host
1✔
258

259
LOG = logging.getLogger(__name__)
1✔
260

261
CAPACITY_PROVIDER_ARN_NAME = "arn:aws[a-zA-Z-]*:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:capacity-provider:[a-zA-Z0-9-_]+"
1✔
262
LAMBDA_DEFAULT_TIMEOUT = 3
1✔
263
LAMBDA_DEFAULT_MEMORY_SIZE = 128
1✔
264

265
LAMBDA_TAG_LIMIT_PER_RESOURCE = 50
1✔
266
LAMBDA_LAYERS_LIMIT_PER_FUNCTION = 5
1✔
267

268
TAG_KEY_CUSTOM_URL = "_custom_id_"
1✔
269
# Requirements (from RFC3986 & co): not longer than 63, first char must be
270
# alpha, then alphanumeric or hyphen, except cannot start or end with hyphen
271
TAG_KEY_CUSTOM_URL_VALIDATOR = re.compile(r"^[A-Za-z]([A-Za-z0-9\-]{0,61}[A-Za-z0-9])?$")
1✔
272

273

274
class LambdaProvider(LambdaApi, ServiceLifecycleHook):
1✔
275
    lambda_service: LambdaService
1✔
276
    create_fn_lock: threading.RLock
1✔
277
    create_layer_lock: threading.RLock
1✔
278
    router: FunctionUrlRouter
1✔
279
    esm_workers: dict[str, EsmWorker]
1✔
280
    layer_fetcher: LayerFetcher | None
1✔
281

282
    def __init__(self) -> None:
1✔
283
        self.lambda_service = LambdaService()
1✔
284
        self.create_fn_lock = threading.RLock()
1✔
285
        self.create_layer_lock = threading.RLock()
1✔
286
        self.router = FunctionUrlRouter(ROUTER, self.lambda_service)
1✔
287
        self.esm_workers = {}
1✔
288
        self.layer_fetcher = None
1✔
289
        lambda_hooks.inject_layer_fetcher.run(self)
1✔
290

291
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
292
        visitor.visit(lambda_stores)
×
293

294
    def on_before_state_reset(self):
1✔
295
        for esm_worker in self.esm_workers.values():
×
296
            esm_worker.stop_for_shutdown()
×
297
        self.esm_workers = {}
×
298
        self.lambda_service.stop()
×
299

300
    def on_after_state_reset(self):
1✔
301
        self.router.lambda_service = self.lambda_service = LambdaService()
×
302

303
    def on_before_state_load(self):
1✔
304
        self.lambda_service.stop()
×
305

306
    def on_after_state_load(self):
1✔
307
        self.lambda_service = LambdaService()
×
308
        self.router.lambda_service = self.lambda_service
×
309

310
        for account_id, account_bundle in lambda_stores.items():
×
311
            for region_name, state in account_bundle.items():
×
312
                for fn in state.functions.values():
×
313
                    # HACK to model a volatile variable that should be ignored for persistence
314
                    # Identifier unique to this function and LocalStack instance.
315
                    # A LocalStack restart or persistence load should create a new instance id.
316
                    # Used for retaining invoke queues across version updates for $LATEST, but
317
                    # separate unrelated instances.
318
                    fn.instance_id = short_uid()
×
319

320
                    for fn_version in fn.versions.values():
×
321
                        try:
×
322
                            # $LATEST is not invokable for Lambda functions with a capacity provider
323
                            # and has a different State (i.e., ActiveNonInvokable)
324
                            is_capacity_provider_latest = (
×
325
                                fn_version.config.capacity_provider_config
326
                                and fn_version.id.qualifier == "$LATEST"
327
                            )
328
                            if not is_capacity_provider_latest:
×
329
                                # Restore the "Pending" state for the function version and start it
330
                                new_state = VersionState(
×
331
                                    state=State.Pending,
332
                                    code=StateReasonCode.Creating,
333
                                    reason="The function is being created.",
334
                                )
335
                                new_config = dataclasses.replace(fn_version.config, state=new_state)
×
336
                                new_version = dataclasses.replace(fn_version, config=new_config)
×
337
                                fn.versions[fn_version.id.qualifier] = new_version
×
338
                                self.lambda_service.create_function_version(fn_version).result(
×
339
                                    timeout=5
340
                                )
341
                        except Exception:
×
342
                            LOG.warning(
×
343
                                "Failed to restore function version %s",
344
                                fn_version.id.qualified_arn(),
345
                                exc_info=LOG.isEnabledFor(logging.DEBUG),
346
                            )
347
                    # restore provisioned concurrency per function considering both versions and aliases
348
                    for (
×
349
                        provisioned_qualifier,
350
                        provisioned_config,
351
                    ) in fn.provisioned_concurrency_configs.items():
352
                        fn_arn = None
×
353
                        try:
×
354
                            if api_utils.qualifier_is_alias(provisioned_qualifier):
×
355
                                alias = fn.aliases.get(provisioned_qualifier)
×
356
                                resolved_version = fn.versions.get(alias.function_version)
×
357
                                fn_arn = resolved_version.id.qualified_arn()
×
358
                            elif api_utils.qualifier_is_version(provisioned_qualifier):
×
359
                                fn_version = fn.versions.get(provisioned_qualifier)
×
360
                                fn_arn = fn_version.id.qualified_arn()
×
361
                            else:
362
                                raise InvalidParameterValueException(
×
363
                                    "Invalid qualifier type:"
364
                                    " Qualifier can only be an alias or a version for provisioned concurrency."
365
                                )
366

367
                            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
368
                            manager.update_provisioned_concurrency_config(
×
369
                                provisioned_config.provisioned_concurrent_executions
370
                            )
371
                        except Exception:
×
372
                            LOG.warning(
×
373
                                "Failed to restore provisioned concurrency %s for function %s",
374
                                provisioned_config,
375
                                fn_arn,
376
                                exc_info=LOG.isEnabledFor(logging.DEBUG),
377
                            )
378

379
                for esm in state.event_source_mappings.values():
×
380
                    # Restores event source workers
381
                    function_arn = esm.get("FunctionArn")
×
382

383
                    # TODO: How do we know the event source is up?
384
                    # A basic poll to see if the mapped Lambda function is active/failed
385
                    if not poll_condition(
×
386
                        lambda: (
387
                            get_function_version_from_arn(function_arn).config.state.state
388
                            in [State.Active, State.Failed]
389
                        ),
390
                        timeout=10,
391
                    ):
392
                        LOG.warning(
×
393
                            "Creating ESM for Lambda that is not in running state: %s",
394
                            function_arn,
395
                        )
396

397
                    function_version = get_function_version_from_arn(function_arn)
×
398
                    function_role = function_version.config.role
×
399

400
                    is_esm_enabled = esm.get("State", EsmState.DISABLED) not in (
×
401
                        EsmState.DISABLED,
402
                        EsmState.DISABLING,
403
                    )
404
                    esm_worker = EsmWorkerFactory(
×
405
                        esm, function_role, is_esm_enabled
406
                    ).get_esm_worker()
407

408
                    # Note: a worker is created in the DISABLED state if not enabled
409
                    esm_worker.create()
×
410
                    # TODO: assigning the esm_worker to the dict only works after .create(). Could it cause a race
411
                    #  condition if we get a shutdown here and have a worker thread spawned but not accounted for?
412
                    self.esm_workers[esm_worker.uuid] = esm_worker
×
413

414
    def on_after_init(self):
1✔
415
        self.router.register_routes()
1✔
416
        get_runtime_executor().validate_environment()
1✔
417

418
    def on_before_stop(self) -> None:
1✔
419
        for esm_worker in self.esm_workers.values():
1✔
420
            esm_worker.stop_for_shutdown()
1✔
421

422
        # TODO: should probably unregister routes?
423
        self.lambda_service.stop()
1✔
424

425
    @staticmethod
1✔
426
    def _get_function(function_name: str, account_id: str, region: str) -> Function:
1✔
427
        state = lambda_stores[account_id][region]
1✔
428
        function = state.functions.get(function_name)
1✔
429
        if not function:
1✔
430
            arn = api_utils.unqualified_lambda_arn(
1✔
431
                function_name=function_name,
432
                account=account_id,
433
                region=region,
434
            )
435
            raise ResourceNotFoundException(
1✔
436
                f"Function not found: {arn}",
437
                Type="User",
438
            )
439
        return function
1✔
440

441
    @staticmethod
1✔
442
    def _get_esm(uuid: str, account_id: str, region: str) -> EventSourceMappingConfiguration:
1✔
443
        state = lambda_stores[account_id][region]
1✔
444
        esm = state.event_source_mappings.get(uuid)
1✔
445
        if not esm:
1✔
446
            arn = lambda_event_source_mapping_arn(uuid, account_id, region)
1✔
447
            raise ResourceNotFoundException(
1✔
448
                f"Event source mapping not found: {arn}",
449
                Type="User",
450
            )
451
        return esm
1✔
452

453
    @staticmethod
1✔
454
    def _get_capacity_provider(
1✔
455
        capacity_provider_name: str,
456
        account_id: str,
457
        region: str,
458
        error_msg_template: str = "Capacity provider not found: {}",
459
    ) -> CapacityProviderModel:
460
        state = lambda_stores[account_id][region]
1✔
461
        cp = state.capacity_providers.get(capacity_provider_name)
1✔
462
        if not cp:
1✔
463
            arn = capacity_provider_arn(capacity_provider_name, account_id, region)
1✔
464
            raise ResourceNotFoundException(
1✔
465
                error_msg_template.format(arn),
466
                Type="User",
467
            )
468
        return cp
×
469

470
    @staticmethod
1✔
471
    def _validate_qualifier_expression(qualifier: str) -> None:
1✔
472
        if error_messages := api_utils.validate_qualifier(qualifier):
1✔
473
            raise ValidationException(
×
474
                message=api_utils.construct_validation_exception_message(error_messages)
475
            )
476

477
    @staticmethod
1✔
478
    def _validate_publish_to(publish_to: str):
1✔
479
        if publish_to != FunctionVersionLatestPublished.LATEST_PUBLISHED:
×
480
            raise ValidationException(
×
481
                message=f"1 validation error detected: Value '{publish_to}' at 'publishTo' failed to satisfy constraint: Member must satisfy enum value set: [LATEST_PUBLISHED]"
482
            )
483

484
    @staticmethod
1✔
485
    def _resolve_fn_qualifier(resolved_fn: Function, qualifier: str | None) -> tuple[str, str]:
1✔
486
        """Attempts to resolve a given qualifier and returns a qualifier that exists or
487
        raises an appropriate ResourceNotFoundException.
488

489
        :param resolved_fn: The resolved lambda function
490
        :param qualifier: The qualifier to be resolved or None
491
        :return: Tuple of (resolved qualifier, function arn either qualified or unqualified)"""
492
        function_name = resolved_fn.function_name
1✔
493
        # assuming function versions need to live in the same account and region
494
        account_id = resolved_fn.latest().id.account
1✔
495
        region = resolved_fn.latest().id.region
1✔
496
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
497
        if qualifier is not None:
1✔
498
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
499
            if api_utils.qualifier_is_alias(qualifier):
1✔
500
                if qualifier not in resolved_fn.aliases:
1✔
501
                    raise ResourceNotFoundException(f"Cannot find alias arn: {fn_arn}", Type="User")
1✔
502
            elif api_utils.qualifier_is_version(qualifier) or qualifier == "$LATEST":
1✔
503
                if qualifier not in resolved_fn.versions:
1✔
504
                    raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
505
            else:
506
                # matches qualifier pattern but invalid alias or version
507
                raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
508
        resolved_qualifier = qualifier or "$LATEST"
1✔
509
        return resolved_qualifier, fn_arn
1✔
510

511
    @staticmethod
1✔
512
    def _function_revision_id(resolved_fn: Function, resolved_qualifier: str) -> str:
1✔
513
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
514
            return resolved_fn.aliases[resolved_qualifier].revision_id
1✔
515
        # Assumes that a non-alias is a version
516
        else:
517
            return resolved_fn.versions[resolved_qualifier].config.revision_id
1✔
518

519
    def _resolve_vpc_id(self, account_id: str, region_name: str, subnet_id: str) -> str:
1✔
520
        ec2_client = connect_to(aws_access_key_id=account_id, region_name=region_name).ec2
1✔
521
        try:
1✔
522
            return ec2_client.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["VpcId"]
1✔
523
        except ec2_client.exceptions.ClientError as e:
1✔
524
            code = e.response["Error"]["Code"]
1✔
525
            message = e.response["Error"]["Message"]
1✔
526
            raise InvalidParameterValueException(
1✔
527
                f"Error occurred while DescribeSubnets. EC2 Error Code: {code}. EC2 Error Message: {message}",
528
                Type="User",
529
            )
530

531
    def _build_vpc_config(
1✔
532
        self,
533
        account_id: str,
534
        region_name: str,
535
        vpc_config: dict | None = None,
536
    ) -> VpcConfig | None:
537
        if not vpc_config or not is_api_enabled("ec2"):
1✔
538
            return None
1✔
539

540
        subnet_ids = vpc_config.get("SubnetIds", [])
1✔
541
        if subnet_ids is not None and len(subnet_ids) == 0:
1✔
542
            return VpcConfig(vpc_id="", security_group_ids=[], subnet_ids=[])
1✔
543

544
        subnet_id = subnet_ids[0]
1✔
545
        if not bool(SUBNET_ID_REGEX.match(subnet_id)):
1✔
546
            raise ValidationException(
1✔
547
                f"1 validation error detected: Value '[{subnet_id}]' at 'vpcConfig.subnetIds' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 0, Member must satisfy regular expression pattern: subnet-[0-9a-z]*]"
548
            )
549

550
        return VpcConfig(
1✔
551
            vpc_id=self._resolve_vpc_id(account_id, region_name, subnet_id),
552
            security_group_ids=vpc_config.get("SecurityGroupIds", []),
553
            subnet_ids=subnet_ids,
554
        )
555

556
    def _create_version_model(
1✔
557
        self,
558
        function_name: str,
559
        region: str,
560
        account_id: str,
561
        description: str | None = None,
562
        revision_id: str | None = None,
563
        code_sha256: str | None = None,
564
        publish_to: FunctionVersionLatestPublished | None = None,
565
        is_active: bool = False,
566
    ) -> tuple[FunctionVersion, bool]:
567
        """
568
        Release a new version to the model if all restrictions are met.
569
        Restrictions:
570
          - CodeSha256, if provided, must equal the current latest version code hash
571
          - RevisionId, if provided, must equal the current latest version revision id
572
          - Some changes have been done to the latest version since last publish
573
        Will return a tuple of the version, and whether the version was published (True) or the latest available version was taken (False).
574
        This can happen if the latest version has not been changed since the last version publish, in this case the last version will be returned.
575

576
        :param function_name: Function name to be published
577
        :param region: Region of the function
578
        :param account_id: Account of the function
579
        :param description: new description of the version (will be the description of the function if missing)
580
        :param revision_id: Revision id, function will raise error if it does not match latest revision id
581
        :param code_sha256: Code sha256, function will raise error if it does not match latest code hash
582
        :return: Tuple of (published version, whether version was released or last released version returned, since nothing changed)
583
        """
584
        current_latest_version = get_function_version(
1✔
585
            function_name=function_name, qualifier="$LATEST", account_id=account_id, region=region
586
        )
587
        if revision_id and current_latest_version.config.revision_id != revision_id:
1✔
588
            raise PreconditionFailedException(
1✔
589
                "The Revision Id provided does not match the latest Revision Id. Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
590
                Type="User",
591
            )
592

593
        # check if code hashes match if they are specified
594
        current_hash = (
1✔
595
            current_latest_version.config.code.code_sha256
596
            if current_latest_version.config.package_type == PackageType.Zip
597
            else current_latest_version.config.image.code_sha256
598
        )
599
        # if the code is a zip package and hot reloaded (hot reloading is currently only supported for zip packagetypes)
600
        # we cannot enforce the codesha256 check
601
        is_hot_reloaded_zip_package = (
1✔
602
            current_latest_version.config.package_type == PackageType.Zip
603
            and current_latest_version.config.code.is_hot_reloading()
604
        )
605
        if code_sha256 and current_hash != code_sha256 and not is_hot_reloaded_zip_package:
1✔
606
            raise InvalidParameterValueException(
1✔
607
                f"CodeSHA256 ({code_sha256}) is different from current CodeSHA256 in $LATEST ({current_hash}). Please try again with the CodeSHA256 in $LATEST.",
608
                Type="User",
609
            )
610

611
        state = lambda_stores[account_id][region]
1✔
612
        function = state.functions.get(function_name)
1✔
613
        changes = {}
1✔
614
        if description is not None:
1✔
615
            changes["description"] = description
1✔
616
        # TODO copy environment instead of restarting one, get rid of all the "Pending"s
617

618
        with function.lock:
1✔
619
            if function.next_version > 1 and (
1✔
620
                prev_version := function.versions.get(str(function.next_version - 1))
621
            ):
622
                if (
1✔
623
                    prev_version.config.internal_revision
624
                    == current_latest_version.config.internal_revision
625
                ):
626
                    return prev_version, False
1✔
627
            # TODO check if there was a change since last version
628
            if publish_to == FunctionVersionLatestPublished.LATEST_PUBLISHED:
1✔
629
                qualifier = "$LATEST.PUBLISHED"
×
630
            else:
631
                qualifier = str(function.next_version)
1✔
632
                function.next_version += 1
1✔
633
            new_id = VersionIdentifier(
1✔
634
                function_name=function_name,
635
                qualifier=qualifier,
636
                region=region,
637
                account=account_id,
638
            )
639

640
            if current_latest_version.config.capacity_provider_config:
1✔
641
                # for lambda managed functions, snap start is not supported
642
                snap_start = None
×
643
            else:
644
                apply_on = current_latest_version.config.snap_start["ApplyOn"]
1✔
645
                optimization_status = SnapStartOptimizationStatus.Off
1✔
646
                if apply_on == SnapStartApplyOn.PublishedVersions:
1✔
647
                    optimization_status = SnapStartOptimizationStatus.On
×
648
                snap_start = SnapStartResponse(
1✔
649
                    ApplyOn=apply_on,
650
                    OptimizationStatus=optimization_status,
651
                )
652

653
            last_update = None
1✔
654
            new_state = VersionState(
1✔
655
                state=State.Pending,
656
                code=StateReasonCode.Creating,
657
                reason="The function is being created.",
658
            )
659
            if publish_to == FunctionVersionLatestPublished.LATEST_PUBLISHED:
1✔
660
                last_update = UpdateStatus(
×
661
                    status=LastUpdateStatus.InProgress,
662
                    code="Updating",
663
                    reason="The function is being updated.",
664
                )
665
                if is_active:
×
666
                    new_state = VersionState(state=State.Active)
×
667
            new_version = dataclasses.replace(
1✔
668
                current_latest_version,
669
                config=dataclasses.replace(
670
                    current_latest_version.config,
671
                    last_update=last_update,
672
                    state=new_state,
673
                    snap_start=snap_start,
674
                    **changes,
675
                ),
676
                id=new_id,
677
            )
678
            function.versions[qualifier] = new_version
1✔
679
        return new_version, True
1✔
680

681
    def _publish_version_from_existing_version(
1✔
682
        self,
683
        function_name: str,
684
        region: str,
685
        account_id: str,
686
        description: str | None = None,
687
        revision_id: str | None = None,
688
        code_sha256: str | None = None,
689
        publish_to: FunctionVersionLatestPublished | None = None,
690
    ) -> FunctionVersion:
691
        """
692
        Publish version from an existing, already initialized LATEST
693

694
        :param function_name: Function name
695
        :param region: region
696
        :param account_id: account id
697
        :param description: description
698
        :param revision_id: revision id (check if current version matches)
699
        :param code_sha256: code sha (check if current code matches)
700
        :return: new version
701
        """
702
        is_active = True if publish_to == FunctionVersionLatestPublished.LATEST_PUBLISHED else False
1✔
703
        new_version, changed = self._create_version_model(
1✔
704
            function_name=function_name,
705
            region=region,
706
            account_id=account_id,
707
            description=description,
708
            revision_id=revision_id,
709
            code_sha256=code_sha256,
710
            publish_to=publish_to,
711
            is_active=is_active,
712
        )
713
        if not changed:
1✔
714
            return new_version
1✔
715

716
        if new_version.config.capacity_provider_config:
1✔
717
            self.lambda_service.publish_version_async(new_version)
×
718
        else:
719
            self.lambda_service.publish_version(new_version)
1✔
720
        state = lambda_stores[account_id][region]
1✔
721
        function = state.functions.get(function_name)
1✔
722

723
        # Update revision id for $LATEST version
724
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
725
        latest_version = function.versions["$LATEST"]
1✔
726
        function.versions["$LATEST"] = dataclasses.replace(
1✔
727
            latest_version, config=dataclasses.replace(latest_version.config)
728
        )
729
        if new_version.config.capacity_provider_config:
1✔
730
            # publish_version happens async for functions with a capacity provider.
731
            # Therefore, we return the new_version with State=Pending or LastUpdateStatus=InProgress ($LATEST.PUBLISHED)
732
            return new_version
×
733
        else:
734
            # Regular functions yield an Active state modified during `publish_version` (sync).
735
            # Therefore, we need to get the updated version from the store.
736
            updated_version = function.versions.get(new_version.id.qualifier)
1✔
737
            return updated_version
1✔
738

739
    def _publish_version_with_changes(
1✔
740
        self,
741
        function_name: str,
742
        region: str,
743
        account_id: str,
744
        description: str | None = None,
745
        revision_id: str | None = None,
746
        code_sha256: str | None = None,
747
        publish_to: FunctionVersionLatestPublished | None = None,
748
        is_active: bool = False,
749
    ) -> FunctionVersion:
750
        """
751
        Publish version together with a new latest version (publish on create / update)
752

753
        :param function_name: Function name
754
        :param region: region
755
        :param account_id: account id
756
        :param description: description
757
        :param revision_id: revision id (check if current version matches)
758
        :param code_sha256: code sha (check if current code matches)
759
        :return: new version
760
        """
761
        new_version, changed = self._create_version_model(
1✔
762
            function_name=function_name,
763
            region=region,
764
            account_id=account_id,
765
            description=description,
766
            revision_id=revision_id,
767
            code_sha256=code_sha256,
768
            publish_to=publish_to,
769
            is_active=is_active,
770
        )
771
        if not changed:
1✔
772
            return new_version
×
773
        self.lambda_service.create_function_version(new_version)
1✔
774
        return new_version
1✔
775

776
    @staticmethod
1✔
777
    def _verify_env_variables(env_vars: dict[str, str]):
1✔
778
        dumped_env_vars = json.dumps(env_vars, separators=(",", ":"))
1✔
779
        if (
1✔
780
            len(dumped_env_vars.encode("utf-8"))
781
            > config.LAMBDA_LIMITS_MAX_FUNCTION_ENVVAR_SIZE_BYTES
782
        ):
783
            raise InvalidParameterValueException(
1✔
784
                f"Lambda was unable to configure your environment variables because the environment variables you have provided exceeded the 4KB limit. String measured: {dumped_env_vars}",
785
                Type="User",
786
            )
787

788
    @staticmethod
1✔
789
    def _validate_snapstart(snap_start: SnapStart, runtime: Runtime):
1✔
790
        apply_on = snap_start.get("ApplyOn")
1✔
791
        if apply_on not in [
1✔
792
            SnapStartApplyOn.PublishedVersions,
793
            SnapStartApplyOn.None_,
794
        ]:
795
            raise ValidationException(
1✔
796
                f"1 validation error detected: Value '{apply_on}' at 'snapStart.applyOn' failed to satisfy constraint: Member must satisfy enum value set: [PublishedVersions, None]"
797
            )
798

799
        if runtime not in SNAP_START_SUPPORTED_RUNTIMES:
1✔
800
            raise InvalidParameterValueException(
×
801
                f"{runtime} is not supported for SnapStart enabled functions.", Type="User"
802
            )
803

804
    def _validate_layers(self, new_layers: list[str], region: str, account_id: str):
1✔
805
        if len(new_layers) > LAMBDA_LAYERS_LIMIT_PER_FUNCTION:
1✔
806
            raise InvalidParameterValueException(
1✔
807
                "Cannot reference more than 5 layers.", Type="User"
808
            )
809

810
        visited_layers = {}
1✔
811
        for layer_version_arn in new_layers:
1✔
812
            (
1✔
813
                layer_region,
814
                layer_account_id,
815
                layer_name,
816
                layer_version_str,
817
            ) = api_utils.parse_layer_arn(layer_version_arn)
818
            if layer_version_str is None:
1✔
819
                raise ValidationException(
1✔
820
                    f"1 validation error detected: Value '[{layer_version_arn}]'"
821
                    + " at 'layers' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 2048, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: "
822
                    + "(arn:(aws[a-zA-Z-]*)?:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+), Member must not be null]",
823
                )
824

825
            state = lambda_stores[layer_account_id][layer_region]
1✔
826
            layer = state.layers.get(layer_name)
1✔
827
            layer_version = None
1✔
828
            if layer is not None:
1✔
829
                layer_version = layer.layer_versions.get(layer_version_str)
1✔
830
            if layer_account_id == account_id:
1✔
831
                if region and layer_region != region:
1✔
832
                    raise InvalidParameterValueException(
1✔
833
                        f"Layers are not in the same region as the function. "
834
                        f"Layers are expected to be in region {region}.",
835
                        Type="User",
836
                    )
837
                if layer is None or layer.layer_versions.get(layer_version_str) is None:
1✔
838
                    raise InvalidParameterValueException(
1✔
839
                        f"Layer version {layer_version_arn} does not exist.", Type="User"
840
                    )
841
            else:  # External layer from other account
842
                # TODO: validate IAM layer policy here, allowing access by default for now and only checking region
843
                if region and layer_region != region:
×
844
                    # TODO: detect user or role from context when IAM users are implemented
845
                    user = "user/localstack-testing"
×
846
                    raise AccessDeniedException(
×
847
                        f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
848
                    )
849
                if layer is None or layer_version is None:
×
850
                    # Limitation: cannot fetch external layers when using the same account id as the target layer
851
                    # because we do not want to trigger the layer fetcher for every non-existing layer.
852
                    if self.layer_fetcher is None:
×
853
                        raise NotImplementedError(
854
                            "Fetching shared layers from AWS is a pro feature."
855
                        )
856

857
                    layer = self.layer_fetcher.fetch_layer(layer_version_arn)
×
858
                    if layer is None:
×
859
                        # TODO: detect user or role from context when IAM users are implemented
860
                        user = "user/localstack-testing"
×
861
                        raise AccessDeniedException(
×
862
                            f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
863
                        )
864

865
                    # Distinguish between new layer and new layer version
866
                    if layer_version is None:
×
867
                        # Create whole layer from scratch
868
                        state.layers[layer_name] = layer
×
869
                    else:
870
                        # Create layer version if another version of the same layer already exists
871
                        state.layers[layer_name].layer_versions[layer_version_str] = (
×
872
                            layer.layer_versions.get(layer_version_str)
873
                        )
874

875
            # only the first two matches in the array are considered for the error message
876
            layer_arn = ":".join(layer_version_arn.split(":")[:-1])
1✔
877
            if layer_arn in visited_layers:
1✔
878
                conflict_layer_version_arn = visited_layers[layer_arn]
1✔
879
                raise InvalidParameterValueException(
1✔
880
                    f"Two different versions of the same layer are not allowed to be referenced in the same function. {conflict_layer_version_arn} and {layer_version_arn} are versions of the same layer.",
881
                    Type="User",
882
                )
883
            visited_layers[layer_arn] = layer_version_arn
1✔
884

885
    def _validate_capacity_provider_config(
1✔
886
        self, capacity_provider_config: CapacityProviderConfig, context: RequestContext
887
    ):
888
        if not capacity_provider_config.get("LambdaManagedInstancesCapacityProviderConfig"):
×
889
            raise ValidationException(
×
890
                "1 validation error detected: Value null at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig' failed to satisfy constraint: Member must not be null"
891
            )
892

893
        capacity_provider_arn = capacity_provider_config.get(
×
894
            "LambdaManagedInstancesCapacityProviderConfig", {}
895
        ).get("CapacityProviderArn")
896
        if not capacity_provider_arn:
×
897
            raise ValidationException(
×
898
                "1 validation error detected: Value null at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig.capacityProviderArn' failed to satisfy constraint: Member must not be null"
899
            )
900

901
        if not re.match(CAPACITY_PROVIDER_ARN_NAME, capacity_provider_arn):
×
902
            raise ValidationException(
×
903
                f"1 validation error detected: Value '{capacity_provider_arn}' at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig.capacityProviderArn' failed to satisfy constraint: Member must satisfy regular expression pattern: {CAPACITY_PROVIDER_ARN_NAME}"
904
            )
905

906
        capacity_provider_name = capacity_provider_arn.split(":")[-1]
×
907
        self.get_capacity_provider(context, capacity_provider_name)
×
908

909
    @staticmethod
1✔
910
    def map_layers(new_layers: list[str]) -> list[LayerVersion]:
1✔
911
        layers = []
1✔
912
        for layer_version_arn in new_layers:
1✔
913
            region_name, account_id, layer_name, layer_version = api_utils.parse_layer_arn(
1✔
914
                layer_version_arn
915
            )
916
            layer = lambda_stores[account_id][region_name].layers.get(layer_name)
1✔
917
            layer_version = layer.layer_versions.get(layer_version)
1✔
918
            layers.append(layer_version)
1✔
919
        return layers
1✔
920

921
    def get_function_recursion_config(
1✔
922
        self,
923
        context: RequestContext,
924
        function_name: UnqualifiedFunctionName,
925
        **kwargs,
926
    ) -> GetFunctionRecursionConfigResponse:
927
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
928
        function_name = api_utils.get_function_name(function_name, context)
1✔
929
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
930
        return GetFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
931

932
    def put_function_recursion_config(
1✔
933
        self,
934
        context: RequestContext,
935
        function_name: UnqualifiedFunctionName,
936
        recursive_loop: RecursiveLoop,
937
        **kwargs,
938
    ) -> PutFunctionRecursionConfigResponse:
939
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
940
        function_name = api_utils.get_function_name(function_name, context)
1✔
941

942
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
943

944
        allowed_values = list(RecursiveLoop.__members__.values())
1✔
945
        if recursive_loop not in allowed_values:
1✔
946
            raise ValidationException(
1✔
947
                f"1 validation error detected: Value '{recursive_loop}' at 'recursiveLoop' failed to satisfy constraint: "
948
                f"Member must satisfy enum value set: [Terminate, Allow]"
949
            )
950

951
        fn.recursive_loop = recursive_loop
1✔
952
        return PutFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
953

954
    @handler(operation="CreateFunction", expand=False)
1✔
955
    def create_function(
1✔
956
        self,
957
        context: RequestContext,
958
        request: CreateFunctionRequest,
959
    ) -> FunctionConfiguration:
960
        context_region = context.region
1✔
961
        context_account_id = context.account_id
1✔
962

963
        zip_file = request.get("Code", {}).get("ZipFile")
1✔
964
        if zip_file and len(zip_file) > config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED:
1✔
965
            raise RequestEntityTooLargeException(
1✔
966
                f"Zipped size must be smaller than {config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED} bytes"
967
            )
968

969
        if context.request.content_length > config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE:
1✔
970
            raise RequestEntityTooLargeException(
1✔
971
                f"Request must be smaller than {config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE} bytes for the CreateFunction operation"
972
            )
973

974
        if architectures := request.get("Architectures"):
1✔
975
            if len(architectures) != 1:
1✔
976
                raise ValidationException(
1✔
977
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
978
                    f"satisfy constraint: Member must have length less than or equal to 1",
979
                )
980
            if architectures[0] not in ARCHITECTURES:
1✔
981
                raise ValidationException(
1✔
982
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
983
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
984
                    f"[x86_64, arm64], Member must not be null]",
985
                )
986

987
        if env_vars := request.get("Environment", {}).get("Variables"):
1✔
988
            self._verify_env_variables(env_vars)
1✔
989

990
        if layers := request.get("Layers", []):
1✔
991
            self._validate_layers(layers, region=context_region, account_id=context_account_id)
1✔
992

993
        if not api_utils.is_role_arn(request.get("Role")):
1✔
994
            raise ValidationException(
1✔
995
                f"1 validation error detected: Value '{request.get('Role')}'"
996
                + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
997
            )
998
        if not self.lambda_service.can_assume_role(request.get("Role"), context.region):
1✔
999
            raise InvalidParameterValueException(
×
1000
                "The role defined for the function cannot be assumed by Lambda.", Type="User"
1001
            )
1002
        package_type = request.get("PackageType", PackageType.Zip)
1✔
1003
        runtime = request.get("Runtime")
1✔
1004
        self._validate_runtime(package_type, runtime)
1✔
1005

1006
        request_function_name = request.get("FunctionName")
1✔
1007

1008
        function_name, *_ = api_utils.get_name_and_qualifier(
1✔
1009
            function_arn_or_name=request_function_name,
1010
            qualifier=None,
1011
            context=context,
1012
        )
1013

1014
        if runtime in DEPRECATED_RUNTIMES:
1✔
1015
            LOG.warning(
1✔
1016
                "The Lambda runtime %s} is deprecated. "
1017
                "Please upgrade the runtime for the function %s: "
1018
                "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
1019
                runtime,
1020
                function_name,
1021
            )
1022
        if snap_start := request.get("SnapStart"):
1✔
1023
            self._validate_snapstart(snap_start, runtime)
1✔
1024
        if publish_to := request.get("PublishTo"):
1✔
1025
            self._validate_publish_to(publish_to)
×
1026
        state = lambda_stores[context_account_id][context_region]
1✔
1027

1028
        with self.create_fn_lock:
1✔
1029
            if function_name in state.functions:
1✔
1030
                raise ResourceConflictException(f"Function already exist: {function_name}")
×
1031
            fn = Function(function_name=function_name)
1✔
1032
            arn = VersionIdentifier(
1✔
1033
                function_name=function_name,
1034
                qualifier="$LATEST",
1035
                region=context_region,
1036
                account=context_account_id,
1037
            )
1038
            # save function code to s3
1039
            code = None
1✔
1040
            image = None
1✔
1041
            image_config = None
1✔
1042
            runtime_version_config = RuntimeVersionConfig(
1✔
1043
                # Limitation: the runtime id (presumably sha256 of image) is currently hardcoded
1044
                # Potential implementation: provide (cached) sha256 hash of used Docker image
1045
                RuntimeVersionArn=f"arn:{context.partition}:lambda:{context_region}::runtime:8eeff65f6809a3ce81507fe733fe09b835899b99481ba22fd75b5a7338290ec1"
1046
            )
1047
            request_code = request.get("Code")
1✔
1048
            if package_type == PackageType.Zip:
1✔
1049
                # TODO verify if correct combination of code is set
1050
                if zip_file := request_code.get("ZipFile"):
1✔
1051
                    code = store_lambda_archive(
1✔
1052
                        archive_file=zip_file,
1053
                        function_name=function_name,
1054
                        region_name=context_region,
1055
                        account_id=context_account_id,
1056
                    )
1057
                elif s3_bucket := request_code.get("S3Bucket"):
1✔
1058
                    s3_key = request_code["S3Key"]
1✔
1059
                    s3_object_version = request_code.get("S3ObjectVersion")
1✔
1060
                    code = store_s3_bucket_archive(
1✔
1061
                        archive_bucket=s3_bucket,
1062
                        archive_key=s3_key,
1063
                        archive_version=s3_object_version,
1064
                        function_name=function_name,
1065
                        region_name=context_region,
1066
                        account_id=context_account_id,
1067
                    )
1068
                else:
1069
                    raise LambdaServiceException("A ZIP file or S3 bucket is required")
×
1070
            elif package_type == PackageType.Image:
1✔
1071
                image = request_code.get("ImageUri")
1✔
1072
                if not image:
1✔
1073
                    raise LambdaServiceException(
×
1074
                        "An image is required when the package type is set to 'image'"
1075
                    )
1076
                image = create_image_code(image_uri=image)
1✔
1077

1078
                image_config_req = request.get("ImageConfig", {})
1✔
1079
                image_config = ImageConfig(
1✔
1080
                    command=image_config_req.get("Command"),
1081
                    entrypoint=image_config_req.get("EntryPoint"),
1082
                    working_directory=image_config_req.get("WorkingDirectory"),
1083
                )
1084
                # Runtime management controls are not available when providing a custom image
1085
                runtime_version_config = None
1✔
1086

1087
            capacity_provider_config = None
1✔
1088
            memory_size = request.get("MemorySize", LAMBDA_DEFAULT_MEMORY_SIZE)
1✔
1089
            if "CapacityProviderConfig" in request:
1✔
1090
                capacity_provider_config = request["CapacityProviderConfig"]
×
1091
                self._validate_capacity_provider_config(capacity_provider_config, context)
×
1092
                self._validate_managed_instances_runtime(runtime)
×
1093

1094
                default_config = CapacityProviderConfig(
×
1095
                    LambdaManagedInstancesCapacityProviderConfig=LambdaManagedInstancesCapacityProviderConfig(
1096
                        ExecutionEnvironmentMemoryGiBPerVCpu=2.0,
1097
                        PerExecutionEnvironmentMaxConcurrency=16,
1098
                    )
1099
                )
1100
                capacity_provider_config = merge_recursive(default_config, capacity_provider_config)
×
1101
                memory_size = 2048
×
1102
                if request.get("LoggingConfig", {}).get("LogFormat") == LogFormat.Text:
×
1103
                    raise InvalidParameterValueException(
×
1104
                        'LogLevel is not supported when LogFormat is set to "Text". Remove LogLevel from your request or change the LogFormat to "JSON" and try again.',
1105
                        Type="User",
1106
                    )
1107
            if "LoggingConfig" in request:
1✔
1108
                logging_config = request["LoggingConfig"]
1✔
1109
                LOG.warning(
1✔
1110
                    "Advanced Lambda Logging Configuration is currently mocked "
1111
                    "and will not impact the logging behavior. "
1112
                    "Please create a feature request if needed."
1113
                )
1114

1115
                # when switching to JSON, app and system level log is auto set to INFO
1116
                if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
1117
                    logging_config = {
1✔
1118
                        "ApplicationLogLevel": "INFO",
1119
                        "SystemLogLevel": "INFO",
1120
                        "LogGroup": f"/aws/lambda/{function_name}",
1121
                    } | logging_config
1122
                else:
1123
                    logging_config = (
×
1124
                        LoggingConfig(
1125
                            LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
1126
                        )
1127
                        | logging_config
1128
                    )
1129

1130
            elif capacity_provider_config:
1✔
1131
                logging_config = LoggingConfig(
×
1132
                    LogFormat=LogFormat.JSON,
1133
                    LogGroup=f"/aws/lambda/{function_name}",
1134
                    ApplicationLogLevel="INFO",
1135
                    SystemLogLevel="INFO",
1136
                )
1137
            else:
1138
                logging_config = LoggingConfig(
1✔
1139
                    LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
1140
                )
1141
            snap_start = (
1✔
1142
                None
1143
                if capacity_provider_config
1144
                else SnapStartResponse(
1145
                    ApplyOn=request.get("SnapStart", {}).get("ApplyOn", SnapStartApplyOn.None_),
1146
                    OptimizationStatus=SnapStartOptimizationStatus.Off,
1147
                )
1148
            )
1149
            version = FunctionVersion(
1✔
1150
                id=arn,
1151
                config=VersionFunctionConfiguration(
1152
                    last_modified=api_utils.format_lambda_date(datetime.datetime.now()),
1153
                    description=request.get("Description", ""),
1154
                    role=request["Role"],
1155
                    timeout=request.get("Timeout", LAMBDA_DEFAULT_TIMEOUT),
1156
                    runtime=request.get("Runtime"),
1157
                    memory_size=memory_size,
1158
                    handler=request.get("Handler"),
1159
                    package_type=package_type,
1160
                    environment=env_vars,
1161
                    architectures=request.get("Architectures") or [Architecture.x86_64],
1162
                    tracing_config_mode=request.get("TracingConfig", {}).get(
1163
                        "Mode", TracingMode.PassThrough
1164
                    ),
1165
                    image=image,
1166
                    image_config=image_config,
1167
                    code=code,
1168
                    layers=self.map_layers(layers),
1169
                    internal_revision=short_uid(),
1170
                    ephemeral_storage=LambdaEphemeralStorage(
1171
                        size=request.get("EphemeralStorage", {}).get("Size", 512)
1172
                    ),
1173
                    snap_start=snap_start,
1174
                    runtime_version_config=runtime_version_config,
1175
                    dead_letter_arn=request.get("DeadLetterConfig", {}).get("TargetArn"),
1176
                    vpc_config=self._build_vpc_config(
1177
                        context_account_id, context_region, request.get("VpcConfig")
1178
                    ),
1179
                    state=VersionState(
1180
                        state=State.Pending,
1181
                        code=StateReasonCode.Creating,
1182
                        reason="The function is being created.",
1183
                    ),
1184
                    logging_config=logging_config,
1185
                    # TODO: might need something like **optional_kwargs if None
1186
                    #   -> Test with regular GetFunction (i.e., without a capacity provider)
1187
                    capacity_provider_config=capacity_provider_config,
1188
                ),
1189
            )
1190
            version_post_response = None
1✔
1191
            if capacity_provider_config:
1✔
1192
                version_post_response = dataclasses.replace(
×
1193
                    version,
1194
                    config=dataclasses.replace(
1195
                        version.config,
1196
                        last_update=UpdateStatus(status=LastUpdateStatus.Successful),
1197
                        state=VersionState(state=State.ActiveNonInvocable),
1198
                    ),
1199
                )
1200
            fn.versions["$LATEST"] = version_post_response or version
1✔
1201
            state.functions[function_name] = fn
1✔
1202
        initialization_type = (
1✔
1203
            FunctionInitializationType.lambda_managed_instances
1204
            if capacity_provider_config
1205
            else FunctionInitializationType.on_demand
1206
        )
1207
        function_counter.labels(
1✔
1208
            operation=FunctionOperation.create,
1209
            runtime=runtime or "n/a",
1210
            status=FunctionStatus.success,
1211
            invocation_type="n/a",
1212
            package_type=package_type,
1213
            initialization_type=initialization_type,
1214
        )
1215
        # TODO: consider potential other side effects of not having a function version for $LATEST
1216
        # Provisioning happens upon publishing for functions using a capacity provider
1217
        if not capacity_provider_config:
1✔
1218
            self.lambda_service.create_function_version(version)
1✔
1219

1220
        if tags := request.get("Tags"):
1✔
1221
            # This will check whether the function exists.
1222
            self._store_tags(arn.unqualified_arn(), tags)
1✔
1223

1224
        if request.get("Publish"):
1✔
1225
            version = self._publish_version_with_changes(
1✔
1226
                function_name=function_name,
1227
                region=context_region,
1228
                account_id=context_account_id,
1229
                publish_to=request.get("PublishTo"),
1230
            )
1231

1232
        if config.LAMBDA_SYNCHRONOUS_CREATE:
1✔
1233
            # block via retrying until "terminal" condition reached before returning
1234
            if not poll_condition(
×
1235
                lambda: (
1236
                    get_function_version(
1237
                        function_name, version.id.qualifier, version.id.account, version.id.region
1238
                    ).config.state.state
1239
                    in [State.Active, State.ActiveNonInvocable, State.Failed]
1240
                ),
1241
                timeout=10,
1242
            ):
1243
                LOG.warning(
×
1244
                    "LAMBDA_SYNCHRONOUS_CREATE is active, but waiting for %s reached timeout.",
1245
                    function_name,
1246
                )
1247

1248
        return api_utils.map_config_out(
1✔
1249
            version, return_qualified_arn=False, return_update_status=False
1250
        )
1251

1252
    def _validate_runtime(self, package_type, runtime):
1✔
1253
        runtimes = ALL_RUNTIMES
1✔
1254
        if config.LAMBDA_RUNTIME_VALIDATION:
1✔
1255
            runtimes = list(itertools.chain(RUNTIMES_AGGREGATED.values()))
1✔
1256

1257
        if package_type == PackageType.Zip and runtime not in runtimes:
1✔
1258
            # deprecated runtimes have different error
1259
            if runtime in DEPRECATED_RUNTIMES:
1✔
1260
                HINT_LOG.info(
1✔
1261
                    "Set env variable LAMBDA_RUNTIME_VALIDATION to 0"
1262
                    " in order to allow usage of deprecated runtimes"
1263
                )
1264
                self._check_for_recomended_migration_target(runtime)
1✔
1265

1266
            raise InvalidParameterValueException(
1✔
1267
                f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1268
                Type="User",
1269
            )
1270

1271
    def _validate_managed_instances_runtime(self, runtime):
1✔
1272
        if runtime not in VALID_MANAGED_INSTANCE_RUNTIMES:
×
1273
            raise InvalidParameterValueException(
×
1274
                f"Runtime Enum {runtime} does not support specified feature: Lambda Managed Instances"
1275
            )
1276

1277
    def _check_for_recomended_migration_target(self, deprecated_runtime):
1✔
1278
        # AWS offers recommended runtime for migration for "newly" deprecated runtimes
1279
        # in order to preserve parity with error messages we need the code bellow
1280
        latest_runtime = DEPRECATED_RUNTIMES_UPGRADES.get(deprecated_runtime)
1✔
1281

1282
        if latest_runtime is not None:
1✔
1283
            LOG.debug(
1✔
1284
                "The Lambda runtime %s is deprecated. Please upgrade to a supported Lambda runtime such as %s.",
1285
                deprecated_runtime,
1286
                latest_runtime,
1287
            )
1288
            raise InvalidParameterValueException(
1✔
1289
                f"The runtime parameter of {deprecated_runtime} is no longer supported for creating or updating AWS Lambda functions. We recommend you use a supported runtime while creating or updating functions.",
1290
                Type="User",
1291
            )
1292

1293
    @handler(operation="UpdateFunctionConfiguration", expand=False)
1✔
1294
    def update_function_configuration(
1✔
1295
        self, context: RequestContext, request: UpdateFunctionConfigurationRequest
1296
    ) -> FunctionConfiguration:
1297
        """updates the $LATEST version of the function"""
1298
        function_name = request.get("FunctionName")
1✔
1299

1300
        # in case we got ARN or partial ARN
1301
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1302
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1303
        state = lambda_stores[account_id][region]
1✔
1304

1305
        if function_name not in state.functions:
1✔
1306
            raise ResourceNotFoundException(
×
1307
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1308
                Type="User",
1309
            )
1310
        function = state.functions[function_name]
1✔
1311

1312
        # TODO: lock modification of latest version
1313
        # TODO: notify service for changes relevant to re-provisioning of $LATEST
1314
        latest_version = function.latest()
1✔
1315
        latest_version_config = latest_version.config
1✔
1316

1317
        revision_id = request.get("RevisionId")
1✔
1318
        if revision_id and revision_id != latest_version.config.revision_id:
1✔
1319
            raise PreconditionFailedException(
1✔
1320
                "The Revision Id provided does not match the latest Revision Id. "
1321
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1322
                Type="User",
1323
            )
1324

1325
        replace_kwargs = {}
1✔
1326
        if "EphemeralStorage" in request:
1✔
1327
            replace_kwargs["ephemeral_storage"] = LambdaEphemeralStorage(
×
1328
                request.get("EphemeralStorage", {}).get("Size", 512)
1329
            )  # TODO: do defaults here apply as well?
1330

1331
        if "Role" in request:
1✔
1332
            if not api_utils.is_role_arn(request["Role"]):
1✔
1333
                raise ValidationException(
1✔
1334
                    f"1 validation error detected: Value '{request.get('Role')}'"
1335
                    + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
1336
                )
1337
            replace_kwargs["role"] = request["Role"]
1✔
1338

1339
        if "Description" in request:
1✔
1340
            replace_kwargs["description"] = request["Description"]
1✔
1341

1342
        if "Timeout" in request:
1✔
1343
            replace_kwargs["timeout"] = request["Timeout"]
1✔
1344

1345
        if "MemorySize" in request:
1✔
1346
            replace_kwargs["memory_size"] = request["MemorySize"]
1✔
1347

1348
        if "DeadLetterConfig" in request:
1✔
1349
            replace_kwargs["dead_letter_arn"] = request.get("DeadLetterConfig", {}).get("TargetArn")
1✔
1350

1351
        if vpc_config := request.get("VpcConfig"):
1✔
1352
            replace_kwargs["vpc_config"] = self._build_vpc_config(account_id, region, vpc_config)
1✔
1353

1354
        if "Handler" in request:
1✔
1355
            replace_kwargs["handler"] = request["Handler"]
1✔
1356

1357
        if "Runtime" in request:
1✔
1358
            runtime = request["Runtime"]
1✔
1359

1360
            if runtime not in ALL_RUNTIMES:
1✔
1361
                raise InvalidParameterValueException(
1✔
1362
                    f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1363
                    Type="User",
1364
                )
1365
            if runtime in DEPRECATED_RUNTIMES:
1✔
1366
                LOG.warning(
×
1367
                    "The Lambda runtime %s is deprecated. "
1368
                    "Please upgrade the runtime for the function %s: "
1369
                    "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
1370
                    runtime,
1371
                    function_name,
1372
                )
1373
            replace_kwargs["runtime"] = request["Runtime"]
1✔
1374

1375
        if snap_start := request.get("SnapStart"):
1✔
1376
            runtime = replace_kwargs.get("runtime") or latest_version_config.runtime
1✔
1377
            self._validate_snapstart(snap_start, runtime)
1✔
1378
            replace_kwargs["snap_start"] = SnapStartResponse(
1✔
1379
                ApplyOn=snap_start.get("ApplyOn", SnapStartApplyOn.None_),
1380
                OptimizationStatus=SnapStartOptimizationStatus.Off,
1381
            )
1382

1383
        if "Environment" in request:
1✔
1384
            if env_vars := request.get("Environment", {}).get("Variables", {}):
1✔
1385
                self._verify_env_variables(env_vars)
1✔
1386
            replace_kwargs["environment"] = env_vars
1✔
1387

1388
        if "Layers" in request:
1✔
1389
            new_layers = request["Layers"]
1✔
1390
            if new_layers:
1✔
1391
                self._validate_layers(new_layers, region=region, account_id=account_id)
1✔
1392
            replace_kwargs["layers"] = self.map_layers(new_layers)
1✔
1393

1394
        if "ImageConfig" in request:
1✔
1395
            new_image_config = request["ImageConfig"]
1✔
1396
            replace_kwargs["image_config"] = ImageConfig(
1✔
1397
                command=new_image_config.get("Command"),
1398
                entrypoint=new_image_config.get("EntryPoint"),
1399
                working_directory=new_image_config.get("WorkingDirectory"),
1400
            )
1401

1402
        if "LoggingConfig" in request:
1✔
1403
            logging_config = request["LoggingConfig"]
1✔
1404
            LOG.warning(
1✔
1405
                "Advanced Lambda Logging Configuration is currently mocked "
1406
                "and will not impact the logging behavior. "
1407
                "Please create a feature request if needed."
1408
            )
1409

1410
            # when switching to JSON, app and system level log is auto set to INFO
1411
            if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
1412
                logging_config = {
1✔
1413
                    "ApplicationLogLevel": "INFO",
1414
                    "SystemLogLevel": "INFO",
1415
                } | logging_config
1416

1417
            last_config = latest_version_config.logging_config
1✔
1418

1419
            # add partial update
1420
            new_logging_config = last_config | logging_config
1✔
1421

1422
            # in case we switched from JSON to Text we need to remove LogLevel keys
1423
            if (
1✔
1424
                new_logging_config.get("LogFormat") == LogFormat.Text
1425
                and last_config.get("LogFormat") == LogFormat.JSON
1426
            ):
1427
                new_logging_config.pop("ApplicationLogLevel", None)
1✔
1428
                new_logging_config.pop("SystemLogLevel", None)
1✔
1429

1430
            replace_kwargs["logging_config"] = new_logging_config
1✔
1431

1432
        if "TracingConfig" in request:
1✔
1433
            new_mode = request.get("TracingConfig", {}).get("Mode")
×
1434
            if new_mode:
×
1435
                replace_kwargs["tracing_config_mode"] = new_mode
×
1436

1437
        if "CapacityProviderConfig" in request:
1✔
1438
            capacity_provider_config = request["CapacityProviderConfig"]
×
1439
            self._validate_capacity_provider_config(capacity_provider_config, context)
×
1440

1441
            if latest_version.config.capacity_provider_config and not request[
×
1442
                "CapacityProviderConfig"
1443
            ].get("LambdaManagedInstancesCapacityProviderConfig"):
1444
                raise ValidationException(
×
1445
                    "1 validation error detected: Value null at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig' failed to satisfy constraint: Member must not be null"
1446
                )
1447
            if not latest_version.config.capacity_provider_config:
×
1448
                raise InvalidParameterValueException(
×
1449
                    "CapacityProviderConfig isn't supported for Lambda Default functions.",
1450
                    Type="User",
1451
                )
1452

1453
            default_config = CapacityProviderConfig(
×
1454
                LambdaManagedInstancesCapacityProviderConfig=LambdaManagedInstancesCapacityProviderConfig(
1455
                    ExecutionEnvironmentMemoryGiBPerVCpu=2.0,
1456
                    PerExecutionEnvironmentMaxConcurrency=16,
1457
                )
1458
            )
1459
            capacity_provider_config = merge_recursive(default_config, capacity_provider_config)
×
1460
            replace_kwargs["capacity_provider_config"] = capacity_provider_config
×
1461
        new_latest_version = dataclasses.replace(
1✔
1462
            latest_version,
1463
            config=dataclasses.replace(
1464
                latest_version_config,
1465
                last_modified=api_utils.generate_lambda_date(),
1466
                internal_revision=short_uid(),
1467
                last_update=UpdateStatus(
1468
                    status=LastUpdateStatus.InProgress,
1469
                    code="Creating",
1470
                    reason="The function is being created.",
1471
                ),
1472
                **replace_kwargs,
1473
            ),
1474
        )
1475
        function.versions["$LATEST"] = new_latest_version  # TODO: notify
1✔
1476

1477
        if function.latest().config.capacity_provider_config:
1✔
1478

1479
            def _update_version_with_logging():
×
1480
                try:
×
1481
                    self.lambda_service.update_version(new_latest_version)
×
1482
                except Exception:
×
1483
                    LOG.error(
×
1484
                        "Failed to update Lambda Managed Instances function version %s",
1485
                        new_latest_version.id.qualified_arn(),
1486
                        exc_info=LOG.isEnabledFor(logging.DEBUG),
1487
                    )
1488

1489
            self.lambda_service.task_executor.submit(_update_version_with_logging)
×
1490
        else:
1491
            self.lambda_service.update_version(new_version=new_latest_version)
1✔
1492

1493
        return api_utils.map_config_out(new_latest_version)
1✔
1494

1495
    @handler(operation="UpdateFunctionCode", expand=False)
1✔
1496
    def update_function_code(
1✔
1497
        self, context: RequestContext, request: UpdateFunctionCodeRequest
1498
    ) -> FunctionConfiguration:
1499
        """updates the $LATEST version of the function"""
1500
        # only supports normal zip packaging atm
1501
        # if request.get("Publish"):
1502
        #     self.lambda_service.create_function_version()
1503

1504
        function_name = request.get("FunctionName")
1✔
1505
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1506
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1507

1508
        store = lambda_stores[account_id][region]
1✔
1509
        if function_name not in store.functions:
1✔
1510
            raise ResourceNotFoundException(
×
1511
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1512
                Type="User",
1513
            )
1514
        function = store.functions[function_name]
1✔
1515

1516
        revision_id = request.get("RevisionId")
1✔
1517
        if revision_id and revision_id != function.latest().config.revision_id:
1✔
1518
            raise PreconditionFailedException(
1✔
1519
                "The Revision Id provided does not match the latest Revision Id. "
1520
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1521
                Type="User",
1522
            )
1523

1524
        # TODO verify if correct combination of code is set
1525
        image = None
1✔
1526
        if (
1✔
1527
            request.get("ZipFile") or request.get("S3Bucket")
1528
        ) and function.latest().config.package_type == PackageType.Image:
1529
            raise InvalidParameterValueException(
1✔
1530
                "Please provide ImageUri when updating a function with packageType Image.",
1531
                Type="User",
1532
            )
1533
        elif request.get("ImageUri") and function.latest().config.package_type == PackageType.Zip:
1✔
1534
            raise InvalidParameterValueException(
1✔
1535
                "Please don't provide ImageUri when updating a function with packageType Zip.",
1536
                Type="User",
1537
            )
1538

1539
        if publish_to := request.get("PublishTo"):
1✔
1540
            self._validate_publish_to(publish_to)
×
1541

1542
        if zip_file := request.get("ZipFile"):
1✔
1543
            code = store_lambda_archive(
1✔
1544
                archive_file=zip_file,
1545
                function_name=function_name,
1546
                region_name=region,
1547
                account_id=account_id,
1548
            )
1549
        elif s3_bucket := request.get("S3Bucket"):
1✔
1550
            s3_key = request["S3Key"]
1✔
1551
            s3_object_version = request.get("S3ObjectVersion")
1✔
1552
            code = store_s3_bucket_archive(
1✔
1553
                archive_bucket=s3_bucket,
1554
                archive_key=s3_key,
1555
                archive_version=s3_object_version,
1556
                function_name=function_name,
1557
                region_name=region,
1558
                account_id=account_id,
1559
            )
1560
        elif image := request.get("ImageUri"):
1✔
1561
            code = None
1✔
1562
            image = create_image_code(image_uri=image)
1✔
1563
        else:
1564
            raise LambdaServiceException("A ZIP file, S3 bucket, or image is required")
×
1565

1566
        old_function_version = function.versions.get("$LATEST")
1✔
1567
        replace_kwargs = {"code": code} if code else {"image": image}
1✔
1568

1569
        if architectures := request.get("Architectures"):
1✔
1570
            if len(architectures) != 1:
×
1571
                raise ValidationException(
×
1572
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1573
                    f"satisfy constraint: Member must have length less than or equal to 1",
1574
                )
1575
            # An empty list of architectures is also forbidden. Further exceptions are tested here for create_function:
1576
            # tests.aws.services.lambda_.test_lambda_api.TestLambdaFunction.test_create_lambda_exceptions
1577
            if architectures[0] not in ARCHITECTURES:
×
1578
                raise ValidationException(
×
1579
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1580
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
1581
                    f"[x86_64, arm64], Member must not be null]",
1582
                )
1583
            replace_kwargs["architectures"] = architectures
×
1584

1585
        config = dataclasses.replace(
1✔
1586
            old_function_version.config,
1587
            internal_revision=short_uid(),
1588
            last_modified=api_utils.generate_lambda_date(),
1589
            last_update=UpdateStatus(
1590
                status=LastUpdateStatus.InProgress,
1591
                code="Creating",
1592
                reason="The function is being created.",
1593
            ),
1594
            **replace_kwargs,
1595
        )
1596
        function_version = dataclasses.replace(old_function_version, config=config)
1✔
1597
        function.versions["$LATEST"] = function_version
1✔
1598

1599
        self.lambda_service.update_version(new_version=function_version)
1✔
1600
        if request.get("Publish"):
1✔
1601
            function_version = self._publish_version_with_changes(
1✔
1602
                function_name=function_name,
1603
                region=region,
1604
                account_id=account_id,
1605
                publish_to=publish_to,
1606
                is_active=True,
1607
            )
1608
        return api_utils.map_config_out(
1✔
1609
            function_version, return_qualified_arn=bool(request.get("Publish"))
1610
        )
1611

1612
    # TODO: does deleting the latest published version affect the next versions number?
1613
    # TODO: what happens when we call this with a qualifier and a fully qualified ARN? (+ conflicts?)
1614
    # TODO: test different ARN patterns (shorthand ARN?)
1615
    # TODO: test deleting across regions?
1616
    # TODO: test mismatch between context region and region in ARN
1617
    # TODO: test qualifier $LATEST, alias-name and version
1618
    def delete_function(
1✔
1619
        self,
1620
        context: RequestContext,
1621
        function_name: NamespacedFunctionName,
1622
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1623
        **kwargs,
1624
    ) -> DeleteFunctionResponse:
1625
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1626
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1627
            function_name, qualifier, context
1628
        )
1629

1630
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1631
            raise InvalidParameterValueException(
×
1632
                "Deletion of aliases is not currently supported.",
1633
                Type="User",
1634
            )
1635

1636
        store = lambda_stores[account_id][region]
1✔
1637
        if qualifier == "$LATEST":
1✔
1638
            raise InvalidParameterValueException(
1✔
1639
                "$LATEST version cannot be deleted without deleting the function.", Type="User"
1640
            )
1641

1642
        unqualified_function_arn = api_utils.unqualified_lambda_arn(
1✔
1643
            function_name=function_name, region=region, account=account_id
1644
        )
1645
        if function_name not in store.functions:
1✔
1646
            e = ResourceNotFoundException(
1✔
1647
                f"Function not found: {unqualified_function_arn}",
1648
                Type="User",
1649
            )
1650
            raise e
1✔
1651
        function = store.functions.get(function_name)
1✔
1652

1653
        function_has_capacity_provider = False
1✔
1654
        if qualifier:
1✔
1655
            # delete a version of the function
1656
            version = function.versions.get(qualifier, None)
1✔
1657
            if version:
1✔
1658
                if version.config.capacity_provider_config:
1✔
1659
                    function_has_capacity_provider = True
×
1660
                    # async delete from store
1661
                    self.lambda_service.delete_function_version_async(function, version, qualifier)
×
1662
                else:
1663
                    function.versions.pop(qualifier, None)
1✔
1664
                self.lambda_service.stop_version(version.id.qualified_arn())
1✔
1665
                destroy_code_if_not_used(code=version.config.code, function=function)
1✔
1666
        else:
1667
            # delete the whole function
1668
            self._remove_all_tags(unqualified_function_arn)
1✔
1669
            # TODO: introduce locking for safe deletion: We could create a new version at the API layer before
1670
            #  the old version gets cleaned up in the internal lambda service.
1671
            function = store.functions.get(function_name)
1✔
1672
            if function.latest().config.capacity_provider_config:
1✔
1673
                function_has_capacity_provider = True
×
1674
                # async delete version from store
1675
                self.lambda_service.delete_function_async(store, function_name)
×
1676

1677
            for version in function.versions.values():
1✔
1678
                # Functions with a capacity provider do NOT have a version manager for $LATEST because only
1679
                # published versions are invokable.
1680
                if not function_has_capacity_provider or (
1✔
1681
                    function_has_capacity_provider and version.id.qualifier != "$LATEST"
1682
                ):
1683
                    self.lambda_service.stop_version(qualified_arn=version.id.qualified_arn())
1✔
1684
                # we can safely destroy the code here
1685
                if version.config.code:
1✔
1686
                    version.config.code.destroy()
1✔
1687
            if not function_has_capacity_provider:
1✔
1688
                store.functions.pop(function_name, None)
1✔
1689

1690
        return DeleteFunctionResponse(StatusCode=202 if function_has_capacity_provider else 204)
1✔
1691

1692
    def list_functions(
1✔
1693
        self,
1694
        context: RequestContext,
1695
        master_region: MasterRegion = None,  # (only relevant for lambda@edge)
1696
        function_version: FunctionVersionApi = None,
1697
        marker: String = None,
1698
        max_items: MaxListItems = None,
1699
        **kwargs,
1700
    ) -> ListFunctionsResponse:
1701
        state = lambda_stores[context.account_id][context.region]
1✔
1702

1703
        if function_version and function_version != FunctionVersionApi.ALL:
1✔
1704
            raise ValidationException(
1✔
1705
                f"1 validation error detected: Value '{function_version}'"
1706
                + " at 'functionVersion' failed to satisfy constraint: Member must satisfy enum value set: [ALL]"
1707
            )
1708

1709
        if function_version == FunctionVersionApi.ALL:
1✔
1710
            # include all versions for all function
1711
            versions = [v for f in state.functions.values() for v in f.versions.values()]
1✔
1712
            return_qualified_arn = True
1✔
1713
        else:
1714
            versions = [f.latest() for f in state.functions.values()]
1✔
1715
            return_qualified_arn = False
1✔
1716

1717
        versions = [
1✔
1718
            api_utils.map_to_list_response(
1719
                api_utils.map_config_out(fc, return_qualified_arn=return_qualified_arn)
1720
            )
1721
            for fc in versions
1722
        ]
1723
        versions = PaginatedList(versions)
1✔
1724
        page, token = versions.get_page(
1✔
1725
            lambda version: version["FunctionArn"],
1726
            marker,
1727
            max_items,
1728
        )
1729
        return ListFunctionsResponse(Functions=page, NextMarker=token)
1✔
1730

1731
    def get_function(
1✔
1732
        self,
1733
        context: RequestContext,
1734
        function_name: NamespacedFunctionName,
1735
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1736
        **kwargs,
1737
    ) -> GetFunctionResponse:
1738
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1739
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1740
            function_name, qualifier, context
1741
        )
1742

1743
        fn = lambda_stores[account_id][region].functions.get(function_name)
1✔
1744
        if fn is None:
1✔
1745
            if qualifier is None:
1✔
1746
                raise ResourceNotFoundException(
1✔
1747
                    f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
1748
                    Type="User",
1749
                )
1750
            else:
1751
                raise ResourceNotFoundException(
1✔
1752
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
1753
                    Type="User",
1754
                )
1755
        alias_name = None
1✔
1756
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1757
            if qualifier not in fn.aliases:
1✔
1758
                alias_arn = api_utils.qualified_lambda_arn(
1✔
1759
                    function_name, qualifier, account_id, region
1760
                )
1761
                raise ResourceNotFoundException(f"Function not found: {alias_arn}", Type="User")
1✔
1762
            alias_name = qualifier
1✔
1763
            qualifier = fn.aliases[alias_name].function_version
1✔
1764

1765
        version = get_function_version(
1✔
1766
            function_name=function_name,
1767
            qualifier=qualifier,
1768
            account_id=account_id,
1769
            region=region,
1770
        )
1771
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
1772
        additional_fields = {}
1✔
1773
        if tags:
1✔
1774
            additional_fields["Tags"] = tags
1✔
1775
        code_location = None
1✔
1776
        if code := version.config.code:
1✔
1777
            code_location = FunctionCodeLocation(
1✔
1778
                Location=code.generate_presigned_url(endpoint_url=config.external_service_url()),
1779
                RepositoryType="S3",
1780
            )
1781
        elif image := version.config.image:
1✔
1782
            code_location = FunctionCodeLocation(
1✔
1783
                ImageUri=image.image_uri,
1784
                RepositoryType=image.repository_type,
1785
                ResolvedImageUri=image.resolved_image_uri,
1786
            )
1787
        concurrency = None
1✔
1788
        if fn.reserved_concurrent_executions:
1✔
1789
            concurrency = Concurrency(
1✔
1790
                ReservedConcurrentExecutions=fn.reserved_concurrent_executions
1791
            )
1792

1793
        return GetFunctionResponse(
1✔
1794
            Configuration=api_utils.map_config_out(
1795
                version, return_qualified_arn=bool(qualifier), alias_name=alias_name
1796
            ),
1797
            Code=code_location,  # TODO
1798
            Concurrency=concurrency,
1799
            **additional_fields,
1800
        )
1801

1802
    def get_function_configuration(
1✔
1803
        self,
1804
        context: RequestContext,
1805
        function_name: NamespacedFunctionName,
1806
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1807
        **kwargs,
1808
    ) -> FunctionConfiguration:
1809
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1810
        # CAVE: THIS RETURN VALUE IS *NOT* THE SAME AS IN get_function (!) but seems to be only configuration part?
1811
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1812
            function_name, qualifier, context
1813
        )
1814
        version = get_function_version(
1✔
1815
            function_name=function_name,
1816
            qualifier=qualifier,
1817
            account_id=account_id,
1818
            region=region,
1819
        )
1820
        return api_utils.map_config_out(version, return_qualified_arn=bool(qualifier))
1✔
1821

1822
    def invoke(
1✔
1823
        self,
1824
        context: RequestContext,
1825
        function_name: NamespacedFunctionName,
1826
        invocation_type: InvocationType | None = None,
1827
        log_type: LogType | None = None,
1828
        client_context: String | None = None,
1829
        durable_execution_name: DurableExecutionName | None = None,
1830
        payload: IO[Blob] | None = None,
1831
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1832
        tenant_id: TenantId | None = None,
1833
        **kwargs,
1834
    ) -> InvocationResponse:
1835
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1836
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1837
            function_name, qualifier, context
1838
        )
1839

1840
        user_agent = context.request.user_agent.string
1✔
1841

1842
        time_before = time.perf_counter()
1✔
1843
        try:
1✔
1844
            invocation_result = self.lambda_service.invoke(
1✔
1845
                function_name=function_name,
1846
                qualifier=qualifier,
1847
                region=region,
1848
                account_id=account_id,
1849
                invocation_type=invocation_type,
1850
                client_context=client_context,
1851
                request_id=context.request_id,
1852
                trace_context=context.trace_context,
1853
                payload=payload.read() if payload else None,
1854
                user_agent=user_agent,
1855
            )
1856
        except ServiceException:
1✔
1857
            raise
1✔
1858
        except EnvironmentStartupTimeoutException as e:
1✔
1859
            raise LambdaServiceException(
1✔
1860
                f"[{context.request_id}] Timeout while starting up lambda environment for function {function_name}:{qualifier}"
1861
            ) from e
1862
        except Exception as e:
1✔
1863
            LOG.error(
1✔
1864
                "[%s] Error while invoking lambda %s",
1865
                context.request_id,
1866
                function_name,
1867
                exc_info=LOG.isEnabledFor(logging.DEBUG),
1868
            )
1869
            raise LambdaServiceException(
1✔
1870
                f"[{context.request_id}] Internal error while executing lambda {function_name}:{qualifier}. Caused by {type(e).__name__}: {e}"
1871
            ) from e
1872

1873
        if invocation_type == InvocationType.Event:
1✔
1874
            # This happens when invocation type is event
1875
            return InvocationResponse(StatusCode=202)
1✔
1876
        if invocation_type == InvocationType.DryRun:
1✔
1877
            # This happens when invocation type is dryrun
1878
            return InvocationResponse(StatusCode=204)
1✔
1879
        LOG.debug("Lambda invocation duration: %0.2fms", (time.perf_counter() - time_before) * 1000)
1✔
1880

1881
        response = InvocationResponse(
1✔
1882
            StatusCode=200,
1883
            Payload=invocation_result.payload,
1884
            ExecutedVersion=invocation_result.executed_version,
1885
        )
1886

1887
        if invocation_result.is_error:
1✔
1888
            response["FunctionError"] = "Unhandled"
1✔
1889

1890
        if log_type == LogType.Tail:
1✔
1891
            response["LogResult"] = to_str(
1✔
1892
                base64.b64encode(to_bytes(invocation_result.logs)[-4096:])
1893
            )
1894

1895
        return response
1✔
1896

1897
    # Version operations
1898
    def publish_version(
1✔
1899
        self,
1900
        context: RequestContext,
1901
        function_name: FunctionName,
1902
        code_sha256: String | None = None,
1903
        description: Description | None = None,
1904
        revision_id: String | None = None,
1905
        publish_to: FunctionVersionLatestPublished | None = None,
1906
        **kwargs,
1907
    ) -> FunctionConfiguration:
1908
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1909
        function_name = api_utils.get_function_name(function_name, context)
1✔
1910
        if publish_to:
1✔
1911
            self._validate_publish_to(publish_to)
×
1912
        new_version = self._publish_version_from_existing_version(
1✔
1913
            function_name=function_name,
1914
            description=description,
1915
            account_id=account_id,
1916
            region=region,
1917
            revision_id=revision_id,
1918
            code_sha256=code_sha256,
1919
            publish_to=publish_to,
1920
        )
1921
        return api_utils.map_config_out(new_version, return_qualified_arn=True)
1✔
1922

1923
    def list_versions_by_function(
1✔
1924
        self,
1925
        context: RequestContext,
1926
        function_name: NamespacedFunctionName,
1927
        marker: String = None,
1928
        max_items: MaxListItems = None,
1929
        **kwargs,
1930
    ) -> ListVersionsByFunctionResponse:
1931
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1932
        function_name = api_utils.get_function_name(function_name, context)
1✔
1933
        function = self._get_function(
1✔
1934
            function_name=function_name, region=region, account_id=account_id
1935
        )
1936
        versions = [
1✔
1937
            api_utils.map_to_list_response(
1938
                api_utils.map_config_out(version=version, return_qualified_arn=True)
1939
            )
1940
            for version in function.versions.values()
1941
        ]
1942
        items = PaginatedList(versions)
1✔
1943
        page, token = items.get_page(
1✔
1944
            lambda item: item,
1945
            marker,
1946
            max_items,
1947
        )
1948
        return ListVersionsByFunctionResponse(Versions=page, NextMarker=token)
1✔
1949

1950
    # Alias
1951

1952
    def _create_routing_config_model(
1✔
1953
        self, routing_config_dict: dict[str, float], function_version: FunctionVersion
1954
    ):
1955
        if len(routing_config_dict) > 1:
1✔
1956
            raise InvalidParameterValueException(
1✔
1957
                "Number of items in AdditionalVersionWeights cannot be greater than 1",
1958
                Type="User",
1959
            )
1960
        # should be exactly one item here, still iterating, might be supported in the future
1961
        for key, value in routing_config_dict.items():
1✔
1962
            if value < 0.0 or value >= 1.0:
1✔
1963
                raise ValidationException(
1✔
1964
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map value must satisfy constraint: [Member must have value less than or equal to 1.0, Member must have value greater than or equal to 0.0, Member must not be null]"
1965
                )
1966
            if key == function_version.id.qualifier:
1✔
1967
                raise InvalidParameterValueException(
1✔
1968
                    f"Invalid function version {function_version.id.qualifier}. Function version {function_version.id.qualifier} is already included in routing configuration.",
1969
                    Type="User",
1970
                )
1971
            # check if version target is latest, then no routing config is allowed
1972
            if function_version.id.qualifier == "$LATEST":
1✔
1973
                raise InvalidParameterValueException(
1✔
1974
                    "$LATEST is not supported for an alias pointing to more than 1 version"
1975
                )
1976
            if not api_utils.qualifier_is_version(key):
1✔
1977
                raise ValidationException(
1✔
1978
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map keys must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: [0-9]+]"
1979
                )
1980

1981
            # checking if the version in the config exists
1982
            get_function_version(
1✔
1983
                function_name=function_version.id.function_name,
1984
                qualifier=key,
1985
                region=function_version.id.region,
1986
                account_id=function_version.id.account,
1987
            )
1988
        return AliasRoutingConfig(version_weights=routing_config_dict)
1✔
1989

1990
    def create_alias(
1✔
1991
        self,
1992
        context: RequestContext,
1993
        function_name: FunctionName,
1994
        name: Alias,
1995
        function_version: VersionWithLatestPublished,
1996
        description: Description = None,
1997
        routing_config: AliasRoutingConfiguration = None,
1998
        **kwargs,
1999
    ) -> AliasConfiguration:
2000
        if not api_utils.qualifier_is_alias(name):
1✔
2001
            raise ValidationException(
1✔
2002
                f"1 validation error detected: Value '{name}' at 'name' failed to satisfy constraint: Member must satisfy regular expression pattern: (?!^[0-9]+$)([a-zA-Z0-9-_]+)"
2003
            )
2004

2005
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2006
        function_name = api_utils.get_function_name(function_name, context)
1✔
2007
        target_version = get_function_version(
1✔
2008
            function_name=function_name,
2009
            qualifier=function_version,
2010
            region=region,
2011
            account_id=account_id,
2012
        )
2013
        function = self._get_function(
1✔
2014
            function_name=function_name, region=region, account_id=account_id
2015
        )
2016
        # description is always present, if not specified it's an empty string
2017
        description = description or ""
1✔
2018
        with function.lock:
1✔
2019
            if existing_alias := function.aliases.get(name):
1✔
2020
                raise ResourceConflictException(
1✔
2021
                    f"Alias already exists: {api_utils.map_alias_out(alias=existing_alias, function=function)['AliasArn']}",
2022
                    Type="User",
2023
                )
2024
            # checking if the version exists
2025
            routing_configuration = None
1✔
2026
            if routing_config and (
1✔
2027
                routing_config_dict := routing_config.get("AdditionalVersionWeights")
2028
            ):
2029
                routing_configuration = self._create_routing_config_model(
1✔
2030
                    routing_config_dict, target_version
2031
                )
2032

2033
            alias = VersionAlias(
1✔
2034
                name=name,
2035
                function_version=function_version,
2036
                description=description,
2037
                routing_configuration=routing_configuration,
2038
            )
2039
            function.aliases[name] = alias
1✔
2040
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
2041

2042
    def list_aliases(
1✔
2043
        self,
2044
        context: RequestContext,
2045
        function_name: FunctionName,
2046
        function_version: VersionWithLatestPublished = None,
2047
        marker: String = None,
2048
        max_items: MaxListItems = None,
2049
        **kwargs,
2050
    ) -> ListAliasesResponse:
2051
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2052
        function_name = api_utils.get_function_name(function_name, context)
1✔
2053
        function = self._get_function(
1✔
2054
            function_name=function_name, region=region, account_id=account_id
2055
        )
2056
        aliases = [
1✔
2057
            api_utils.map_alias_out(alias, function)
2058
            for alias in function.aliases.values()
2059
            if function_version is None or alias.function_version == function_version
2060
        ]
2061

2062
        aliases = PaginatedList(aliases)
1✔
2063
        page, token = aliases.get_page(
1✔
2064
            lambda alias: alias["AliasArn"],
2065
            marker,
2066
            max_items,
2067
        )
2068

2069
        return ListAliasesResponse(Aliases=page, NextMarker=token)
1✔
2070

2071
    def delete_alias(
1✔
2072
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
2073
    ) -> None:
2074
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2075
        function_name = api_utils.get_function_name(function_name, context)
1✔
2076
        function = self._get_function(
1✔
2077
            function_name=function_name, region=region, account_id=account_id
2078
        )
2079
        version_alias = function.aliases.pop(name, None)
1✔
2080

2081
        # cleanup related resources
2082
        if name in function.provisioned_concurrency_configs:
1✔
2083
            function.provisioned_concurrency_configs.pop(name)
1✔
2084

2085
        # TODO: Allow for deactivating/unregistering specific Lambda URLs
2086
        if version_alias and name in function.function_url_configs:
1✔
2087
            url_config = function.function_url_configs.pop(name)
1✔
2088
            LOG.debug(
1✔
2089
                "Stopping aliased Lambda Function URL %s for %s",
2090
                url_config.url,
2091
                url_config.function_name,
2092
            )
2093

2094
    def get_alias(
1✔
2095
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
2096
    ) -> AliasConfiguration:
2097
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2098
        function_name = api_utils.get_function_name(function_name, context)
1✔
2099
        function = self._get_function(
1✔
2100
            function_name=function_name, region=region, account_id=account_id
2101
        )
2102
        if not (alias := function.aliases.get(name)):
1✔
2103
            raise ResourceNotFoundException(
1✔
2104
                f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name=function_name, qualifier=name, region=region, account=account_id)}",
2105
                Type="User",
2106
            )
2107
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
2108

2109
    def update_alias(
1✔
2110
        self,
2111
        context: RequestContext,
2112
        function_name: FunctionName,
2113
        name: Alias,
2114
        function_version: VersionWithLatestPublished = None,
2115
        description: Description = None,
2116
        routing_config: AliasRoutingConfiguration = None,
2117
        revision_id: String = None,
2118
        **kwargs,
2119
    ) -> AliasConfiguration:
2120
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2121
        function_name = api_utils.get_function_name(function_name, context)
1✔
2122
        function = self._get_function(
1✔
2123
            function_name=function_name, region=region, account_id=account_id
2124
        )
2125
        if not (alias := function.aliases.get(name)):
1✔
2126
            fn_arn = api_utils.qualified_lambda_arn(function_name, name, account_id, region)
1✔
2127
            raise ResourceNotFoundException(
1✔
2128
                f"Alias not found: {fn_arn}",
2129
                Type="User",
2130
            )
2131
        if revision_id and alias.revision_id != revision_id:
1✔
2132
            raise PreconditionFailedException(
1✔
2133
                "The Revision Id provided does not match the latest Revision Id. "
2134
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2135
                Type="User",
2136
            )
2137
        changes = {}
1✔
2138
        if function_version is not None:
1✔
2139
            changes |= {"function_version": function_version}
1✔
2140
        if description is not None:
1✔
2141
            changes |= {"description": description}
1✔
2142
        if routing_config is not None:
1✔
2143
            # if it is an empty dict or AdditionalVersionWeights is empty, set routing config to None
2144
            new_routing_config = None
1✔
2145
            if routing_config_dict := routing_config.get("AdditionalVersionWeights"):
1✔
2146
                new_routing_config = self._create_routing_config_model(routing_config_dict)
×
2147
            changes |= {"routing_configuration": new_routing_config}
1✔
2148
        # even if no changes are done, we have to update revision id for some reason
2149
        old_alias = alias
1✔
2150
        alias = dataclasses.replace(alias, **changes)
1✔
2151
        function.aliases[name] = alias
1✔
2152

2153
        # TODO: signal lambda service that pointer potentially changed
2154
        self.lambda_service.update_alias(old_alias=old_alias, new_alias=alias, function=function)
1✔
2155

2156
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
2157

2158
    # =======================================
2159
    # ======= EVENT SOURCE MAPPINGS =========
2160
    # =======================================
2161
    def check_service_resource_exists(
1✔
2162
        self, service: str, resource_arn: str, function_arn: str, function_role_arn: str
2163
    ):
2164
        """
2165
        Check if the service resource exists and if the function has access to it.
2166

2167
        Raises:
2168
            InvalidParameterValueException: If the service resource does not exist or the function does not have access to it.
2169
        """
2170
        arn = parse_arn(resource_arn)
1✔
2171
        source_client = get_internal_client(
1✔
2172
            arn=resource_arn,
2173
            role_arn=function_role_arn,
2174
            service_principal=ServicePrincipal.lambda_,
2175
            source_arn=function_arn,
2176
        )
2177
        if service in ["sqs", "sqs-fifo"]:
1✔
2178
            try:
1✔
2179
                # AWS uses `GetQueueAttributes` internally to verify the queue existence, but we need the `QueueUrl`
2180
                # which is not given directly. We build out a dummy `QueueUrl` which can be parsed by SQS to return
2181
                # the right value
2182
                queue_name = arn["resource"].split("/")[-1]
1✔
2183
                queue_url = f"http://sqs.{arn['region']}.domain/{arn['account']}/{queue_name}"
1✔
2184
                source_client.get_queue_attributes(QueueUrl=queue_url)
1✔
2185
            except ClientError as e:
1✔
2186
                error_code = e.response["Error"]["Code"]
1✔
2187
                if error_code == "AWS.SimpleQueueService.NonExistentQueue":
1✔
2188
                    raise InvalidParameterValueException(
1✔
2189
                        f"Error occurred while ReceiveMessage. SQS Error Code: {error_code}. SQS Error Message: {e.response['Error']['Message']}",
2190
                        Type="User",
2191
                    )
2192
                raise e
×
2193
        elif service in ["kinesis"]:
1✔
2194
            try:
1✔
2195
                source_client.describe_stream(StreamARN=resource_arn)
1✔
2196
            except ClientError as e:
1✔
2197
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
2198
                    raise InvalidParameterValueException(
1✔
2199
                        f"Stream not found: {resource_arn}",
2200
                        Type="User",
2201
                    )
2202
                raise e
×
2203
        elif service in ["dynamodb"]:
1✔
2204
            try:
1✔
2205
                source_client.describe_stream(StreamArn=resource_arn)
1✔
2206
            except ClientError as e:
1✔
2207
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
2208
                    raise InvalidParameterValueException(
1✔
2209
                        f"Stream not found: {resource_arn}",
2210
                        Type="User",
2211
                    )
2212
                raise e
×
2213

2214
    @handler("CreateEventSourceMapping", expand=False)
1✔
2215
    def create_event_source_mapping(
1✔
2216
        self,
2217
        context: RequestContext,
2218
        request: CreateEventSourceMappingRequest,
2219
    ) -> EventSourceMappingConfiguration:
2220
        return self.create_event_source_mapping_v2(context, request)
1✔
2221

2222
    def create_event_source_mapping_v2(
1✔
2223
        self,
2224
        context: RequestContext,
2225
        request: CreateEventSourceMappingRequest,
2226
    ) -> EventSourceMappingConfiguration:
2227
        # Validations
2228
        function_arn, function_name, state, function_version, function_role = (
1✔
2229
            self.validate_event_source_mapping(context, request)
2230
        )
2231

2232
        esm_config = EsmConfigFactory(request, context, function_arn).get_esm_config()
1✔
2233

2234
        # Copy esm_config to avoid a race condition with potential async update in the store
2235
        state.event_source_mappings[esm_config["UUID"]] = esm_config.copy()
1✔
2236
        enabled = request.get("Enabled", True)
1✔
2237
        # TODO: check for potential async race condition update -> think about locking
2238
        esm_worker = EsmWorkerFactory(esm_config, function_role, enabled).get_esm_worker()
1✔
2239
        self.esm_workers[esm_worker.uuid] = esm_worker
1✔
2240
        # TODO: check StateTransitionReason, LastModified, LastProcessingResult (concurrent updates requires locking!)
2241
        if tags := request.get("Tags"):
1✔
2242
            self._store_tags(esm_config.get("EventSourceMappingArn"), tags)
1✔
2243
        esm_worker.create()
1✔
2244
        return esm_config
1✔
2245

2246
    def validate_event_source_mapping(self, context, request):
1✔
2247
        # TODO: test whether stream ARNs are valid sources for Pipes or ESM or whether only DynamoDB table ARNs work
2248
        # TODO: Validate MaxRecordAgeInSeconds (i.e cannot subceed 60s but can be -1) and MaxRetryAttempts parameters.
2249
        # See https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumrecordageinseconds
2250
        is_create_esm_request = context.operation.name == self.create_event_source_mapping.operation
1✔
2251

2252
        if destination_config := request.get("DestinationConfig"):
1✔
2253
            if "OnSuccess" in destination_config:
1✔
2254
                raise InvalidParameterValueException(
1✔
2255
                    "Unsupported DestinationConfig parameter for given event source mapping type.",
2256
                    Type="User",
2257
                )
2258

2259
        service = None
1✔
2260
        if "SelfManagedEventSource" in request:
1✔
2261
            service = "kafka"
×
2262
            if "SourceAccessConfigurations" not in request:
×
2263
                raise InvalidParameterValueException(
×
2264
                    "Required 'sourceAccessConfigurations' parameter is missing.", Type="User"
2265
                )
2266
        if service is None and "EventSourceArn" not in request:
1✔
2267
            raise InvalidParameterValueException("Unrecognized event source.", Type="User")
1✔
2268
        if service is None:
1✔
2269
            service = extract_service_from_arn(request["EventSourceArn"])
1✔
2270

2271
        batch_size = api_utils.validate_and_set_batch_size(service, request.get("BatchSize"))
1✔
2272
        if service in ["dynamodb", "kinesis"]:
1✔
2273
            starting_position = request.get("StartingPosition")
1✔
2274
            if not starting_position:
1✔
2275
                raise InvalidParameterValueException(
1✔
2276
                    "1 validation error detected: Value null at 'startingPosition' failed to satisfy constraint: Member must not be null.",
2277
                    Type="User",
2278
                )
2279

2280
            if starting_position not in KinesisStreamStartPosition.__members__:
1✔
2281
                raise ValidationException(
1✔
2282
                    f"1 validation error detected: Value '{starting_position}' at 'startingPosition' failed to satisfy constraint: Member must satisfy enum value set: [LATEST, AT_TIMESTAMP, TRIM_HORIZON]"
2283
                )
2284
            # AT_TIMESTAMP is not allowed for DynamoDB Streams
2285
            elif (
1✔
2286
                service == "dynamodb"
2287
                and starting_position not in DynamoDBStreamStartPosition.__members__
2288
            ):
2289
                raise InvalidParameterValueException(
1✔
2290
                    f"Unsupported starting position for arn type: {request['EventSourceArn']}",
2291
                    Type="User",
2292
                )
2293

2294
        if service in ["sqs", "sqs-fifo"]:
1✔
2295
            if batch_size > 10 and request.get("MaximumBatchingWindowInSeconds", 0) == 0:
1✔
2296
                raise InvalidParameterValueException(
1✔
2297
                    "Maximum batch window in seconds must be greater than 0 if maximum batch size is greater than 10",
2298
                    Type="User",
2299
                )
2300

2301
        if (filter_criteria := request.get("FilterCriteria")) is not None:
1✔
2302
            for filter_ in filter_criteria.get("Filters", []):
1✔
2303
                pattern_str = filter_.get("Pattern")
1✔
2304
                if not pattern_str or not isinstance(pattern_str, str):
1✔
2305
                    raise InvalidParameterValueException(
×
2306
                        "Invalid filter pattern definition.", Type="User"
2307
                    )
2308

2309
                if not validate_event_pattern(pattern_str):
1✔
2310
                    raise InvalidParameterValueException(
1✔
2311
                        "Invalid filter pattern definition.", Type="User"
2312
                    )
2313

2314
        # Can either have a FunctionName (i.e CreateEventSourceMapping request) or
2315
        # an internal EventSourceMappingConfiguration representation
2316
        request_function_name = request.get("FunctionName") or request.get("FunctionArn")
1✔
2317
        # can be either a partial arn or a full arn for the version/alias
2318
        function_name, qualifier, account, region = function_locators_from_arn(
1✔
2319
            request_function_name
2320
        )
2321
        # TODO: validate `context.region` vs. `region(request["FunctionName"])` vs. `region(request["EventSourceArn"])`
2322
        account = account or context.account_id
1✔
2323
        region = region or context.region
1✔
2324
        state = lambda_stores[account][region]
1✔
2325
        fn = state.functions.get(function_name)
1✔
2326
        if not fn:
1✔
2327
            raise InvalidParameterValueException("Function does not exist", Type="User")
1✔
2328

2329
        if qualifier:
1✔
2330
            # make sure the function version/alias exists
2331
            if api_utils.qualifier_is_alias(qualifier):
1✔
2332
                fn_alias = fn.aliases.get(qualifier)
1✔
2333
                if not fn_alias:
1✔
2334
                    raise Exception("unknown alias")  # TODO: cover via test
×
2335
            elif api_utils.qualifier_is_version(qualifier):
1✔
2336
                fn_version = fn.versions.get(qualifier)
1✔
2337
                if not fn_version:
1✔
2338
                    raise Exception("unknown version")  # TODO: cover via test
×
2339
            elif qualifier == "$LATEST":
1✔
2340
                pass
1✔
2341
            elif qualifier == "$LATEST.PUBLISHED":
×
2342
                if fn.versions.get(qualifier):
×
2343
                    pass
×
2344
            else:
2345
                raise Exception("invalid functionname")  # TODO: cover via test
×
2346
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account, region)
1✔
2347

2348
        else:
2349
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account, region)
1✔
2350

2351
        function_version = get_function_version_from_arn(fn_arn)
1✔
2352
        function_role = function_version.config.role
1✔
2353

2354
        if source_arn := request.get("EventSourceArn"):
1✔
2355
            self.check_service_resource_exists(service, source_arn, fn_arn, function_role)
1✔
2356
        # Check we are validating a CreateEventSourceMapping request
2357
        if is_create_esm_request:
1✔
2358

2359
            def _get_mapping_sources(mapping: dict[str, Any]) -> list[str]:
1✔
2360
                if event_source_arn := mapping.get("EventSourceArn"):
1✔
2361
                    return [event_source_arn]
1✔
2362
                return (
×
2363
                    mapping.get("SelfManagedEventSource", {})
2364
                    .get("Endpoints", {})
2365
                    .get("KAFKA_BOOTSTRAP_SERVERS", [])
2366
                )
2367

2368
            # check for event source duplicates
2369
            # TODO: currently validated for sqs, kinesis, and dynamodb
2370
            service_id = load_service(service).service_id
1✔
2371
            for uuid, mapping in state.event_source_mappings.items():
1✔
2372
                mapping_sources = _get_mapping_sources(mapping)
1✔
2373
                request_sources = _get_mapping_sources(request)
1✔
2374
                if mapping["FunctionArn"] == fn_arn and (
1✔
2375
                    set(mapping_sources).intersection(request_sources)
2376
                ):
2377
                    if service == "sqs":
1✔
2378
                        # *shakes fist at SQS*
2379
                        raise ResourceConflictException(
1✔
2380
                            f'An event source mapping with {service_id} arn (" {mapping["EventSourceArn"]} ") '
2381
                            f'and function (" {function_name} ") already exists. Please update or delete the '
2382
                            f"existing mapping with UUID {uuid}",
2383
                            Type="User",
2384
                        )
2385
                    elif service == "kafka":
1✔
2386
                        if set(mapping["Topics"]).intersection(request["Topics"]):
×
2387
                            raise ResourceConflictException(
×
2388
                                f'An event source mapping with event source ("{",".join(request_sources)}"), '
2389
                                f'function ("{fn_arn}"), '
2390
                                f'topics ("{",".join(request["Topics"])}") already exists. Please update or delete the '
2391
                                f"existing mapping with UUID {uuid}",
2392
                                Type="User",
2393
                            )
2394
                    else:
2395
                        raise ResourceConflictException(
1✔
2396
                            f'The event source arn (" {mapping["EventSourceArn"]} ") and function '
2397
                            f'(" {function_name} ") provided mapping already exists. Please update or delete the '
2398
                            f"existing mapping with UUID {uuid}",
2399
                            Type="User",
2400
                        )
2401
        return fn_arn, function_name, state, function_version, function_role
1✔
2402

2403
    @handler("UpdateEventSourceMapping", expand=False)
1✔
2404
    def update_event_source_mapping(
1✔
2405
        self,
2406
        context: RequestContext,
2407
        request: UpdateEventSourceMappingRequest,
2408
    ) -> EventSourceMappingConfiguration:
2409
        return self.update_event_source_mapping_v2(context, request)
1✔
2410

2411
    def update_event_source_mapping_v2(
1✔
2412
        self,
2413
        context: RequestContext,
2414
        request: UpdateEventSourceMappingRequest,
2415
    ) -> EventSourceMappingConfiguration:
2416
        # TODO: test and implement this properly (quite complex with many validations and limitations!)
2417
        LOG.warning(
1✔
2418
            "Updating Lambda Event Source Mapping is in experimental state and not yet fully tested."
2419
        )
2420
        state = lambda_stores[context.account_id][context.region]
1✔
2421
        request_data = {**request}
1✔
2422
        uuid = request_data.pop("UUID", None)
1✔
2423
        if not uuid:
1✔
2424
            raise ResourceNotFoundException(
×
2425
                "The resource you requested does not exist.", Type="User"
2426
            )
2427
        old_event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2428
        esm_worker = self.esm_workers.get(uuid)
1✔
2429
        if old_event_source_mapping is None or esm_worker is None:
1✔
2430
            raise ResourceNotFoundException(
1✔
2431
                "The resource you requested does not exist.", Type="User"
2432
            )  # TODO: test?
2433

2434
        # normalize values to overwrite
2435
        event_source_mapping = old_event_source_mapping | request_data
1✔
2436

2437
        temp_params = {}  # values only set for the returned response, not saved internally (e.g. transient state)
1✔
2438

2439
        # Validate the newly updated ESM object. We ignore the output here since we only care whether an Exception is raised.
2440
        function_arn, _, _, function_version, function_role = self.validate_event_source_mapping(
1✔
2441
            context, event_source_mapping
2442
        )
2443

2444
        # remove the FunctionName field
2445
        event_source_mapping.pop("FunctionName", None)
1✔
2446

2447
        if function_arn:
1✔
2448
            event_source_mapping["FunctionArn"] = function_arn
1✔
2449

2450
        # Only apply update if the desired state differs
2451
        enabled = request.get("Enabled")
1✔
2452
        if enabled is not None:
1✔
2453
            if enabled and old_event_source_mapping["State"] != EsmState.ENABLED:
1✔
2454
                event_source_mapping["State"] = EsmState.ENABLING
1✔
2455
            # TODO: What happens when trying to update during an update or failed state?!
2456
            elif not enabled and old_event_source_mapping["State"] == EsmState.ENABLED:
1✔
2457
                event_source_mapping["State"] = EsmState.DISABLING
1✔
2458
        else:
2459
            event_source_mapping["State"] = EsmState.UPDATING
1✔
2460

2461
        # To ensure parity, certain responses need to be immediately returned
2462
        temp_params["State"] = event_source_mapping["State"]
1✔
2463

2464
        state.event_source_mappings[uuid] = event_source_mapping
1✔
2465

2466
        # TODO: Currently, we re-create the entire ESM worker. Look into approach with better performance.
2467
        worker_factory = EsmWorkerFactory(
1✔
2468
            event_source_mapping, function_role, request.get("Enabled", esm_worker.enabled)
2469
        )
2470

2471
        # Get a new ESM worker object but do not active it, since the factory holds all logic for creating new worker from configuration.
2472
        updated_esm_worker = worker_factory.get_esm_worker()
1✔
2473
        self.esm_workers[uuid] = updated_esm_worker
1✔
2474

2475
        # We should stop() the worker since the delete() will remove the ESM from the state mapping.
2476
        esm_worker.stop()
1✔
2477
        # This will either create an EsmWorker in the CREATING state if enabled. Otherwise, the DISABLING state is set.
2478
        updated_esm_worker.create()
1✔
2479

2480
        return {**event_source_mapping, **temp_params}
1✔
2481

2482
    def delete_event_source_mapping(
1✔
2483
        self, context: RequestContext, uuid: String, **kwargs
2484
    ) -> EventSourceMappingConfiguration:
2485
        state = lambda_stores[context.account_id][context.region]
1✔
2486
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2487
        if not event_source_mapping:
1✔
2488
            raise ResourceNotFoundException(
1✔
2489
                "The resource you requested does not exist.", Type="User"
2490
            )
2491
        esm = state.event_source_mappings[uuid]
1✔
2492
        # TODO: add proper locking
2493
        esm_worker = self.esm_workers.pop(uuid, None)
1✔
2494
        # Asynchronous delete in v2
2495
        if not esm_worker:
1✔
2496
            raise ResourceNotFoundException(
×
2497
                "The resource you requested does not exist.", Type="User"
2498
            )
2499
        # the full deletion of the ESM is happening asynchronously, but we delete the Tags instantly
2500
        # this behavior is similar to ``get_event_source_mapping`` which will raise right after deletion, but it is not
2501
        # always the case in AWS. Add more testing and align behavior with ``get_event_source_mapping``.
2502
        self._remove_all_tags(event_source_mapping["EventSourceMappingArn"])
1✔
2503
        esm_worker.delete()
1✔
2504
        return {**esm, "State": EsmState.DELETING}
1✔
2505

2506
    def get_event_source_mapping(
1✔
2507
        self, context: RequestContext, uuid: String, **kwargs
2508
    ) -> EventSourceMappingConfiguration:
2509
        state = lambda_stores[context.account_id][context.region]
1✔
2510
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2511
        if not event_source_mapping:
1✔
2512
            raise ResourceNotFoundException(
1✔
2513
                "The resource you requested does not exist.", Type="User"
2514
            )
2515
        esm_worker = self.esm_workers.get(uuid)
1✔
2516
        if not esm_worker:
1✔
2517
            raise ResourceNotFoundException(
×
2518
                "The resource you requested does not exist.", Type="User"
2519
            )
2520
        event_source_mapping["State"] = esm_worker.current_state
1✔
2521
        event_source_mapping["StateTransitionReason"] = esm_worker.state_transition_reason
1✔
2522
        return event_source_mapping
1✔
2523

2524
    def list_event_source_mappings(
1✔
2525
        self,
2526
        context: RequestContext,
2527
        event_source_arn: Arn = None,
2528
        function_name: FunctionName = None,
2529
        marker: String = None,
2530
        max_items: MaxListItems = None,
2531
        **kwargs,
2532
    ) -> ListEventSourceMappingsResponse:
2533
        state = lambda_stores[context.account_id][context.region]
1✔
2534

2535
        esms = state.event_source_mappings.values()
1✔
2536
        # TODO: update and test State and StateTransitionReason for ESM v2
2537

2538
        if event_source_arn:  # TODO: validate pattern
1✔
2539
            esms = [e for e in esms if e.get("EventSourceArn") == event_source_arn]
1✔
2540

2541
        if function_name:
1✔
2542
            esms = [e for e in esms if function_name in e["FunctionArn"]]
1✔
2543

2544
        esms = PaginatedList(esms)
1✔
2545
        page, token = esms.get_page(
1✔
2546
            lambda x: x["UUID"],
2547
            marker,
2548
            max_items,
2549
        )
2550
        return ListEventSourceMappingsResponse(EventSourceMappings=page, NextMarker=token)
1✔
2551

2552
    def get_source_type_from_request(self, request: dict[str, Any]) -> str:
1✔
2553
        if event_source_arn := request.get("EventSourceArn", ""):
×
2554
            service = extract_service_from_arn(event_source_arn)
×
2555
            if service == "sqs" and "fifo" in event_source_arn:
×
2556
                service = "sqs-fifo"
×
2557
            return service
×
2558
        elif request.get("SelfManagedEventSource"):
×
2559
            return "kafka"
×
2560

2561
    # =======================================
2562
    # ============ FUNCTION URLS ============
2563
    # =======================================
2564

2565
    @staticmethod
1✔
2566
    def _validate_qualifier(qualifier: str) -> None:
1✔
2567
        if qualifier in ["$LATEST", "$LATEST.PUBLISHED"] or (
1✔
2568
            qualifier and api_utils.qualifier_is_version(qualifier)
2569
        ):
2570
            raise ValidationException(
1✔
2571
                f"1 validation error detected: Value '{qualifier}' at 'qualifier' failed to satisfy constraint: Member must satisfy regular expression pattern: ((?!^\\d+$)^[0-9a-zA-Z-_]+$)"
2572
            )
2573

2574
    @staticmethod
1✔
2575
    def _validate_invoke_mode(invoke_mode: str) -> None:
1✔
2576
        if invoke_mode and invoke_mode not in [InvokeMode.BUFFERED, InvokeMode.RESPONSE_STREAM]:
1✔
2577
            raise ValidationException(
1✔
2578
                f"1 validation error detected: Value '{invoke_mode}' at 'invokeMode' failed to satisfy constraint: Member must satisfy enum value set: [RESPONSE_STREAM, BUFFERED]"
2579
            )
2580
        if invoke_mode == InvokeMode.RESPONSE_STREAM:
1✔
2581
            # TODO should we actually fail for setting RESPONSE_STREAM?
2582
            #  It should trigger InvokeWithResponseStream which is not implemented
2583
            LOG.warning(
1✔
2584
                "The invokeMode 'RESPONSE_STREAM' is not yet supported on LocalStack. The property is only mocked, the execution will still be 'BUFFERED'"
2585
            )
2586

2587
    # TODO: what happens if function state is not active?
2588
    def create_function_url_config(
1✔
2589
        self,
2590
        context: RequestContext,
2591
        function_name: FunctionName,
2592
        auth_type: FunctionUrlAuthType,
2593
        qualifier: FunctionUrlQualifier = None,
2594
        cors: Cors = None,
2595
        invoke_mode: InvokeMode = None,
2596
        **kwargs,
2597
    ) -> CreateFunctionUrlConfigResponse:
2598
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2599
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2600
            function_name, qualifier, context
2601
        )
2602
        state = lambda_stores[account_id][region]
1✔
2603
        self._validate_qualifier(qualifier)
1✔
2604
        self._validate_invoke_mode(invoke_mode)
1✔
2605

2606
        fn = state.functions.get(function_name)
1✔
2607
        if fn is None:
1✔
2608
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2609

2610
        url_config = fn.function_url_configs.get(qualifier or "$LATEST")
1✔
2611
        if url_config:
1✔
2612
            raise ResourceConflictException(
1✔
2613
                f"Failed to create function url config for [functionArn = {url_config.function_arn}]. Error message:  FunctionUrlConfig exists for this Lambda function",
2614
                Type="User",
2615
            )
2616

2617
        if qualifier and qualifier != "$LATEST" and qualifier not in fn.aliases:
1✔
2618
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2619

2620
        normalized_qualifier = qualifier or "$LATEST"
1✔
2621

2622
        function_arn = (
1✔
2623
            api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
2624
            if qualifier
2625
            else api_utils.unqualified_lambda_arn(function_name, account_id, region)
2626
        )
2627

2628
        custom_id: str | None = None
1✔
2629

2630
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
2631
        if TAG_KEY_CUSTOM_URL in tags:
1✔
2632
            # Note: I really wanted to add verification here that the
2633
            # url_id is unique, so we could surface that to the user ASAP.
2634
            # However, it seems like that information isn't available yet,
2635
            # since (as far as I can tell) we call
2636
            # self.router.register_routes() once, in a single shot, for all
2637
            # of the routes -- and we need to verify that it's unique not
2638
            # just for this particular lambda function, but for the entire
2639
            # lambda provider. Therefore... that idea proved non-trivial!
2640
            custom_id_tag_value = (
1✔
2641
                f"{tags[TAG_KEY_CUSTOM_URL]}-{qualifier}" if qualifier else tags[TAG_KEY_CUSTOM_URL]
2642
            )
2643
            if TAG_KEY_CUSTOM_URL_VALIDATOR.match(custom_id_tag_value):
1✔
2644
                custom_id = custom_id_tag_value
1✔
2645

2646
            else:
2647
                # Note: we're logging here instead of raising to prioritize
2648
                # strict parity with AWS over the localstack-only custom_id
2649
                LOG.warning(
1✔
2650
                    "Invalid custom ID tag value for lambda URL (%s=%s). "
2651
                    "Replaced with default (random id)",
2652
                    TAG_KEY_CUSTOM_URL,
2653
                    custom_id_tag_value,
2654
                )
2655

2656
        # The url_id is the subdomain used for the URL we're creating. This
2657
        # is either created randomly (as in AWS), or can be passed as a tag
2658
        # to the lambda itself (localstack-only).
2659
        url_id: str
2660
        if custom_id is None:
1✔
2661
            url_id = api_utils.generate_random_url_id()
1✔
2662
        else:
2663
            url_id = custom_id
1✔
2664

2665
        host_definition = localstack_host(custom_port=config.GATEWAY_LISTEN[0].port)
1✔
2666
        fn.function_url_configs[normalized_qualifier] = FunctionUrlConfig(
1✔
2667
            function_arn=function_arn,
2668
            function_name=function_name,
2669
            cors=cors,
2670
            url_id=url_id,
2671
            url=f"http://{url_id}.lambda-url.{context.region}.{host_definition.host_and_port()}/",  # TODO: https support
2672
            auth_type=auth_type,
2673
            creation_time=api_utils.generate_lambda_date(),
2674
            last_modified_time=api_utils.generate_lambda_date(),
2675
            invoke_mode=invoke_mode,
2676
        )
2677

2678
        # persist and start URL
2679
        # TODO: implement URL invoke
2680
        api_url_config = api_utils.map_function_url_config(
1✔
2681
            fn.function_url_configs[normalized_qualifier]
2682
        )
2683

2684
        return CreateFunctionUrlConfigResponse(
1✔
2685
            FunctionUrl=api_url_config["FunctionUrl"],
2686
            FunctionArn=api_url_config["FunctionArn"],
2687
            AuthType=api_url_config["AuthType"],
2688
            Cors=api_url_config["Cors"],
2689
            CreationTime=api_url_config["CreationTime"],
2690
            InvokeMode=api_url_config["InvokeMode"],
2691
        )
2692

2693
    def get_function_url_config(
1✔
2694
        self,
2695
        context: RequestContext,
2696
        function_name: FunctionName,
2697
        qualifier: FunctionUrlQualifier = None,
2698
        **kwargs,
2699
    ) -> GetFunctionUrlConfigResponse:
2700
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2701
        state = lambda_stores[account_id][region]
1✔
2702

2703
        fn_name, qualifier = api_utils.get_name_and_qualifier(function_name, qualifier, context)
1✔
2704

2705
        self._validate_qualifier(qualifier)
1✔
2706

2707
        resolved_fn = state.functions.get(fn_name)
1✔
2708
        if not resolved_fn:
1✔
2709
            raise ResourceNotFoundException(
1✔
2710
                "The resource you requested does not exist.", Type="User"
2711
            )
2712

2713
        qualifier = qualifier or "$LATEST"
1✔
2714
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2715
        if not url_config:
1✔
2716
            raise ResourceNotFoundException(
1✔
2717
                "The resource you requested does not exist.", Type="User"
2718
            )
2719

2720
        return api_utils.map_function_url_config(url_config)
1✔
2721

2722
    def update_function_url_config(
1✔
2723
        self,
2724
        context: RequestContext,
2725
        function_name: FunctionName,
2726
        qualifier: FunctionUrlQualifier = None,
2727
        auth_type: FunctionUrlAuthType = None,
2728
        cors: Cors = None,
2729
        invoke_mode: InvokeMode = None,
2730
        **kwargs,
2731
    ) -> UpdateFunctionUrlConfigResponse:
2732
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2733
        state = lambda_stores[account_id][region]
1✔
2734

2735
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2736
            function_name, qualifier, context
2737
        )
2738
        self._validate_qualifier(qualifier)
1✔
2739
        self._validate_invoke_mode(invoke_mode)
1✔
2740

2741
        fn = state.functions.get(function_name)
1✔
2742
        if not fn:
1✔
2743
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2744

2745
        normalized_qualifier = qualifier or "$LATEST"
1✔
2746

2747
        if (
1✔
2748
            api_utils.qualifier_is_alias(normalized_qualifier)
2749
            and normalized_qualifier not in fn.aliases
2750
        ):
2751
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2752

2753
        url_config = fn.function_url_configs.get(normalized_qualifier)
1✔
2754
        if not url_config:
1✔
2755
            raise ResourceNotFoundException(
1✔
2756
                "The resource you requested does not exist.", Type="User"
2757
            )
2758

2759
        changes = {
1✔
2760
            "last_modified_time": api_utils.generate_lambda_date(),
2761
            **({"cors": cors} if cors is not None else {}),
2762
            **({"auth_type": auth_type} if auth_type is not None else {}),
2763
        }
2764

2765
        if invoke_mode:
1✔
2766
            changes["invoke_mode"] = invoke_mode
1✔
2767

2768
        new_url_config = dataclasses.replace(url_config, **changes)
1✔
2769
        fn.function_url_configs[normalized_qualifier] = new_url_config
1✔
2770

2771
        return UpdateFunctionUrlConfigResponse(
1✔
2772
            FunctionUrl=new_url_config.url,
2773
            FunctionArn=new_url_config.function_arn,
2774
            AuthType=new_url_config.auth_type,
2775
            Cors=new_url_config.cors,
2776
            CreationTime=new_url_config.creation_time,
2777
            LastModifiedTime=new_url_config.last_modified_time,
2778
            InvokeMode=new_url_config.invoke_mode,
2779
        )
2780

2781
    def delete_function_url_config(
1✔
2782
        self,
2783
        context: RequestContext,
2784
        function_name: FunctionName,
2785
        qualifier: FunctionUrlQualifier = None,
2786
        **kwargs,
2787
    ) -> None:
2788
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2789
        state = lambda_stores[account_id][region]
1✔
2790

2791
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2792
            function_name, qualifier, context
2793
        )
2794
        self._validate_qualifier(qualifier)
1✔
2795

2796
        resolved_fn = state.functions.get(function_name)
1✔
2797
        if not resolved_fn:
1✔
2798
            raise ResourceNotFoundException(
1✔
2799
                "The resource you requested does not exist.", Type="User"
2800
            )
2801

2802
        qualifier = qualifier or "$LATEST"
1✔
2803
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2804
        if not url_config:
1✔
2805
            raise ResourceNotFoundException(
1✔
2806
                "The resource you requested does not exist.", Type="User"
2807
            )
2808

2809
        del resolved_fn.function_url_configs[qualifier]
1✔
2810

2811
    def list_function_url_configs(
1✔
2812
        self,
2813
        context: RequestContext,
2814
        function_name: FunctionName,
2815
        marker: String = None,
2816
        max_items: MaxItems = None,
2817
        **kwargs,
2818
    ) -> ListFunctionUrlConfigsResponse:
2819
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2820
        state = lambda_stores[account_id][region]
1✔
2821

2822
        fn_name = api_utils.get_function_name(function_name, context)
1✔
2823
        resolved_fn = state.functions.get(fn_name)
1✔
2824
        if not resolved_fn:
1✔
2825
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2826

2827
        url_configs = [
1✔
2828
            api_utils.map_function_url_config(fn_conf)
2829
            for fn_conf in resolved_fn.function_url_configs.values()
2830
        ]
2831
        url_configs = PaginatedList(url_configs)
1✔
2832
        page, token = url_configs.get_page(
1✔
2833
            lambda url_config: url_config["FunctionArn"],
2834
            marker,
2835
            max_items,
2836
        )
2837
        url_configs = page
1✔
2838
        return ListFunctionUrlConfigsResponse(FunctionUrlConfigs=url_configs, NextMarker=token)
1✔
2839

2840
    # =======================================
2841
    # ============  Permissions  ============
2842
    # =======================================
2843

2844
    @handler("AddPermission", expand=False)
1✔
2845
    def add_permission(
1✔
2846
        self,
2847
        context: RequestContext,
2848
        request: AddPermissionRequest,
2849
    ) -> AddPermissionResponse:
2850
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2851
            request.get("FunctionName"), request.get("Qualifier"), context
2852
        )
2853

2854
        # validate qualifier
2855
        if qualifier is not None:
1✔
2856
            self._validate_qualifier_expression(qualifier)
1✔
2857
            if qualifier == "$LATEST":
1✔
2858
                raise InvalidParameterValueException(
1✔
2859
                    "We currently do not support adding policies for $LATEST.", Type="User"
2860
                )
2861
        account_id, region = api_utils.get_account_and_region(request.get("FunctionName"), context)
1✔
2862

2863
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2864
        resolved_qualifier, fn_arn = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2865

2866
        revision_id = request.get("RevisionId")
1✔
2867
        if revision_id:
1✔
2868
            fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2869
            if revision_id != fn_revision_id:
1✔
2870
                raise PreconditionFailedException(
1✔
2871
                    "The Revision Id provided does not match the latest Revision Id. "
2872
                    "Call the GetPolicy API to retrieve the latest Revision Id",
2873
                    Type="User",
2874
                )
2875

2876
        request_sid = request["StatementId"]
1✔
2877
        if not bool(STATEMENT_ID_REGEX.match(request_sid)):
1✔
2878
            raise ValidationException(
1✔
2879
                f"1 validation error detected: Value '{request_sid}' at 'statementId' failed to satisfy constraint: Member must satisfy regular expression pattern: ([a-zA-Z0-9-_]+)"
2880
            )
2881
        # check for an already existing policy and any conflicts in existing statements
2882
        existing_policy = resolved_fn.permissions.get(resolved_qualifier)
1✔
2883
        if existing_policy:
1✔
2884
            if request_sid in [s["Sid"] for s in existing_policy.policy.Statement]:
1✔
2885
                # uniqueness scope: statement id needs to be unique per qualified function ($LATEST, version, or alias)
2886
                # Counterexample: the same sid can exist within $LATEST, version, and alias
2887
                raise ResourceConflictException(
1✔
2888
                    f"The statement id ({request_sid}) provided already exists. Please provide a new statement id, or remove the existing statement.",
2889
                    Type="User",
2890
                )
2891

2892
        permission_statement = api_utils.build_statement(
1✔
2893
            partition=context.partition,
2894
            resource_arn=fn_arn,
2895
            statement_id=request["StatementId"],
2896
            action=request["Action"],
2897
            principal=request["Principal"],
2898
            source_arn=request.get("SourceArn"),
2899
            source_account=request.get("SourceAccount"),
2900
            principal_org_id=request.get("PrincipalOrgID"),
2901
            event_source_token=request.get("EventSourceToken"),
2902
            auth_type=request.get("FunctionUrlAuthType"),
2903
        )
2904
        new_policy = existing_policy
1✔
2905
        if not existing_policy:
1✔
2906
            new_policy = FunctionResourcePolicy(
1✔
2907
                policy=ResourcePolicy(Version="2012-10-17", Id="default", Statement=[])
2908
            )
2909
        new_policy.policy.Statement.append(permission_statement)
1✔
2910
        if not existing_policy:
1✔
2911
            resolved_fn.permissions[resolved_qualifier] = new_policy
1✔
2912

2913
        # Update revision id of alias or version
2914
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2915
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2916
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2917
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2918
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
1✔
2919
        # Assumes that a non-alias is a version
2920
        else:
2921
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2922
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2923
                resolved_version, config=dataclasses.replace(resolved_version.config)
2924
            )
2925
        return AddPermissionResponse(Statement=json.dumps(permission_statement))
1✔
2926

2927
    def remove_permission(
1✔
2928
        self,
2929
        context: RequestContext,
2930
        function_name: NamespacedFunctionName,
2931
        statement_id: NamespacedStatementId,
2932
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
2933
        revision_id: String | None = None,
2934
        **kwargs,
2935
    ) -> None:
2936
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2937
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2938
            function_name, qualifier, context
2939
        )
2940
        if qualifier is not None:
1✔
2941
            self._validate_qualifier_expression(qualifier)
1✔
2942

2943
        state = lambda_stores[account_id][region]
1✔
2944
        resolved_fn = state.functions.get(function_name)
1✔
2945
        if resolved_fn is None:
1✔
2946
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2947
            raise ResourceNotFoundException(f"No policy found for: {fn_arn}", Type="User")
1✔
2948

2949
        resolved_qualifier, _ = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2950
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2951
        if not function_permission:
1✔
2952
            raise ResourceNotFoundException(
1✔
2953
                "No policy is associated with the given resource.", Type="User"
2954
            )
2955

2956
        # try to find statement in policy and delete it
2957
        statement = None
1✔
2958
        for s in function_permission.policy.Statement:
1✔
2959
            if s["Sid"] == statement_id:
1✔
2960
                statement = s
1✔
2961
                break
1✔
2962

2963
        if not statement:
1✔
2964
            raise ResourceNotFoundException(
1✔
2965
                f"Statement {statement_id} is not found in resource policy.", Type="User"
2966
            )
2967
        fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2968
        if revision_id and revision_id != fn_revision_id:
1✔
2969
            raise PreconditionFailedException(
×
2970
                "The Revision Id provided does not match the latest Revision Id. "
2971
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2972
                Type="User",
2973
            )
2974
        function_permission.policy.Statement.remove(statement)
1✔
2975

2976
        # Update revision id for alias or version
2977
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2978
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2979
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2980
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
×
2981
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
×
2982
        # Assumes that a non-alias is a version
2983
        else:
2984
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2985
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2986
                resolved_version, config=dataclasses.replace(resolved_version.config)
2987
            )
2988

2989
        # remove the policy as a whole when there's no statement left in it
2990
        if len(function_permission.policy.Statement) == 0:
1✔
2991
            del resolved_fn.permissions[resolved_qualifier]
1✔
2992

2993
    def get_policy(
1✔
2994
        self,
2995
        context: RequestContext,
2996
        function_name: NamespacedFunctionName,
2997
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
2998
        **kwargs,
2999
    ) -> GetPolicyResponse:
3000
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3001
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3002
            function_name, qualifier, context
3003
        )
3004

3005
        if qualifier is not None:
1✔
3006
            self._validate_qualifier_expression(qualifier)
1✔
3007

3008
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
3009

3010
        resolved_qualifier = qualifier or "$LATEST"
1✔
3011
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
3012
        if not function_permission:
1✔
3013
            raise ResourceNotFoundException(
1✔
3014
                "The resource you requested does not exist.", Type="User"
3015
            )
3016

3017
        fn_revision_id = None
1✔
3018
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
3019
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
3020
            fn_revision_id = resolved_alias.revision_id
1✔
3021
        # Assumes that a non-alias is a version
3022
        else:
3023
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
3024
            fn_revision_id = resolved_version.config.revision_id
1✔
3025

3026
        return GetPolicyResponse(
1✔
3027
            Policy=json.dumps(dataclasses.asdict(function_permission.policy)),
3028
            RevisionId=fn_revision_id,
3029
        )
3030

3031
    # =======================================
3032
    # ========  Code signing config  ========
3033
    # =======================================
3034

3035
    def create_code_signing_config(
1✔
3036
        self,
3037
        context: RequestContext,
3038
        allowed_publishers: AllowedPublishers,
3039
        description: Description | None = None,
3040
        code_signing_policies: CodeSigningPolicies | None = None,
3041
        tags: Tags | None = None,
3042
        **kwargs,
3043
    ) -> CreateCodeSigningConfigResponse:
3044
        account = context.account_id
1✔
3045
        region = context.region
1✔
3046

3047
        state = lambda_stores[account][region]
1✔
3048
        # TODO: can there be duplicates?
3049
        csc_id = f"csc-{get_random_hex(17)}"  # e.g. 'csc-077c33b4c19e26036'
1✔
3050
        csc_arn = f"arn:{context.partition}:lambda:{region}:{account}:code-signing-config:{csc_id}"
1✔
3051
        csc = CodeSigningConfig(
1✔
3052
            csc_id=csc_id,
3053
            arn=csc_arn,
3054
            allowed_publishers=allowed_publishers,
3055
            policies=code_signing_policies,
3056
            last_modified=api_utils.generate_lambda_date(),
3057
            description=description,
3058
        )
3059
        state.code_signing_configs[csc_arn] = csc
1✔
3060
        return CreateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
3061

3062
    def put_function_code_signing_config(
1✔
3063
        self,
3064
        context: RequestContext,
3065
        code_signing_config_arn: CodeSigningConfigArn,
3066
        function_name: NamespacedFunctionName,
3067
        **kwargs,
3068
    ) -> PutFunctionCodeSigningConfigResponse:
3069
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3070
        state = lambda_stores[account_id][region]
1✔
3071
        function_name = api_utils.get_function_name(function_name, context)
1✔
3072

3073
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3074
        if not csc:
1✔
3075
            raise CodeSigningConfigNotFoundException(
1✔
3076
                f"The code signing configuration cannot be found. Check that the provided configuration is not deleted: {code_signing_config_arn}.",
3077
                Type="User",
3078
            )
3079

3080
        fn = state.functions.get(function_name)
1✔
3081
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
3082
        if not fn:
1✔
3083
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
3084

3085
        fn.code_signing_config_arn = code_signing_config_arn
1✔
3086
        return PutFunctionCodeSigningConfigResponse(
1✔
3087
            CodeSigningConfigArn=code_signing_config_arn, FunctionName=function_name
3088
        )
3089

3090
    def update_code_signing_config(
1✔
3091
        self,
3092
        context: RequestContext,
3093
        code_signing_config_arn: CodeSigningConfigArn,
3094
        description: Description = None,
3095
        allowed_publishers: AllowedPublishers = None,
3096
        code_signing_policies: CodeSigningPolicies = None,
3097
        **kwargs,
3098
    ) -> UpdateCodeSigningConfigResponse:
3099
        state = lambda_stores[context.account_id][context.region]
1✔
3100
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3101
        if not csc:
1✔
3102
            raise ResourceNotFoundException(
1✔
3103
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3104
            )
3105

3106
        changes = {
1✔
3107
            **(
3108
                {"allowed_publishers": allowed_publishers} if allowed_publishers is not None else {}
3109
            ),
3110
            **({"policies": code_signing_policies} if code_signing_policies is not None else {}),
3111
            **({"description": description} if description is not None else {}),
3112
        }
3113
        new_csc = dataclasses.replace(
1✔
3114
            csc, last_modified=api_utils.generate_lambda_date(), **changes
3115
        )
3116
        state.code_signing_configs[code_signing_config_arn] = new_csc
1✔
3117

3118
        return UpdateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(new_csc))
1✔
3119

3120
    def get_code_signing_config(
1✔
3121
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
3122
    ) -> GetCodeSigningConfigResponse:
3123
        state = lambda_stores[context.account_id][context.region]
1✔
3124
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3125
        if not csc:
1✔
3126
            raise ResourceNotFoundException(
1✔
3127
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3128
            )
3129

3130
        return GetCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
3131

3132
    def get_function_code_signing_config(
1✔
3133
        self, context: RequestContext, function_name: NamespacedFunctionName, **kwargs
3134
    ) -> GetFunctionCodeSigningConfigResponse:
3135
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3136
        state = lambda_stores[account_id][region]
1✔
3137
        function_name = api_utils.get_function_name(function_name, context)
1✔
3138
        fn = state.functions.get(function_name)
1✔
3139
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
3140
        if not fn:
1✔
3141
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
3142

3143
        if fn.code_signing_config_arn:
1✔
3144
            return GetFunctionCodeSigningConfigResponse(
1✔
3145
                CodeSigningConfigArn=fn.code_signing_config_arn, FunctionName=function_name
3146
            )
3147

3148
        return GetFunctionCodeSigningConfigResponse()
1✔
3149

3150
    def delete_function_code_signing_config(
1✔
3151
        self, context: RequestContext, function_name: NamespacedFunctionName, **kwargs
3152
    ) -> None:
3153
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3154
        state = lambda_stores[account_id][region]
1✔
3155
        function_name = api_utils.get_function_name(function_name, context)
1✔
3156
        fn = state.functions.get(function_name)
1✔
3157
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
3158
        if not fn:
1✔
3159
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
3160

3161
        fn.code_signing_config_arn = None
1✔
3162

3163
    def delete_code_signing_config(
1✔
3164
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
3165
    ) -> DeleteCodeSigningConfigResponse:
3166
        state = lambda_stores[context.account_id][context.region]
1✔
3167

3168
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3169
        if not csc:
1✔
3170
            raise ResourceNotFoundException(
1✔
3171
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3172
            )
3173

3174
        del state.code_signing_configs[code_signing_config_arn]
1✔
3175

3176
        return DeleteCodeSigningConfigResponse()
1✔
3177

3178
    def list_code_signing_configs(
1✔
3179
        self,
3180
        context: RequestContext,
3181
        marker: String = None,
3182
        max_items: MaxListItems = None,
3183
        **kwargs,
3184
    ) -> ListCodeSigningConfigsResponse:
3185
        state = lambda_stores[context.account_id][context.region]
1✔
3186

3187
        cscs = [api_utils.map_csc(csc) for csc in state.code_signing_configs.values()]
1✔
3188
        cscs = PaginatedList(cscs)
1✔
3189
        page, token = cscs.get_page(
1✔
3190
            lambda csc: csc["CodeSigningConfigId"],
3191
            marker,
3192
            max_items,
3193
        )
3194
        return ListCodeSigningConfigsResponse(CodeSigningConfigs=page, NextMarker=token)
1✔
3195

3196
    def list_functions_by_code_signing_config(
1✔
3197
        self,
3198
        context: RequestContext,
3199
        code_signing_config_arn: CodeSigningConfigArn,
3200
        marker: String = None,
3201
        max_items: MaxListItems = None,
3202
        **kwargs,
3203
    ) -> ListFunctionsByCodeSigningConfigResponse:
3204
        account = context.account_id
1✔
3205
        region = context.region
1✔
3206

3207
        state = lambda_stores[account][region]
1✔
3208

3209
        if code_signing_config_arn not in state.code_signing_configs:
1✔
3210
            raise ResourceNotFoundException(
1✔
3211
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3212
            )
3213

3214
        fn_arns = [
1✔
3215
            api_utils.unqualified_lambda_arn(fn.function_name, account, region)
3216
            for fn in state.functions.values()
3217
            if fn.code_signing_config_arn == code_signing_config_arn
3218
        ]
3219

3220
        cscs = PaginatedList(fn_arns)
1✔
3221
        page, token = cscs.get_page(
1✔
3222
            lambda x: x,
3223
            marker,
3224
            max_items,
3225
        )
3226
        return ListFunctionsByCodeSigningConfigResponse(FunctionArns=page, NextMarker=token)
1✔
3227

3228
    # =======================================
3229
    # =========  Account Settings   =========
3230
    # =======================================
3231

3232
    # CAVE: these settings & usages are *per* region!
3233
    # Lambda quotas: https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
3234
    def get_account_settings(self, context: RequestContext, **kwargs) -> GetAccountSettingsResponse:
1✔
3235
        state = lambda_stores[context.account_id][context.region]
1✔
3236

3237
        fn_count = 0
1✔
3238
        code_size_sum = 0
1✔
3239
        reserved_concurrency_sum = 0
1✔
3240
        for fn in state.functions.values():
1✔
3241
            fn_count += 1
1✔
3242
            for fn_version in fn.versions.values():
1✔
3243
                # Image-based Lambdas do not have a code attribute and count against the ECR quotas instead
3244
                if fn_version.config.package_type == PackageType.Zip:
1✔
3245
                    code_size_sum += fn_version.config.code.code_size
1✔
3246
            if fn.reserved_concurrent_executions is not None:
1✔
3247
                reserved_concurrency_sum += fn.reserved_concurrent_executions
1✔
3248
            for c in fn.provisioned_concurrency_configs.values():
1✔
3249
                reserved_concurrency_sum += c.provisioned_concurrent_executions
1✔
3250
        for layer in state.layers.values():
1✔
3251
            for layer_version in layer.layer_versions.values():
1✔
3252
                code_size_sum += layer_version.code.code_size
1✔
3253
        return GetAccountSettingsResponse(
1✔
3254
            AccountLimit=AccountLimit(
3255
                TotalCodeSize=config.LAMBDA_LIMITS_TOTAL_CODE_SIZE,
3256
                CodeSizeZipped=config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED,
3257
                CodeSizeUnzipped=config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED,
3258
                ConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS,
3259
                UnreservedConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS
3260
                - reserved_concurrency_sum,
3261
            ),
3262
            AccountUsage=AccountUsage(
3263
                TotalCodeSize=code_size_sum,
3264
                FunctionCount=fn_count,
3265
            ),
3266
        )
3267

3268
    # =======================================
3269
    # ==  Provisioned Concurrency Config   ==
3270
    # =======================================
3271

3272
    def _get_provisioned_config(
1✔
3273
        self, context: RequestContext, function_name: str, qualifier: str
3274
    ) -> ProvisionedConcurrencyConfiguration | None:
3275
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3276
        state = lambda_stores[account_id][region]
1✔
3277
        function_name = api_utils.get_function_name(function_name, context)
1✔
3278
        fn = state.functions.get(function_name)
1✔
3279
        if api_utils.qualifier_is_alias(qualifier):
1✔
3280
            fn_alias = None
1✔
3281
            if fn:
1✔
3282
                fn_alias = fn.aliases.get(qualifier)
1✔
3283
            if fn_alias is None:
1✔
3284
                raise ResourceNotFoundException(
1✔
3285
                    f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3286
                    Type="User",
3287
                )
3288
        elif api_utils.qualifier_is_version(qualifier):
1✔
3289
            fn_version = None
1✔
3290
            if fn:
1✔
3291
                fn_version = fn.versions.get(qualifier)
1✔
3292
            if fn_version is None:
1✔
3293
                raise ResourceNotFoundException(
1✔
3294
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3295
                    Type="User",
3296
                )
3297

3298
        return fn.provisioned_concurrency_configs.get(qualifier)
1✔
3299

3300
    def put_provisioned_concurrency_config(
1✔
3301
        self,
3302
        context: RequestContext,
3303
        function_name: FunctionName,
3304
        qualifier: Qualifier,
3305
        provisioned_concurrent_executions: PositiveInteger,
3306
        **kwargs,
3307
    ) -> PutProvisionedConcurrencyConfigResponse:
3308
        if provisioned_concurrent_executions <= 0:
1✔
3309
            raise ValidationException(
1✔
3310
                f"1 validation error detected: Value '{provisioned_concurrent_executions}' at 'provisionedConcurrentExecutions' failed to satisfy constraint: Member must have value greater than or equal to 1"
3311
            )
3312

3313
        if qualifier == "$LATEST":
1✔
3314
            raise InvalidParameterValueException(
1✔
3315
                "Provisioned Concurrency Configs cannot be applied to unpublished function versions.",
3316
                Type="User",
3317
            )
3318
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3319
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3320
            function_name, qualifier, context
3321
        )
3322
        state = lambda_stores[account_id][region]
1✔
3323
        fn = state.functions.get(function_name)
1✔
3324

3325
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3326

3327
        if provisioned_config:  # TODO: merge?
1✔
3328
            # TODO: add a test for partial updates (if possible)
3329
            LOG.warning(
1✔
3330
                "Partial update of provisioned concurrency config is currently not supported."
3331
            )
3332

3333
        other_provisioned_sum = sum(
1✔
3334
            [
3335
                provisioned_configs.provisioned_concurrent_executions
3336
                for provisioned_qualifier, provisioned_configs in fn.provisioned_concurrency_configs.items()
3337
                if provisioned_qualifier != qualifier
3338
            ]
3339
        )
3340

3341
        if (
1✔
3342
            fn.reserved_concurrent_executions is not None
3343
            and fn.reserved_concurrent_executions
3344
            < other_provisioned_sum + provisioned_concurrent_executions
3345
        ):
3346
            raise InvalidParameterValueException(
1✔
3347
                "Requested Provisioned Concurrency should not be greater than the reservedConcurrentExecution for function",
3348
                Type="User",
3349
            )
3350

3351
        if provisioned_concurrent_executions > config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS:
1✔
3352
            raise InvalidParameterValueException(
1✔
3353
                f"Specified ConcurrentExecutions for function is greater than account's unreserved concurrency"
3354
                f" [{config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS}]."
3355
            )
3356

3357
        settings = self.get_account_settings(context)
1✔
3358
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
3359
            "UnreservedConcurrentExecutions"
3360
        ]
3361
        if (
1✔
3362
            unreserved_concurrent_executions - provisioned_concurrent_executions
3363
            < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY
3364
        ):
3365
            raise InvalidParameterValueException(
1✔
3366
                f"Specified ConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below"
3367
                f" its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
3368
            )
3369

3370
        provisioned_config = ProvisionedConcurrencyConfiguration(
1✔
3371
            provisioned_concurrent_executions, api_utils.generate_lambda_date()
3372
        )
3373
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3374

3375
        if api_utils.qualifier_is_alias(qualifier):
1✔
3376
            alias = fn.aliases.get(qualifier)
1✔
3377
            resolved_version = fn.versions.get(alias.function_version)
1✔
3378

3379
            if (
1✔
3380
                resolved_version
3381
                and fn.provisioned_concurrency_configs.get(alias.function_version) is not None
3382
            ):
3383
                raise ResourceConflictException(
1✔
3384
                    "Alias can't be used for Provisioned Concurrency configuration on an already Provisioned version",
3385
                    Type="User",
3386
                )
3387
            fn_arn = resolved_version.id.qualified_arn()
1✔
3388
        elif api_utils.qualifier_is_version(qualifier):
1✔
3389
            fn_version = fn.versions.get(qualifier)
1✔
3390

3391
            # TODO: might be useful other places, utilize
3392
            pointing_aliases = []
1✔
3393
            for alias in fn.aliases.values():
1✔
3394
                if (
1✔
3395
                    alias.function_version == qualifier
3396
                    and fn.provisioned_concurrency_configs.get(alias.name) is not None
3397
                ):
3398
                    pointing_aliases.append(alias.name)
1✔
3399
            if pointing_aliases:
1✔
3400
                raise ResourceConflictException(
1✔
3401
                    "Version is pointed by a Provisioned Concurrency alias", Type="User"
3402
                )
3403

3404
            fn_arn = fn_version.id.qualified_arn()
1✔
3405

3406
        manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3407

3408
        fn.provisioned_concurrency_configs[qualifier] = provisioned_config
1✔
3409

3410
        manager.update_provisioned_concurrency_config(
1✔
3411
            provisioned_config.provisioned_concurrent_executions
3412
        )
3413

3414
        return PutProvisionedConcurrencyConfigResponse(
1✔
3415
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3416
            AvailableProvisionedConcurrentExecutions=0,
3417
            AllocatedProvisionedConcurrentExecutions=0,
3418
            Status=ProvisionedConcurrencyStatusEnum.IN_PROGRESS,
3419
            # StatusReason=manager.provisioned_state.status_reason,
3420
            LastModified=provisioned_config.last_modified,  # TODO: does change with configuration or also with state changes?
3421
        )
3422

3423
    def get_provisioned_concurrency_config(
1✔
3424
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3425
    ) -> GetProvisionedConcurrencyConfigResponse:
3426
        if qualifier == "$LATEST":
1✔
3427
            raise InvalidParameterValueException(
1✔
3428
                "The function resource provided must be an alias or a published version.",
3429
                Type="User",
3430
            )
3431
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3432
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3433
            function_name, qualifier, context
3434
        )
3435

3436
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3437
        if not provisioned_config:
1✔
3438
            raise ProvisionedConcurrencyConfigNotFoundException(
1✔
3439
                "No Provisioned Concurrency Config found for this function", Type="User"
3440
            )
3441

3442
        # TODO: make this compatible with alias pointer migration on update
3443
        if api_utils.qualifier_is_alias(qualifier):
1✔
3444
            state = lambda_stores[account_id][region]
1✔
3445
            fn = state.functions.get(function_name)
1✔
3446
            alias = fn.aliases.get(qualifier)
1✔
3447
            fn_arn = api_utils.qualified_lambda_arn(
1✔
3448
                function_name, alias.function_version, account_id, region
3449
            )
3450
        else:
3451
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3452

3453
        ver_manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3454

3455
        return GetProvisionedConcurrencyConfigResponse(
1✔
3456
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3457
            LastModified=provisioned_config.last_modified,
3458
            AvailableProvisionedConcurrentExecutions=ver_manager.provisioned_state.available,
3459
            AllocatedProvisionedConcurrentExecutions=ver_manager.provisioned_state.allocated,
3460
            Status=ver_manager.provisioned_state.status,
3461
            StatusReason=ver_manager.provisioned_state.status_reason,
3462
        )
3463

3464
    def list_provisioned_concurrency_configs(
1✔
3465
        self,
3466
        context: RequestContext,
3467
        function_name: FunctionName,
3468
        marker: String = None,
3469
        max_items: MaxProvisionedConcurrencyConfigListItems = None,
3470
        **kwargs,
3471
    ) -> ListProvisionedConcurrencyConfigsResponse:
3472
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3473
        state = lambda_stores[account_id][region]
1✔
3474

3475
        function_name = api_utils.get_function_name(function_name, context)
1✔
3476
        fn = state.functions.get(function_name)
1✔
3477
        if fn is None:
1✔
3478
            raise ResourceNotFoundException(
1✔
3479
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
3480
                Type="User",
3481
            )
3482

3483
        configs = []
1✔
3484
        for qualifier, pc_config in fn.provisioned_concurrency_configs.items():
1✔
3485
            if api_utils.qualifier_is_alias(qualifier):
×
3486
                alias = fn.aliases.get(qualifier)
×
3487
                fn_arn = api_utils.qualified_lambda_arn(
×
3488
                    function_name, alias.function_version, account_id, region
3489
                )
3490
            else:
3491
                fn_arn = api_utils.qualified_lambda_arn(
×
3492
                    function_name, qualifier, account_id, region
3493
                )
3494

3495
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
3496

3497
            configs.append(
×
3498
                ProvisionedConcurrencyConfigListItem(
3499
                    FunctionArn=api_utils.qualified_lambda_arn(
3500
                        function_name, qualifier, account_id, region
3501
                    ),
3502
                    RequestedProvisionedConcurrentExecutions=pc_config.provisioned_concurrent_executions,
3503
                    AvailableProvisionedConcurrentExecutions=manager.provisioned_state.available,
3504
                    AllocatedProvisionedConcurrentExecutions=manager.provisioned_state.allocated,
3505
                    Status=manager.provisioned_state.status,
3506
                    StatusReason=manager.provisioned_state.status_reason,
3507
                    LastModified=pc_config.last_modified,
3508
                )
3509
            )
3510

3511
        provisioned_concurrency_configs = configs
1✔
3512
        provisioned_concurrency_configs = PaginatedList(provisioned_concurrency_configs)
1✔
3513
        page, token = provisioned_concurrency_configs.get_page(
1✔
3514
            lambda x: x,
3515
            marker,
3516
            max_items,
3517
        )
3518
        return ListProvisionedConcurrencyConfigsResponse(
1✔
3519
            ProvisionedConcurrencyConfigs=page, NextMarker=token
3520
        )
3521

3522
    def delete_provisioned_concurrency_config(
1✔
3523
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3524
    ) -> None:
3525
        if qualifier == "$LATEST":
1✔
3526
            raise InvalidParameterValueException(
1✔
3527
                "The function resource provided must be an alias or a published version.",
3528
                Type="User",
3529
            )
3530
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3531
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3532
            function_name, qualifier, context
3533
        )
3534
        state = lambda_stores[account_id][region]
1✔
3535
        fn = state.functions.get(function_name)
1✔
3536

3537
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3538
        # delete is idempotent and doesn't actually care about the provisioned concurrency config not existing
3539
        if provisioned_config:
1✔
3540
            fn.provisioned_concurrency_configs.pop(qualifier)
1✔
3541
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3542
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3543
            manager.update_provisioned_concurrency_config(0)
1✔
3544

3545
    # =======================================
3546
    # =======  Event Invoke Config   ========
3547
    # =======================================
3548

3549
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)"
3550
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]2((-gov)|(-iso(b?)))?-[a-z]+-\\d1)?:(\\d12)?:(.*)" ... (expected → actual)
3551

3552
    def _validate_destination_config(
1✔
3553
        self, store: LambdaStore, function_name: str, destination_config: DestinationConfig
3554
    ):
3555
        def _validate_destination_arn(destination_arn) -> bool:
1✔
3556
            if not api_utils.DESTINATION_ARN_PATTERN.match(destination_arn):
1✔
3557
                # technically we shouldn't handle this in the provider
3558
                raise ValidationException(
1✔
3559
                    "1 validation error detected: Value '"
3560
                    + destination_arn
3561
                    + "' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: "
3562
                    + "$|kafka://([^.]([a-zA-Z0-9\\-_.]{0,248}))|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:((eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)"
3563
                )
3564

3565
            match destination_arn.split(":")[2]:
1✔
3566
                case "lambda":
1✔
3567
                    fn_parts = api_utils.FULL_FN_ARN_PATTERN.search(destination_arn).groupdict()
1✔
3568
                    if fn_parts:
1✔
3569
                        # check if it exists
3570
                        fn = store.functions.get(fn_parts["function_name"])
1✔
3571
                        if not fn:
1✔
3572
                            raise InvalidParameterValueException(
1✔
3573
                                f"The destination ARN {destination_arn} is invalid.", Type="User"
3574
                            )
3575
                        if fn_parts["function_name"] == function_name:
1✔
3576
                            raise InvalidParameterValueException(
1✔
3577
                                "You can't specify the function as a destination for itself.",
3578
                                Type="User",
3579
                            )
3580
                case "sns" | "sqs" | "events":
1✔
3581
                    pass
1✔
3582
                case _:
1✔
3583
                    return False
1✔
3584
            return True
1✔
3585

3586
        validation_err = False
1✔
3587

3588
        failure_destination = destination_config.get("OnFailure", {}).get("Destination")
1✔
3589
        if failure_destination:
1✔
3590
            validation_err = validation_err or not _validate_destination_arn(failure_destination)
1✔
3591

3592
        success_destination = destination_config.get("OnSuccess", {}).get("Destination")
1✔
3593
        if success_destination:
1✔
3594
            validation_err = validation_err or not _validate_destination_arn(success_destination)
1✔
3595

3596
        if validation_err:
1✔
3597
            on_success_part = (
1✔
3598
                f"OnSuccess(destination={success_destination})" if success_destination else "null"
3599
            )
3600
            on_failure_part = (
1✔
3601
                f"OnFailure(destination={failure_destination})" if failure_destination else "null"
3602
            )
3603
            raise InvalidParameterValueException(
1✔
3604
                f"The provided destination config DestinationConfig(onSuccess={on_success_part}, onFailure={on_failure_part}) is invalid.",
3605
                Type="User",
3606
            )
3607

3608
    def put_function_event_invoke_config(
1✔
3609
        self,
3610
        context: RequestContext,
3611
        function_name: FunctionName,
3612
        qualifier: NumericLatestPublishedOrAliasQualifier = None,
3613
        maximum_retry_attempts: MaximumRetryAttempts = None,
3614
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3615
        destination_config: DestinationConfig = None,
3616
        **kwargs,
3617
    ) -> FunctionEventInvokeConfig:
3618
        """
3619
        Destination ARNs can be:
3620
        * SQS arn
3621
        * SNS arn
3622
        * Lambda arn
3623
        * EventBridge arn
3624

3625
        Differences between put_ and update_:
3626
            * put overwrites any existing config
3627
            * update allows changes only single values while keeping the rest of existing ones
3628
            * update fails on non-existing configs
3629

3630
        Differences between destination and DLQ
3631
            * "However, a dead-letter queue is part of a function's version-specific configuration, so it is locked in when you publish a version."
3632
            *  "On-failure destinations also support additional targets and include details about the function's response in the invocation record."
3633

3634
        """
3635
        if (
1✔
3636
            maximum_event_age_in_seconds is None
3637
            and maximum_retry_attempts is None
3638
            and destination_config is None
3639
        ):
3640
            raise InvalidParameterValueException(
1✔
3641
                "You must specify at least one of error handling or destination setting.",
3642
                Type="User",
3643
            )
3644
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3645
        state = lambda_stores[account_id][region]
1✔
3646
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3647
            function_name, qualifier, context
3648
        )
3649
        fn = state.functions.get(function_name)
1✔
3650
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3651
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3652

3653
        qualifier = qualifier or "$LATEST"
1✔
3654

3655
        # validate and normalize destination config
3656
        if destination_config:
1✔
3657
            self._validate_destination_config(state, function_name, destination_config)
1✔
3658

3659
        destination_config = DestinationConfig(
1✔
3660
            OnSuccess=OnSuccess(
3661
                Destination=(destination_config or {}).get("OnSuccess", {}).get("Destination")
3662
            ),
3663
            OnFailure=OnFailure(
3664
                Destination=(destination_config or {}).get("OnFailure", {}).get("Destination")
3665
            ),
3666
        )
3667

3668
        config = EventInvokeConfig(
1✔
3669
            function_name=function_name,
3670
            qualifier=qualifier,
3671
            maximum_event_age_in_seconds=maximum_event_age_in_seconds,
3672
            maximum_retry_attempts=maximum_retry_attempts,
3673
            last_modified=api_utils.generate_lambda_date(),
3674
            destination_config=destination_config,
3675
        )
3676
        fn.event_invoke_configs[qualifier] = config
1✔
3677

3678
        return FunctionEventInvokeConfig(
1✔
3679
            LastModified=datetime.datetime.strptime(
3680
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3681
            ),
3682
            FunctionArn=api_utils.qualified_lambda_arn(
3683
                function_name, qualifier or "$LATEST", account_id, region
3684
            ),
3685
            DestinationConfig=destination_config,
3686
            MaximumEventAgeInSeconds=maximum_event_age_in_seconds,
3687
            MaximumRetryAttempts=maximum_retry_attempts,
3688
        )
3689

3690
    def get_function_event_invoke_config(
1✔
3691
        self,
3692
        context: RequestContext,
3693
        function_name: FunctionName,
3694
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
3695
        **kwargs,
3696
    ) -> FunctionEventInvokeConfig:
3697
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3698
        state = lambda_stores[account_id][region]
1✔
3699
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3700
            function_name, qualifier, context
3701
        )
3702

3703
        qualifier = qualifier or "$LATEST"
1✔
3704
        fn = state.functions.get(function_name)
1✔
3705
        if not fn:
1✔
3706
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3707
            raise ResourceNotFoundException(
1✔
3708
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3709
            )
3710

3711
        config = fn.event_invoke_configs.get(qualifier)
1✔
3712
        if not config:
1✔
3713
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3714
            raise ResourceNotFoundException(
1✔
3715
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3716
            )
3717

3718
        return FunctionEventInvokeConfig(
1✔
3719
            LastModified=datetime.datetime.strptime(
3720
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3721
            ),
3722
            FunctionArn=api_utils.qualified_lambda_arn(
3723
                function_name, qualifier, account_id, region
3724
            ),
3725
            DestinationConfig=config.destination_config,
3726
            MaximumEventAgeInSeconds=config.maximum_event_age_in_seconds,
3727
            MaximumRetryAttempts=config.maximum_retry_attempts,
3728
        )
3729

3730
    def list_function_event_invoke_configs(
1✔
3731
        self,
3732
        context: RequestContext,
3733
        function_name: FunctionName,
3734
        marker: String = None,
3735
        max_items: MaxFunctionEventInvokeConfigListItems = None,
3736
        **kwargs,
3737
    ) -> ListFunctionEventInvokeConfigsResponse:
3738
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3739
        state = lambda_stores[account_id][region]
1✔
3740
        fn = state.functions.get(function_name)
1✔
3741
        if not fn:
1✔
3742
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3743

3744
        event_invoke_configs = [
1✔
3745
            FunctionEventInvokeConfig(
3746
                LastModified=c.last_modified,
3747
                FunctionArn=api_utils.qualified_lambda_arn(
3748
                    function_name, c.qualifier, account_id, region
3749
                ),
3750
                MaximumEventAgeInSeconds=c.maximum_event_age_in_seconds,
3751
                MaximumRetryAttempts=c.maximum_retry_attempts,
3752
                DestinationConfig=c.destination_config,
3753
            )
3754
            for c in fn.event_invoke_configs.values()
3755
        ]
3756

3757
        event_invoke_configs = PaginatedList(event_invoke_configs)
1✔
3758
        page, token = event_invoke_configs.get_page(
1✔
3759
            lambda x: x["FunctionArn"],
3760
            marker,
3761
            max_items,
3762
        )
3763
        return ListFunctionEventInvokeConfigsResponse(
1✔
3764
            FunctionEventInvokeConfigs=page, NextMarker=token
3765
        )
3766

3767
    def delete_function_event_invoke_config(
1✔
3768
        self,
3769
        context: RequestContext,
3770
        function_name: FunctionName,
3771
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
3772
        **kwargs,
3773
    ) -> None:
3774
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3775
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3776
            function_name, qualifier, context
3777
        )
3778
        state = lambda_stores[account_id][region]
1✔
3779
        fn = state.functions.get(function_name)
1✔
3780
        resolved_qualifier = qualifier or "$LATEST"
1✔
3781
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3782
        if not fn:
1✔
3783
            raise ResourceNotFoundException(
1✔
3784
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3785
            )
3786

3787
        config = fn.event_invoke_configs.get(resolved_qualifier)
1✔
3788
        if not config:
1✔
3789
            raise ResourceNotFoundException(
1✔
3790
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3791
            )
3792

3793
        del fn.event_invoke_configs[resolved_qualifier]
1✔
3794

3795
    def update_function_event_invoke_config(
1✔
3796
        self,
3797
        context: RequestContext,
3798
        function_name: FunctionName,
3799
        qualifier: NumericLatestPublishedOrAliasQualifier = None,
3800
        maximum_retry_attempts: MaximumRetryAttempts = None,
3801
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3802
        destination_config: DestinationConfig = None,
3803
        **kwargs,
3804
    ) -> FunctionEventInvokeConfig:
3805
        # like put but only update single fields via replace
3806
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3807
        state = lambda_stores[account_id][region]
1✔
3808
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3809
            function_name, qualifier, context
3810
        )
3811

3812
        if (
1✔
3813
            maximum_event_age_in_seconds is None
3814
            and maximum_retry_attempts is None
3815
            and destination_config is None
3816
        ):
3817
            raise InvalidParameterValueException(
×
3818
                "You must specify at least one of error handling or destination setting.",
3819
                Type="User",
3820
            )
3821

3822
        fn = state.functions.get(function_name)
1✔
3823
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3824
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3825

3826
        qualifier = qualifier or "$LATEST"
1✔
3827

3828
        config = fn.event_invoke_configs.get(qualifier)
1✔
3829
        if not config:
1✔
3830
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3831
            raise ResourceNotFoundException(
1✔
3832
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3833
            )
3834

3835
        if destination_config:
1✔
3836
            self._validate_destination_config(state, function_name, destination_config)
×
3837

3838
        optional_kwargs = {
1✔
3839
            k: v
3840
            for k, v in {
3841
                "destination_config": destination_config,
3842
                "maximum_retry_attempts": maximum_retry_attempts,
3843
                "maximum_event_age_in_seconds": maximum_event_age_in_seconds,
3844
            }.items()
3845
            if v is not None
3846
        }
3847

3848
        new_config = dataclasses.replace(
1✔
3849
            config, last_modified=api_utils.generate_lambda_date(), **optional_kwargs
3850
        )
3851
        fn.event_invoke_configs[qualifier] = new_config
1✔
3852

3853
        return FunctionEventInvokeConfig(
1✔
3854
            LastModified=datetime.datetime.strptime(
3855
                new_config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3856
            ),
3857
            FunctionArn=api_utils.qualified_lambda_arn(
3858
                function_name, qualifier or "$LATEST", account_id, region
3859
            ),
3860
            DestinationConfig=new_config.destination_config,
3861
            MaximumEventAgeInSeconds=new_config.maximum_event_age_in_seconds,
3862
            MaximumRetryAttempts=new_config.maximum_retry_attempts,
3863
        )
3864

3865
    # =======================================
3866
    # ======  Layer & Layer Versions  =======
3867
    # =======================================
3868

3869
    @staticmethod
1✔
3870
    def _resolve_layer(
1✔
3871
        layer_name_or_arn: str, context: RequestContext
3872
    ) -> tuple[str, str, str, str | None]:
3873
        """
3874
        Return locator attributes for a given Lambda layer.
3875

3876
        :param layer_name_or_arn: Layer name or ARN
3877
        :param context: Request context
3878
        :return: Tuple of region, account ID, layer name, layer version
3879
        """
3880
        if api_utils.is_layer_arn(layer_name_or_arn):
1✔
3881
            return api_utils.parse_layer_arn(layer_name_or_arn)
1✔
3882

3883
        return context.region, context.account_id, layer_name_or_arn, None
1✔
3884

3885
    def publish_layer_version(
1✔
3886
        self,
3887
        context: RequestContext,
3888
        layer_name: LayerName,
3889
        content: LayerVersionContentInput,
3890
        description: Description | None = None,
3891
        compatible_runtimes: CompatibleRuntimes | None = None,
3892
        license_info: LicenseInfo | None = None,
3893
        compatible_architectures: CompatibleArchitectures | None = None,
3894
        **kwargs,
3895
    ) -> PublishLayerVersionResponse:
3896
        """
3897
        On first use of a LayerName a new layer is created and for each subsequent call with the same LayerName a new version is created.
3898
        Note that there are no $LATEST versions with layers!
3899

3900
        """
3901
        account = context.account_id
1✔
3902
        region = context.region
1✔
3903

3904
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
3905
            compatible_runtimes, compatible_architectures
3906
        )
3907
        if validation_errors:
1✔
3908
            raise ValidationException(
1✔
3909
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {'; '.join(validation_errors)}"
3910
            )
3911

3912
        state = lambda_stores[account][region]
1✔
3913
        with self.create_layer_lock:
1✔
3914
            if layer_name not in state.layers:
1✔
3915
                # we don't have a version so create new layer object
3916
                # lock is required to avoid creating two v1 objects for the same name
3917
                layer = Layer(
1✔
3918
                    arn=api_utils.layer_arn(layer_name=layer_name, account=account, region=region)
3919
                )
3920
                state.layers[layer_name] = layer
1✔
3921

3922
        layer = state.layers[layer_name]
1✔
3923
        with layer.next_version_lock:
1✔
3924
            next_version = LambdaLayerVersionIdentifier(
1✔
3925
                account_id=account, region=region, layer_name=layer_name
3926
            ).generate(next_version=layer.next_version)
3927
            # When creating a layer with user defined layer version, it is possible that we
3928
            # create layer versions out of order.
3929
            # ie. a user could replicate layer v2 then layer v1. It is important to always keep the maximum possible
3930
            # value for next layer to avoid overwriting existing versions
3931
            if layer.next_version <= next_version:
1✔
3932
                # We don't need to update layer.next_version if the created version is lower than the "next in line"
3933
                layer.next_version = max(next_version, layer.next_version) + 1
1✔
3934

3935
        # creating a new layer
3936
        if content.get("ZipFile"):
1✔
3937
            code = store_lambda_archive(
1✔
3938
                archive_file=content["ZipFile"],
3939
                function_name=layer_name,
3940
                region_name=region,
3941
                account_id=account,
3942
            )
3943
        else:
3944
            code = store_s3_bucket_archive(
1✔
3945
                archive_bucket=content["S3Bucket"],
3946
                archive_key=content["S3Key"],
3947
                archive_version=content.get("S3ObjectVersion"),
3948
                function_name=layer_name,
3949
                region_name=region,
3950
                account_id=account,
3951
            )
3952

3953
        new_layer_version = LayerVersion(
1✔
3954
            layer_version_arn=api_utils.layer_version_arn(
3955
                layer_name=layer_name,
3956
                account=account,
3957
                region=region,
3958
                version=str(next_version),
3959
            ),
3960
            layer_arn=layer.arn,
3961
            version=next_version,
3962
            description=description or "",
3963
            license_info=license_info,
3964
            compatible_runtimes=compatible_runtimes,
3965
            compatible_architectures=compatible_architectures,
3966
            created=api_utils.generate_lambda_date(),
3967
            code=code,
3968
        )
3969

3970
        layer.layer_versions[str(next_version)] = new_layer_version
1✔
3971

3972
        return api_utils.map_layer_out(new_layer_version)
1✔
3973

3974
    def get_layer_version(
1✔
3975
        self,
3976
        context: RequestContext,
3977
        layer_name: LayerName,
3978
        version_number: LayerVersionNumber,
3979
        **kwargs,
3980
    ) -> GetLayerVersionResponse:
3981
        # TODO: handle layer_name as an ARN
3982

3983
        region_name, account_id, layer_name, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
3984
        state = lambda_stores[account_id][region_name]
1✔
3985

3986
        layer = state.layers.get(layer_name)
1✔
3987
        if version_number < 1:
1✔
3988
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
3989
        if layer is None:
1✔
3990
            raise ResourceNotFoundException(
1✔
3991
                "The resource you requested does not exist.", Type="User"
3992
            )
3993
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3994
        if layer_version is None:
1✔
3995
            raise ResourceNotFoundException(
1✔
3996
                "The resource you requested does not exist.", Type="User"
3997
            )
3998
        return api_utils.map_layer_out(layer_version)
1✔
3999

4000
    def get_layer_version_by_arn(
1✔
4001
        self, context: RequestContext, arn: LayerVersionArn, **kwargs
4002
    ) -> GetLayerVersionResponse:
4003
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
4004
            arn, context
4005
        )
4006

4007
        if not layer_version:
1✔
4008
            raise ValidationException(
1✔
4009
                f"1 validation error detected: Value '{arn}' at 'arn' failed to satisfy constraint: Member must satisfy regular expression pattern: "
4010
                + "(arn:(aws[a-zA-Z-]*)?:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+)"
4011
            )
4012

4013
        store = lambda_stores[account_id][region_name]
1✔
4014
        if not (layers := store.layers.get(layer_name)):
1✔
4015
            raise ResourceNotFoundException(
×
4016
                "The resource you requested does not exist.", Type="User"
4017
            )
4018

4019
        layer_version = layers.layer_versions.get(layer_version)
1✔
4020

4021
        if not layer_version:
1✔
4022
            raise ResourceNotFoundException(
1✔
4023
                "The resource you requested does not exist.", Type="User"
4024
            )
4025

4026
        return api_utils.map_layer_out(layer_version)
1✔
4027

4028
    def list_layers(
1✔
4029
        self,
4030
        context: RequestContext,
4031
        compatible_runtime: Runtime | None = None,
4032
        marker: String | None = None,
4033
        max_items: MaxLayerListItems | None = None,
4034
        compatible_architecture: Architecture | None = None,
4035
        **kwargs,
4036
    ) -> ListLayersResponse:
4037
        validation_errors = []
1✔
4038

4039
        validation_error_arch = api_utils.validate_layer_architecture(compatible_architecture)
1✔
4040
        if validation_error_arch:
1✔
4041
            validation_errors.append(validation_error_arch)
1✔
4042

4043
        validation_error_runtime = api_utils.validate_layer_runtime(compatible_runtime)
1✔
4044
        if validation_error_runtime:
1✔
4045
            validation_errors.append(validation_error_runtime)
1✔
4046

4047
        if validation_errors:
1✔
4048
            raise ValidationException(
1✔
4049
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
4050
            )
4051
        # TODO: handle filter: compatible_runtime
4052
        # TODO: handle filter: compatible_architecture
4053

4054
        state = lambda_stores[context.account_id][context.region]
×
4055
        layers = state.layers
×
4056

4057
        # TODO: test how filters behave together with only returning layers here? Does it return the latest "matching" layer, i.e. does it ignore later layer versions that don't match?
4058

4059
        responses: list[LayersListItem] = []
×
4060
        for layer_name, layer in layers.items():
×
4061
            # fetch latest version
4062
            layer_versions = list(layer.layer_versions.values())
×
4063
            sorted(layer_versions, key=lambda x: x.version)
×
4064
            latest_layer_version = layer_versions[-1]
×
4065
            responses.append(
×
4066
                LayersListItem(
4067
                    LayerName=layer_name,
4068
                    LayerArn=layer.arn,
4069
                    LatestMatchingVersion=api_utils.map_layer_out(latest_layer_version),
4070
                )
4071
            )
4072

4073
        responses = PaginatedList(responses)
×
4074
        page, token = responses.get_page(
×
4075
            lambda version: version,
4076
            marker,
4077
            max_items,
4078
        )
4079

4080
        return ListLayersResponse(NextMarker=token, Layers=page)
×
4081

4082
    def list_layer_versions(
1✔
4083
        self,
4084
        context: RequestContext,
4085
        layer_name: LayerName,
4086
        compatible_runtime: Runtime | None = None,
4087
        marker: String | None = None,
4088
        max_items: MaxLayerListItems | None = None,
4089
        compatible_architecture: Architecture | None = None,
4090
        **kwargs,
4091
    ) -> ListLayerVersionsResponse:
4092
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
4093
            [compatible_runtime] if compatible_runtime else [],
4094
            [compatible_architecture] if compatible_architecture else [],
4095
        )
4096
        if validation_errors:
1✔
4097
            raise ValidationException(
×
4098
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
4099
            )
4100

4101
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
4102
            layer_name, context
4103
        )
4104
        state = lambda_stores[account_id][region_name]
1✔
4105

4106
        # TODO: Test & handle filter: compatible_runtime
4107
        # TODO: Test & handle filter: compatible_architecture
4108
        all_layer_versions = []
1✔
4109
        layer = state.layers.get(layer_name)
1✔
4110
        if layer is not None:
1✔
4111
            for layer_version in layer.layer_versions.values():
1✔
4112
                all_layer_versions.append(api_utils.map_layer_out(layer_version))
1✔
4113

4114
        all_layer_versions.sort(key=lambda x: x["Version"], reverse=True)
1✔
4115
        all_layer_versions = PaginatedList(all_layer_versions)
1✔
4116
        page, token = all_layer_versions.get_page(
1✔
4117
            lambda version: version["LayerVersionArn"],
4118
            marker,
4119
            max_items,
4120
        )
4121
        return ListLayerVersionsResponse(NextMarker=token, LayerVersions=page)
1✔
4122

4123
    def delete_layer_version(
1✔
4124
        self,
4125
        context: RequestContext,
4126
        layer_name: LayerName,
4127
        version_number: LayerVersionNumber,
4128
        **kwargs,
4129
    ) -> None:
4130
        if version_number < 1:
1✔
4131
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
4132

4133
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
4134
            layer_name, context
4135
        )
4136

4137
        store = lambda_stores[account_id][region_name]
1✔
4138
        layer = store.layers.get(layer_name, {})
1✔
4139
        if layer:
1✔
4140
            layer.layer_versions.pop(str(version_number), None)
1✔
4141

4142
    # =======================================
4143
    # =====  Layer Version Permissions  =====
4144
    # =======================================
4145
    # TODO: lock updates that change revision IDs
4146

4147
    def add_layer_version_permission(
1✔
4148
        self,
4149
        context: RequestContext,
4150
        layer_name: LayerName,
4151
        version_number: LayerVersionNumber,
4152
        statement_id: StatementId,
4153
        action: LayerPermissionAllowedAction,
4154
        principal: LayerPermissionAllowedPrincipal,
4155
        organization_id: OrganizationId = None,
4156
        revision_id: String = None,
4157
        **kwargs,
4158
    ) -> AddLayerVersionPermissionResponse:
4159
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4160
        # `layer_n` contains the layer name.
4161
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
4162

4163
        if action != "lambda:GetLayerVersion":
1✔
4164
            raise ValidationException(
1✔
4165
                f"1 validation error detected: Value '{action}' at 'action' failed to satisfy constraint: Member must satisfy regular expression pattern: lambda:GetLayerVersion"
4166
            )
4167

4168
        store = lambda_stores[account_id][region_name]
1✔
4169
        layer = store.layers.get(layer_n)
1✔
4170

4171
        layer_version_arn = api_utils.layer_version_arn(
1✔
4172
            layer_name, account_id, region_name, str(version_number)
4173
        )
4174

4175
        if layer is None:
1✔
4176
            raise ResourceNotFoundException(
1✔
4177
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4178
            )
4179
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4180
        if layer_version is None:
1✔
4181
            raise ResourceNotFoundException(
1✔
4182
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4183
            )
4184
        # do we have a policy? if not set one
4185
        if layer_version.policy is None:
1✔
4186
            layer_version.policy = LayerPolicy()
1✔
4187

4188
        if statement_id in layer_version.policy.statements:
1✔
4189
            raise ResourceConflictException(
1✔
4190
                f"The statement id ({statement_id}) provided already exists. Please provide a new statement id, or remove the existing statement.",
4191
                Type="User",
4192
            )
4193

4194
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
4195
            raise PreconditionFailedException(
1✔
4196
                "The Revision Id provided does not match the latest Revision Id. "
4197
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
4198
                Type="User",
4199
            )
4200

4201
        statement = LayerPolicyStatement(
1✔
4202
            sid=statement_id, action=action, principal=principal, organization_id=organization_id
4203
        )
4204

4205
        old_statements = layer_version.policy.statements
1✔
4206
        layer_version.policy = dataclasses.replace(
1✔
4207
            layer_version.policy, statements={**old_statements, statement_id: statement}
4208
        )
4209

4210
        return AddLayerVersionPermissionResponse(
1✔
4211
            Statement=json.dumps(
4212
                {
4213
                    "Sid": statement.sid,
4214
                    "Effect": "Allow",
4215
                    "Principal": statement.principal,
4216
                    "Action": statement.action,
4217
                    "Resource": layer_version.layer_version_arn,
4218
                }
4219
            ),
4220
            RevisionId=layer_version.policy.revision_id,
4221
        )
4222

4223
    def remove_layer_version_permission(
1✔
4224
        self,
4225
        context: RequestContext,
4226
        layer_name: LayerName,
4227
        version_number: LayerVersionNumber,
4228
        statement_id: StatementId,
4229
        revision_id: String = None,
4230
        **kwargs,
4231
    ) -> None:
4232
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4233
        # `layer_n` contains the layer name.
4234
        region_name, account_id, layer_n, layer_version = LambdaProvider._resolve_layer(
1✔
4235
            layer_name, context
4236
        )
4237

4238
        layer_version_arn = api_utils.layer_version_arn(
1✔
4239
            layer_name, account_id, region_name, str(version_number)
4240
        )
4241

4242
        state = lambda_stores[account_id][region_name]
1✔
4243
        layer = state.layers.get(layer_n)
1✔
4244
        if layer is None:
1✔
4245
            raise ResourceNotFoundException(
1✔
4246
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4247
            )
4248
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4249
        if layer_version is None:
1✔
4250
            raise ResourceNotFoundException(
1✔
4251
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4252
            )
4253

4254
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
4255
            raise PreconditionFailedException(
1✔
4256
                "The Revision Id provided does not match the latest Revision Id. "
4257
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
4258
                Type="User",
4259
            )
4260

4261
        if statement_id not in layer_version.policy.statements:
1✔
4262
            raise ResourceNotFoundException(
1✔
4263
                f"Statement {statement_id} is not found in resource policy.", Type="User"
4264
            )
4265

4266
        old_statements = layer_version.policy.statements
1✔
4267
        layer_version.policy = dataclasses.replace(
1✔
4268
            layer_version.policy,
4269
            statements={k: v for k, v in old_statements.items() if k != statement_id},
4270
        )
4271

4272
    def get_layer_version_policy(
1✔
4273
        self,
4274
        context: RequestContext,
4275
        layer_name: LayerName,
4276
        version_number: LayerVersionNumber,
4277
        **kwargs,
4278
    ) -> GetLayerVersionPolicyResponse:
4279
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4280
        # `layer_n` contains the layer name.
4281
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
4282

4283
        layer_version_arn = api_utils.layer_version_arn(
1✔
4284
            layer_name, account_id, region_name, str(version_number)
4285
        )
4286

4287
        store = lambda_stores[account_id][region_name]
1✔
4288
        layer = store.layers.get(layer_n)
1✔
4289

4290
        if layer is None:
1✔
4291
            raise ResourceNotFoundException(
1✔
4292
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4293
            )
4294

4295
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4296
        if layer_version is None:
1✔
4297
            raise ResourceNotFoundException(
1✔
4298
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4299
            )
4300

4301
        if layer_version.policy is None:
1✔
4302
            raise ResourceNotFoundException(
1✔
4303
                "No policy is associated with the given resource.", Type="User"
4304
            )
4305

4306
        return GetLayerVersionPolicyResponse(
1✔
4307
            Policy=json.dumps(
4308
                {
4309
                    "Version": layer_version.policy.version,
4310
                    "Id": layer_version.policy.id,
4311
                    "Statement": [
4312
                        {
4313
                            "Sid": ps.sid,
4314
                            "Effect": "Allow",
4315
                            "Principal": ps.principal,
4316
                            "Action": ps.action,
4317
                            "Resource": layer_version.layer_version_arn,
4318
                        }
4319
                        for ps in layer_version.policy.statements.values()
4320
                    ],
4321
                }
4322
            ),
4323
            RevisionId=layer_version.policy.revision_id,
4324
        )
4325

4326
    # =======================================
4327
    # =======  Function Concurrency  ========
4328
    # =======================================
4329
    # (Reserved) function concurrency is scoped to the whole function
4330

4331
    def get_function_concurrency(
1✔
4332
        self, context: RequestContext, function_name: FunctionName, **kwargs
4333
    ) -> GetFunctionConcurrencyResponse:
4334
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4335
        function_name = api_utils.get_function_name(function_name, context)
1✔
4336
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
4337
        return GetFunctionConcurrencyResponse(
1✔
4338
            ReservedConcurrentExecutions=fn.reserved_concurrent_executions
4339
        )
4340

4341
    def put_function_concurrency(
1✔
4342
        self,
4343
        context: RequestContext,
4344
        function_name: FunctionName,
4345
        reserved_concurrent_executions: ReservedConcurrentExecutions,
4346
        **kwargs,
4347
    ) -> Concurrency:
4348
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4349

4350
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4351
        if qualifier:
1✔
4352
            raise InvalidParameterValueException(
1✔
4353
                "This operation is permitted on Lambda functions only. Aliases and versions do not support this operation. Please specify either a function name or an unqualified function ARN.",
4354
                Type="User",
4355
            )
4356

4357
        store = lambda_stores[account_id][region]
1✔
4358
        fn = store.functions.get(function_name)
1✔
4359
        if not fn:
1✔
4360
            fn_arn = api_utils.qualified_lambda_arn(
1✔
4361
                function_name,
4362
                qualifier="$LATEST",
4363
                account=account_id,
4364
                region=region,
4365
            )
4366
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
4367

4368
        settings = self.get_account_settings(context)
1✔
4369
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
4370
            "UnreservedConcurrentExecutions"
4371
        ]
4372

4373
        # The existing reserved concurrent executions for the same function are already deduced in
4374
        # unreserved_concurrent_executions but must not count because the new one will replace the existing one.
4375
        # Joel tested this behavior manually against AWS (2023-11-28).
4376
        existing_reserved_concurrent_executions = (
1✔
4377
            fn.reserved_concurrent_executions if fn.reserved_concurrent_executions else 0
4378
        )
4379
        if (
1✔
4380
            unreserved_concurrent_executions
4381
            - reserved_concurrent_executions
4382
            + existing_reserved_concurrent_executions
4383
        ) < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY:
4384
            raise InvalidParameterValueException(
1✔
4385
                f"Specified ReservedConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
4386
            )
4387

4388
        total_provisioned_concurrency = sum(
1✔
4389
            [
4390
                provisioned_configs.provisioned_concurrent_executions
4391
                for provisioned_configs in fn.provisioned_concurrency_configs.values()
4392
            ]
4393
        )
4394
        if total_provisioned_concurrency > reserved_concurrent_executions:
1✔
4395
            raise InvalidParameterValueException(
1✔
4396
                f" ReservedConcurrentExecutions  {reserved_concurrent_executions} should not be lower than function's total provisioned concurrency [{total_provisioned_concurrency}]."
4397
            )
4398

4399
        fn.reserved_concurrent_executions = reserved_concurrent_executions
1✔
4400

4401
        return Concurrency(ReservedConcurrentExecutions=fn.reserved_concurrent_executions)
1✔
4402

4403
    def delete_function_concurrency(
1✔
4404
        self, context: RequestContext, function_name: FunctionName, **kwargs
4405
    ) -> None:
4406
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4407
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4408
        store = lambda_stores[account_id][region]
1✔
4409
        fn = store.functions.get(function_name)
1✔
4410
        fn.reserved_concurrent_executions = None
1✔
4411

4412
    # =======================================
4413
    # ===============  TAGS   ===============
4414
    # =======================================
4415
    # only Function, Event Source Mapping, and Code Signing Config (not currently supported by LocalStack) ARNs
4416
    # are available for tagging in AWS
4417

4418
    @staticmethod
1✔
4419
    def _update_resource_tags(
1✔
4420
        resource_arn: str, account_id: str, region: str, tags: dict[str, str]
4421
    ) -> None:
4422
        lambda_stores[account_id][region].tags.update_tags(resource_arn, tags)
1✔
4423

4424
    @staticmethod
1✔
4425
    def _list_resource_tags(resource_arn: str, account_id: str, region: str) -> dict[str, str]:
1✔
4426
        return lambda_stores[account_id][region].tags.get_tags(resource_arn)
1✔
4427

4428
    @staticmethod
1✔
4429
    def _remove_resource_tags(
1✔
4430
        resource_arn: str, account_id: str, region: str, keys: TagKeyList
4431
    ) -> None:
4432
        lambda_stores[account_id][region].tags.delete_tags(resource_arn, keys)
1✔
4433

4434
    @staticmethod
1✔
4435
    def _remove_all_resource_tags(resource_arn: str, account_id: str, region: str) -> None:
1✔
4436
        lambda_stores[account_id][region].tags.delete_all_tags(resource_arn)
1✔
4437

4438
    def _get_tags(self, resource: TaggableResource) -> dict[str, str]:
1✔
4439
        account_id, region = self._get_account_id_and_region_for_taggable_resource(resource)
1✔
4440
        tags = self._list_resource_tags(resource_arn=resource, account_id=account_id, region=region)
1✔
4441
        return tags
1✔
4442

4443
    def _store_tags(self, resource: TaggableResource, tags: dict[str, str]) -> None:
1✔
4444
        account_id, region = self._get_account_id_and_region_for_taggable_resource(resource)
1✔
4445
        existing_tags = self._list_resource_tags(
1✔
4446
            resource_arn=resource, account_id=account_id, region=region
4447
        )
4448
        if len({**existing_tags, **tags}) > LAMBDA_TAG_LIMIT_PER_RESOURCE:
1✔
4449
            # note: we cannot use | on `ImmutableDict` and regular `dict`
4450
            raise InvalidParameterValueException(
1✔
4451
                "Number of tags exceeds resource tag limit.", Type="User"
4452
            )
4453
        self._update_resource_tags(
1✔
4454
            resource_arn=resource,
4455
            account_id=account_id,
4456
            region=region,
4457
            tags=tags,
4458
        )
4459

4460
    def _remove_tags(self, resource: TaggableResource, keys: TagKeyList) -> None:
1✔
4461
        account_id, region = self._get_account_id_and_region_for_taggable_resource(resource)
1✔
4462
        self._remove_resource_tags(
1✔
4463
            resource_arn=resource, account_id=account_id, region=region, keys=keys
4464
        )
4465

4466
    def _remove_all_tags(self, resource: TaggableResource) -> None:
1✔
4467
        account_id, region = self._get_account_id_and_region_for_taggable_resource(resource)
1✔
4468
        self._remove_all_resource_tags(resource_arn=resource, account_id=account_id, region=region)
1✔
4469

4470
    def _get_account_id_and_region_for_taggable_resource(
1✔
4471
        self, resource: TaggableResource
4472
    ) -> tuple[str, str]:
4473
        """
4474
        Takes a resource ARN for a TaggableResource (Lambda Function, Event Source Mapping, Code Signing Config, or Capacity Provider) and returns a corresponding
4475
        LambdaStore for its region and account.
4476

4477
        In addition, this function validates that the ARN is a valid TaggableResource type, and that the TaggableResource exists.
4478

4479
        Raises:
4480
            ValidationException: If the resource ARN is not a full ARN for a TaggableResource.
4481
            ResourceNotFoundException: If the specified resource does not exist.
4482
            InvalidParameterValueException: If the resource ARN is a qualified Lambda Function.
4483
        """
4484

4485
        def _raise_validation_exception():
1✔
4486
            raise ValidationException(
1✔
4487
                f"1 validation error detected: Value '{resource}' at 'resource' failed to satisfy constraint: Member must satisfy regular expression pattern: {api_utils.TAGGABLE_RESOURCE_ARN_PATTERN}"
4488
            )
4489

4490
        # Check whether the ARN we have been passed is correctly formatted
4491
        parsed_resource_arn: ArnData = None
1✔
4492
        try:
1✔
4493
            parsed_resource_arn = parse_arn(resource)
1✔
4494
        except Exception:
1✔
4495
            _raise_validation_exception()
1✔
4496

4497
        # TODO: Should we be checking whether this is a full ARN?
4498
        region, account_id, resource_type = map(
1✔
4499
            parsed_resource_arn.get, ("region", "account", "resource")
4500
        )
4501

4502
        if not all((region, account_id, resource_type)):
1✔
4503
            _raise_validation_exception()
×
4504

4505
        if not (parts := resource_type.split(":")):
1✔
4506
            _raise_validation_exception()
×
4507

4508
        resource_type, resource_identifier, *qualifier = parts
1✔
4509

4510
        # Qualifier validation raises before checking for NotFound
4511
        if qualifier:
1✔
4512
            if resource_type == "function":
1✔
4513
                raise InvalidParameterValueException(
1✔
4514
                    "Tags on function aliases and versions are not supported. Please specify a function ARN.",
4515
                    Type="User",
4516
                )
4517
            _raise_validation_exception()
1✔
4518

4519
        if resource_type == "event-source-mapping":
1✔
4520
            self._get_esm(resource_identifier, account_id, region)
1✔
4521
        elif resource_type == "code-signing-config":
1✔
4522
            raise NotImplementedError("Resource tagging on CSC not yet implemented.")
4523
        elif resource_type == "function":
1✔
4524
            self._get_function(
1✔
4525
                function_name=resource_identifier, account_id=account_id, region=region
4526
            )
4527
        elif resource_type == "capacity-provider":
1✔
4528
            self._get_capacity_provider(resource_identifier, account_id, region)
1✔
4529
        else:
4530
            _raise_validation_exception()
1✔
4531

4532
        # If no exceptions are raised, assume ARN and referenced resource is valid for tag operations
4533
        return account_id, region
1✔
4534

4535
    def tag_resource(
1✔
4536
        self, context: RequestContext, resource: TaggableResource, tags: Tags, **kwargs
4537
    ) -> None:
4538
        if not tags:
1✔
4539
            raise InvalidParameterValueException(
1✔
4540
                "An error occurred and the request cannot be processed.", Type="User"
4541
            )
4542
        self._store_tags(resource, tags)
1✔
4543

4544
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4545
            "function"
4546
        ):
4547
            name, _, account, region = function_locators_from_arn(resource)
1✔
4548
            function = self._get_function(name, account, region)
1✔
4549
            with function.lock:
1✔
4550
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4551
                latest_version = function.versions["$LATEST"]
1✔
4552
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4553
                    latest_version, config=dataclasses.replace(latest_version.config)
4554
                )
4555

4556
    def list_tags(
1✔
4557
        self, context: RequestContext, resource: TaggableResource, **kwargs
4558
    ) -> ListTagsResponse:
4559
        tags = self._get_tags(resource)
1✔
4560
        return ListTagsResponse(Tags=tags)
1✔
4561

4562
    def untag_resource(
1✔
4563
        self, context: RequestContext, resource: TaggableResource, tag_keys: TagKeyList, **kwargs
4564
    ) -> None:
4565
        if not tag_keys:
1✔
4566
            raise ValidationException(
1✔
4567
                "1 validation error detected: Value null at 'tagKeys' failed to satisfy constraint: Member must not be null"
4568
            )  # should probably be generalized a bit
4569

4570
        self._remove_tags(resource, tag_keys)
1✔
4571

4572
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4573
            "function"
4574
        ):
4575
            name, _, account, region = function_locators_from_arn(resource)
1✔
4576
            function = self._get_function(name, account, region)
1✔
4577
            # TODO: Potential race condition
4578
            with function.lock:
1✔
4579
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4580
                latest_version = function.versions["$LATEST"]
1✔
4581
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4582
                    latest_version, config=dataclasses.replace(latest_version.config)
4583
                )
4584

4585
    # =======================================
4586
    # =======  LEGACY / DEPRECATED   ========
4587
    # =======================================
4588

4589
    def invoke_async(
1✔
4590
        self,
4591
        context: RequestContext,
4592
        function_name: NamespacedFunctionName,
4593
        invoke_args: IO[BlobStream],
4594
        **kwargs,
4595
    ) -> InvokeAsyncResponse:
4596
        """LEGACY API endpoint. Even AWS heavily discourages its usage."""
4597
        raise NotImplementedError
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc