• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 22209548116

19 Feb 2026 02:08PM UTC coverage: 86.964% (-0.04%) from 87.003%
22209548116

push

github

web-flow
Logs: fix snapshot region from tests (#13792)

69755 of 80211 relevant lines covered (86.96%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.29
/localstack-core/localstack/services/lambda_/provider.py
1
import base64
1✔
2
import dataclasses
1✔
3
import datetime
1✔
4
import itertools
1✔
5
import json
1✔
6
import logging
1✔
7
import re
1✔
8
import threading
1✔
9
import time
1✔
10
from typing import IO, Any
1✔
11

12
from botocore.exceptions import ClientError
1✔
13

14
from localstack import config
1✔
15
from localstack.aws.api import RequestContext, ServiceException, handler
1✔
16
from localstack.aws.api.lambda_ import (
1✔
17
    AccountLimit,
18
    AccountUsage,
19
    AddLayerVersionPermissionResponse,
20
    AddPermissionRequest,
21
    AddPermissionResponse,
22
    Alias,
23
    AliasConfiguration,
24
    AliasRoutingConfiguration,
25
    AllowedPublishers,
26
    Architecture,
27
    Arn,
28
    Blob,
29
    BlobStream,
30
    CapacityProviderConfig,
31
    CodeSigningConfigArn,
32
    CodeSigningConfigNotFoundException,
33
    CodeSigningPolicies,
34
    CompatibleArchitectures,
35
    CompatibleRuntimes,
36
    Concurrency,
37
    Cors,
38
    CreateCodeSigningConfigResponse,
39
    CreateEventSourceMappingRequest,
40
    CreateFunctionRequest,
41
    CreateFunctionUrlConfigResponse,
42
    DeleteCodeSigningConfigResponse,
43
    DeleteFunctionResponse,
44
    Description,
45
    DestinationConfig,
46
    DurableExecutionName,
47
    EventSourceMappingConfiguration,
48
    FunctionCodeLocation,
49
    FunctionConfiguration,
50
    FunctionEventInvokeConfig,
51
    FunctionName,
52
    FunctionUrlAuthType,
53
    FunctionUrlQualifier,
54
    FunctionVersionLatestPublished,
55
    GetAccountSettingsResponse,
56
    GetCodeSigningConfigResponse,
57
    GetFunctionCodeSigningConfigResponse,
58
    GetFunctionConcurrencyResponse,
59
    GetFunctionRecursionConfigResponse,
60
    GetFunctionResponse,
61
    GetFunctionUrlConfigResponse,
62
    GetLayerVersionPolicyResponse,
63
    GetLayerVersionResponse,
64
    GetPolicyResponse,
65
    GetProvisionedConcurrencyConfigResponse,
66
    InvalidParameterValueException,
67
    InvocationResponse,
68
    InvocationType,
69
    InvokeAsyncResponse,
70
    InvokeMode,
71
    LambdaApi,
72
    LambdaManagedInstancesCapacityProviderConfig,
73
    LastUpdateStatus,
74
    LayerName,
75
    LayerPermissionAllowedAction,
76
    LayerPermissionAllowedPrincipal,
77
    LayersListItem,
78
    LayerVersionArn,
79
    LayerVersionContentInput,
80
    LayerVersionNumber,
81
    LicenseInfo,
82
    ListAliasesResponse,
83
    ListCodeSigningConfigsResponse,
84
    ListEventSourceMappingsResponse,
85
    ListFunctionEventInvokeConfigsResponse,
86
    ListFunctionsByCodeSigningConfigResponse,
87
    ListFunctionsResponse,
88
    ListFunctionUrlConfigsResponse,
89
    ListLayersResponse,
90
    ListLayerVersionsResponse,
91
    ListProvisionedConcurrencyConfigsResponse,
92
    ListTagsResponse,
93
    ListVersionsByFunctionResponse,
94
    LogFormat,
95
    LoggingConfig,
96
    LogType,
97
    MasterRegion,
98
    MaxFunctionEventInvokeConfigListItems,
99
    MaximumEventAgeInSeconds,
100
    MaximumRetryAttempts,
101
    MaxItems,
102
    MaxLayerListItems,
103
    MaxListItems,
104
    MaxProvisionedConcurrencyConfigListItems,
105
    NamespacedFunctionName,
106
    NamespacedStatementId,
107
    NumericLatestPublishedOrAliasQualifier,
108
    OnFailure,
109
    OnSuccess,
110
    OrganizationId,
111
    PackageType,
112
    PositiveInteger,
113
    PreconditionFailedException,
114
    ProvisionedConcurrencyConfigListItem,
115
    ProvisionedConcurrencyConfigNotFoundException,
116
    ProvisionedConcurrencyStatusEnum,
117
    PublishLayerVersionResponse,
118
    PutFunctionCodeSigningConfigResponse,
119
    PutFunctionRecursionConfigResponse,
120
    PutProvisionedConcurrencyConfigResponse,
121
    Qualifier,
122
    RecursiveLoop,
123
    ReservedConcurrentExecutions,
124
    ResourceConflictException,
125
    ResourceNotFoundException,
126
    Runtime,
127
    RuntimeVersionConfig,
128
    SnapStart,
129
    SnapStartApplyOn,
130
    SnapStartOptimizationStatus,
131
    SnapStartResponse,
132
    State,
133
    StatementId,
134
    StateReasonCode,
135
    String,
136
    TaggableResource,
137
    TagKeyList,
138
    Tags,
139
    TenantId,
140
    TracingMode,
141
    UnqualifiedFunctionName,
142
    UpdateCodeSigningConfigResponse,
143
    UpdateEventSourceMappingRequest,
144
    UpdateFunctionCodeRequest,
145
    UpdateFunctionConfigurationRequest,
146
    UpdateFunctionUrlConfigResponse,
147
    VersionWithLatestPublished,
148
)
149
from localstack.aws.api.lambda_ import FunctionVersion as FunctionVersionApi
1✔
150
from localstack.aws.api.lambda_ import ServiceException as LambdaServiceException
1✔
151
from localstack.aws.api.pipes import (
1✔
152
    DynamoDBStreamStartPosition,
153
    KinesisStreamStartPosition,
154
)
155
from localstack.aws.connect import connect_to
1✔
156
from localstack.aws.spec import load_service
1✔
157
from localstack.services.edge import ROUTER
1✔
158
from localstack.services.lambda_ import api_utils
1✔
159
from localstack.services.lambda_ import hooks as lambda_hooks
1✔
160
from localstack.services.lambda_.analytics import (
1✔
161
    FunctionInitializationType,
162
    FunctionOperation,
163
    FunctionStatus,
164
    function_counter,
165
)
166
from localstack.services.lambda_.api_utils import (
1✔
167
    ARCHITECTURES,
168
    STATEMENT_ID_REGEX,
169
    SUBNET_ID_REGEX,
170
    function_locators_from_arn,
171
)
172
from localstack.services.lambda_.event_source_mapping.esm_config_factory import (
1✔
173
    EsmConfigFactory,
174
)
175
from localstack.services.lambda_.event_source_mapping.esm_worker import (
1✔
176
    EsmState,
177
    EsmWorker,
178
)
179
from localstack.services.lambda_.event_source_mapping.esm_worker_factory import (
1✔
180
    EsmWorkerFactory,
181
)
182
from localstack.services.lambda_.event_source_mapping.pipe_utils import get_internal_client
1✔
183
from localstack.services.lambda_.invocation import AccessDeniedException
1✔
184
from localstack.services.lambda_.invocation.execution_environment import (
1✔
185
    EnvironmentStartupTimeoutException,
186
)
187
from localstack.services.lambda_.invocation.lambda_models import (
1✔
188
    AliasRoutingConfig,
189
    CodeSigningConfig,
190
    EventInvokeConfig,
191
    Function,
192
    FunctionResourcePolicy,
193
    FunctionUrlConfig,
194
    FunctionVersion,
195
    ImageConfig,
196
    LambdaEphemeralStorage,
197
    Layer,
198
    LayerPolicy,
199
    LayerPolicyStatement,
200
    LayerVersion,
201
    ProvisionedConcurrencyConfiguration,
202
    RequestEntityTooLargeException,
203
    ResourcePolicy,
204
    UpdateStatus,
205
    ValidationException,
206
    VersionAlias,
207
    VersionFunctionConfiguration,
208
    VersionIdentifier,
209
    VersionState,
210
    VpcConfig,
211
)
212
from localstack.services.lambda_.invocation.lambda_service import (
1✔
213
    LambdaService,
214
    create_image_code,
215
    destroy_code_if_not_used,
216
    lambda_stores,
217
    store_lambda_archive,
218
    store_s3_bucket_archive,
219
)
220
from localstack.services.lambda_.invocation.models import CapacityProvider as CapacityProviderModel
1✔
221
from localstack.services.lambda_.invocation.models import LambdaStore
1✔
222
from localstack.services.lambda_.invocation.runtime_executor import get_runtime_executor
1✔
223
from localstack.services.lambda_.lambda_utils import HINT_LOG
1✔
224
from localstack.services.lambda_.layerfetcher.layer_fetcher import LayerFetcher
1✔
225
from localstack.services.lambda_.provider_utils import (
1✔
226
    LambdaLayerVersionIdentifier,
227
    get_function_version,
228
    get_function_version_from_arn,
229
)
230
from localstack.services.lambda_.runtimes import (
1✔
231
    ALL_RUNTIMES,
232
    DEPRECATED_RUNTIMES,
233
    DEPRECATED_RUNTIMES_UPGRADES,
234
    RUNTIMES_AGGREGATED,
235
    SNAP_START_SUPPORTED_RUNTIMES,
236
    VALID_MANAGED_INSTANCE_RUNTIMES,
237
    VALID_RUNTIMES,
238
)
239
from localstack.services.lambda_.urlrouter import FunctionUrlRouter
1✔
240
from localstack.services.plugins import ServiceLifecycleHook
1✔
241
from localstack.state import StateVisitor
1✔
242
from localstack.utils.aws.arns import (
1✔
243
    ArnData,
244
    capacity_provider_arn,
245
    extract_resource_from_arn,
246
    extract_service_from_arn,
247
    get_partition,
248
    lambda_event_source_mapping_arn,
249
    parse_arn,
250
)
251
from localstack.utils.aws.client_types import ServicePrincipal
1✔
252
from localstack.utils.bootstrap import is_api_enabled
1✔
253
from localstack.utils.collections import PaginatedList, merge_recursive
1✔
254
from localstack.utils.event_matcher import validate_event_pattern
1✔
255
from localstack.utils.strings import get_random_hex, short_uid, to_bytes, to_str
1✔
256
from localstack.utils.sync import poll_condition
1✔
257
from localstack.utils.urls import localstack_host
1✔
258

259
LOG = logging.getLogger(__name__)
1✔
260

261
CAPACITY_PROVIDER_ARN_NAME = "arn:aws[a-zA-Z-]*:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:capacity-provider:[a-zA-Z0-9-_]+"
1✔
262
LAMBDA_DEFAULT_TIMEOUT = 3
1✔
263
LAMBDA_DEFAULT_MEMORY_SIZE = 128
1✔
264

265
LAMBDA_TAG_LIMIT_PER_RESOURCE = 50
1✔
266
LAMBDA_LAYERS_LIMIT_PER_FUNCTION = 5
1✔
267

268
TAG_KEY_CUSTOM_URL = "_custom_id_"
1✔
269
# Requirements (from RFC3986 & co): not longer than 63, first char must be
270
# alpha, then alphanumeric or hyphen, except cannot start or end with hyphen
271
TAG_KEY_CUSTOM_URL_VALIDATOR = re.compile(r"^[A-Za-z]([A-Za-z0-9\-]{0,61}[A-Za-z0-9])?$")
1✔
272

273

274
class LambdaProvider(LambdaApi, ServiceLifecycleHook):
1✔
275
    lambda_service: LambdaService
1✔
276
    create_fn_lock: threading.RLock
1✔
277
    create_layer_lock: threading.RLock
1✔
278
    router: FunctionUrlRouter
1✔
279
    esm_workers: dict[str, EsmWorker]
1✔
280
    layer_fetcher: LayerFetcher | None
1✔
281

282
    def __init__(self) -> None:
1✔
283
        self.lambda_service = LambdaService()
1✔
284
        self.create_fn_lock = threading.RLock()
1✔
285
        self.create_layer_lock = threading.RLock()
1✔
286
        self.router = FunctionUrlRouter(ROUTER, self.lambda_service)
1✔
287
        self.esm_workers = {}
1✔
288
        self.layer_fetcher = None
1✔
289
        lambda_hooks.inject_layer_fetcher.run(self)
1✔
290

291
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
292
        visitor.visit(lambda_stores)
×
293

294
    def on_before_state_reset(self):
1✔
295
        for esm_worker in self.esm_workers.values():
×
296
            esm_worker.stop_for_shutdown()
×
297
        self.esm_workers = {}
×
298
        self.lambda_service.stop()
×
299

300
    def on_after_state_reset(self):
1✔
301
        self.router.lambda_service = self.lambda_service = LambdaService()
×
302

303
    def on_before_state_load(self):
1✔
304
        self.lambda_service.stop()
×
305

306
    def on_after_state_load(self):
1✔
307
        self.lambda_service = LambdaService()
×
308
        self.router.lambda_service = self.lambda_service
×
309

310
        for account_id, account_bundle in lambda_stores.items():
×
311
            for region_name, state in account_bundle.items():
×
312
                for fn in state.functions.values():
×
313
                    # HACK to model a volatile variable that should be ignored for persistence
314
                    # Identifier unique to this function and LocalStack instance.
315
                    # A LocalStack restart or persistence load should create a new instance id.
316
                    # Used for retaining invoke queues across version updates for $LATEST, but
317
                    # separate unrelated instances.
318
                    fn.instance_id = short_uid()
×
319

320
                    for fn_version in fn.versions.values():
×
321
                        try:
×
322
                            # $LATEST is not invokable for Lambda functions with a capacity provider
323
                            # and has a different State (i.e., ActiveNonInvokable)
324
                            is_capacity_provider_latest = (
×
325
                                fn_version.config.capacity_provider_config
326
                                and fn_version.id.qualifier == "$LATEST"
327
                            )
328
                            if not is_capacity_provider_latest:
×
329
                                # Restore the "Pending" state for the function version and start it
330
                                new_state = VersionState(
×
331
                                    state=State.Pending,
332
                                    code=StateReasonCode.Creating,
333
                                    reason="The function is being created.",
334
                                )
335
                                new_config = dataclasses.replace(fn_version.config, state=new_state)
×
336
                                new_version = dataclasses.replace(fn_version, config=new_config)
×
337
                                fn.versions[fn_version.id.qualifier] = new_version
×
338
                                self.lambda_service.create_function_version(fn_version).result(
×
339
                                    timeout=5
340
                                )
341
                        except Exception:
×
342
                            LOG.warning(
×
343
                                "Failed to restore function version %s",
344
                                fn_version.id.qualified_arn(),
345
                                exc_info=LOG.isEnabledFor(logging.DEBUG),
346
                            )
347
                    # restore provisioned concurrency per function considering both versions and aliases
348
                    for (
×
349
                        provisioned_qualifier,
350
                        provisioned_config,
351
                    ) in fn.provisioned_concurrency_configs.items():
352
                        fn_arn = None
×
353
                        try:
×
354
                            if api_utils.qualifier_is_alias(provisioned_qualifier):
×
355
                                alias = fn.aliases.get(provisioned_qualifier)
×
356
                                resolved_version = fn.versions.get(alias.function_version)
×
357
                                fn_arn = resolved_version.id.qualified_arn()
×
358
                            elif api_utils.qualifier_is_version(provisioned_qualifier):
×
359
                                fn_version = fn.versions.get(provisioned_qualifier)
×
360
                                fn_arn = fn_version.id.qualified_arn()
×
361
                            else:
362
                                raise InvalidParameterValueException(
×
363
                                    "Invalid qualifier type:"
364
                                    " Qualifier can only be an alias or a version for provisioned concurrency."
365
                                )
366

367
                            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
368
                            manager.update_provisioned_concurrency_config(
×
369
                                provisioned_config.provisioned_concurrent_executions
370
                            )
371
                        except Exception:
×
372
                            LOG.warning(
×
373
                                "Failed to restore provisioned concurrency %s for function %s",
374
                                provisioned_config,
375
                                fn_arn,
376
                                exc_info=LOG.isEnabledFor(logging.DEBUG),
377
                            )
378

379
                for esm in state.event_source_mappings.values():
×
380
                    # Restores event source workers
381
                    function_arn = esm.get("FunctionArn")
×
382

383
                    # TODO: How do we know the event source is up?
384
                    # A basic poll to see if the mapped Lambda function is active/failed
385
                    if not poll_condition(
×
386
                        lambda: (
387
                            get_function_version_from_arn(function_arn).config.state.state
388
                            in [State.Active, State.Failed]
389
                        ),
390
                        timeout=10,
391
                    ):
392
                        LOG.warning(
×
393
                            "Creating ESM for Lambda that is not in running state: %s",
394
                            function_arn,
395
                        )
396

397
                    function_version = get_function_version_from_arn(function_arn)
×
398
                    function_role = function_version.config.role
×
399

400
                    is_esm_enabled = esm.get("State", EsmState.DISABLED) not in (
×
401
                        EsmState.DISABLED,
402
                        EsmState.DISABLING,
403
                    )
404
                    esm_worker = EsmWorkerFactory(
×
405
                        esm, function_role, is_esm_enabled
406
                    ).get_esm_worker()
407

408
                    # Note: a worker is created in the DISABLED state if not enabled
409
                    esm_worker.create()
×
410
                    # TODO: assigning the esm_worker to the dict only works after .create(). Could it cause a race
411
                    #  condition if we get a shutdown here and have a worker thread spawned but not accounted for?
412
                    self.esm_workers[esm_worker.uuid] = esm_worker
×
413

414
    def on_after_init(self):
1✔
415
        self.router.register_routes()
1✔
416
        get_runtime_executor().validate_environment()
1✔
417

418
    def on_before_stop(self) -> None:
1✔
419
        for esm_worker in self.esm_workers.values():
1✔
420
            esm_worker.stop_for_shutdown()
1✔
421

422
        # TODO: should probably unregister routes?
423
        self.lambda_service.stop()
1✔
424

425
    @staticmethod
1✔
426
    def _get_function(function_name: str, account_id: str, region: str) -> Function:
1✔
427
        state = lambda_stores[account_id][region]
1✔
428
        function = state.functions.get(function_name)
1✔
429
        if not function:
1✔
430
            arn = api_utils.unqualified_lambda_arn(
1✔
431
                function_name=function_name,
432
                account=account_id,
433
                region=region,
434
            )
435
            raise ResourceNotFoundException(
1✔
436
                f"Function not found: {arn}",
437
                Type="User",
438
            )
439
        return function
1✔
440

441
    @staticmethod
1✔
442
    def _get_esm(uuid: str, account_id: str, region: str) -> EventSourceMappingConfiguration:
1✔
443
        state = lambda_stores[account_id][region]
1✔
444
        esm = state.event_source_mappings.get(uuid)
1✔
445
        if not esm:
1✔
446
            arn = lambda_event_source_mapping_arn(uuid, account_id, region)
1✔
447
            raise ResourceNotFoundException(
1✔
448
                f"Event source mapping not found: {arn}",
449
                Type="User",
450
            )
451
        return esm
1✔
452

453
    @staticmethod
1✔
454
    def _get_capacity_provider(
1✔
455
        capacity_provider_name: str,
456
        account_id: str,
457
        region: str,
458
        error_msg_template: str = "Capacity provider not found: {}",
459
    ) -> CapacityProviderModel:
460
        state = lambda_stores[account_id][region]
1✔
461
        cp = state.capacity_providers.get(capacity_provider_name)
1✔
462
        if not cp:
1✔
463
            arn = capacity_provider_arn(capacity_provider_name, account_id, region)
1✔
464
            raise ResourceNotFoundException(
1✔
465
                error_msg_template.format(arn),
466
                Type="User",
467
            )
468
        return cp
×
469

470
    @staticmethod
1✔
471
    def _validate_qualifier_expression(qualifier: str) -> None:
1✔
472
        if error_messages := api_utils.validate_qualifier(qualifier):
1✔
473
            raise ValidationException(
×
474
                message=api_utils.construct_validation_exception_message(error_messages)
475
            )
476

477
    @staticmethod
1✔
478
    def _validate_publish_to(publish_to: str):
1✔
479
        if publish_to != FunctionVersionLatestPublished.LATEST_PUBLISHED:
×
480
            raise ValidationException(
×
481
                message=f"1 validation error detected: Value '{publish_to}' at 'publishTo' failed to satisfy constraint: Member must satisfy enum value set: [LATEST_PUBLISHED]"
482
            )
483

484
    @staticmethod
1✔
485
    def _resolve_fn_qualifier(resolved_fn: Function, qualifier: str | None) -> tuple[str, str]:
1✔
486
        """Attempts to resolve a given qualifier and returns a qualifier that exists or
487
        raises an appropriate ResourceNotFoundException.
488

489
        :param resolved_fn: The resolved lambda function
490
        :param qualifier: The qualifier to be resolved or None
491
        :return: Tuple of (resolved qualifier, function arn either qualified or unqualified)"""
492
        function_name = resolved_fn.function_name
1✔
493
        # assuming function versions need to live in the same account and region
494
        account_id = resolved_fn.latest().id.account
1✔
495
        region = resolved_fn.latest().id.region
1✔
496
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
497
        if qualifier is not None:
1✔
498
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
499
            if api_utils.qualifier_is_alias(qualifier):
1✔
500
                if qualifier not in resolved_fn.aliases:
1✔
501
                    raise ResourceNotFoundException(f"Cannot find alias arn: {fn_arn}", Type="User")
1✔
502
            elif api_utils.qualifier_is_version(qualifier) or qualifier == "$LATEST":
1✔
503
                if qualifier not in resolved_fn.versions:
1✔
504
                    raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
505
            else:
506
                # matches qualifier pattern but invalid alias or version
507
                raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
508
        resolved_qualifier = qualifier or "$LATEST"
1✔
509
        return resolved_qualifier, fn_arn
1✔
510

511
    @staticmethod
1✔
512
    def _function_revision_id(resolved_fn: Function, resolved_qualifier: str) -> str:
1✔
513
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
514
            return resolved_fn.aliases[resolved_qualifier].revision_id
1✔
515
        # Assumes that a non-alias is a version
516
        else:
517
            return resolved_fn.versions[resolved_qualifier].config.revision_id
1✔
518

519
    def _resolve_vpc_id(self, account_id: str, region_name: str, subnet_id: str) -> str:
1✔
520
        ec2_client = connect_to(aws_access_key_id=account_id, region_name=region_name).ec2
1✔
521
        try:
1✔
522
            return ec2_client.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["VpcId"]
1✔
523
        except ec2_client.exceptions.ClientError as e:
1✔
524
            code = e.response["Error"]["Code"]
1✔
525
            message = e.response["Error"]["Message"]
1✔
526
            raise InvalidParameterValueException(
1✔
527
                f"Error occurred while DescribeSubnets. EC2 Error Code: {code}. EC2 Error Message: {message}",
528
                Type="User",
529
            )
530

531
    def _build_vpc_config(
1✔
532
        self,
533
        account_id: str,
534
        region_name: str,
535
        vpc_config: dict | None = None,
536
    ) -> VpcConfig | None:
537
        if not vpc_config or not is_api_enabled("ec2"):
1✔
538
            return None
1✔
539

540
        subnet_ids = vpc_config.get("SubnetIds", [])
1✔
541
        if subnet_ids is not None and len(subnet_ids) == 0:
1✔
542
            return VpcConfig(vpc_id="", security_group_ids=[], subnet_ids=[])
1✔
543

544
        subnet_id = subnet_ids[0]
1✔
545
        if not bool(SUBNET_ID_REGEX.match(subnet_id)):
1✔
546
            raise ValidationException(
1✔
547
                f"1 validation error detected: Value '[{subnet_id}]' at 'vpcConfig.subnetIds' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 0, Member must satisfy regular expression pattern: subnet-[0-9a-z]*]"
548
            )
549

550
        return VpcConfig(
1✔
551
            vpc_id=self._resolve_vpc_id(account_id, region_name, subnet_id),
552
            security_group_ids=vpc_config.get("SecurityGroupIds", []),
553
            subnet_ids=subnet_ids,
554
        )
555

556
    def _create_version_model(
1✔
557
        self,
558
        function_name: str,
559
        region: str,
560
        account_id: str,
561
        description: str | None = None,
562
        revision_id: str | None = None,
563
        code_sha256: str | None = None,
564
        publish_to: FunctionVersionLatestPublished | None = None,
565
        is_active: bool = False,
566
    ) -> tuple[FunctionVersion, bool]:
567
        """
568
        Release a new version to the model if all restrictions are met.
569
        Restrictions:
570
          - CodeSha256, if provided, must equal the current latest version code hash
571
          - RevisionId, if provided, must equal the current latest version revision id
572
          - Some changes have been done to the latest version since last publish
573
        Will return a tuple of the version, and whether the version was published (True) or the latest available version was taken (False).
574
        This can happen if the latest version has not been changed since the last version publish, in this case the last version will be returned.
575

576
        :param function_name: Function name to be published
577
        :param region: Region of the function
578
        :param account_id: Account of the function
579
        :param description: new description of the version (will be the description of the function if missing)
580
        :param revision_id: Revision id, function will raise error if it does not match latest revision id
581
        :param code_sha256: Code sha256, function will raise error if it does not match latest code hash
582
        :return: Tuple of (published version, whether version was released or last released version returned, since nothing changed)
583
        """
584
        current_latest_version = get_function_version(
1✔
585
            function_name=function_name, qualifier="$LATEST", account_id=account_id, region=region
586
        )
587
        if revision_id and current_latest_version.config.revision_id != revision_id:
1✔
588
            raise PreconditionFailedException(
1✔
589
                "The Revision Id provided does not match the latest Revision Id. Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
590
                Type="User",
591
            )
592

593
        # check if code hashes match if they are specified
594
        current_hash = (
1✔
595
            current_latest_version.config.code.code_sha256
596
            if current_latest_version.config.package_type == PackageType.Zip
597
            else current_latest_version.config.image.code_sha256
598
        )
599
        # if the code is a zip package and hot reloaded (hot reloading is currently only supported for zip packagetypes)
600
        # we cannot enforce the codesha256 check
601
        is_hot_reloaded_zip_package = (
1✔
602
            current_latest_version.config.package_type == PackageType.Zip
603
            and current_latest_version.config.code.is_hot_reloading()
604
        )
605
        if code_sha256 and current_hash != code_sha256 and not is_hot_reloaded_zip_package:
1✔
606
            raise InvalidParameterValueException(
1✔
607
                f"CodeSHA256 ({code_sha256}) is different from current CodeSHA256 in $LATEST ({current_hash}). Please try again with the CodeSHA256 in $LATEST.",
608
                Type="User",
609
            )
610

611
        state = lambda_stores[account_id][region]
1✔
612
        function = state.functions.get(function_name)
1✔
613
        changes = {}
1✔
614
        if description is not None:
1✔
615
            changes["description"] = description
1✔
616
        # TODO copy environment instead of restarting one, get rid of all the "Pending"s
617

618
        with function.lock:
1✔
619
            if function.next_version > 1 and (
1✔
620
                prev_version := function.versions.get(str(function.next_version - 1))
621
            ):
622
                if (
1✔
623
                    prev_version.config.internal_revision
624
                    == current_latest_version.config.internal_revision
625
                ):
626
                    return prev_version, False
1✔
627
            # TODO check if there was a change since last version
628
            if publish_to == FunctionVersionLatestPublished.LATEST_PUBLISHED:
1✔
629
                qualifier = "$LATEST.PUBLISHED"
×
630
            else:
631
                qualifier = str(function.next_version)
1✔
632
                function.next_version += 1
1✔
633
            new_id = VersionIdentifier(
1✔
634
                function_name=function_name,
635
                qualifier=qualifier,
636
                region=region,
637
                account=account_id,
638
            )
639

640
            if current_latest_version.config.capacity_provider_config:
1✔
641
                # for lambda managed functions, snap start is not supported
642
                snap_start = None
×
643
            else:
644
                apply_on = current_latest_version.config.snap_start["ApplyOn"]
1✔
645
                optimization_status = SnapStartOptimizationStatus.Off
1✔
646
                if apply_on == SnapStartApplyOn.PublishedVersions:
1✔
647
                    optimization_status = SnapStartOptimizationStatus.On
×
648
                snap_start = SnapStartResponse(
1✔
649
                    ApplyOn=apply_on,
650
                    OptimizationStatus=optimization_status,
651
                )
652

653
            last_update = None
1✔
654
            new_state = VersionState(
1✔
655
                state=State.Pending,
656
                code=StateReasonCode.Creating,
657
                reason="The function is being created.",
658
            )
659
            if publish_to == FunctionVersionLatestPublished.LATEST_PUBLISHED:
1✔
660
                last_update = UpdateStatus(
×
661
                    status=LastUpdateStatus.InProgress,
662
                    code="Updating",
663
                    reason="The function is being updated.",
664
                )
665
                if is_active:
×
666
                    new_state = VersionState(state=State.Active)
×
667
            new_version = dataclasses.replace(
1✔
668
                current_latest_version,
669
                config=dataclasses.replace(
670
                    current_latest_version.config,
671
                    last_update=last_update,
672
                    state=new_state,
673
                    snap_start=snap_start,
674
                    **changes,
675
                ),
676
                id=new_id,
677
            )
678
            function.versions[qualifier] = new_version
1✔
679
        return new_version, True
1✔
680

681
    def _publish_version_from_existing_version(
1✔
682
        self,
683
        function_name: str,
684
        region: str,
685
        account_id: str,
686
        description: str | None = None,
687
        revision_id: str | None = None,
688
        code_sha256: str | None = None,
689
        publish_to: FunctionVersionLatestPublished | None = None,
690
    ) -> FunctionVersion:
691
        """
692
        Publish version from an existing, already initialized LATEST
693

694
        :param function_name: Function name
695
        :param region: region
696
        :param account_id: account id
697
        :param description: description
698
        :param revision_id: revision id (check if current version matches)
699
        :param code_sha256: code sha (check if current code matches)
700
        :return: new version
701
        """
702
        is_active = True if publish_to == FunctionVersionLatestPublished.LATEST_PUBLISHED else False
1✔
703
        new_version, changed = self._create_version_model(
1✔
704
            function_name=function_name,
705
            region=region,
706
            account_id=account_id,
707
            description=description,
708
            revision_id=revision_id,
709
            code_sha256=code_sha256,
710
            publish_to=publish_to,
711
            is_active=is_active,
712
        )
713
        if not changed:
1✔
714
            return new_version
1✔
715

716
        if new_version.config.capacity_provider_config:
1✔
717
            self.lambda_service.publish_version_async(new_version)
×
718
        else:
719
            self.lambda_service.publish_version(new_version)
1✔
720
        state = lambda_stores[account_id][region]
1✔
721
        function = state.functions.get(function_name)
1✔
722

723
        # Update revision id for $LATEST version
724
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
725
        latest_version = function.versions["$LATEST"]
1✔
726
        function.versions["$LATEST"] = dataclasses.replace(
1✔
727
            latest_version, config=dataclasses.replace(latest_version.config)
728
        )
729
        if new_version.config.capacity_provider_config:
1✔
730
            # publish_version happens async for functions with a capacity provider.
731
            # Therefore, we return the new_version with State=Pending or LastUpdateStatus=InProgress ($LATEST.PUBLISHED)
732
            return new_version
×
733
        else:
734
            # Regular functions yield an Active state modified during `publish_version` (sync).
735
            # Therefore, we need to get the updated version from the store.
736
            updated_version = function.versions.get(new_version.id.qualifier)
1✔
737
            return updated_version
1✔
738

739
    def _publish_version_with_changes(
1✔
740
        self,
741
        function_name: str,
742
        region: str,
743
        account_id: str,
744
        description: str | None = None,
745
        revision_id: str | None = None,
746
        code_sha256: str | None = None,
747
        publish_to: FunctionVersionLatestPublished | None = None,
748
        is_active: bool = False,
749
    ) -> FunctionVersion:
750
        """
751
        Publish version together with a new latest version (publish on create / update)
752

753
        :param function_name: Function name
754
        :param region: region
755
        :param account_id: account id
756
        :param description: description
757
        :param revision_id: revision id (check if current version matches)
758
        :param code_sha256: code sha (check if current code matches)
759
        :return: new version
760
        """
761
        new_version, changed = self._create_version_model(
1✔
762
            function_name=function_name,
763
            region=region,
764
            account_id=account_id,
765
            description=description,
766
            revision_id=revision_id,
767
            code_sha256=code_sha256,
768
            publish_to=publish_to,
769
            is_active=is_active,
770
        )
771
        if not changed:
1✔
772
            return new_version
×
773
        self.lambda_service.create_function_version(new_version)
1✔
774
        return new_version
1✔
775

776
    @staticmethod
1✔
777
    def _verify_env_variables(env_vars: dict[str, str]):
1✔
778
        dumped_env_vars = json.dumps(env_vars, separators=(",", ":"))
1✔
779
        if (
1✔
780
            len(dumped_env_vars.encode("utf-8"))
781
            > config.LAMBDA_LIMITS_MAX_FUNCTION_ENVVAR_SIZE_BYTES
782
        ):
783
            raise InvalidParameterValueException(
1✔
784
                f"Lambda was unable to configure your environment variables because the environment variables you have provided exceeded the 4KB limit. String measured: {dumped_env_vars}",
785
                Type="User",
786
            )
787

788
    @staticmethod
1✔
789
    def _validate_snapstart(snap_start: SnapStart, runtime: Runtime):
1✔
790
        apply_on = snap_start.get("ApplyOn")
1✔
791
        if apply_on not in [
1✔
792
            SnapStartApplyOn.PublishedVersions,
793
            SnapStartApplyOn.None_,
794
        ]:
795
            raise ValidationException(
1✔
796
                f"1 validation error detected: Value '{apply_on}' at 'snapStart.applyOn' failed to satisfy constraint: Member must satisfy enum value set: [PublishedVersions, None]"
797
            )
798

799
        if runtime not in SNAP_START_SUPPORTED_RUNTIMES:
1✔
800
            raise InvalidParameterValueException(
×
801
                f"{runtime} is not supported for SnapStart enabled functions.", Type="User"
802
            )
803

804
    def _validate_layers(self, new_layers: list[str], region: str, account_id: str):
1✔
805
        if len(new_layers) > LAMBDA_LAYERS_LIMIT_PER_FUNCTION:
1✔
806
            raise InvalidParameterValueException(
1✔
807
                "Cannot reference more than 5 layers.", Type="User"
808
            )
809

810
        visited_layers = {}
1✔
811
        for layer_version_arn in new_layers:
1✔
812
            (
1✔
813
                layer_region,
814
                layer_account_id,
815
                layer_name,
816
                layer_version_str,
817
            ) = api_utils.parse_layer_arn(layer_version_arn)
818
            if layer_version_str is None:
1✔
819
                raise ValidationException(
1✔
820
                    f"1 validation error detected: Value '[{layer_version_arn}]'"
821
                    + " at 'layers' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 2048, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: "
822
                    + "(arn:(aws[a-zA-Z-]*)?:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+), Member must not be null]",
823
                )
824

825
            state = lambda_stores[layer_account_id][layer_region]
1✔
826
            layer = state.layers.get(layer_name)
1✔
827
            layer_version = None
1✔
828
            if layer is not None:
1✔
829
                layer_version = layer.layer_versions.get(layer_version_str)
1✔
830
            if layer_account_id == account_id:
1✔
831
                if region and layer_region != region:
1✔
832
                    raise InvalidParameterValueException(
1✔
833
                        f"Layers are not in the same region as the function. "
834
                        f"Layers are expected to be in region {region}.",
835
                        Type="User",
836
                    )
837
                if layer is None or layer.layer_versions.get(layer_version_str) is None:
1✔
838
                    raise InvalidParameterValueException(
1✔
839
                        f"Layer version {layer_version_arn} does not exist.", Type="User"
840
                    )
841
            else:  # External layer from other account
842
                # TODO: validate IAM layer policy here, allowing access by default for now and only checking region
843
                if region and layer_region != region:
×
844
                    # TODO: detect user or role from context when IAM users are implemented
845
                    user = "user/localstack-testing"
×
846
                    raise AccessDeniedException(
×
847
                        f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
848
                    )
849
                if layer is None or layer_version is None:
×
850
                    # Limitation: cannot fetch external layers when using the same account id as the target layer
851
                    # because we do not want to trigger the layer fetcher for every non-existing layer.
852
                    if self.layer_fetcher is None:
×
853
                        raise NotImplementedError(
854
                            "Fetching shared layers from AWS is a pro feature."
855
                        )
856

857
                    layer = self.layer_fetcher.fetch_layer(layer_version_arn)
×
858
                    if layer is None:
×
859
                        # TODO: detect user or role from context when IAM users are implemented
860
                        user = "user/localstack-testing"
×
861
                        raise AccessDeniedException(
×
862
                            f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
863
                        )
864

865
                    # Distinguish between new layer and new layer version
866
                    if layer_version is None:
×
867
                        # Create whole layer from scratch
868
                        state.layers[layer_name] = layer
×
869
                    else:
870
                        # Create layer version if another version of the same layer already exists
871
                        state.layers[layer_name].layer_versions[layer_version_str] = (
×
872
                            layer.layer_versions.get(layer_version_str)
873
                        )
874

875
            # only the first two matches in the array are considered for the error message
876
            layer_arn = ":".join(layer_version_arn.split(":")[:-1])
1✔
877
            if layer_arn in visited_layers:
1✔
878
                conflict_layer_version_arn = visited_layers[layer_arn]
1✔
879
                raise InvalidParameterValueException(
1✔
880
                    f"Two different versions of the same layer are not allowed to be referenced in the same function. {conflict_layer_version_arn} and {layer_version_arn} are versions of the same layer.",
881
                    Type="User",
882
                )
883
            visited_layers[layer_arn] = layer_version_arn
1✔
884

885
    def _validate_capacity_provider_config(
1✔
886
        self, capacity_provider_config: CapacityProviderConfig, context: RequestContext
887
    ):
888
        if not capacity_provider_config.get("LambdaManagedInstancesCapacityProviderConfig"):
×
889
            raise ValidationException(
×
890
                "1 validation error detected: Value null at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig' failed to satisfy constraint: Member must not be null"
891
            )
892

893
        capacity_provider_arn = capacity_provider_config.get(
×
894
            "LambdaManagedInstancesCapacityProviderConfig", {}
895
        ).get("CapacityProviderArn")
896
        if not capacity_provider_arn:
×
897
            raise ValidationException(
×
898
                "1 validation error detected: Value null at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig.capacityProviderArn' failed to satisfy constraint: Member must not be null"
899
            )
900

901
        if not re.match(CAPACITY_PROVIDER_ARN_NAME, capacity_provider_arn):
×
902
            raise ValidationException(
×
903
                f"1 validation error detected: Value '{capacity_provider_arn}' at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig.capacityProviderArn' failed to satisfy constraint: Member must satisfy regular expression pattern: {CAPACITY_PROVIDER_ARN_NAME}"
904
            )
905

906
        capacity_provider_name = capacity_provider_arn.split(":")[-1]
×
907
        self.get_capacity_provider(context, capacity_provider_name)
×
908

909
    @staticmethod
1✔
910
    def map_layers(new_layers: list[str]) -> list[LayerVersion]:
1✔
911
        layers = []
1✔
912
        for layer_version_arn in new_layers:
1✔
913
            region_name, account_id, layer_name, layer_version = api_utils.parse_layer_arn(
1✔
914
                layer_version_arn
915
            )
916
            layer = lambda_stores[account_id][region_name].layers.get(layer_name)
1✔
917
            layer_version = layer.layer_versions.get(layer_version)
1✔
918
            layers.append(layer_version)
1✔
919
        return layers
1✔
920

921
    def get_function_recursion_config(
1✔
922
        self,
923
        context: RequestContext,
924
        function_name: UnqualifiedFunctionName,
925
        **kwargs,
926
    ) -> GetFunctionRecursionConfigResponse:
927
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
928
        function_name = api_utils.get_function_name(function_name, context)
1✔
929
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
930
        return GetFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
931

932
    def put_function_recursion_config(
1✔
933
        self,
934
        context: RequestContext,
935
        function_name: UnqualifiedFunctionName,
936
        recursive_loop: RecursiveLoop,
937
        **kwargs,
938
    ) -> PutFunctionRecursionConfigResponse:
939
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
940
        function_name = api_utils.get_function_name(function_name, context)
1✔
941

942
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
943

944
        allowed_values = list(RecursiveLoop.__members__.values())
1✔
945
        if recursive_loop not in allowed_values:
1✔
946
            raise ValidationException(
1✔
947
                f"1 validation error detected: Value '{recursive_loop}' at 'recursiveLoop' failed to satisfy constraint: "
948
                f"Member must satisfy enum value set: [Terminate, Allow]"
949
            )
950

951
        fn.recursive_loop = recursive_loop
1✔
952
        return PutFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
953

954
    @handler(operation="CreateFunction", expand=False)
1✔
955
    def create_function(
1✔
956
        self,
957
        context: RequestContext,
958
        request: CreateFunctionRequest,
959
    ) -> FunctionConfiguration:
960
        context_region = context.region
1✔
961
        context_account_id = context.account_id
1✔
962

963
        zip_file = request.get("Code", {}).get("ZipFile")
1✔
964
        if zip_file and len(zip_file) > config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED:
1✔
965
            raise RequestEntityTooLargeException(
1✔
966
                f"Zipped size must be smaller than {config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED} bytes"
967
            )
968

969
        if context.request.content_length > config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE:
1✔
970
            raise RequestEntityTooLargeException(
1✔
971
                f"Request must be smaller than {config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE} bytes for the CreateFunction operation"
972
            )
973

974
        if architectures := request.get("Architectures"):
1✔
975
            if len(architectures) != 1:
1✔
976
                raise ValidationException(
1✔
977
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
978
                    f"satisfy constraint: Member must have length less than or equal to 1",
979
                )
980
            if architectures[0] not in ARCHITECTURES:
1✔
981
                raise ValidationException(
1✔
982
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
983
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
984
                    f"[x86_64, arm64], Member must not be null]",
985
                )
986

987
        if env_vars := request.get("Environment", {}).get("Variables"):
1✔
988
            self._verify_env_variables(env_vars)
1✔
989

990
        if layers := request.get("Layers", []):
1✔
991
            self._validate_layers(layers, region=context_region, account_id=context_account_id)
1✔
992

993
        if not api_utils.is_role_arn(request.get("Role")):
1✔
994
            raise ValidationException(
1✔
995
                f"1 validation error detected: Value '{request.get('Role')}'"
996
                + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
997
            )
998
        if not self.lambda_service.can_assume_role(request.get("Role"), context.region):
1✔
999
            raise InvalidParameterValueException(
×
1000
                "The role defined for the function cannot be assumed by Lambda.", Type="User"
1001
            )
1002
        package_type = request.get("PackageType", PackageType.Zip)
1✔
1003
        runtime = request.get("Runtime")
1✔
1004
        self._validate_runtime(package_type, runtime)
1✔
1005

1006
        request_function_name = request.get("FunctionName")
1✔
1007

1008
        function_name, *_ = api_utils.get_name_and_qualifier(
1✔
1009
            function_arn_or_name=request_function_name,
1010
            qualifier=None,
1011
            context=context,
1012
        )
1013

1014
        if runtime in DEPRECATED_RUNTIMES:
1✔
1015
            LOG.warning(
1✔
1016
                "The Lambda runtime %s} is deprecated. "
1017
                "Please upgrade the runtime for the function %s: "
1018
                "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
1019
                runtime,
1020
                function_name,
1021
            )
1022
        if snap_start := request.get("SnapStart"):
1✔
1023
            self._validate_snapstart(snap_start, runtime)
1✔
1024
        if publish_to := request.get("PublishTo"):
1✔
1025
            self._validate_publish_to(publish_to)
×
1026
        state = lambda_stores[context_account_id][context_region]
1✔
1027

1028
        with self.create_fn_lock:
1✔
1029
            if function_name in state.functions:
1✔
1030
                raise ResourceConflictException(f"Function already exist: {function_name}")
×
1031
            fn = Function(function_name=function_name)
1✔
1032
            arn = VersionIdentifier(
1✔
1033
                function_name=function_name,
1034
                qualifier="$LATEST",
1035
                region=context_region,
1036
                account=context_account_id,
1037
            )
1038
            # save function code to s3
1039
            code = None
1✔
1040
            image = None
1✔
1041
            image_config = None
1✔
1042
            runtime_version_config = RuntimeVersionConfig(
1✔
1043
                # Limitation: the runtime id (presumably sha256 of image) is currently hardcoded
1044
                # Potential implementation: provide (cached) sha256 hash of used Docker image
1045
                RuntimeVersionArn=f"arn:{context.partition}:lambda:{context_region}::runtime:8eeff65f6809a3ce81507fe733fe09b835899b99481ba22fd75b5a7338290ec1"
1046
            )
1047
            request_code = request.get("Code")
1✔
1048
            if package_type == PackageType.Zip:
1✔
1049
                # TODO verify if correct combination of code is set
1050
                if zip_file := request_code.get("ZipFile"):
1✔
1051
                    code = store_lambda_archive(
1✔
1052
                        archive_file=zip_file,
1053
                        function_name=function_name,
1054
                        region_name=context_region,
1055
                        account_id=context_account_id,
1056
                    )
1057
                elif s3_bucket := request_code.get("S3Bucket"):
1✔
1058
                    s3_key = request_code["S3Key"]
1✔
1059
                    s3_object_version = request_code.get("S3ObjectVersion")
1✔
1060
                    code = store_s3_bucket_archive(
1✔
1061
                        archive_bucket=s3_bucket,
1062
                        archive_key=s3_key,
1063
                        archive_version=s3_object_version,
1064
                        function_name=function_name,
1065
                        region_name=context_region,
1066
                        account_id=context_account_id,
1067
                    )
1068
                else:
1069
                    raise LambdaServiceException("A ZIP file or S3 bucket is required")
×
1070
            elif package_type == PackageType.Image:
1✔
1071
                image = request_code.get("ImageUri")
1✔
1072
                if not image:
1✔
1073
                    raise LambdaServiceException(
×
1074
                        "An image is required when the package type is set to 'image'"
1075
                    )
1076
                image = create_image_code(image_uri=image)
1✔
1077

1078
                image_config_req = request.get("ImageConfig", {})
1✔
1079
                image_config = ImageConfig(
1✔
1080
                    command=image_config_req.get("Command"),
1081
                    entrypoint=image_config_req.get("EntryPoint"),
1082
                    working_directory=image_config_req.get("WorkingDirectory"),
1083
                )
1084
                # Runtime management controls are not available when providing a custom image
1085
                runtime_version_config = None
1✔
1086

1087
            capacity_provider_config = None
1✔
1088
            memory_size = request.get("MemorySize", LAMBDA_DEFAULT_MEMORY_SIZE)
1✔
1089
            if "CapacityProviderConfig" in request:
1✔
1090
                capacity_provider_config = request["CapacityProviderConfig"]
×
1091
                self._validate_capacity_provider_config(capacity_provider_config, context)
×
1092
                self._validate_managed_instances_runtime(runtime)
×
1093

1094
                default_config = CapacityProviderConfig(
×
1095
                    LambdaManagedInstancesCapacityProviderConfig=LambdaManagedInstancesCapacityProviderConfig(
1096
                        ExecutionEnvironmentMemoryGiBPerVCpu=2.0,
1097
                        PerExecutionEnvironmentMaxConcurrency=16,
1098
                    )
1099
                )
1100
                capacity_provider_config = merge_recursive(default_config, capacity_provider_config)
×
1101
                memory_size = 2048
×
1102
                if request.get("LoggingConfig", {}).get("LogFormat") == LogFormat.Text:
×
1103
                    raise InvalidParameterValueException(
×
1104
                        'LogLevel is not supported when LogFormat is set to "Text". Remove LogLevel from your request or change the LogFormat to "JSON" and try again.',
1105
                        Type="User",
1106
                    )
1107
            if "LoggingConfig" in request:
1✔
1108
                logging_config = request["LoggingConfig"]
1✔
1109
                LOG.warning(
1✔
1110
                    "Advanced Lambda Logging Configuration is currently mocked "
1111
                    "and will not impact the logging behavior. "
1112
                    "Please create a feature request if needed."
1113
                )
1114

1115
                # when switching to JSON, app and system level log is auto set to INFO
1116
                if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
1117
                    logging_config = {
1✔
1118
                        "ApplicationLogLevel": "INFO",
1119
                        "SystemLogLevel": "INFO",
1120
                        "LogGroup": f"/aws/lambda/{function_name}",
1121
                    } | logging_config
1122
                else:
1123
                    logging_config = (
×
1124
                        LoggingConfig(
1125
                            LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
1126
                        )
1127
                        | logging_config
1128
                    )
1129

1130
            elif capacity_provider_config:
1✔
1131
                logging_config = LoggingConfig(
×
1132
                    LogFormat=LogFormat.JSON,
1133
                    LogGroup=f"/aws/lambda/{function_name}",
1134
                    ApplicationLogLevel="INFO",
1135
                    SystemLogLevel="INFO",
1136
                )
1137
            else:
1138
                logging_config = LoggingConfig(
1✔
1139
                    LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
1140
                )
1141
            snap_start = (
1✔
1142
                None
1143
                if capacity_provider_config
1144
                else SnapStartResponse(
1145
                    ApplyOn=request.get("SnapStart", {}).get("ApplyOn", SnapStartApplyOn.None_),
1146
                    OptimizationStatus=SnapStartOptimizationStatus.Off,
1147
                )
1148
            )
1149
            version = FunctionVersion(
1✔
1150
                id=arn,
1151
                config=VersionFunctionConfiguration(
1152
                    last_modified=api_utils.format_lambda_date(datetime.datetime.now()),
1153
                    description=request.get("Description", ""),
1154
                    role=request["Role"],
1155
                    timeout=request.get("Timeout", LAMBDA_DEFAULT_TIMEOUT),
1156
                    runtime=request.get("Runtime"),
1157
                    memory_size=memory_size,
1158
                    handler=request.get("Handler"),
1159
                    package_type=package_type,
1160
                    environment=env_vars,
1161
                    architectures=request.get("Architectures") or [Architecture.x86_64],
1162
                    tracing_config_mode=request.get("TracingConfig", {}).get(
1163
                        "Mode", TracingMode.PassThrough
1164
                    ),
1165
                    image=image,
1166
                    image_config=image_config,
1167
                    code=code,
1168
                    layers=self.map_layers(layers),
1169
                    internal_revision=short_uid(),
1170
                    ephemeral_storage=LambdaEphemeralStorage(
1171
                        size=request.get("EphemeralStorage", {}).get("Size", 512)
1172
                    ),
1173
                    snap_start=snap_start,
1174
                    runtime_version_config=runtime_version_config,
1175
                    dead_letter_arn=request.get("DeadLetterConfig", {}).get("TargetArn"),
1176
                    vpc_config=self._build_vpc_config(
1177
                        context_account_id, context_region, request.get("VpcConfig")
1178
                    ),
1179
                    state=VersionState(
1180
                        state=State.Pending,
1181
                        code=StateReasonCode.Creating,
1182
                        reason="The function is being created.",
1183
                    ),
1184
                    logging_config=logging_config,
1185
                    # TODO: might need something like **optional_kwargs if None
1186
                    #   -> Test with regular GetFunction (i.e., without a capacity provider)
1187
                    capacity_provider_config=capacity_provider_config,
1188
                ),
1189
            )
1190
            version_post_response = None
1✔
1191
            if capacity_provider_config:
1✔
1192
                version_post_response = dataclasses.replace(
×
1193
                    version,
1194
                    config=dataclasses.replace(
1195
                        version.config,
1196
                        last_update=UpdateStatus(status=LastUpdateStatus.Successful),
1197
                        state=VersionState(state=State.ActiveNonInvocable),
1198
                    ),
1199
                )
1200
            fn.versions["$LATEST"] = version_post_response or version
1✔
1201
            state.functions[function_name] = fn
1✔
1202
        initialization_type = (
1✔
1203
            FunctionInitializationType.lambda_managed_instances
1204
            if capacity_provider_config
1205
            else FunctionInitializationType.on_demand
1206
        )
1207
        function_counter.labels(
1✔
1208
            operation=FunctionOperation.create,
1209
            runtime=runtime or "n/a",
1210
            status=FunctionStatus.success,
1211
            invocation_type="n/a",
1212
            package_type=package_type,
1213
            initialization_type=initialization_type,
1214
        )
1215
        # TODO: consider potential other side effects of not having a function version for $LATEST
1216
        # Provisioning happens upon publishing for functions using a capacity provider
1217
        if not capacity_provider_config:
1✔
1218
            self.lambda_service.create_function_version(version)
1✔
1219

1220
        if tags := request.get("Tags"):
1✔
1221
            # This will check whether the function exists.
1222
            self._store_tags(arn.unqualified_arn(), tags)
1✔
1223

1224
        if request.get("Publish"):
1✔
1225
            version = self._publish_version_with_changes(
1✔
1226
                function_name=function_name,
1227
                region=context_region,
1228
                account_id=context_account_id,
1229
                publish_to=request.get("PublishTo"),
1230
            )
1231

1232
        if config.LAMBDA_SYNCHRONOUS_CREATE:
1✔
1233
            # block via retrying until "terminal" condition reached before returning
1234
            if not poll_condition(
×
1235
                lambda: (
1236
                    get_function_version(
1237
                        function_name, version.id.qualifier, version.id.account, version.id.region
1238
                    ).config.state.state
1239
                    in [State.Active, State.ActiveNonInvocable, State.Failed]
1240
                ),
1241
                timeout=10,
1242
            ):
1243
                LOG.warning(
×
1244
                    "LAMBDA_SYNCHRONOUS_CREATE is active, but waiting for %s reached timeout.",
1245
                    function_name,
1246
                )
1247

1248
        return api_utils.map_config_out(
1✔
1249
            version, return_qualified_arn=False, return_update_status=False
1250
        )
1251

1252
    def _validate_runtime(self, package_type, runtime):
1✔
1253
        runtimes = ALL_RUNTIMES
1✔
1254
        if config.LAMBDA_RUNTIME_VALIDATION:
1✔
1255
            runtimes = list(itertools.chain(RUNTIMES_AGGREGATED.values()))
1✔
1256

1257
        if package_type == PackageType.Zip and runtime not in runtimes:
1✔
1258
            # deprecated runtimes have different error
1259
            if runtime in DEPRECATED_RUNTIMES:
1✔
1260
                HINT_LOG.info(
1✔
1261
                    "Set env variable LAMBDA_RUNTIME_VALIDATION to 0"
1262
                    " in order to allow usage of deprecated runtimes"
1263
                )
1264
                self._check_for_recomended_migration_target(runtime)
1✔
1265

1266
            raise InvalidParameterValueException(
1✔
1267
                f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1268
                Type="User",
1269
            )
1270

1271
    def _validate_managed_instances_runtime(self, runtime):
1✔
1272
        if runtime not in VALID_MANAGED_INSTANCE_RUNTIMES:
×
1273
            raise InvalidParameterValueException(
×
1274
                f"Runtime Enum {runtime} does not support specified feature: Lambda Managed Instances"
1275
            )
1276

1277
    def _check_for_recomended_migration_target(self, deprecated_runtime):
1✔
1278
        # AWS offers recommended runtime for migration for "newly" deprecated runtimes
1279
        # in order to preserve parity with error messages we need the code bellow
1280
        latest_runtime = DEPRECATED_RUNTIMES_UPGRADES.get(deprecated_runtime)
1✔
1281

1282
        if latest_runtime is not None:
1✔
1283
            LOG.debug(
1✔
1284
                "The Lambda runtime %s is deprecated. Please upgrade to a supported Lambda runtime such as %s.",
1285
                deprecated_runtime,
1286
                latest_runtime,
1287
            )
1288
            raise InvalidParameterValueException(
1✔
1289
                f"The runtime parameter of {deprecated_runtime} is no longer supported for creating or updating AWS Lambda functions. We recommend you use a supported runtime while creating or updating functions.",
1290
                Type="User",
1291
            )
1292

1293
    @handler(operation="UpdateFunctionConfiguration", expand=False)
1✔
1294
    def update_function_configuration(
1✔
1295
        self, context: RequestContext, request: UpdateFunctionConfigurationRequest
1296
    ) -> FunctionConfiguration:
1297
        """updates the $LATEST version of the function"""
1298
        function_name = request.get("FunctionName")
1✔
1299

1300
        # in case we got ARN or partial ARN
1301
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1302
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1303
        state = lambda_stores[account_id][region]
1✔
1304

1305
        if function_name not in state.functions:
1✔
1306
            raise ResourceNotFoundException(
×
1307
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1308
                Type="User",
1309
            )
1310
        function = state.functions[function_name]
1✔
1311

1312
        # TODO: lock modification of latest version
1313
        # TODO: notify service for changes relevant to re-provisioning of $LATEST
1314
        latest_version = function.latest()
1✔
1315
        latest_version_config = latest_version.config
1✔
1316

1317
        revision_id = request.get("RevisionId")
1✔
1318
        if revision_id and revision_id != latest_version.config.revision_id:
1✔
1319
            raise PreconditionFailedException(
1✔
1320
                "The Revision Id provided does not match the latest Revision Id. "
1321
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1322
                Type="User",
1323
            )
1324

1325
        replace_kwargs = {}
1✔
1326
        if "EphemeralStorage" in request:
1✔
1327
            replace_kwargs["ephemeral_storage"] = LambdaEphemeralStorage(
×
1328
                request.get("EphemeralStorage", {}).get("Size", 512)
1329
            )  # TODO: do defaults here apply as well?
1330

1331
        if "Role" in request:
1✔
1332
            if not api_utils.is_role_arn(request["Role"]):
1✔
1333
                raise ValidationException(
1✔
1334
                    f"1 validation error detected: Value '{request.get('Role')}'"
1335
                    + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
1336
                )
1337
            replace_kwargs["role"] = request["Role"]
1✔
1338

1339
        if "Description" in request:
1✔
1340
            replace_kwargs["description"] = request["Description"]
1✔
1341

1342
        if "Timeout" in request:
1✔
1343
            replace_kwargs["timeout"] = request["Timeout"]
1✔
1344

1345
        if "MemorySize" in request:
1✔
1346
            replace_kwargs["memory_size"] = request["MemorySize"]
1✔
1347

1348
        if "DeadLetterConfig" in request:
1✔
1349
            replace_kwargs["dead_letter_arn"] = request.get("DeadLetterConfig", {}).get("TargetArn")
1✔
1350

1351
        if vpc_config := request.get("VpcConfig"):
1✔
1352
            replace_kwargs["vpc_config"] = self._build_vpc_config(account_id, region, vpc_config)
1✔
1353

1354
        if "Handler" in request:
1✔
1355
            replace_kwargs["handler"] = request["Handler"]
1✔
1356

1357
        if "Runtime" in request:
1✔
1358
            runtime = request["Runtime"]
1✔
1359

1360
            if runtime not in ALL_RUNTIMES:
1✔
1361
                raise InvalidParameterValueException(
1✔
1362
                    f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1363
                    Type="User",
1364
                )
1365
            if runtime in DEPRECATED_RUNTIMES:
1✔
1366
                LOG.warning(
×
1367
                    "The Lambda runtime %s is deprecated. "
1368
                    "Please upgrade the runtime for the function %s: "
1369
                    "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
1370
                    runtime,
1371
                    function_name,
1372
                )
1373
            replace_kwargs["runtime"] = request["Runtime"]
1✔
1374

1375
        if snap_start := request.get("SnapStart"):
1✔
1376
            runtime = replace_kwargs.get("runtime") or latest_version_config.runtime
1✔
1377
            self._validate_snapstart(snap_start, runtime)
1✔
1378
            replace_kwargs["snap_start"] = SnapStartResponse(
1✔
1379
                ApplyOn=snap_start.get("ApplyOn", SnapStartApplyOn.None_),
1380
                OptimizationStatus=SnapStartOptimizationStatus.Off,
1381
            )
1382

1383
        if "Environment" in request:
1✔
1384
            if env_vars := request.get("Environment", {}).get("Variables", {}):
1✔
1385
                self._verify_env_variables(env_vars)
1✔
1386
            replace_kwargs["environment"] = env_vars
1✔
1387

1388
        if "Layers" in request:
1✔
1389
            new_layers = request["Layers"]
1✔
1390
            if new_layers:
1✔
1391
                self._validate_layers(new_layers, region=region, account_id=account_id)
1✔
1392
            replace_kwargs["layers"] = self.map_layers(new_layers)
1✔
1393

1394
        if "ImageConfig" in request:
1✔
1395
            new_image_config = request["ImageConfig"]
1✔
1396
            replace_kwargs["image_config"] = ImageConfig(
1✔
1397
                command=new_image_config.get("Command"),
1398
                entrypoint=new_image_config.get("EntryPoint"),
1399
                working_directory=new_image_config.get("WorkingDirectory"),
1400
            )
1401

1402
        if "LoggingConfig" in request:
1✔
1403
            logging_config = request["LoggingConfig"]
1✔
1404
            LOG.warning(
1✔
1405
                "Advanced Lambda Logging Configuration is currently mocked "
1406
                "and will not impact the logging behavior. "
1407
                "Please create a feature request if needed."
1408
            )
1409

1410
            # when switching to JSON, app and system level log is auto set to INFO
1411
            if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
1412
                logging_config = {
1✔
1413
                    "ApplicationLogLevel": "INFO",
1414
                    "SystemLogLevel": "INFO",
1415
                } | logging_config
1416

1417
            last_config = latest_version_config.logging_config
1✔
1418

1419
            # add partial update
1420
            new_logging_config = last_config | logging_config
1✔
1421

1422
            # in case we switched from JSON to Text we need to remove LogLevel keys
1423
            if (
1✔
1424
                new_logging_config.get("LogFormat") == LogFormat.Text
1425
                and last_config.get("LogFormat") == LogFormat.JSON
1426
            ):
1427
                new_logging_config.pop("ApplicationLogLevel", None)
1✔
1428
                new_logging_config.pop("SystemLogLevel", None)
1✔
1429

1430
            replace_kwargs["logging_config"] = new_logging_config
1✔
1431

1432
        if "TracingConfig" in request:
1✔
1433
            new_mode = request.get("TracingConfig", {}).get("Mode")
×
1434
            if new_mode:
×
1435
                replace_kwargs["tracing_config_mode"] = new_mode
×
1436

1437
        if "CapacityProviderConfig" in request:
1✔
1438
            capacity_provider_config = request["CapacityProviderConfig"]
×
1439
            self._validate_capacity_provider_config(capacity_provider_config, context)
×
1440

1441
            if latest_version.config.capacity_provider_config and not request[
×
1442
                "CapacityProviderConfig"
1443
            ].get("LambdaManagedInstancesCapacityProviderConfig"):
1444
                raise ValidationException(
×
1445
                    "1 validation error detected: Value null at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig' failed to satisfy constraint: Member must not be null"
1446
                )
1447
            if not latest_version.config.capacity_provider_config:
×
1448
                raise InvalidParameterValueException(
×
1449
                    "CapacityProviderConfig isn't supported for Lambda Default functions.",
1450
                    Type="User",
1451
                )
1452

1453
            default_config = CapacityProviderConfig(
×
1454
                LambdaManagedInstancesCapacityProviderConfig=LambdaManagedInstancesCapacityProviderConfig(
1455
                    ExecutionEnvironmentMemoryGiBPerVCpu=2.0,
1456
                    PerExecutionEnvironmentMaxConcurrency=16,
1457
                )
1458
            )
1459
            capacity_provider_config = merge_recursive(default_config, capacity_provider_config)
×
1460
            replace_kwargs["capacity_provider_config"] = capacity_provider_config
×
1461
        new_latest_version = dataclasses.replace(
1✔
1462
            latest_version,
1463
            config=dataclasses.replace(
1464
                latest_version_config,
1465
                last_modified=api_utils.generate_lambda_date(),
1466
                internal_revision=short_uid(),
1467
                last_update=UpdateStatus(
1468
                    status=LastUpdateStatus.InProgress,
1469
                    code="Creating",
1470
                    reason="The function is being created.",
1471
                ),
1472
                **replace_kwargs,
1473
            ),
1474
        )
1475
        function.versions["$LATEST"] = new_latest_version  # TODO: notify
1✔
1476
        self.lambda_service.update_version(new_version=new_latest_version)
1✔
1477

1478
        return api_utils.map_config_out(new_latest_version)
1✔
1479

1480
    @handler(operation="UpdateFunctionCode", expand=False)
1✔
1481
    def update_function_code(
1✔
1482
        self, context: RequestContext, request: UpdateFunctionCodeRequest
1483
    ) -> FunctionConfiguration:
1484
        """updates the $LATEST version of the function"""
1485
        # only supports normal zip packaging atm
1486
        # if request.get("Publish"):
1487
        #     self.lambda_service.create_function_version()
1488

1489
        function_name = request.get("FunctionName")
1✔
1490
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1491
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1492

1493
        store = lambda_stores[account_id][region]
1✔
1494
        if function_name not in store.functions:
1✔
1495
            raise ResourceNotFoundException(
×
1496
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1497
                Type="User",
1498
            )
1499
        function = store.functions[function_name]
1✔
1500

1501
        revision_id = request.get("RevisionId")
1✔
1502
        if revision_id and revision_id != function.latest().config.revision_id:
1✔
1503
            raise PreconditionFailedException(
1✔
1504
                "The Revision Id provided does not match the latest Revision Id. "
1505
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1506
                Type="User",
1507
            )
1508

1509
        # TODO verify if correct combination of code is set
1510
        image = None
1✔
1511
        if (
1✔
1512
            request.get("ZipFile") or request.get("S3Bucket")
1513
        ) and function.latest().config.package_type == PackageType.Image:
1514
            raise InvalidParameterValueException(
1✔
1515
                "Please provide ImageUri when updating a function with packageType Image.",
1516
                Type="User",
1517
            )
1518
        elif request.get("ImageUri") and function.latest().config.package_type == PackageType.Zip:
1✔
1519
            raise InvalidParameterValueException(
1✔
1520
                "Please don't provide ImageUri when updating a function with packageType Zip.",
1521
                Type="User",
1522
            )
1523

1524
        if publish_to := request.get("PublishTo"):
1✔
1525
            self._validate_publish_to(publish_to)
×
1526

1527
        if zip_file := request.get("ZipFile"):
1✔
1528
            code = store_lambda_archive(
1✔
1529
                archive_file=zip_file,
1530
                function_name=function_name,
1531
                region_name=region,
1532
                account_id=account_id,
1533
            )
1534
        elif s3_bucket := request.get("S3Bucket"):
1✔
1535
            s3_key = request["S3Key"]
1✔
1536
            s3_object_version = request.get("S3ObjectVersion")
1✔
1537
            code = store_s3_bucket_archive(
1✔
1538
                archive_bucket=s3_bucket,
1539
                archive_key=s3_key,
1540
                archive_version=s3_object_version,
1541
                function_name=function_name,
1542
                region_name=region,
1543
                account_id=account_id,
1544
            )
1545
        elif image := request.get("ImageUri"):
1✔
1546
            code = None
1✔
1547
            image = create_image_code(image_uri=image)
1✔
1548
        else:
1549
            raise LambdaServiceException("A ZIP file, S3 bucket, or image is required")
×
1550

1551
        old_function_version = function.versions.get("$LATEST")
1✔
1552
        replace_kwargs = {"code": code} if code else {"image": image}
1✔
1553

1554
        if architectures := request.get("Architectures"):
1✔
1555
            if len(architectures) != 1:
×
1556
                raise ValidationException(
×
1557
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1558
                    f"satisfy constraint: Member must have length less than or equal to 1",
1559
                )
1560
            # An empty list of architectures is also forbidden. Further exceptions are tested here for create_function:
1561
            # tests.aws.services.lambda_.test_lambda_api.TestLambdaFunction.test_create_lambda_exceptions
1562
            if architectures[0] not in ARCHITECTURES:
×
1563
                raise ValidationException(
×
1564
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1565
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
1566
                    f"[x86_64, arm64], Member must not be null]",
1567
                )
1568
            replace_kwargs["architectures"] = architectures
×
1569

1570
        config = dataclasses.replace(
1✔
1571
            old_function_version.config,
1572
            internal_revision=short_uid(),
1573
            last_modified=api_utils.generate_lambda_date(),
1574
            last_update=UpdateStatus(
1575
                status=LastUpdateStatus.InProgress,
1576
                code="Creating",
1577
                reason="The function is being created.",
1578
            ),
1579
            **replace_kwargs,
1580
        )
1581
        function_version = dataclasses.replace(old_function_version, config=config)
1✔
1582
        function.versions["$LATEST"] = function_version
1✔
1583

1584
        self.lambda_service.update_version(new_version=function_version)
1✔
1585
        if request.get("Publish"):
1✔
1586
            function_version = self._publish_version_with_changes(
1✔
1587
                function_name=function_name,
1588
                region=region,
1589
                account_id=account_id,
1590
                publish_to=publish_to,
1591
                is_active=True,
1592
            )
1593
        return api_utils.map_config_out(
1✔
1594
            function_version, return_qualified_arn=bool(request.get("Publish"))
1595
        )
1596

1597
    # TODO: does deleting the latest published version affect the next versions number?
1598
    # TODO: what happens when we call this with a qualifier and a fully qualified ARN? (+ conflicts?)
1599
    # TODO: test different ARN patterns (shorthand ARN?)
1600
    # TODO: test deleting across regions?
1601
    # TODO: test mismatch between context region and region in ARN
1602
    # TODO: test qualifier $LATEST, alias-name and version
1603
    def delete_function(
1✔
1604
        self,
1605
        context: RequestContext,
1606
        function_name: NamespacedFunctionName,
1607
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1608
        **kwargs,
1609
    ) -> DeleteFunctionResponse:
1610
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1611
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1612
            function_name, qualifier, context
1613
        )
1614

1615
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1616
            raise InvalidParameterValueException(
×
1617
                "Deletion of aliases is not currently supported.",
1618
                Type="User",
1619
            )
1620

1621
        store = lambda_stores[account_id][region]
1✔
1622
        if qualifier == "$LATEST":
1✔
1623
            raise InvalidParameterValueException(
1✔
1624
                "$LATEST version cannot be deleted without deleting the function.", Type="User"
1625
            )
1626

1627
        unqualified_function_arn = api_utils.unqualified_lambda_arn(
1✔
1628
            function_name=function_name, region=region, account=account_id
1629
        )
1630
        if function_name not in store.functions:
1✔
1631
            e = ResourceNotFoundException(
1✔
1632
                f"Function not found: {unqualified_function_arn}",
1633
                Type="User",
1634
            )
1635
            raise e
1✔
1636
        function = store.functions.get(function_name)
1✔
1637

1638
        function_has_capacity_provider = False
1✔
1639
        if qualifier:
1✔
1640
            # delete a version of the function
1641
            version = function.versions.get(qualifier, None)
1✔
1642
            if version:
1✔
1643
                if version.config.capacity_provider_config:
1✔
1644
                    function_has_capacity_provider = True
×
1645
                    # async delete from store
1646
                    self.lambda_service.delete_function_version_async(function, version, qualifier)
×
1647
                else:
1648
                    function.versions.pop(qualifier, None)
1✔
1649
                self.lambda_service.stop_version(version.id.qualified_arn())
1✔
1650
                destroy_code_if_not_used(code=version.config.code, function=function)
1✔
1651
        else:
1652
            # delete the whole function
1653
            self._remove_all_tags(unqualified_function_arn)
1✔
1654
            # TODO: introduce locking for safe deletion: We could create a new version at the API layer before
1655
            #  the old version gets cleaned up in the internal lambda service.
1656
            function = store.functions.get(function_name)
1✔
1657
            if function.latest().config.capacity_provider_config:
1✔
1658
                function_has_capacity_provider = True
×
1659
                # async delete version from store
1660
                self.lambda_service.delete_function_async(store, function_name)
×
1661

1662
            for version in function.versions.values():
1✔
1663
                # Functions with a capacity provider do NOT have a version manager for $LATEST because only
1664
                # published versions are invokable.
1665
                if not function_has_capacity_provider or (
1✔
1666
                    function_has_capacity_provider and version.id.qualifier != "$LATEST"
1667
                ):
1668
                    self.lambda_service.stop_version(qualified_arn=version.id.qualified_arn())
1✔
1669
                # we can safely destroy the code here
1670
                if version.config.code:
1✔
1671
                    version.config.code.destroy()
1✔
1672
            if not function_has_capacity_provider:
1✔
1673
                store.functions.pop(function_name, None)
1✔
1674

1675
        return DeleteFunctionResponse(StatusCode=202 if function_has_capacity_provider else 204)
1✔
1676

1677
    def list_functions(
1✔
1678
        self,
1679
        context: RequestContext,
1680
        master_region: MasterRegion = None,  # (only relevant for lambda@edge)
1681
        function_version: FunctionVersionApi = None,
1682
        marker: String = None,
1683
        max_items: MaxListItems = None,
1684
        **kwargs,
1685
    ) -> ListFunctionsResponse:
1686
        state = lambda_stores[context.account_id][context.region]
1✔
1687

1688
        if function_version and function_version != FunctionVersionApi.ALL:
1✔
1689
            raise ValidationException(
1✔
1690
                f"1 validation error detected: Value '{function_version}'"
1691
                + " at 'functionVersion' failed to satisfy constraint: Member must satisfy enum value set: [ALL]"
1692
            )
1693

1694
        if function_version == FunctionVersionApi.ALL:
1✔
1695
            # include all versions for all function
1696
            versions = [v for f in state.functions.values() for v in f.versions.values()]
1✔
1697
            return_qualified_arn = True
1✔
1698
        else:
1699
            versions = [f.latest() for f in state.functions.values()]
1✔
1700
            return_qualified_arn = False
1✔
1701

1702
        versions = [
1✔
1703
            api_utils.map_to_list_response(
1704
                api_utils.map_config_out(fc, return_qualified_arn=return_qualified_arn)
1705
            )
1706
            for fc in versions
1707
        ]
1708
        versions = PaginatedList(versions)
1✔
1709
        page, token = versions.get_page(
1✔
1710
            lambda version: version["FunctionArn"],
1711
            marker,
1712
            max_items,
1713
        )
1714
        return ListFunctionsResponse(Functions=page, NextMarker=token)
1✔
1715

1716
    def get_function(
1✔
1717
        self,
1718
        context: RequestContext,
1719
        function_name: NamespacedFunctionName,
1720
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1721
        **kwargs,
1722
    ) -> GetFunctionResponse:
1723
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1724
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1725
            function_name, qualifier, context
1726
        )
1727

1728
        fn = lambda_stores[account_id][region].functions.get(function_name)
1✔
1729
        if fn is None:
1✔
1730
            if qualifier is None:
1✔
1731
                raise ResourceNotFoundException(
1✔
1732
                    f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
1733
                    Type="User",
1734
                )
1735
            else:
1736
                raise ResourceNotFoundException(
1✔
1737
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
1738
                    Type="User",
1739
                )
1740
        alias_name = None
1✔
1741
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1742
            if qualifier not in fn.aliases:
1✔
1743
                alias_arn = api_utils.qualified_lambda_arn(
1✔
1744
                    function_name, qualifier, account_id, region
1745
                )
1746
                raise ResourceNotFoundException(f"Function not found: {alias_arn}", Type="User")
1✔
1747
            alias_name = qualifier
1✔
1748
            qualifier = fn.aliases[alias_name].function_version
1✔
1749

1750
        version = get_function_version(
1✔
1751
            function_name=function_name,
1752
            qualifier=qualifier,
1753
            account_id=account_id,
1754
            region=region,
1755
        )
1756
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
1757
        additional_fields = {}
1✔
1758
        if tags:
1✔
1759
            additional_fields["Tags"] = tags
1✔
1760
        code_location = None
1✔
1761
        if code := version.config.code:
1✔
1762
            code_location = FunctionCodeLocation(
1✔
1763
                Location=code.generate_presigned_url(endpoint_url=config.external_service_url()),
1764
                RepositoryType="S3",
1765
            )
1766
        elif image := version.config.image:
1✔
1767
            code_location = FunctionCodeLocation(
1✔
1768
                ImageUri=image.image_uri,
1769
                RepositoryType=image.repository_type,
1770
                ResolvedImageUri=image.resolved_image_uri,
1771
            )
1772
        concurrency = None
1✔
1773
        if fn.reserved_concurrent_executions:
1✔
1774
            concurrency = Concurrency(
1✔
1775
                ReservedConcurrentExecutions=fn.reserved_concurrent_executions
1776
            )
1777

1778
        return GetFunctionResponse(
1✔
1779
            Configuration=api_utils.map_config_out(
1780
                version, return_qualified_arn=bool(qualifier), alias_name=alias_name
1781
            ),
1782
            Code=code_location,  # TODO
1783
            Concurrency=concurrency,
1784
            **additional_fields,
1785
        )
1786

1787
    def get_function_configuration(
1✔
1788
        self,
1789
        context: RequestContext,
1790
        function_name: NamespacedFunctionName,
1791
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1792
        **kwargs,
1793
    ) -> FunctionConfiguration:
1794
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1795
        # CAVE: THIS RETURN VALUE IS *NOT* THE SAME AS IN get_function (!) but seems to be only configuration part?
1796
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1797
            function_name, qualifier, context
1798
        )
1799
        version = get_function_version(
1✔
1800
            function_name=function_name,
1801
            qualifier=qualifier,
1802
            account_id=account_id,
1803
            region=region,
1804
        )
1805
        return api_utils.map_config_out(version, return_qualified_arn=bool(qualifier))
1✔
1806

1807
    def invoke(
1✔
1808
        self,
1809
        context: RequestContext,
1810
        function_name: NamespacedFunctionName,
1811
        invocation_type: InvocationType | None = None,
1812
        log_type: LogType | None = None,
1813
        client_context: String | None = None,
1814
        durable_execution_name: DurableExecutionName | None = None,
1815
        payload: IO[Blob] | None = None,
1816
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1817
        tenant_id: TenantId | None = None,
1818
        **kwargs,
1819
    ) -> InvocationResponse:
1820
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1821
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1822
            function_name, qualifier, context
1823
        )
1824

1825
        user_agent = context.request.user_agent.string
1✔
1826

1827
        time_before = time.perf_counter()
1✔
1828
        try:
1✔
1829
            invocation_result = self.lambda_service.invoke(
1✔
1830
                function_name=function_name,
1831
                qualifier=qualifier,
1832
                region=region,
1833
                account_id=account_id,
1834
                invocation_type=invocation_type,
1835
                client_context=client_context,
1836
                request_id=context.request_id,
1837
                trace_context=context.trace_context,
1838
                payload=payload.read() if payload else None,
1839
                user_agent=user_agent,
1840
            )
1841
        except ServiceException:
1✔
1842
            raise
1✔
1843
        except EnvironmentStartupTimeoutException as e:
1✔
1844
            raise LambdaServiceException(
1✔
1845
                f"[{context.request_id}] Timeout while starting up lambda environment for function {function_name}:{qualifier}"
1846
            ) from e
1847
        except Exception as e:
1✔
1848
            LOG.error(
1✔
1849
                "[%s] Error while invoking lambda %s",
1850
                context.request_id,
1851
                function_name,
1852
                exc_info=LOG.isEnabledFor(logging.DEBUG),
1853
            )
1854
            raise LambdaServiceException(
1✔
1855
                f"[{context.request_id}] Internal error while executing lambda {function_name}:{qualifier}. Caused by {type(e).__name__}: {e}"
1856
            ) from e
1857

1858
        if invocation_type == InvocationType.Event:
1✔
1859
            # This happens when invocation type is event
1860
            return InvocationResponse(StatusCode=202)
1✔
1861
        if invocation_type == InvocationType.DryRun:
1✔
1862
            # This happens when invocation type is dryrun
1863
            return InvocationResponse(StatusCode=204)
1✔
1864
        LOG.debug("Lambda invocation duration: %0.2fms", (time.perf_counter() - time_before) * 1000)
1✔
1865

1866
        response = InvocationResponse(
1✔
1867
            StatusCode=200,
1868
            Payload=invocation_result.payload,
1869
            ExecutedVersion=invocation_result.executed_version,
1870
        )
1871

1872
        if invocation_result.is_error:
1✔
1873
            response["FunctionError"] = "Unhandled"
1✔
1874

1875
        if log_type == LogType.Tail:
1✔
1876
            response["LogResult"] = to_str(
1✔
1877
                base64.b64encode(to_bytes(invocation_result.logs)[-4096:])
1878
            )
1879

1880
        return response
1✔
1881

1882
    # Version operations
1883
    def publish_version(
1✔
1884
        self,
1885
        context: RequestContext,
1886
        function_name: FunctionName,
1887
        code_sha256: String | None = None,
1888
        description: Description | None = None,
1889
        revision_id: String | None = None,
1890
        publish_to: FunctionVersionLatestPublished | None = None,
1891
        **kwargs,
1892
    ) -> FunctionConfiguration:
1893
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1894
        function_name = api_utils.get_function_name(function_name, context)
1✔
1895
        if publish_to:
1✔
1896
            self._validate_publish_to(publish_to)
×
1897
        new_version = self._publish_version_from_existing_version(
1✔
1898
            function_name=function_name,
1899
            description=description,
1900
            account_id=account_id,
1901
            region=region,
1902
            revision_id=revision_id,
1903
            code_sha256=code_sha256,
1904
            publish_to=publish_to,
1905
        )
1906
        return api_utils.map_config_out(new_version, return_qualified_arn=True)
1✔
1907

1908
    def list_versions_by_function(
1✔
1909
        self,
1910
        context: RequestContext,
1911
        function_name: NamespacedFunctionName,
1912
        marker: String = None,
1913
        max_items: MaxListItems = None,
1914
        **kwargs,
1915
    ) -> ListVersionsByFunctionResponse:
1916
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1917
        function_name = api_utils.get_function_name(function_name, context)
1✔
1918
        function = self._get_function(
1✔
1919
            function_name=function_name, region=region, account_id=account_id
1920
        )
1921
        versions = [
1✔
1922
            api_utils.map_to_list_response(
1923
                api_utils.map_config_out(version=version, return_qualified_arn=True)
1924
            )
1925
            for version in function.versions.values()
1926
        ]
1927
        items = PaginatedList(versions)
1✔
1928
        page, token = items.get_page(
1✔
1929
            lambda item: item,
1930
            marker,
1931
            max_items,
1932
        )
1933
        return ListVersionsByFunctionResponse(Versions=page, NextMarker=token)
1✔
1934

1935
    # Alias
1936

1937
    def _create_routing_config_model(
1✔
1938
        self, routing_config_dict: dict[str, float], function_version: FunctionVersion
1939
    ):
1940
        if len(routing_config_dict) > 1:
1✔
1941
            raise InvalidParameterValueException(
1✔
1942
                "Number of items in AdditionalVersionWeights cannot be greater than 1",
1943
                Type="User",
1944
            )
1945
        # should be exactly one item here, still iterating, might be supported in the future
1946
        for key, value in routing_config_dict.items():
1✔
1947
            if value < 0.0 or value >= 1.0:
1✔
1948
                raise ValidationException(
1✔
1949
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map value must satisfy constraint: [Member must have value less than or equal to 1.0, Member must have value greater than or equal to 0.0, Member must not be null]"
1950
                )
1951
            if key == function_version.id.qualifier:
1✔
1952
                raise InvalidParameterValueException(
1✔
1953
                    f"Invalid function version {function_version.id.qualifier}. Function version {function_version.id.qualifier} is already included in routing configuration.",
1954
                    Type="User",
1955
                )
1956
            # check if version target is latest, then no routing config is allowed
1957
            if function_version.id.qualifier == "$LATEST":
1✔
1958
                raise InvalidParameterValueException(
1✔
1959
                    "$LATEST is not supported for an alias pointing to more than 1 version"
1960
                )
1961
            if not api_utils.qualifier_is_version(key):
1✔
1962
                raise ValidationException(
1✔
1963
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map keys must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: [0-9]+]"
1964
                )
1965

1966
            # checking if the version in the config exists
1967
            get_function_version(
1✔
1968
                function_name=function_version.id.function_name,
1969
                qualifier=key,
1970
                region=function_version.id.region,
1971
                account_id=function_version.id.account,
1972
            )
1973
        return AliasRoutingConfig(version_weights=routing_config_dict)
1✔
1974

1975
    def create_alias(
1✔
1976
        self,
1977
        context: RequestContext,
1978
        function_name: FunctionName,
1979
        name: Alias,
1980
        function_version: VersionWithLatestPublished,
1981
        description: Description = None,
1982
        routing_config: AliasRoutingConfiguration = None,
1983
        **kwargs,
1984
    ) -> AliasConfiguration:
1985
        if not api_utils.qualifier_is_alias(name):
1✔
1986
            raise ValidationException(
1✔
1987
                f"1 validation error detected: Value '{name}' at 'name' failed to satisfy constraint: Member must satisfy regular expression pattern: (?!^[0-9]+$)([a-zA-Z0-9-_]+)"
1988
            )
1989

1990
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1991
        function_name = api_utils.get_function_name(function_name, context)
1✔
1992
        target_version = get_function_version(
1✔
1993
            function_name=function_name,
1994
            qualifier=function_version,
1995
            region=region,
1996
            account_id=account_id,
1997
        )
1998
        function = self._get_function(
1✔
1999
            function_name=function_name, region=region, account_id=account_id
2000
        )
2001
        # description is always present, if not specified it's an empty string
2002
        description = description or ""
1✔
2003
        with function.lock:
1✔
2004
            if existing_alias := function.aliases.get(name):
1✔
2005
                raise ResourceConflictException(
1✔
2006
                    f"Alias already exists: {api_utils.map_alias_out(alias=existing_alias, function=function)['AliasArn']}",
2007
                    Type="User",
2008
                )
2009
            # checking if the version exists
2010
            routing_configuration = None
1✔
2011
            if routing_config and (
1✔
2012
                routing_config_dict := routing_config.get("AdditionalVersionWeights")
2013
            ):
2014
                routing_configuration = self._create_routing_config_model(
1✔
2015
                    routing_config_dict, target_version
2016
                )
2017

2018
            alias = VersionAlias(
1✔
2019
                name=name,
2020
                function_version=function_version,
2021
                description=description,
2022
                routing_configuration=routing_configuration,
2023
            )
2024
            function.aliases[name] = alias
1✔
2025
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
2026

2027
    def list_aliases(
1✔
2028
        self,
2029
        context: RequestContext,
2030
        function_name: FunctionName,
2031
        function_version: VersionWithLatestPublished = None,
2032
        marker: String = None,
2033
        max_items: MaxListItems = None,
2034
        **kwargs,
2035
    ) -> ListAliasesResponse:
2036
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2037
        function_name = api_utils.get_function_name(function_name, context)
1✔
2038
        function = self._get_function(
1✔
2039
            function_name=function_name, region=region, account_id=account_id
2040
        )
2041
        aliases = [
1✔
2042
            api_utils.map_alias_out(alias, function)
2043
            for alias in function.aliases.values()
2044
            if function_version is None or alias.function_version == function_version
2045
        ]
2046

2047
        aliases = PaginatedList(aliases)
1✔
2048
        page, token = aliases.get_page(
1✔
2049
            lambda alias: alias["AliasArn"],
2050
            marker,
2051
            max_items,
2052
        )
2053

2054
        return ListAliasesResponse(Aliases=page, NextMarker=token)
1✔
2055

2056
    def delete_alias(
1✔
2057
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
2058
    ) -> None:
2059
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2060
        function_name = api_utils.get_function_name(function_name, context)
1✔
2061
        function = self._get_function(
1✔
2062
            function_name=function_name, region=region, account_id=account_id
2063
        )
2064
        version_alias = function.aliases.pop(name, None)
1✔
2065

2066
        # cleanup related resources
2067
        if name in function.provisioned_concurrency_configs:
1✔
2068
            function.provisioned_concurrency_configs.pop(name)
1✔
2069

2070
        # TODO: Allow for deactivating/unregistering specific Lambda URLs
2071
        if version_alias and name in function.function_url_configs:
1✔
2072
            url_config = function.function_url_configs.pop(name)
1✔
2073
            LOG.debug(
1✔
2074
                "Stopping aliased Lambda Function URL %s for %s",
2075
                url_config.url,
2076
                url_config.function_name,
2077
            )
2078

2079
    def get_alias(
1✔
2080
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
2081
    ) -> AliasConfiguration:
2082
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2083
        function_name = api_utils.get_function_name(function_name, context)
1✔
2084
        function = self._get_function(
1✔
2085
            function_name=function_name, region=region, account_id=account_id
2086
        )
2087
        if not (alias := function.aliases.get(name)):
1✔
2088
            raise ResourceNotFoundException(
1✔
2089
                f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name=function_name, qualifier=name, region=region, account=account_id)}",
2090
                Type="User",
2091
            )
2092
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
2093

2094
    def update_alias(
1✔
2095
        self,
2096
        context: RequestContext,
2097
        function_name: FunctionName,
2098
        name: Alias,
2099
        function_version: VersionWithLatestPublished = None,
2100
        description: Description = None,
2101
        routing_config: AliasRoutingConfiguration = None,
2102
        revision_id: String = None,
2103
        **kwargs,
2104
    ) -> AliasConfiguration:
2105
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2106
        function_name = api_utils.get_function_name(function_name, context)
1✔
2107
        function = self._get_function(
1✔
2108
            function_name=function_name, region=region, account_id=account_id
2109
        )
2110
        if not (alias := function.aliases.get(name)):
1✔
2111
            fn_arn = api_utils.qualified_lambda_arn(function_name, name, account_id, region)
1✔
2112
            raise ResourceNotFoundException(
1✔
2113
                f"Alias not found: {fn_arn}",
2114
                Type="User",
2115
            )
2116
        if revision_id and alias.revision_id != revision_id:
1✔
2117
            raise PreconditionFailedException(
1✔
2118
                "The Revision Id provided does not match the latest Revision Id. "
2119
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2120
                Type="User",
2121
            )
2122
        changes = {}
1✔
2123
        if function_version is not None:
1✔
2124
            changes |= {"function_version": function_version}
1✔
2125
        if description is not None:
1✔
2126
            changes |= {"description": description}
1✔
2127
        if routing_config is not None:
1✔
2128
            # if it is an empty dict or AdditionalVersionWeights is empty, set routing config to None
2129
            new_routing_config = None
1✔
2130
            if routing_config_dict := routing_config.get("AdditionalVersionWeights"):
1✔
2131
                new_routing_config = self._create_routing_config_model(routing_config_dict)
×
2132
            changes |= {"routing_configuration": new_routing_config}
1✔
2133
        # even if no changes are done, we have to update revision id for some reason
2134
        old_alias = alias
1✔
2135
        alias = dataclasses.replace(alias, **changes)
1✔
2136
        function.aliases[name] = alias
1✔
2137

2138
        # TODO: signal lambda service that pointer potentially changed
2139
        self.lambda_service.update_alias(old_alias=old_alias, new_alias=alias, function=function)
1✔
2140

2141
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
2142

2143
    # =======================================
2144
    # ======= EVENT SOURCE MAPPINGS =========
2145
    # =======================================
2146
    def check_service_resource_exists(
1✔
2147
        self, service: str, resource_arn: str, function_arn: str, function_role_arn: str
2148
    ):
2149
        """
2150
        Check if the service resource exists and if the function has access to it.
2151

2152
        Raises:
2153
            InvalidParameterValueException: If the service resource does not exist or the function does not have access to it.
2154
        """
2155
        arn = parse_arn(resource_arn)
1✔
2156
        source_client = get_internal_client(
1✔
2157
            arn=resource_arn,
2158
            role_arn=function_role_arn,
2159
            service_principal=ServicePrincipal.lambda_,
2160
            source_arn=function_arn,
2161
        )
2162
        if service in ["sqs", "sqs-fifo"]:
1✔
2163
            try:
1✔
2164
                # AWS uses `GetQueueAttributes` internally to verify the queue existence, but we need the `QueueUrl`
2165
                # which is not given directly. We build out a dummy `QueueUrl` which can be parsed by SQS to return
2166
                # the right value
2167
                queue_name = arn["resource"].split("/")[-1]
1✔
2168
                queue_url = f"http://sqs.{arn['region']}.domain/{arn['account']}/{queue_name}"
1✔
2169
                source_client.get_queue_attributes(QueueUrl=queue_url)
1✔
2170
            except ClientError as e:
1✔
2171
                error_code = e.response["Error"]["Code"]
1✔
2172
                if error_code == "AWS.SimpleQueueService.NonExistentQueue":
1✔
2173
                    raise InvalidParameterValueException(
1✔
2174
                        f"Error occurred while ReceiveMessage. SQS Error Code: {error_code}. SQS Error Message: {e.response['Error']['Message']}",
2175
                        Type="User",
2176
                    )
2177
                raise e
×
2178
        elif service in ["kinesis"]:
1✔
2179
            try:
1✔
2180
                source_client.describe_stream(StreamARN=resource_arn)
1✔
2181
            except ClientError as e:
1✔
2182
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
2183
                    raise InvalidParameterValueException(
1✔
2184
                        f"Stream not found: {resource_arn}",
2185
                        Type="User",
2186
                    )
2187
                raise e
×
2188
        elif service in ["dynamodb"]:
1✔
2189
            try:
1✔
2190
                source_client.describe_stream(StreamArn=resource_arn)
1✔
2191
            except ClientError as e:
1✔
2192
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
2193
                    raise InvalidParameterValueException(
1✔
2194
                        f"Stream not found: {resource_arn}",
2195
                        Type="User",
2196
                    )
2197
                raise e
×
2198

2199
    @handler("CreateEventSourceMapping", expand=False)
1✔
2200
    def create_event_source_mapping(
1✔
2201
        self,
2202
        context: RequestContext,
2203
        request: CreateEventSourceMappingRequest,
2204
    ) -> EventSourceMappingConfiguration:
2205
        return self.create_event_source_mapping_v2(context, request)
1✔
2206

2207
    def create_event_source_mapping_v2(
1✔
2208
        self,
2209
        context: RequestContext,
2210
        request: CreateEventSourceMappingRequest,
2211
    ) -> EventSourceMappingConfiguration:
2212
        # Validations
2213
        function_arn, function_name, state, function_version, function_role = (
1✔
2214
            self.validate_event_source_mapping(context, request)
2215
        )
2216

2217
        esm_config = EsmConfigFactory(request, context, function_arn).get_esm_config()
1✔
2218

2219
        # Copy esm_config to avoid a race condition with potential async update in the store
2220
        state.event_source_mappings[esm_config["UUID"]] = esm_config.copy()
1✔
2221
        enabled = request.get("Enabled", True)
1✔
2222
        # TODO: check for potential async race condition update -> think about locking
2223
        esm_worker = EsmWorkerFactory(esm_config, function_role, enabled).get_esm_worker()
1✔
2224
        self.esm_workers[esm_worker.uuid] = esm_worker
1✔
2225
        # TODO: check StateTransitionReason, LastModified, LastProcessingResult (concurrent updates requires locking!)
2226
        if tags := request.get("Tags"):
1✔
2227
            self._store_tags(esm_config.get("EventSourceMappingArn"), tags)
1✔
2228
        esm_worker.create()
1✔
2229
        return esm_config
1✔
2230

2231
    def validate_event_source_mapping(self, context, request):
1✔
2232
        # TODO: test whether stream ARNs are valid sources for Pipes or ESM or whether only DynamoDB table ARNs work
2233
        # TODO: Validate MaxRecordAgeInSeconds (i.e cannot subceed 60s but can be -1) and MaxRetryAttempts parameters.
2234
        # See https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumrecordageinseconds
2235
        is_create_esm_request = context.operation.name == self.create_event_source_mapping.operation
1✔
2236

2237
        if destination_config := request.get("DestinationConfig"):
1✔
2238
            if "OnSuccess" in destination_config:
1✔
2239
                raise InvalidParameterValueException(
1✔
2240
                    "Unsupported DestinationConfig parameter for given event source mapping type.",
2241
                    Type="User",
2242
                )
2243

2244
        service = None
1✔
2245
        if "SelfManagedEventSource" in request:
1✔
2246
            service = "kafka"
×
2247
            if "SourceAccessConfigurations" not in request:
×
2248
                raise InvalidParameterValueException(
×
2249
                    "Required 'sourceAccessConfigurations' parameter is missing.", Type="User"
2250
                )
2251
        if service is None and "EventSourceArn" not in request:
1✔
2252
            raise InvalidParameterValueException("Unrecognized event source.", Type="User")
1✔
2253
        if service is None:
1✔
2254
            service = extract_service_from_arn(request["EventSourceArn"])
1✔
2255

2256
        batch_size = api_utils.validate_and_set_batch_size(service, request.get("BatchSize"))
1✔
2257
        if service in ["dynamodb", "kinesis"]:
1✔
2258
            starting_position = request.get("StartingPosition")
1✔
2259
            if not starting_position:
1✔
2260
                raise InvalidParameterValueException(
1✔
2261
                    "1 validation error detected: Value null at 'startingPosition' failed to satisfy constraint: Member must not be null.",
2262
                    Type="User",
2263
                )
2264

2265
            if starting_position not in KinesisStreamStartPosition.__members__:
1✔
2266
                raise ValidationException(
1✔
2267
                    f"1 validation error detected: Value '{starting_position}' at 'startingPosition' failed to satisfy constraint: Member must satisfy enum value set: [LATEST, AT_TIMESTAMP, TRIM_HORIZON]"
2268
                )
2269
            # AT_TIMESTAMP is not allowed for DynamoDB Streams
2270
            elif (
1✔
2271
                service == "dynamodb"
2272
                and starting_position not in DynamoDBStreamStartPosition.__members__
2273
            ):
2274
                raise InvalidParameterValueException(
1✔
2275
                    f"Unsupported starting position for arn type: {request['EventSourceArn']}",
2276
                    Type="User",
2277
                )
2278

2279
        if service in ["sqs", "sqs-fifo"]:
1✔
2280
            if batch_size > 10 and request.get("MaximumBatchingWindowInSeconds", 0) == 0:
1✔
2281
                raise InvalidParameterValueException(
1✔
2282
                    "Maximum batch window in seconds must be greater than 0 if maximum batch size is greater than 10",
2283
                    Type="User",
2284
                )
2285

2286
        if (filter_criteria := request.get("FilterCriteria")) is not None:
1✔
2287
            for filter_ in filter_criteria.get("Filters", []):
1✔
2288
                pattern_str = filter_.get("Pattern")
1✔
2289
                if not pattern_str or not isinstance(pattern_str, str):
1✔
2290
                    raise InvalidParameterValueException(
×
2291
                        "Invalid filter pattern definition.", Type="User"
2292
                    )
2293

2294
                if not validate_event_pattern(pattern_str):
1✔
2295
                    raise InvalidParameterValueException(
1✔
2296
                        "Invalid filter pattern definition.", Type="User"
2297
                    )
2298

2299
        # Can either have a FunctionName (i.e CreateEventSourceMapping request) or
2300
        # an internal EventSourceMappingConfiguration representation
2301
        request_function_name = request.get("FunctionName") or request.get("FunctionArn")
1✔
2302
        # can be either a partial arn or a full arn for the version/alias
2303
        function_name, qualifier, account, region = function_locators_from_arn(
1✔
2304
            request_function_name
2305
        )
2306
        # TODO: validate `context.region` vs. `region(request["FunctionName"])` vs. `region(request["EventSourceArn"])`
2307
        account = account or context.account_id
1✔
2308
        region = region or context.region
1✔
2309
        state = lambda_stores[account][region]
1✔
2310
        fn = state.functions.get(function_name)
1✔
2311
        if not fn:
1✔
2312
            raise InvalidParameterValueException("Function does not exist", Type="User")
1✔
2313

2314
        if qualifier:
1✔
2315
            # make sure the function version/alias exists
2316
            if api_utils.qualifier_is_alias(qualifier):
1✔
2317
                fn_alias = fn.aliases.get(qualifier)
1✔
2318
                if not fn_alias:
1✔
2319
                    raise Exception("unknown alias")  # TODO: cover via test
×
2320
            elif api_utils.qualifier_is_version(qualifier):
1✔
2321
                fn_version = fn.versions.get(qualifier)
1✔
2322
                if not fn_version:
1✔
2323
                    raise Exception("unknown version")  # TODO: cover via test
×
2324
            elif qualifier == "$LATEST":
1✔
2325
                pass
1✔
2326
            elif qualifier == "$LATEST.PUBLISHED":
×
2327
                if fn.versions.get(qualifier):
×
2328
                    pass
×
2329
            else:
2330
                raise Exception("invalid functionname")  # TODO: cover via test
×
2331
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account, region)
1✔
2332

2333
        else:
2334
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account, region)
1✔
2335

2336
        function_version = get_function_version_from_arn(fn_arn)
1✔
2337
        function_role = function_version.config.role
1✔
2338

2339
        if source_arn := request.get("EventSourceArn"):
1✔
2340
            self.check_service_resource_exists(service, source_arn, fn_arn, function_role)
1✔
2341
        # Check we are validating a CreateEventSourceMapping request
2342
        if is_create_esm_request:
1✔
2343

2344
            def _get_mapping_sources(mapping: dict[str, Any]) -> list[str]:
1✔
2345
                if event_source_arn := mapping.get("EventSourceArn"):
1✔
2346
                    return [event_source_arn]
1✔
2347
                return (
×
2348
                    mapping.get("SelfManagedEventSource", {})
2349
                    .get("Endpoints", {})
2350
                    .get("KAFKA_BOOTSTRAP_SERVERS", [])
2351
                )
2352

2353
            # check for event source duplicates
2354
            # TODO: currently validated for sqs, kinesis, and dynamodb
2355
            service_id = load_service(service).service_id
1✔
2356
            for uuid, mapping in state.event_source_mappings.items():
1✔
2357
                mapping_sources = _get_mapping_sources(mapping)
1✔
2358
                request_sources = _get_mapping_sources(request)
1✔
2359
                if mapping["FunctionArn"] == fn_arn and (
1✔
2360
                    set(mapping_sources).intersection(request_sources)
2361
                ):
2362
                    if service == "sqs":
1✔
2363
                        # *shakes fist at SQS*
2364
                        raise ResourceConflictException(
1✔
2365
                            f'An event source mapping with {service_id} arn (" {mapping["EventSourceArn"]} ") '
2366
                            f'and function (" {function_name} ") already exists. Please update or delete the '
2367
                            f"existing mapping with UUID {uuid}",
2368
                            Type="User",
2369
                        )
2370
                    elif service == "kafka":
1✔
2371
                        if set(mapping["Topics"]).intersection(request["Topics"]):
×
2372
                            raise ResourceConflictException(
×
2373
                                f'An event source mapping with event source ("{",".join(request_sources)}"), '
2374
                                f'function ("{fn_arn}"), '
2375
                                f'topics ("{",".join(request["Topics"])}") already exists. Please update or delete the '
2376
                                f"existing mapping with UUID {uuid}",
2377
                                Type="User",
2378
                            )
2379
                    else:
2380
                        raise ResourceConflictException(
1✔
2381
                            f'The event source arn (" {mapping["EventSourceArn"]} ") and function '
2382
                            f'(" {function_name} ") provided mapping already exists. Please update or delete the '
2383
                            f"existing mapping with UUID {uuid}",
2384
                            Type="User",
2385
                        )
2386
        return fn_arn, function_name, state, function_version, function_role
1✔
2387

2388
    @handler("UpdateEventSourceMapping", expand=False)
1✔
2389
    def update_event_source_mapping(
1✔
2390
        self,
2391
        context: RequestContext,
2392
        request: UpdateEventSourceMappingRequest,
2393
    ) -> EventSourceMappingConfiguration:
2394
        return self.update_event_source_mapping_v2(context, request)
1✔
2395

2396
    def update_event_source_mapping_v2(
1✔
2397
        self,
2398
        context: RequestContext,
2399
        request: UpdateEventSourceMappingRequest,
2400
    ) -> EventSourceMappingConfiguration:
2401
        # TODO: test and implement this properly (quite complex with many validations and limitations!)
2402
        LOG.warning(
1✔
2403
            "Updating Lambda Event Source Mapping is in experimental state and not yet fully tested."
2404
        )
2405
        state = lambda_stores[context.account_id][context.region]
1✔
2406
        request_data = {**request}
1✔
2407
        uuid = request_data.pop("UUID", None)
1✔
2408
        if not uuid:
1✔
2409
            raise ResourceNotFoundException(
×
2410
                "The resource you requested does not exist.", Type="User"
2411
            )
2412
        old_event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2413
        esm_worker = self.esm_workers.get(uuid)
1✔
2414
        if old_event_source_mapping is None or esm_worker is None:
1✔
2415
            raise ResourceNotFoundException(
1✔
2416
                "The resource you requested does not exist.", Type="User"
2417
            )  # TODO: test?
2418

2419
        # normalize values to overwrite
2420
        event_source_mapping = old_event_source_mapping | request_data
1✔
2421

2422
        temp_params = {}  # values only set for the returned response, not saved internally (e.g. transient state)
1✔
2423

2424
        # Validate the newly updated ESM object. We ignore the output here since we only care whether an Exception is raised.
2425
        function_arn, _, _, function_version, function_role = self.validate_event_source_mapping(
1✔
2426
            context, event_source_mapping
2427
        )
2428

2429
        # remove the FunctionName field
2430
        event_source_mapping.pop("FunctionName", None)
1✔
2431

2432
        if function_arn:
1✔
2433
            event_source_mapping["FunctionArn"] = function_arn
1✔
2434

2435
        # Only apply update if the desired state differs
2436
        enabled = request.get("Enabled")
1✔
2437
        if enabled is not None:
1✔
2438
            if enabled and old_event_source_mapping["State"] != EsmState.ENABLED:
1✔
2439
                event_source_mapping["State"] = EsmState.ENABLING
1✔
2440
            # TODO: What happens when trying to update during an update or failed state?!
2441
            elif not enabled and old_event_source_mapping["State"] == EsmState.ENABLED:
1✔
2442
                event_source_mapping["State"] = EsmState.DISABLING
1✔
2443
        else:
2444
            event_source_mapping["State"] = EsmState.UPDATING
1✔
2445

2446
        # To ensure parity, certain responses need to be immediately returned
2447
        temp_params["State"] = event_source_mapping["State"]
1✔
2448

2449
        state.event_source_mappings[uuid] = event_source_mapping
1✔
2450

2451
        # TODO: Currently, we re-create the entire ESM worker. Look into approach with better performance.
2452
        worker_factory = EsmWorkerFactory(
1✔
2453
            event_source_mapping, function_role, request.get("Enabled", esm_worker.enabled)
2454
        )
2455

2456
        # Get a new ESM worker object but do not active it, since the factory holds all logic for creating new worker from configuration.
2457
        updated_esm_worker = worker_factory.get_esm_worker()
1✔
2458
        self.esm_workers[uuid] = updated_esm_worker
1✔
2459

2460
        # We should stop() the worker since the delete() will remove the ESM from the state mapping.
2461
        esm_worker.stop()
1✔
2462
        # This will either create an EsmWorker in the CREATING state if enabled. Otherwise, the DISABLING state is set.
2463
        updated_esm_worker.create()
1✔
2464

2465
        return {**event_source_mapping, **temp_params}
1✔
2466

2467
    def delete_event_source_mapping(
1✔
2468
        self, context: RequestContext, uuid: String, **kwargs
2469
    ) -> EventSourceMappingConfiguration:
2470
        state = lambda_stores[context.account_id][context.region]
1✔
2471
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2472
        if not event_source_mapping:
1✔
2473
            raise ResourceNotFoundException(
1✔
2474
                "The resource you requested does not exist.", Type="User"
2475
            )
2476
        esm = state.event_source_mappings[uuid]
1✔
2477
        # TODO: add proper locking
2478
        esm_worker = self.esm_workers.pop(uuid, None)
1✔
2479
        # Asynchronous delete in v2
2480
        if not esm_worker:
1✔
2481
            raise ResourceNotFoundException(
×
2482
                "The resource you requested does not exist.", Type="User"
2483
            )
2484
        # the full deletion of the ESM is happening asynchronously, but we delete the Tags instantly
2485
        # this behavior is similar to ``get_event_source_mapping`` which will raise right after deletion, but it is not
2486
        # always the case in AWS. Add more testing and align behavior with ``get_event_source_mapping``.
2487
        self._remove_all_tags(event_source_mapping["EventSourceMappingArn"])
1✔
2488
        esm_worker.delete()
1✔
2489
        return {**esm, "State": EsmState.DELETING}
1✔
2490

2491
    def get_event_source_mapping(
1✔
2492
        self, context: RequestContext, uuid: String, **kwargs
2493
    ) -> EventSourceMappingConfiguration:
2494
        state = lambda_stores[context.account_id][context.region]
1✔
2495
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2496
        if not event_source_mapping:
1✔
2497
            raise ResourceNotFoundException(
1✔
2498
                "The resource you requested does not exist.", Type="User"
2499
            )
2500
        esm_worker = self.esm_workers.get(uuid)
1✔
2501
        if not esm_worker:
1✔
2502
            raise ResourceNotFoundException(
×
2503
                "The resource you requested does not exist.", Type="User"
2504
            )
2505
        event_source_mapping["State"] = esm_worker.current_state
1✔
2506
        event_source_mapping["StateTransitionReason"] = esm_worker.state_transition_reason
1✔
2507
        return event_source_mapping
1✔
2508

2509
    def list_event_source_mappings(
1✔
2510
        self,
2511
        context: RequestContext,
2512
        event_source_arn: Arn = None,
2513
        function_name: FunctionName = None,
2514
        marker: String = None,
2515
        max_items: MaxListItems = None,
2516
        **kwargs,
2517
    ) -> ListEventSourceMappingsResponse:
2518
        state = lambda_stores[context.account_id][context.region]
1✔
2519

2520
        esms = state.event_source_mappings.values()
1✔
2521
        # TODO: update and test State and StateTransitionReason for ESM v2
2522

2523
        if event_source_arn:  # TODO: validate pattern
1✔
2524
            esms = [e for e in esms if e.get("EventSourceArn") == event_source_arn]
1✔
2525

2526
        if function_name:
1✔
2527
            esms = [e for e in esms if function_name in e["FunctionArn"]]
1✔
2528

2529
        esms = PaginatedList(esms)
1✔
2530
        page, token = esms.get_page(
1✔
2531
            lambda x: x["UUID"],
2532
            marker,
2533
            max_items,
2534
        )
2535
        return ListEventSourceMappingsResponse(EventSourceMappings=page, NextMarker=token)
1✔
2536

2537
    def get_source_type_from_request(self, request: dict[str, Any]) -> str:
1✔
2538
        if event_source_arn := request.get("EventSourceArn", ""):
×
2539
            service = extract_service_from_arn(event_source_arn)
×
2540
            if service == "sqs" and "fifo" in event_source_arn:
×
2541
                service = "sqs-fifo"
×
2542
            return service
×
2543
        elif request.get("SelfManagedEventSource"):
×
2544
            return "kafka"
×
2545

2546
    # =======================================
2547
    # ============ FUNCTION URLS ============
2548
    # =======================================
2549

2550
    @staticmethod
1✔
2551
    def _validate_qualifier(qualifier: str) -> None:
1✔
2552
        if qualifier in ["$LATEST", "$LATEST.PUBLISHED"] or (
1✔
2553
            qualifier and api_utils.qualifier_is_version(qualifier)
2554
        ):
2555
            raise ValidationException(
1✔
2556
                f"1 validation error detected: Value '{qualifier}' at 'qualifier' failed to satisfy constraint: Member must satisfy regular expression pattern: ((?!^\\d+$)^[0-9a-zA-Z-_]+$)"
2557
            )
2558

2559
    @staticmethod
1✔
2560
    def _validate_invoke_mode(invoke_mode: str) -> None:
1✔
2561
        if invoke_mode and invoke_mode not in [InvokeMode.BUFFERED, InvokeMode.RESPONSE_STREAM]:
1✔
2562
            raise ValidationException(
1✔
2563
                f"1 validation error detected: Value '{invoke_mode}' at 'invokeMode' failed to satisfy constraint: Member must satisfy enum value set: [RESPONSE_STREAM, BUFFERED]"
2564
            )
2565
        if invoke_mode == InvokeMode.RESPONSE_STREAM:
1✔
2566
            # TODO should we actually fail for setting RESPONSE_STREAM?
2567
            #  It should trigger InvokeWithResponseStream which is not implemented
2568
            LOG.warning(
1✔
2569
                "The invokeMode 'RESPONSE_STREAM' is not yet supported on LocalStack. The property is only mocked, the execution will still be 'BUFFERED'"
2570
            )
2571

2572
    # TODO: what happens if function state is not active?
2573
    def create_function_url_config(
1✔
2574
        self,
2575
        context: RequestContext,
2576
        function_name: FunctionName,
2577
        auth_type: FunctionUrlAuthType,
2578
        qualifier: FunctionUrlQualifier = None,
2579
        cors: Cors = None,
2580
        invoke_mode: InvokeMode = None,
2581
        **kwargs,
2582
    ) -> CreateFunctionUrlConfigResponse:
2583
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2584
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2585
            function_name, qualifier, context
2586
        )
2587
        state = lambda_stores[account_id][region]
1✔
2588
        self._validate_qualifier(qualifier)
1✔
2589
        self._validate_invoke_mode(invoke_mode)
1✔
2590

2591
        fn = state.functions.get(function_name)
1✔
2592
        if fn is None:
1✔
2593
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2594

2595
        url_config = fn.function_url_configs.get(qualifier or "$LATEST")
1✔
2596
        if url_config:
1✔
2597
            raise ResourceConflictException(
1✔
2598
                f"Failed to create function url config for [functionArn = {url_config.function_arn}]. Error message:  FunctionUrlConfig exists for this Lambda function",
2599
                Type="User",
2600
            )
2601

2602
        if qualifier and qualifier != "$LATEST" and qualifier not in fn.aliases:
1✔
2603
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2604

2605
        normalized_qualifier = qualifier or "$LATEST"
1✔
2606

2607
        function_arn = (
1✔
2608
            api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
2609
            if qualifier
2610
            else api_utils.unqualified_lambda_arn(function_name, account_id, region)
2611
        )
2612

2613
        custom_id: str | None = None
1✔
2614

2615
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
2616
        if TAG_KEY_CUSTOM_URL in tags:
1✔
2617
            # Note: I really wanted to add verification here that the
2618
            # url_id is unique, so we could surface that to the user ASAP.
2619
            # However, it seems like that information isn't available yet,
2620
            # since (as far as I can tell) we call
2621
            # self.router.register_routes() once, in a single shot, for all
2622
            # of the routes -- and we need to verify that it's unique not
2623
            # just for this particular lambda function, but for the entire
2624
            # lambda provider. Therefore... that idea proved non-trivial!
2625
            custom_id_tag_value = (
1✔
2626
                f"{tags[TAG_KEY_CUSTOM_URL]}-{qualifier}" if qualifier else tags[TAG_KEY_CUSTOM_URL]
2627
            )
2628
            if TAG_KEY_CUSTOM_URL_VALIDATOR.match(custom_id_tag_value):
1✔
2629
                custom_id = custom_id_tag_value
1✔
2630

2631
            else:
2632
                # Note: we're logging here instead of raising to prioritize
2633
                # strict parity with AWS over the localstack-only custom_id
2634
                LOG.warning(
1✔
2635
                    "Invalid custom ID tag value for lambda URL (%s=%s). "
2636
                    "Replaced with default (random id)",
2637
                    TAG_KEY_CUSTOM_URL,
2638
                    custom_id_tag_value,
2639
                )
2640

2641
        # The url_id is the subdomain used for the URL we're creating. This
2642
        # is either created randomly (as in AWS), or can be passed as a tag
2643
        # to the lambda itself (localstack-only).
2644
        url_id: str
2645
        if custom_id is None:
1✔
2646
            url_id = api_utils.generate_random_url_id()
1✔
2647
        else:
2648
            url_id = custom_id
1✔
2649

2650
        host_definition = localstack_host(custom_port=config.GATEWAY_LISTEN[0].port)
1✔
2651
        fn.function_url_configs[normalized_qualifier] = FunctionUrlConfig(
1✔
2652
            function_arn=function_arn,
2653
            function_name=function_name,
2654
            cors=cors,
2655
            url_id=url_id,
2656
            url=f"http://{url_id}.lambda-url.{context.region}.{host_definition.host_and_port()}/",  # TODO: https support
2657
            auth_type=auth_type,
2658
            creation_time=api_utils.generate_lambda_date(),
2659
            last_modified_time=api_utils.generate_lambda_date(),
2660
            invoke_mode=invoke_mode,
2661
        )
2662

2663
        # persist and start URL
2664
        # TODO: implement URL invoke
2665
        api_url_config = api_utils.map_function_url_config(
1✔
2666
            fn.function_url_configs[normalized_qualifier]
2667
        )
2668

2669
        return CreateFunctionUrlConfigResponse(
1✔
2670
            FunctionUrl=api_url_config["FunctionUrl"],
2671
            FunctionArn=api_url_config["FunctionArn"],
2672
            AuthType=api_url_config["AuthType"],
2673
            Cors=api_url_config["Cors"],
2674
            CreationTime=api_url_config["CreationTime"],
2675
            InvokeMode=api_url_config["InvokeMode"],
2676
        )
2677

2678
    def get_function_url_config(
1✔
2679
        self,
2680
        context: RequestContext,
2681
        function_name: FunctionName,
2682
        qualifier: FunctionUrlQualifier = None,
2683
        **kwargs,
2684
    ) -> GetFunctionUrlConfigResponse:
2685
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2686
        state = lambda_stores[account_id][region]
1✔
2687

2688
        fn_name, qualifier = api_utils.get_name_and_qualifier(function_name, qualifier, context)
1✔
2689

2690
        self._validate_qualifier(qualifier)
1✔
2691

2692
        resolved_fn = state.functions.get(fn_name)
1✔
2693
        if not resolved_fn:
1✔
2694
            raise ResourceNotFoundException(
1✔
2695
                "The resource you requested does not exist.", Type="User"
2696
            )
2697

2698
        qualifier = qualifier or "$LATEST"
1✔
2699
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2700
        if not url_config:
1✔
2701
            raise ResourceNotFoundException(
1✔
2702
                "The resource you requested does not exist.", Type="User"
2703
            )
2704

2705
        return api_utils.map_function_url_config(url_config)
1✔
2706

2707
    def update_function_url_config(
1✔
2708
        self,
2709
        context: RequestContext,
2710
        function_name: FunctionName,
2711
        qualifier: FunctionUrlQualifier = None,
2712
        auth_type: FunctionUrlAuthType = None,
2713
        cors: Cors = None,
2714
        invoke_mode: InvokeMode = None,
2715
        **kwargs,
2716
    ) -> UpdateFunctionUrlConfigResponse:
2717
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2718
        state = lambda_stores[account_id][region]
1✔
2719

2720
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2721
            function_name, qualifier, context
2722
        )
2723
        self._validate_qualifier(qualifier)
1✔
2724
        self._validate_invoke_mode(invoke_mode)
1✔
2725

2726
        fn = state.functions.get(function_name)
1✔
2727
        if not fn:
1✔
2728
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2729

2730
        normalized_qualifier = qualifier or "$LATEST"
1✔
2731

2732
        if (
1✔
2733
            api_utils.qualifier_is_alias(normalized_qualifier)
2734
            and normalized_qualifier not in fn.aliases
2735
        ):
2736
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2737

2738
        url_config = fn.function_url_configs.get(normalized_qualifier)
1✔
2739
        if not url_config:
1✔
2740
            raise ResourceNotFoundException(
1✔
2741
                "The resource you requested does not exist.", Type="User"
2742
            )
2743

2744
        changes = {
1✔
2745
            "last_modified_time": api_utils.generate_lambda_date(),
2746
            **({"cors": cors} if cors is not None else {}),
2747
            **({"auth_type": auth_type} if auth_type is not None else {}),
2748
        }
2749

2750
        if invoke_mode:
1✔
2751
            changes["invoke_mode"] = invoke_mode
1✔
2752

2753
        new_url_config = dataclasses.replace(url_config, **changes)
1✔
2754
        fn.function_url_configs[normalized_qualifier] = new_url_config
1✔
2755

2756
        return UpdateFunctionUrlConfigResponse(
1✔
2757
            FunctionUrl=new_url_config.url,
2758
            FunctionArn=new_url_config.function_arn,
2759
            AuthType=new_url_config.auth_type,
2760
            Cors=new_url_config.cors,
2761
            CreationTime=new_url_config.creation_time,
2762
            LastModifiedTime=new_url_config.last_modified_time,
2763
            InvokeMode=new_url_config.invoke_mode,
2764
        )
2765

2766
    def delete_function_url_config(
1✔
2767
        self,
2768
        context: RequestContext,
2769
        function_name: FunctionName,
2770
        qualifier: FunctionUrlQualifier = None,
2771
        **kwargs,
2772
    ) -> None:
2773
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2774
        state = lambda_stores[account_id][region]
1✔
2775

2776
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2777
            function_name, qualifier, context
2778
        )
2779
        self._validate_qualifier(qualifier)
1✔
2780

2781
        resolved_fn = state.functions.get(function_name)
1✔
2782
        if not resolved_fn:
1✔
2783
            raise ResourceNotFoundException(
1✔
2784
                "The resource you requested does not exist.", Type="User"
2785
            )
2786

2787
        qualifier = qualifier or "$LATEST"
1✔
2788
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2789
        if not url_config:
1✔
2790
            raise ResourceNotFoundException(
1✔
2791
                "The resource you requested does not exist.", Type="User"
2792
            )
2793

2794
        del resolved_fn.function_url_configs[qualifier]
1✔
2795

2796
    def list_function_url_configs(
1✔
2797
        self,
2798
        context: RequestContext,
2799
        function_name: FunctionName,
2800
        marker: String = None,
2801
        max_items: MaxItems = None,
2802
        **kwargs,
2803
    ) -> ListFunctionUrlConfigsResponse:
2804
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2805
        state = lambda_stores[account_id][region]
1✔
2806

2807
        fn_name = api_utils.get_function_name(function_name, context)
1✔
2808
        resolved_fn = state.functions.get(fn_name)
1✔
2809
        if not resolved_fn:
1✔
2810
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2811

2812
        url_configs = [
1✔
2813
            api_utils.map_function_url_config(fn_conf)
2814
            for fn_conf in resolved_fn.function_url_configs.values()
2815
        ]
2816
        url_configs = PaginatedList(url_configs)
1✔
2817
        page, token = url_configs.get_page(
1✔
2818
            lambda url_config: url_config["FunctionArn"],
2819
            marker,
2820
            max_items,
2821
        )
2822
        url_configs = page
1✔
2823
        return ListFunctionUrlConfigsResponse(FunctionUrlConfigs=url_configs, NextMarker=token)
1✔
2824

2825
    # =======================================
2826
    # ============  Permissions  ============
2827
    # =======================================
2828

2829
    @handler("AddPermission", expand=False)
1✔
2830
    def add_permission(
1✔
2831
        self,
2832
        context: RequestContext,
2833
        request: AddPermissionRequest,
2834
    ) -> AddPermissionResponse:
2835
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2836
            request.get("FunctionName"), request.get("Qualifier"), context
2837
        )
2838

2839
        # validate qualifier
2840
        if qualifier is not None:
1✔
2841
            self._validate_qualifier_expression(qualifier)
1✔
2842
            if qualifier == "$LATEST":
1✔
2843
                raise InvalidParameterValueException(
1✔
2844
                    "We currently do not support adding policies for $LATEST.", Type="User"
2845
                )
2846
        account_id, region = api_utils.get_account_and_region(request.get("FunctionName"), context)
1✔
2847

2848
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2849
        resolved_qualifier, fn_arn = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2850

2851
        revision_id = request.get("RevisionId")
1✔
2852
        if revision_id:
1✔
2853
            fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2854
            if revision_id != fn_revision_id:
1✔
2855
                raise PreconditionFailedException(
1✔
2856
                    "The Revision Id provided does not match the latest Revision Id. "
2857
                    "Call the GetPolicy API to retrieve the latest Revision Id",
2858
                    Type="User",
2859
                )
2860

2861
        request_sid = request["StatementId"]
1✔
2862
        if not bool(STATEMENT_ID_REGEX.match(request_sid)):
1✔
2863
            raise ValidationException(
1✔
2864
                f"1 validation error detected: Value '{request_sid}' at 'statementId' failed to satisfy constraint: Member must satisfy regular expression pattern: ([a-zA-Z0-9-_]+)"
2865
            )
2866
        # check for an already existing policy and any conflicts in existing statements
2867
        existing_policy = resolved_fn.permissions.get(resolved_qualifier)
1✔
2868
        if existing_policy:
1✔
2869
            if request_sid in [s["Sid"] for s in existing_policy.policy.Statement]:
1✔
2870
                # uniqueness scope: statement id needs to be unique per qualified function ($LATEST, version, or alias)
2871
                # Counterexample: the same sid can exist within $LATEST, version, and alias
2872
                raise ResourceConflictException(
1✔
2873
                    f"The statement id ({request_sid}) provided already exists. Please provide a new statement id, or remove the existing statement.",
2874
                    Type="User",
2875
                )
2876

2877
        permission_statement = api_utils.build_statement(
1✔
2878
            partition=context.partition,
2879
            resource_arn=fn_arn,
2880
            statement_id=request["StatementId"],
2881
            action=request["Action"],
2882
            principal=request["Principal"],
2883
            source_arn=request.get("SourceArn"),
2884
            source_account=request.get("SourceAccount"),
2885
            principal_org_id=request.get("PrincipalOrgID"),
2886
            event_source_token=request.get("EventSourceToken"),
2887
            auth_type=request.get("FunctionUrlAuthType"),
2888
        )
2889
        new_policy = existing_policy
1✔
2890
        if not existing_policy:
1✔
2891
            new_policy = FunctionResourcePolicy(
1✔
2892
                policy=ResourcePolicy(Version="2012-10-17", Id="default", Statement=[])
2893
            )
2894
        new_policy.policy.Statement.append(permission_statement)
1✔
2895
        if not existing_policy:
1✔
2896
            resolved_fn.permissions[resolved_qualifier] = new_policy
1✔
2897

2898
        # Update revision id of alias or version
2899
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2900
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2901
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2902
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2903
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
1✔
2904
        # Assumes that a non-alias is a version
2905
        else:
2906
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2907
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2908
                resolved_version, config=dataclasses.replace(resolved_version.config)
2909
            )
2910
        return AddPermissionResponse(Statement=json.dumps(permission_statement))
1✔
2911

2912
    def remove_permission(
1✔
2913
        self,
2914
        context: RequestContext,
2915
        function_name: NamespacedFunctionName,
2916
        statement_id: NamespacedStatementId,
2917
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
2918
        revision_id: String | None = None,
2919
        **kwargs,
2920
    ) -> None:
2921
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2922
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2923
            function_name, qualifier, context
2924
        )
2925
        if qualifier is not None:
1✔
2926
            self._validate_qualifier_expression(qualifier)
1✔
2927

2928
        state = lambda_stores[account_id][region]
1✔
2929
        resolved_fn = state.functions.get(function_name)
1✔
2930
        if resolved_fn is None:
1✔
2931
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2932
            raise ResourceNotFoundException(f"No policy found for: {fn_arn}", Type="User")
1✔
2933

2934
        resolved_qualifier, _ = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2935
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2936
        if not function_permission:
1✔
2937
            raise ResourceNotFoundException(
1✔
2938
                "No policy is associated with the given resource.", Type="User"
2939
            )
2940

2941
        # try to find statement in policy and delete it
2942
        statement = None
1✔
2943
        for s in function_permission.policy.Statement:
1✔
2944
            if s["Sid"] == statement_id:
1✔
2945
                statement = s
1✔
2946
                break
1✔
2947

2948
        if not statement:
1✔
2949
            raise ResourceNotFoundException(
1✔
2950
                f"Statement {statement_id} is not found in resource policy.", Type="User"
2951
            )
2952
        fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2953
        if revision_id and revision_id != fn_revision_id:
1✔
2954
            raise PreconditionFailedException(
×
2955
                "The Revision Id provided does not match the latest Revision Id. "
2956
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2957
                Type="User",
2958
            )
2959
        function_permission.policy.Statement.remove(statement)
1✔
2960

2961
        # Update revision id for alias or version
2962
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2963
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2964
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2965
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
×
2966
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
×
2967
        # Assumes that a non-alias is a version
2968
        else:
2969
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2970
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2971
                resolved_version, config=dataclasses.replace(resolved_version.config)
2972
            )
2973

2974
        # remove the policy as a whole when there's no statement left in it
2975
        if len(function_permission.policy.Statement) == 0:
1✔
2976
            del resolved_fn.permissions[resolved_qualifier]
1✔
2977

2978
    def get_policy(
1✔
2979
        self,
2980
        context: RequestContext,
2981
        function_name: NamespacedFunctionName,
2982
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
2983
        **kwargs,
2984
    ) -> GetPolicyResponse:
2985
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2986
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2987
            function_name, qualifier, context
2988
        )
2989

2990
        if qualifier is not None:
1✔
2991
            self._validate_qualifier_expression(qualifier)
1✔
2992

2993
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2994

2995
        resolved_qualifier = qualifier or "$LATEST"
1✔
2996
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2997
        if not function_permission:
1✔
2998
            raise ResourceNotFoundException(
1✔
2999
                "The resource you requested does not exist.", Type="User"
3000
            )
3001

3002
        fn_revision_id = None
1✔
3003
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
3004
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
3005
            fn_revision_id = resolved_alias.revision_id
1✔
3006
        # Assumes that a non-alias is a version
3007
        else:
3008
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
3009
            fn_revision_id = resolved_version.config.revision_id
1✔
3010

3011
        return GetPolicyResponse(
1✔
3012
            Policy=json.dumps(dataclasses.asdict(function_permission.policy)),
3013
            RevisionId=fn_revision_id,
3014
        )
3015

3016
    # =======================================
3017
    # ========  Code signing config  ========
3018
    # =======================================
3019

3020
    def create_code_signing_config(
1✔
3021
        self,
3022
        context: RequestContext,
3023
        allowed_publishers: AllowedPublishers,
3024
        description: Description | None = None,
3025
        code_signing_policies: CodeSigningPolicies | None = None,
3026
        tags: Tags | None = None,
3027
        **kwargs,
3028
    ) -> CreateCodeSigningConfigResponse:
3029
        account = context.account_id
1✔
3030
        region = context.region
1✔
3031

3032
        state = lambda_stores[account][region]
1✔
3033
        # TODO: can there be duplicates?
3034
        csc_id = f"csc-{get_random_hex(17)}"  # e.g. 'csc-077c33b4c19e26036'
1✔
3035
        csc_arn = f"arn:{context.partition}:lambda:{region}:{account}:code-signing-config:{csc_id}"
1✔
3036
        csc = CodeSigningConfig(
1✔
3037
            csc_id=csc_id,
3038
            arn=csc_arn,
3039
            allowed_publishers=allowed_publishers,
3040
            policies=code_signing_policies,
3041
            last_modified=api_utils.generate_lambda_date(),
3042
            description=description,
3043
        )
3044
        state.code_signing_configs[csc_arn] = csc
1✔
3045
        return CreateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
3046

3047
    def put_function_code_signing_config(
1✔
3048
        self,
3049
        context: RequestContext,
3050
        code_signing_config_arn: CodeSigningConfigArn,
3051
        function_name: NamespacedFunctionName,
3052
        **kwargs,
3053
    ) -> PutFunctionCodeSigningConfigResponse:
3054
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3055
        state = lambda_stores[account_id][region]
1✔
3056
        function_name = api_utils.get_function_name(function_name, context)
1✔
3057

3058
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3059
        if not csc:
1✔
3060
            raise CodeSigningConfigNotFoundException(
1✔
3061
                f"The code signing configuration cannot be found. Check that the provided configuration is not deleted: {code_signing_config_arn}.",
3062
                Type="User",
3063
            )
3064

3065
        fn = state.functions.get(function_name)
1✔
3066
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
3067
        if not fn:
1✔
3068
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
3069

3070
        fn.code_signing_config_arn = code_signing_config_arn
1✔
3071
        return PutFunctionCodeSigningConfigResponse(
1✔
3072
            CodeSigningConfigArn=code_signing_config_arn, FunctionName=function_name
3073
        )
3074

3075
    def update_code_signing_config(
1✔
3076
        self,
3077
        context: RequestContext,
3078
        code_signing_config_arn: CodeSigningConfigArn,
3079
        description: Description = None,
3080
        allowed_publishers: AllowedPublishers = None,
3081
        code_signing_policies: CodeSigningPolicies = None,
3082
        **kwargs,
3083
    ) -> UpdateCodeSigningConfigResponse:
3084
        state = lambda_stores[context.account_id][context.region]
1✔
3085
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3086
        if not csc:
1✔
3087
            raise ResourceNotFoundException(
1✔
3088
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3089
            )
3090

3091
        changes = {
1✔
3092
            **(
3093
                {"allowed_publishers": allowed_publishers} if allowed_publishers is not None else {}
3094
            ),
3095
            **({"policies": code_signing_policies} if code_signing_policies is not None else {}),
3096
            **({"description": description} if description is not None else {}),
3097
        }
3098
        new_csc = dataclasses.replace(
1✔
3099
            csc, last_modified=api_utils.generate_lambda_date(), **changes
3100
        )
3101
        state.code_signing_configs[code_signing_config_arn] = new_csc
1✔
3102

3103
        return UpdateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(new_csc))
1✔
3104

3105
    def get_code_signing_config(
1✔
3106
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
3107
    ) -> GetCodeSigningConfigResponse:
3108
        state = lambda_stores[context.account_id][context.region]
1✔
3109
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3110
        if not csc:
1✔
3111
            raise ResourceNotFoundException(
1✔
3112
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3113
            )
3114

3115
        return GetCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
3116

3117
    def get_function_code_signing_config(
1✔
3118
        self, context: RequestContext, function_name: NamespacedFunctionName, **kwargs
3119
    ) -> GetFunctionCodeSigningConfigResponse:
3120
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3121
        state = lambda_stores[account_id][region]
1✔
3122
        function_name = api_utils.get_function_name(function_name, context)
1✔
3123
        fn = state.functions.get(function_name)
1✔
3124
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
3125
        if not fn:
1✔
3126
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
3127

3128
        if fn.code_signing_config_arn:
1✔
3129
            return GetFunctionCodeSigningConfigResponse(
1✔
3130
                CodeSigningConfigArn=fn.code_signing_config_arn, FunctionName=function_name
3131
            )
3132

3133
        return GetFunctionCodeSigningConfigResponse()
1✔
3134

3135
    def delete_function_code_signing_config(
1✔
3136
        self, context: RequestContext, function_name: NamespacedFunctionName, **kwargs
3137
    ) -> None:
3138
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3139
        state = lambda_stores[account_id][region]
1✔
3140
        function_name = api_utils.get_function_name(function_name, context)
1✔
3141
        fn = state.functions.get(function_name)
1✔
3142
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
3143
        if not fn:
1✔
3144
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
3145

3146
        fn.code_signing_config_arn = None
1✔
3147

3148
    def delete_code_signing_config(
1✔
3149
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
3150
    ) -> DeleteCodeSigningConfigResponse:
3151
        state = lambda_stores[context.account_id][context.region]
1✔
3152

3153
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3154
        if not csc:
1✔
3155
            raise ResourceNotFoundException(
1✔
3156
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3157
            )
3158

3159
        del state.code_signing_configs[code_signing_config_arn]
1✔
3160

3161
        return DeleteCodeSigningConfigResponse()
1✔
3162

3163
    def list_code_signing_configs(
1✔
3164
        self,
3165
        context: RequestContext,
3166
        marker: String = None,
3167
        max_items: MaxListItems = None,
3168
        **kwargs,
3169
    ) -> ListCodeSigningConfigsResponse:
3170
        state = lambda_stores[context.account_id][context.region]
1✔
3171

3172
        cscs = [api_utils.map_csc(csc) for csc in state.code_signing_configs.values()]
1✔
3173
        cscs = PaginatedList(cscs)
1✔
3174
        page, token = cscs.get_page(
1✔
3175
            lambda csc: csc["CodeSigningConfigId"],
3176
            marker,
3177
            max_items,
3178
        )
3179
        return ListCodeSigningConfigsResponse(CodeSigningConfigs=page, NextMarker=token)
1✔
3180

3181
    def list_functions_by_code_signing_config(
1✔
3182
        self,
3183
        context: RequestContext,
3184
        code_signing_config_arn: CodeSigningConfigArn,
3185
        marker: String = None,
3186
        max_items: MaxListItems = None,
3187
        **kwargs,
3188
    ) -> ListFunctionsByCodeSigningConfigResponse:
3189
        account = context.account_id
1✔
3190
        region = context.region
1✔
3191

3192
        state = lambda_stores[account][region]
1✔
3193

3194
        if code_signing_config_arn not in state.code_signing_configs:
1✔
3195
            raise ResourceNotFoundException(
1✔
3196
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3197
            )
3198

3199
        fn_arns = [
1✔
3200
            api_utils.unqualified_lambda_arn(fn.function_name, account, region)
3201
            for fn in state.functions.values()
3202
            if fn.code_signing_config_arn == code_signing_config_arn
3203
        ]
3204

3205
        cscs = PaginatedList(fn_arns)
1✔
3206
        page, token = cscs.get_page(
1✔
3207
            lambda x: x,
3208
            marker,
3209
            max_items,
3210
        )
3211
        return ListFunctionsByCodeSigningConfigResponse(FunctionArns=page, NextMarker=token)
1✔
3212

3213
    # =======================================
3214
    # =========  Account Settings   =========
3215
    # =======================================
3216

3217
    # CAVE: these settings & usages are *per* region!
3218
    # Lambda quotas: https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
3219
    def get_account_settings(self, context: RequestContext, **kwargs) -> GetAccountSettingsResponse:
1✔
3220
        state = lambda_stores[context.account_id][context.region]
1✔
3221

3222
        fn_count = 0
1✔
3223
        code_size_sum = 0
1✔
3224
        reserved_concurrency_sum = 0
1✔
3225
        for fn in state.functions.values():
1✔
3226
            fn_count += 1
1✔
3227
            for fn_version in fn.versions.values():
1✔
3228
                # Image-based Lambdas do not have a code attribute and count against the ECR quotas instead
3229
                if fn_version.config.package_type == PackageType.Zip:
1✔
3230
                    code_size_sum += fn_version.config.code.code_size
1✔
3231
            if fn.reserved_concurrent_executions is not None:
1✔
3232
                reserved_concurrency_sum += fn.reserved_concurrent_executions
1✔
3233
            for c in fn.provisioned_concurrency_configs.values():
1✔
3234
                reserved_concurrency_sum += c.provisioned_concurrent_executions
1✔
3235
        for layer in state.layers.values():
1✔
3236
            for layer_version in layer.layer_versions.values():
1✔
3237
                code_size_sum += layer_version.code.code_size
1✔
3238
        return GetAccountSettingsResponse(
1✔
3239
            AccountLimit=AccountLimit(
3240
                TotalCodeSize=config.LAMBDA_LIMITS_TOTAL_CODE_SIZE,
3241
                CodeSizeZipped=config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED,
3242
                CodeSizeUnzipped=config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED,
3243
                ConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS,
3244
                UnreservedConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS
3245
                - reserved_concurrency_sum,
3246
            ),
3247
            AccountUsage=AccountUsage(
3248
                TotalCodeSize=code_size_sum,
3249
                FunctionCount=fn_count,
3250
            ),
3251
        )
3252

3253
    # =======================================
3254
    # ==  Provisioned Concurrency Config   ==
3255
    # =======================================
3256

3257
    def _get_provisioned_config(
1✔
3258
        self, context: RequestContext, function_name: str, qualifier: str
3259
    ) -> ProvisionedConcurrencyConfiguration | None:
3260
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3261
        state = lambda_stores[account_id][region]
1✔
3262
        function_name = api_utils.get_function_name(function_name, context)
1✔
3263
        fn = state.functions.get(function_name)
1✔
3264
        if api_utils.qualifier_is_alias(qualifier):
1✔
3265
            fn_alias = None
1✔
3266
            if fn:
1✔
3267
                fn_alias = fn.aliases.get(qualifier)
1✔
3268
            if fn_alias is None:
1✔
3269
                raise ResourceNotFoundException(
1✔
3270
                    f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3271
                    Type="User",
3272
                )
3273
        elif api_utils.qualifier_is_version(qualifier):
1✔
3274
            fn_version = None
1✔
3275
            if fn:
1✔
3276
                fn_version = fn.versions.get(qualifier)
1✔
3277
            if fn_version is None:
1✔
3278
                raise ResourceNotFoundException(
1✔
3279
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3280
                    Type="User",
3281
                )
3282

3283
        return fn.provisioned_concurrency_configs.get(qualifier)
1✔
3284

3285
    def put_provisioned_concurrency_config(
1✔
3286
        self,
3287
        context: RequestContext,
3288
        function_name: FunctionName,
3289
        qualifier: Qualifier,
3290
        provisioned_concurrent_executions: PositiveInteger,
3291
        **kwargs,
3292
    ) -> PutProvisionedConcurrencyConfigResponse:
3293
        if provisioned_concurrent_executions <= 0:
1✔
3294
            raise ValidationException(
1✔
3295
                f"1 validation error detected: Value '{provisioned_concurrent_executions}' at 'provisionedConcurrentExecutions' failed to satisfy constraint: Member must have value greater than or equal to 1"
3296
            )
3297

3298
        if qualifier == "$LATEST":
1✔
3299
            raise InvalidParameterValueException(
1✔
3300
                "Provisioned Concurrency Configs cannot be applied to unpublished function versions.",
3301
                Type="User",
3302
            )
3303
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3304
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3305
            function_name, qualifier, context
3306
        )
3307
        state = lambda_stores[account_id][region]
1✔
3308
        fn = state.functions.get(function_name)
1✔
3309

3310
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3311

3312
        if provisioned_config:  # TODO: merge?
1✔
3313
            # TODO: add a test for partial updates (if possible)
3314
            LOG.warning(
1✔
3315
                "Partial update of provisioned concurrency config is currently not supported."
3316
            )
3317

3318
        other_provisioned_sum = sum(
1✔
3319
            [
3320
                provisioned_configs.provisioned_concurrent_executions
3321
                for provisioned_qualifier, provisioned_configs in fn.provisioned_concurrency_configs.items()
3322
                if provisioned_qualifier != qualifier
3323
            ]
3324
        )
3325

3326
        if (
1✔
3327
            fn.reserved_concurrent_executions is not None
3328
            and fn.reserved_concurrent_executions
3329
            < other_provisioned_sum + provisioned_concurrent_executions
3330
        ):
3331
            raise InvalidParameterValueException(
1✔
3332
                "Requested Provisioned Concurrency should not be greater than the reservedConcurrentExecution for function",
3333
                Type="User",
3334
            )
3335

3336
        if provisioned_concurrent_executions > config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS:
1✔
3337
            raise InvalidParameterValueException(
1✔
3338
                f"Specified ConcurrentExecutions for function is greater than account's unreserved concurrency"
3339
                f" [{config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS}]."
3340
            )
3341

3342
        settings = self.get_account_settings(context)
1✔
3343
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
3344
            "UnreservedConcurrentExecutions"
3345
        ]
3346
        if (
1✔
3347
            unreserved_concurrent_executions - provisioned_concurrent_executions
3348
            < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY
3349
        ):
3350
            raise InvalidParameterValueException(
1✔
3351
                f"Specified ConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below"
3352
                f" its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
3353
            )
3354

3355
        provisioned_config = ProvisionedConcurrencyConfiguration(
1✔
3356
            provisioned_concurrent_executions, api_utils.generate_lambda_date()
3357
        )
3358
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3359

3360
        if api_utils.qualifier_is_alias(qualifier):
1✔
3361
            alias = fn.aliases.get(qualifier)
1✔
3362
            resolved_version = fn.versions.get(alias.function_version)
1✔
3363

3364
            if (
1✔
3365
                resolved_version
3366
                and fn.provisioned_concurrency_configs.get(alias.function_version) is not None
3367
            ):
3368
                raise ResourceConflictException(
1✔
3369
                    "Alias can't be used for Provisioned Concurrency configuration on an already Provisioned version",
3370
                    Type="User",
3371
                )
3372
            fn_arn = resolved_version.id.qualified_arn()
1✔
3373
        elif api_utils.qualifier_is_version(qualifier):
1✔
3374
            fn_version = fn.versions.get(qualifier)
1✔
3375

3376
            # TODO: might be useful other places, utilize
3377
            pointing_aliases = []
1✔
3378
            for alias in fn.aliases.values():
1✔
3379
                if (
1✔
3380
                    alias.function_version == qualifier
3381
                    and fn.provisioned_concurrency_configs.get(alias.name) is not None
3382
                ):
3383
                    pointing_aliases.append(alias.name)
1✔
3384
            if pointing_aliases:
1✔
3385
                raise ResourceConflictException(
1✔
3386
                    "Version is pointed by a Provisioned Concurrency alias", Type="User"
3387
                )
3388

3389
            fn_arn = fn_version.id.qualified_arn()
1✔
3390

3391
        manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3392

3393
        fn.provisioned_concurrency_configs[qualifier] = provisioned_config
1✔
3394

3395
        manager.update_provisioned_concurrency_config(
1✔
3396
            provisioned_config.provisioned_concurrent_executions
3397
        )
3398

3399
        return PutProvisionedConcurrencyConfigResponse(
1✔
3400
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3401
            AvailableProvisionedConcurrentExecutions=0,
3402
            AllocatedProvisionedConcurrentExecutions=0,
3403
            Status=ProvisionedConcurrencyStatusEnum.IN_PROGRESS,
3404
            # StatusReason=manager.provisioned_state.status_reason,
3405
            LastModified=provisioned_config.last_modified,  # TODO: does change with configuration or also with state changes?
3406
        )
3407

3408
    def get_provisioned_concurrency_config(
1✔
3409
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3410
    ) -> GetProvisionedConcurrencyConfigResponse:
3411
        if qualifier == "$LATEST":
1✔
3412
            raise InvalidParameterValueException(
1✔
3413
                "The function resource provided must be an alias or a published version.",
3414
                Type="User",
3415
            )
3416
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3417
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3418
            function_name, qualifier, context
3419
        )
3420

3421
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3422
        if not provisioned_config:
1✔
3423
            raise ProvisionedConcurrencyConfigNotFoundException(
1✔
3424
                "No Provisioned Concurrency Config found for this function", Type="User"
3425
            )
3426

3427
        # TODO: make this compatible with alias pointer migration on update
3428
        if api_utils.qualifier_is_alias(qualifier):
1✔
3429
            state = lambda_stores[account_id][region]
1✔
3430
            fn = state.functions.get(function_name)
1✔
3431
            alias = fn.aliases.get(qualifier)
1✔
3432
            fn_arn = api_utils.qualified_lambda_arn(
1✔
3433
                function_name, alias.function_version, account_id, region
3434
            )
3435
        else:
3436
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3437

3438
        ver_manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3439

3440
        return GetProvisionedConcurrencyConfigResponse(
1✔
3441
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3442
            LastModified=provisioned_config.last_modified,
3443
            AvailableProvisionedConcurrentExecutions=ver_manager.provisioned_state.available,
3444
            AllocatedProvisionedConcurrentExecutions=ver_manager.provisioned_state.allocated,
3445
            Status=ver_manager.provisioned_state.status,
3446
            StatusReason=ver_manager.provisioned_state.status_reason,
3447
        )
3448

3449
    def list_provisioned_concurrency_configs(
1✔
3450
        self,
3451
        context: RequestContext,
3452
        function_name: FunctionName,
3453
        marker: String = None,
3454
        max_items: MaxProvisionedConcurrencyConfigListItems = None,
3455
        **kwargs,
3456
    ) -> ListProvisionedConcurrencyConfigsResponse:
3457
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3458
        state = lambda_stores[account_id][region]
1✔
3459

3460
        function_name = api_utils.get_function_name(function_name, context)
1✔
3461
        fn = state.functions.get(function_name)
1✔
3462
        if fn is None:
1✔
3463
            raise ResourceNotFoundException(
1✔
3464
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
3465
                Type="User",
3466
            )
3467

3468
        configs = []
1✔
3469
        for qualifier, pc_config in fn.provisioned_concurrency_configs.items():
1✔
3470
            if api_utils.qualifier_is_alias(qualifier):
×
3471
                alias = fn.aliases.get(qualifier)
×
3472
                fn_arn = api_utils.qualified_lambda_arn(
×
3473
                    function_name, alias.function_version, account_id, region
3474
                )
3475
            else:
3476
                fn_arn = api_utils.qualified_lambda_arn(
×
3477
                    function_name, qualifier, account_id, region
3478
                )
3479

3480
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
3481

3482
            configs.append(
×
3483
                ProvisionedConcurrencyConfigListItem(
3484
                    FunctionArn=api_utils.qualified_lambda_arn(
3485
                        function_name, qualifier, account_id, region
3486
                    ),
3487
                    RequestedProvisionedConcurrentExecutions=pc_config.provisioned_concurrent_executions,
3488
                    AvailableProvisionedConcurrentExecutions=manager.provisioned_state.available,
3489
                    AllocatedProvisionedConcurrentExecutions=manager.provisioned_state.allocated,
3490
                    Status=manager.provisioned_state.status,
3491
                    StatusReason=manager.provisioned_state.status_reason,
3492
                    LastModified=pc_config.last_modified,
3493
                )
3494
            )
3495

3496
        provisioned_concurrency_configs = configs
1✔
3497
        provisioned_concurrency_configs = PaginatedList(provisioned_concurrency_configs)
1✔
3498
        page, token = provisioned_concurrency_configs.get_page(
1✔
3499
            lambda x: x,
3500
            marker,
3501
            max_items,
3502
        )
3503
        return ListProvisionedConcurrencyConfigsResponse(
1✔
3504
            ProvisionedConcurrencyConfigs=page, NextMarker=token
3505
        )
3506

3507
    def delete_provisioned_concurrency_config(
1✔
3508
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3509
    ) -> None:
3510
        if qualifier == "$LATEST":
1✔
3511
            raise InvalidParameterValueException(
1✔
3512
                "The function resource provided must be an alias or a published version.",
3513
                Type="User",
3514
            )
3515
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3516
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3517
            function_name, qualifier, context
3518
        )
3519
        state = lambda_stores[account_id][region]
1✔
3520
        fn = state.functions.get(function_name)
1✔
3521

3522
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3523
        # delete is idempotent and doesn't actually care about the provisioned concurrency config not existing
3524
        if provisioned_config:
1✔
3525
            fn.provisioned_concurrency_configs.pop(qualifier)
1✔
3526
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3527
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3528
            manager.update_provisioned_concurrency_config(0)
1✔
3529

3530
    # =======================================
3531
    # =======  Event Invoke Config   ========
3532
    # =======================================
3533

3534
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)"
3535
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]2((-gov)|(-iso(b?)))?-[a-z]+-\\d1)?:(\\d12)?:(.*)" ... (expected → actual)
3536

3537
    def _validate_destination_config(
1✔
3538
        self, store: LambdaStore, function_name: str, destination_config: DestinationConfig
3539
    ):
3540
        def _validate_destination_arn(destination_arn) -> bool:
1✔
3541
            if not api_utils.DESTINATION_ARN_PATTERN.match(destination_arn):
1✔
3542
                # technically we shouldn't handle this in the provider
3543
                raise ValidationException(
1✔
3544
                    "1 validation error detected: Value '"
3545
                    + destination_arn
3546
                    + "' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: "
3547
                    + "$|kafka://([^.]([a-zA-Z0-9\\-_.]{0,248}))|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:((eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)"
3548
                )
3549

3550
            match destination_arn.split(":")[2]:
1✔
3551
                case "lambda":
1✔
3552
                    fn_parts = api_utils.FULL_FN_ARN_PATTERN.search(destination_arn).groupdict()
1✔
3553
                    if fn_parts:
1✔
3554
                        # check if it exists
3555
                        fn = store.functions.get(fn_parts["function_name"])
1✔
3556
                        if not fn:
1✔
3557
                            raise InvalidParameterValueException(
1✔
3558
                                f"The destination ARN {destination_arn} is invalid.", Type="User"
3559
                            )
3560
                        if fn_parts["function_name"] == function_name:
1✔
3561
                            raise InvalidParameterValueException(
1✔
3562
                                "You can't specify the function as a destination for itself.",
3563
                                Type="User",
3564
                            )
3565
                case "sns" | "sqs" | "events":
1✔
3566
                    pass
1✔
3567
                case _:
1✔
3568
                    return False
1✔
3569
            return True
1✔
3570

3571
        validation_err = False
1✔
3572

3573
        failure_destination = destination_config.get("OnFailure", {}).get("Destination")
1✔
3574
        if failure_destination:
1✔
3575
            validation_err = validation_err or not _validate_destination_arn(failure_destination)
1✔
3576

3577
        success_destination = destination_config.get("OnSuccess", {}).get("Destination")
1✔
3578
        if success_destination:
1✔
3579
            validation_err = validation_err or not _validate_destination_arn(success_destination)
1✔
3580

3581
        if validation_err:
1✔
3582
            on_success_part = (
1✔
3583
                f"OnSuccess(destination={success_destination})" if success_destination else "null"
3584
            )
3585
            on_failure_part = (
1✔
3586
                f"OnFailure(destination={failure_destination})" if failure_destination else "null"
3587
            )
3588
            raise InvalidParameterValueException(
1✔
3589
                f"The provided destination config DestinationConfig(onSuccess={on_success_part}, onFailure={on_failure_part}) is invalid.",
3590
                Type="User",
3591
            )
3592

3593
    def put_function_event_invoke_config(
1✔
3594
        self,
3595
        context: RequestContext,
3596
        function_name: FunctionName,
3597
        qualifier: NumericLatestPublishedOrAliasQualifier = None,
3598
        maximum_retry_attempts: MaximumRetryAttempts = None,
3599
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3600
        destination_config: DestinationConfig = None,
3601
        **kwargs,
3602
    ) -> FunctionEventInvokeConfig:
3603
        """
3604
        Destination ARNs can be:
3605
        * SQS arn
3606
        * SNS arn
3607
        * Lambda arn
3608
        * EventBridge arn
3609

3610
        Differences between put_ and update_:
3611
            * put overwrites any existing config
3612
            * update allows changes only single values while keeping the rest of existing ones
3613
            * update fails on non-existing configs
3614

3615
        Differences between destination and DLQ
3616
            * "However, a dead-letter queue is part of a function's version-specific configuration, so it is locked in when you publish a version."
3617
            *  "On-failure destinations also support additional targets and include details about the function's response in the invocation record."
3618

3619
        """
3620
        if (
1✔
3621
            maximum_event_age_in_seconds is None
3622
            and maximum_retry_attempts is None
3623
            and destination_config is None
3624
        ):
3625
            raise InvalidParameterValueException(
1✔
3626
                "You must specify at least one of error handling or destination setting.",
3627
                Type="User",
3628
            )
3629
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3630
        state = lambda_stores[account_id][region]
1✔
3631
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3632
            function_name, qualifier, context
3633
        )
3634
        fn = state.functions.get(function_name)
1✔
3635
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3636
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3637

3638
        qualifier = qualifier or "$LATEST"
1✔
3639

3640
        # validate and normalize destination config
3641
        if destination_config:
1✔
3642
            self._validate_destination_config(state, function_name, destination_config)
1✔
3643

3644
        destination_config = DestinationConfig(
1✔
3645
            OnSuccess=OnSuccess(
3646
                Destination=(destination_config or {}).get("OnSuccess", {}).get("Destination")
3647
            ),
3648
            OnFailure=OnFailure(
3649
                Destination=(destination_config or {}).get("OnFailure", {}).get("Destination")
3650
            ),
3651
        )
3652

3653
        config = EventInvokeConfig(
1✔
3654
            function_name=function_name,
3655
            qualifier=qualifier,
3656
            maximum_event_age_in_seconds=maximum_event_age_in_seconds,
3657
            maximum_retry_attempts=maximum_retry_attempts,
3658
            last_modified=api_utils.generate_lambda_date(),
3659
            destination_config=destination_config,
3660
        )
3661
        fn.event_invoke_configs[qualifier] = config
1✔
3662

3663
        return FunctionEventInvokeConfig(
1✔
3664
            LastModified=datetime.datetime.strptime(
3665
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3666
            ),
3667
            FunctionArn=api_utils.qualified_lambda_arn(
3668
                function_name, qualifier or "$LATEST", account_id, region
3669
            ),
3670
            DestinationConfig=destination_config,
3671
            MaximumEventAgeInSeconds=maximum_event_age_in_seconds,
3672
            MaximumRetryAttempts=maximum_retry_attempts,
3673
        )
3674

3675
    def get_function_event_invoke_config(
1✔
3676
        self,
3677
        context: RequestContext,
3678
        function_name: FunctionName,
3679
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
3680
        **kwargs,
3681
    ) -> FunctionEventInvokeConfig:
3682
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3683
        state = lambda_stores[account_id][region]
1✔
3684
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3685
            function_name, qualifier, context
3686
        )
3687

3688
        qualifier = qualifier or "$LATEST"
1✔
3689
        fn = state.functions.get(function_name)
1✔
3690
        if not fn:
1✔
3691
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3692
            raise ResourceNotFoundException(
1✔
3693
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3694
            )
3695

3696
        config = fn.event_invoke_configs.get(qualifier)
1✔
3697
        if not config:
1✔
3698
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3699
            raise ResourceNotFoundException(
1✔
3700
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3701
            )
3702

3703
        return FunctionEventInvokeConfig(
1✔
3704
            LastModified=datetime.datetime.strptime(
3705
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3706
            ),
3707
            FunctionArn=api_utils.qualified_lambda_arn(
3708
                function_name, qualifier, account_id, region
3709
            ),
3710
            DestinationConfig=config.destination_config,
3711
            MaximumEventAgeInSeconds=config.maximum_event_age_in_seconds,
3712
            MaximumRetryAttempts=config.maximum_retry_attempts,
3713
        )
3714

3715
    def list_function_event_invoke_configs(
1✔
3716
        self,
3717
        context: RequestContext,
3718
        function_name: FunctionName,
3719
        marker: String = None,
3720
        max_items: MaxFunctionEventInvokeConfigListItems = None,
3721
        **kwargs,
3722
    ) -> ListFunctionEventInvokeConfigsResponse:
3723
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3724
        state = lambda_stores[account_id][region]
1✔
3725
        fn = state.functions.get(function_name)
1✔
3726
        if not fn:
1✔
3727
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3728

3729
        event_invoke_configs = [
1✔
3730
            FunctionEventInvokeConfig(
3731
                LastModified=c.last_modified,
3732
                FunctionArn=api_utils.qualified_lambda_arn(
3733
                    function_name, c.qualifier, account_id, region
3734
                ),
3735
                MaximumEventAgeInSeconds=c.maximum_event_age_in_seconds,
3736
                MaximumRetryAttempts=c.maximum_retry_attempts,
3737
                DestinationConfig=c.destination_config,
3738
            )
3739
            for c in fn.event_invoke_configs.values()
3740
        ]
3741

3742
        event_invoke_configs = PaginatedList(event_invoke_configs)
1✔
3743
        page, token = event_invoke_configs.get_page(
1✔
3744
            lambda x: x["FunctionArn"],
3745
            marker,
3746
            max_items,
3747
        )
3748
        return ListFunctionEventInvokeConfigsResponse(
1✔
3749
            FunctionEventInvokeConfigs=page, NextMarker=token
3750
        )
3751

3752
    def delete_function_event_invoke_config(
1✔
3753
        self,
3754
        context: RequestContext,
3755
        function_name: FunctionName,
3756
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
3757
        **kwargs,
3758
    ) -> None:
3759
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3760
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3761
            function_name, qualifier, context
3762
        )
3763
        state = lambda_stores[account_id][region]
1✔
3764
        fn = state.functions.get(function_name)
1✔
3765
        resolved_qualifier = qualifier or "$LATEST"
1✔
3766
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3767
        if not fn:
1✔
3768
            raise ResourceNotFoundException(
1✔
3769
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3770
            )
3771

3772
        config = fn.event_invoke_configs.get(resolved_qualifier)
1✔
3773
        if not config:
1✔
3774
            raise ResourceNotFoundException(
1✔
3775
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3776
            )
3777

3778
        del fn.event_invoke_configs[resolved_qualifier]
1✔
3779

3780
    def update_function_event_invoke_config(
1✔
3781
        self,
3782
        context: RequestContext,
3783
        function_name: FunctionName,
3784
        qualifier: NumericLatestPublishedOrAliasQualifier = None,
3785
        maximum_retry_attempts: MaximumRetryAttempts = None,
3786
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3787
        destination_config: DestinationConfig = None,
3788
        **kwargs,
3789
    ) -> FunctionEventInvokeConfig:
3790
        # like put but only update single fields via replace
3791
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3792
        state = lambda_stores[account_id][region]
1✔
3793
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3794
            function_name, qualifier, context
3795
        )
3796

3797
        if (
1✔
3798
            maximum_event_age_in_seconds is None
3799
            and maximum_retry_attempts is None
3800
            and destination_config is None
3801
        ):
3802
            raise InvalidParameterValueException(
×
3803
                "You must specify at least one of error handling or destination setting.",
3804
                Type="User",
3805
            )
3806

3807
        fn = state.functions.get(function_name)
1✔
3808
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3809
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3810

3811
        qualifier = qualifier or "$LATEST"
1✔
3812

3813
        config = fn.event_invoke_configs.get(qualifier)
1✔
3814
        if not config:
1✔
3815
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3816
            raise ResourceNotFoundException(
1✔
3817
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3818
            )
3819

3820
        if destination_config:
1✔
3821
            self._validate_destination_config(state, function_name, destination_config)
×
3822

3823
        optional_kwargs = {
1✔
3824
            k: v
3825
            for k, v in {
3826
                "destination_config": destination_config,
3827
                "maximum_retry_attempts": maximum_retry_attempts,
3828
                "maximum_event_age_in_seconds": maximum_event_age_in_seconds,
3829
            }.items()
3830
            if v is not None
3831
        }
3832

3833
        new_config = dataclasses.replace(
1✔
3834
            config, last_modified=api_utils.generate_lambda_date(), **optional_kwargs
3835
        )
3836
        fn.event_invoke_configs[qualifier] = new_config
1✔
3837

3838
        return FunctionEventInvokeConfig(
1✔
3839
            LastModified=datetime.datetime.strptime(
3840
                new_config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3841
            ),
3842
            FunctionArn=api_utils.qualified_lambda_arn(
3843
                function_name, qualifier or "$LATEST", account_id, region
3844
            ),
3845
            DestinationConfig=new_config.destination_config,
3846
            MaximumEventAgeInSeconds=new_config.maximum_event_age_in_seconds,
3847
            MaximumRetryAttempts=new_config.maximum_retry_attempts,
3848
        )
3849

3850
    # =======================================
3851
    # ======  Layer & Layer Versions  =======
3852
    # =======================================
3853

3854
    @staticmethod
1✔
3855
    def _resolve_layer(
1✔
3856
        layer_name_or_arn: str, context: RequestContext
3857
    ) -> tuple[str, str, str, str | None]:
3858
        """
3859
        Return locator attributes for a given Lambda layer.
3860

3861
        :param layer_name_or_arn: Layer name or ARN
3862
        :param context: Request context
3863
        :return: Tuple of region, account ID, layer name, layer version
3864
        """
3865
        if api_utils.is_layer_arn(layer_name_or_arn):
1✔
3866
            return api_utils.parse_layer_arn(layer_name_or_arn)
1✔
3867

3868
        return context.region, context.account_id, layer_name_or_arn, None
1✔
3869

3870
    def publish_layer_version(
1✔
3871
        self,
3872
        context: RequestContext,
3873
        layer_name: LayerName,
3874
        content: LayerVersionContentInput,
3875
        description: Description | None = None,
3876
        compatible_runtimes: CompatibleRuntimes | None = None,
3877
        license_info: LicenseInfo | None = None,
3878
        compatible_architectures: CompatibleArchitectures | None = None,
3879
        **kwargs,
3880
    ) -> PublishLayerVersionResponse:
3881
        """
3882
        On first use of a LayerName a new layer is created and for each subsequent call with the same LayerName a new version is created.
3883
        Note that there are no $LATEST versions with layers!
3884

3885
        """
3886
        account = context.account_id
1✔
3887
        region = context.region
1✔
3888

3889
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
3890
            compatible_runtimes, compatible_architectures
3891
        )
3892
        if validation_errors:
1✔
3893
            raise ValidationException(
1✔
3894
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {'; '.join(validation_errors)}"
3895
            )
3896

3897
        state = lambda_stores[account][region]
1✔
3898
        with self.create_layer_lock:
1✔
3899
            if layer_name not in state.layers:
1✔
3900
                # we don't have a version so create new layer object
3901
                # lock is required to avoid creating two v1 objects for the same name
3902
                layer = Layer(
1✔
3903
                    arn=api_utils.layer_arn(layer_name=layer_name, account=account, region=region)
3904
                )
3905
                state.layers[layer_name] = layer
1✔
3906

3907
        layer = state.layers[layer_name]
1✔
3908
        with layer.next_version_lock:
1✔
3909
            next_version = LambdaLayerVersionIdentifier(
1✔
3910
                account_id=account, region=region, layer_name=layer_name
3911
            ).generate(next_version=layer.next_version)
3912
            # When creating a layer with user defined layer version, it is possible that we
3913
            # create layer versions out of order.
3914
            # ie. a user could replicate layer v2 then layer v1. It is important to always keep the maximum possible
3915
            # value for next layer to avoid overwriting existing versions
3916
            if layer.next_version <= next_version:
1✔
3917
                # We don't need to update layer.next_version if the created version is lower than the "next in line"
3918
                layer.next_version = max(next_version, layer.next_version) + 1
1✔
3919

3920
        # creating a new layer
3921
        if content.get("ZipFile"):
1✔
3922
            code = store_lambda_archive(
1✔
3923
                archive_file=content["ZipFile"],
3924
                function_name=layer_name,
3925
                region_name=region,
3926
                account_id=account,
3927
            )
3928
        else:
3929
            code = store_s3_bucket_archive(
1✔
3930
                archive_bucket=content["S3Bucket"],
3931
                archive_key=content["S3Key"],
3932
                archive_version=content.get("S3ObjectVersion"),
3933
                function_name=layer_name,
3934
                region_name=region,
3935
                account_id=account,
3936
            )
3937

3938
        new_layer_version = LayerVersion(
1✔
3939
            layer_version_arn=api_utils.layer_version_arn(
3940
                layer_name=layer_name,
3941
                account=account,
3942
                region=region,
3943
                version=str(next_version),
3944
            ),
3945
            layer_arn=layer.arn,
3946
            version=next_version,
3947
            description=description or "",
3948
            license_info=license_info,
3949
            compatible_runtimes=compatible_runtimes,
3950
            compatible_architectures=compatible_architectures,
3951
            created=api_utils.generate_lambda_date(),
3952
            code=code,
3953
        )
3954

3955
        layer.layer_versions[str(next_version)] = new_layer_version
1✔
3956

3957
        return api_utils.map_layer_out(new_layer_version)
1✔
3958

3959
    def get_layer_version(
1✔
3960
        self,
3961
        context: RequestContext,
3962
        layer_name: LayerName,
3963
        version_number: LayerVersionNumber,
3964
        **kwargs,
3965
    ) -> GetLayerVersionResponse:
3966
        # TODO: handle layer_name as an ARN
3967

3968
        region_name, account_id, layer_name, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
3969
        state = lambda_stores[account_id][region_name]
1✔
3970

3971
        layer = state.layers.get(layer_name)
1✔
3972
        if version_number < 1:
1✔
3973
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
3974
        if layer is None:
1✔
3975
            raise ResourceNotFoundException(
1✔
3976
                "The resource you requested does not exist.", Type="User"
3977
            )
3978
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3979
        if layer_version is None:
1✔
3980
            raise ResourceNotFoundException(
1✔
3981
                "The resource you requested does not exist.", Type="User"
3982
            )
3983
        return api_utils.map_layer_out(layer_version)
1✔
3984

3985
    def get_layer_version_by_arn(
1✔
3986
        self, context: RequestContext, arn: LayerVersionArn, **kwargs
3987
    ) -> GetLayerVersionResponse:
3988
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3989
            arn, context
3990
        )
3991

3992
        if not layer_version:
1✔
3993
            raise ValidationException(
1✔
3994
                f"1 validation error detected: Value '{arn}' at 'arn' failed to satisfy constraint: Member must satisfy regular expression pattern: "
3995
                + "(arn:(aws[a-zA-Z-]*)?:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+)"
3996
            )
3997

3998
        store = lambda_stores[account_id][region_name]
1✔
3999
        if not (layers := store.layers.get(layer_name)):
1✔
4000
            raise ResourceNotFoundException(
×
4001
                "The resource you requested does not exist.", Type="User"
4002
            )
4003

4004
        layer_version = layers.layer_versions.get(layer_version)
1✔
4005

4006
        if not layer_version:
1✔
4007
            raise ResourceNotFoundException(
1✔
4008
                "The resource you requested does not exist.", Type="User"
4009
            )
4010

4011
        return api_utils.map_layer_out(layer_version)
1✔
4012

4013
    def list_layers(
1✔
4014
        self,
4015
        context: RequestContext,
4016
        compatible_runtime: Runtime | None = None,
4017
        marker: String | None = None,
4018
        max_items: MaxLayerListItems | None = None,
4019
        compatible_architecture: Architecture | None = None,
4020
        **kwargs,
4021
    ) -> ListLayersResponse:
4022
        validation_errors = []
1✔
4023

4024
        validation_error_arch = api_utils.validate_layer_architecture(compatible_architecture)
1✔
4025
        if validation_error_arch:
1✔
4026
            validation_errors.append(validation_error_arch)
1✔
4027

4028
        validation_error_runtime = api_utils.validate_layer_runtime(compatible_runtime)
1✔
4029
        if validation_error_runtime:
1✔
4030
            validation_errors.append(validation_error_runtime)
1✔
4031

4032
        if validation_errors:
1✔
4033
            raise ValidationException(
1✔
4034
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
4035
            )
4036
        # TODO: handle filter: compatible_runtime
4037
        # TODO: handle filter: compatible_architecture
4038

4039
        state = lambda_stores[context.account_id][context.region]
×
4040
        layers = state.layers
×
4041

4042
        # TODO: test how filters behave together with only returning layers here? Does it return the latest "matching" layer, i.e. does it ignore later layer versions that don't match?
4043

4044
        responses: list[LayersListItem] = []
×
4045
        for layer_name, layer in layers.items():
×
4046
            # fetch latest version
4047
            layer_versions = list(layer.layer_versions.values())
×
4048
            sorted(layer_versions, key=lambda x: x.version)
×
4049
            latest_layer_version = layer_versions[-1]
×
4050
            responses.append(
×
4051
                LayersListItem(
4052
                    LayerName=layer_name,
4053
                    LayerArn=layer.arn,
4054
                    LatestMatchingVersion=api_utils.map_layer_out(latest_layer_version),
4055
                )
4056
            )
4057

4058
        responses = PaginatedList(responses)
×
4059
        page, token = responses.get_page(
×
4060
            lambda version: version,
4061
            marker,
4062
            max_items,
4063
        )
4064

4065
        return ListLayersResponse(NextMarker=token, Layers=page)
×
4066

4067
    def list_layer_versions(
1✔
4068
        self,
4069
        context: RequestContext,
4070
        layer_name: LayerName,
4071
        compatible_runtime: Runtime | None = None,
4072
        marker: String | None = None,
4073
        max_items: MaxLayerListItems | None = None,
4074
        compatible_architecture: Architecture | None = None,
4075
        **kwargs,
4076
    ) -> ListLayerVersionsResponse:
4077
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
4078
            [compatible_runtime] if compatible_runtime else [],
4079
            [compatible_architecture] if compatible_architecture else [],
4080
        )
4081
        if validation_errors:
1✔
4082
            raise ValidationException(
×
4083
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
4084
            )
4085

4086
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
4087
            layer_name, context
4088
        )
4089
        state = lambda_stores[account_id][region_name]
1✔
4090

4091
        # TODO: Test & handle filter: compatible_runtime
4092
        # TODO: Test & handle filter: compatible_architecture
4093
        all_layer_versions = []
1✔
4094
        layer = state.layers.get(layer_name)
1✔
4095
        if layer is not None:
1✔
4096
            for layer_version in layer.layer_versions.values():
1✔
4097
                all_layer_versions.append(api_utils.map_layer_out(layer_version))
1✔
4098

4099
        all_layer_versions.sort(key=lambda x: x["Version"], reverse=True)
1✔
4100
        all_layer_versions = PaginatedList(all_layer_versions)
1✔
4101
        page, token = all_layer_versions.get_page(
1✔
4102
            lambda version: version["LayerVersionArn"],
4103
            marker,
4104
            max_items,
4105
        )
4106
        return ListLayerVersionsResponse(NextMarker=token, LayerVersions=page)
1✔
4107

4108
    def delete_layer_version(
1✔
4109
        self,
4110
        context: RequestContext,
4111
        layer_name: LayerName,
4112
        version_number: LayerVersionNumber,
4113
        **kwargs,
4114
    ) -> None:
4115
        if version_number < 1:
1✔
4116
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
4117

4118
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
4119
            layer_name, context
4120
        )
4121

4122
        store = lambda_stores[account_id][region_name]
1✔
4123
        layer = store.layers.get(layer_name, {})
1✔
4124
        if layer:
1✔
4125
            layer.layer_versions.pop(str(version_number), None)
1✔
4126

4127
    # =======================================
4128
    # =====  Layer Version Permissions  =====
4129
    # =======================================
4130
    # TODO: lock updates that change revision IDs
4131

4132
    def add_layer_version_permission(
1✔
4133
        self,
4134
        context: RequestContext,
4135
        layer_name: LayerName,
4136
        version_number: LayerVersionNumber,
4137
        statement_id: StatementId,
4138
        action: LayerPermissionAllowedAction,
4139
        principal: LayerPermissionAllowedPrincipal,
4140
        organization_id: OrganizationId = None,
4141
        revision_id: String = None,
4142
        **kwargs,
4143
    ) -> AddLayerVersionPermissionResponse:
4144
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4145
        # `layer_n` contains the layer name.
4146
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
4147

4148
        if action != "lambda:GetLayerVersion":
1✔
4149
            raise ValidationException(
1✔
4150
                f"1 validation error detected: Value '{action}' at 'action' failed to satisfy constraint: Member must satisfy regular expression pattern: lambda:GetLayerVersion"
4151
            )
4152

4153
        store = lambda_stores[account_id][region_name]
1✔
4154
        layer = store.layers.get(layer_n)
1✔
4155

4156
        layer_version_arn = api_utils.layer_version_arn(
1✔
4157
            layer_name, account_id, region_name, str(version_number)
4158
        )
4159

4160
        if layer is None:
1✔
4161
            raise ResourceNotFoundException(
1✔
4162
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4163
            )
4164
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4165
        if layer_version is None:
1✔
4166
            raise ResourceNotFoundException(
1✔
4167
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4168
            )
4169
        # do we have a policy? if not set one
4170
        if layer_version.policy is None:
1✔
4171
            layer_version.policy = LayerPolicy()
1✔
4172

4173
        if statement_id in layer_version.policy.statements:
1✔
4174
            raise ResourceConflictException(
1✔
4175
                f"The statement id ({statement_id}) provided already exists. Please provide a new statement id, or remove the existing statement.",
4176
                Type="User",
4177
            )
4178

4179
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
4180
            raise PreconditionFailedException(
1✔
4181
                "The Revision Id provided does not match the latest Revision Id. "
4182
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
4183
                Type="User",
4184
            )
4185

4186
        statement = LayerPolicyStatement(
1✔
4187
            sid=statement_id, action=action, principal=principal, organization_id=organization_id
4188
        )
4189

4190
        old_statements = layer_version.policy.statements
1✔
4191
        layer_version.policy = dataclasses.replace(
1✔
4192
            layer_version.policy, statements={**old_statements, statement_id: statement}
4193
        )
4194

4195
        return AddLayerVersionPermissionResponse(
1✔
4196
            Statement=json.dumps(
4197
                {
4198
                    "Sid": statement.sid,
4199
                    "Effect": "Allow",
4200
                    "Principal": statement.principal,
4201
                    "Action": statement.action,
4202
                    "Resource": layer_version.layer_version_arn,
4203
                }
4204
            ),
4205
            RevisionId=layer_version.policy.revision_id,
4206
        )
4207

4208
    def remove_layer_version_permission(
1✔
4209
        self,
4210
        context: RequestContext,
4211
        layer_name: LayerName,
4212
        version_number: LayerVersionNumber,
4213
        statement_id: StatementId,
4214
        revision_id: String = None,
4215
        **kwargs,
4216
    ) -> None:
4217
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4218
        # `layer_n` contains the layer name.
4219
        region_name, account_id, layer_n, layer_version = LambdaProvider._resolve_layer(
1✔
4220
            layer_name, context
4221
        )
4222

4223
        layer_version_arn = api_utils.layer_version_arn(
1✔
4224
            layer_name, account_id, region_name, str(version_number)
4225
        )
4226

4227
        state = lambda_stores[account_id][region_name]
1✔
4228
        layer = state.layers.get(layer_n)
1✔
4229
        if layer is None:
1✔
4230
            raise ResourceNotFoundException(
1✔
4231
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4232
            )
4233
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4234
        if layer_version is None:
1✔
4235
            raise ResourceNotFoundException(
1✔
4236
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4237
            )
4238

4239
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
4240
            raise PreconditionFailedException(
1✔
4241
                "The Revision Id provided does not match the latest Revision Id. "
4242
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
4243
                Type="User",
4244
            )
4245

4246
        if statement_id not in layer_version.policy.statements:
1✔
4247
            raise ResourceNotFoundException(
1✔
4248
                f"Statement {statement_id} is not found in resource policy.", Type="User"
4249
            )
4250

4251
        old_statements = layer_version.policy.statements
1✔
4252
        layer_version.policy = dataclasses.replace(
1✔
4253
            layer_version.policy,
4254
            statements={k: v for k, v in old_statements.items() if k != statement_id},
4255
        )
4256

4257
    def get_layer_version_policy(
1✔
4258
        self,
4259
        context: RequestContext,
4260
        layer_name: LayerName,
4261
        version_number: LayerVersionNumber,
4262
        **kwargs,
4263
    ) -> GetLayerVersionPolicyResponse:
4264
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4265
        # `layer_n` contains the layer name.
4266
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
4267

4268
        layer_version_arn = api_utils.layer_version_arn(
1✔
4269
            layer_name, account_id, region_name, str(version_number)
4270
        )
4271

4272
        store = lambda_stores[account_id][region_name]
1✔
4273
        layer = store.layers.get(layer_n)
1✔
4274

4275
        if layer is None:
1✔
4276
            raise ResourceNotFoundException(
1✔
4277
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4278
            )
4279

4280
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4281
        if layer_version is None:
1✔
4282
            raise ResourceNotFoundException(
1✔
4283
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4284
            )
4285

4286
        if layer_version.policy is None:
1✔
4287
            raise ResourceNotFoundException(
1✔
4288
                "No policy is associated with the given resource.", Type="User"
4289
            )
4290

4291
        return GetLayerVersionPolicyResponse(
1✔
4292
            Policy=json.dumps(
4293
                {
4294
                    "Version": layer_version.policy.version,
4295
                    "Id": layer_version.policy.id,
4296
                    "Statement": [
4297
                        {
4298
                            "Sid": ps.sid,
4299
                            "Effect": "Allow",
4300
                            "Principal": ps.principal,
4301
                            "Action": ps.action,
4302
                            "Resource": layer_version.layer_version_arn,
4303
                        }
4304
                        for ps in layer_version.policy.statements.values()
4305
                    ],
4306
                }
4307
            ),
4308
            RevisionId=layer_version.policy.revision_id,
4309
        )
4310

4311
    # =======================================
4312
    # =======  Function Concurrency  ========
4313
    # =======================================
4314
    # (Reserved) function concurrency is scoped to the whole function
4315

4316
    def get_function_concurrency(
1✔
4317
        self, context: RequestContext, function_name: FunctionName, **kwargs
4318
    ) -> GetFunctionConcurrencyResponse:
4319
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4320
        function_name = api_utils.get_function_name(function_name, context)
1✔
4321
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
4322
        return GetFunctionConcurrencyResponse(
1✔
4323
            ReservedConcurrentExecutions=fn.reserved_concurrent_executions
4324
        )
4325

4326
    def put_function_concurrency(
1✔
4327
        self,
4328
        context: RequestContext,
4329
        function_name: FunctionName,
4330
        reserved_concurrent_executions: ReservedConcurrentExecutions,
4331
        **kwargs,
4332
    ) -> Concurrency:
4333
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4334

4335
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4336
        if qualifier:
1✔
4337
            raise InvalidParameterValueException(
1✔
4338
                "This operation is permitted on Lambda functions only. Aliases and versions do not support this operation. Please specify either a function name or an unqualified function ARN.",
4339
                Type="User",
4340
            )
4341

4342
        store = lambda_stores[account_id][region]
1✔
4343
        fn = store.functions.get(function_name)
1✔
4344
        if not fn:
1✔
4345
            fn_arn = api_utils.qualified_lambda_arn(
1✔
4346
                function_name,
4347
                qualifier="$LATEST",
4348
                account=account_id,
4349
                region=region,
4350
            )
4351
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
4352

4353
        settings = self.get_account_settings(context)
1✔
4354
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
4355
            "UnreservedConcurrentExecutions"
4356
        ]
4357

4358
        # The existing reserved concurrent executions for the same function are already deduced in
4359
        # unreserved_concurrent_executions but must not count because the new one will replace the existing one.
4360
        # Joel tested this behavior manually against AWS (2023-11-28).
4361
        existing_reserved_concurrent_executions = (
1✔
4362
            fn.reserved_concurrent_executions if fn.reserved_concurrent_executions else 0
4363
        )
4364
        if (
1✔
4365
            unreserved_concurrent_executions
4366
            - reserved_concurrent_executions
4367
            + existing_reserved_concurrent_executions
4368
        ) < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY:
4369
            raise InvalidParameterValueException(
1✔
4370
                f"Specified ReservedConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
4371
            )
4372

4373
        total_provisioned_concurrency = sum(
1✔
4374
            [
4375
                provisioned_configs.provisioned_concurrent_executions
4376
                for provisioned_configs in fn.provisioned_concurrency_configs.values()
4377
            ]
4378
        )
4379
        if total_provisioned_concurrency > reserved_concurrent_executions:
1✔
4380
            raise InvalidParameterValueException(
1✔
4381
                f" ReservedConcurrentExecutions  {reserved_concurrent_executions} should not be lower than function's total provisioned concurrency [{total_provisioned_concurrency}]."
4382
            )
4383

4384
        fn.reserved_concurrent_executions = reserved_concurrent_executions
1✔
4385

4386
        return Concurrency(ReservedConcurrentExecutions=fn.reserved_concurrent_executions)
1✔
4387

4388
    def delete_function_concurrency(
1✔
4389
        self, context: RequestContext, function_name: FunctionName, **kwargs
4390
    ) -> None:
4391
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4392
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4393
        store = lambda_stores[account_id][region]
1✔
4394
        fn = store.functions.get(function_name)
1✔
4395
        fn.reserved_concurrent_executions = None
1✔
4396

4397
    # =======================================
4398
    # ===============  TAGS   ===============
4399
    # =======================================
4400
    # only Function, Event Source Mapping, and Code Signing Config (not currently supported by LocalStack) ARNs
4401
    # are available for tagging in AWS
4402

4403
    def _update_resource_tags(
1✔
4404
        self, resource_arn: str, account_id: str, region: str, tags: dict[str, str]
4405
    ) -> None:
4406
        tagger_service = lambda_stores[account_id][region].TAGS
1✔
4407
        tag_svc_adapted_tags = [{"Key": key, "Value": value} for key, value in tags.items()]
1✔
4408
        tagger_service.tag_resource(resource_arn, tag_svc_adapted_tags)
1✔
4409

4410
    def _list_resource_tags(
1✔
4411
        self, resource_arn: str, account_id: str, region: str
4412
    ) -> dict[str, str]:
4413
        tagger_service = lambda_stores[account_id][region].TAGS
1✔
4414
        return tagger_service.tags.get(resource_arn, {})
1✔
4415

4416
    def _remove_resource_tags(
1✔
4417
        self, resource_arn: str, account_id: str, region: str, keys: TagKeyList
4418
    ) -> None:
4419
        tagger_service = lambda_stores[account_id][region].TAGS
1✔
4420
        tagger_service.untag_resource(resource_arn, keys)
1✔
4421

4422
    def _remove_all_resource_tags(self, resource_arn: str, account_id: str, region: str) -> None:
1✔
4423
        tagger_service = lambda_stores[account_id][region].TAGS
1✔
4424
        return tagger_service.tags.pop(resource_arn, None)
1✔
4425

4426
    def _get_tags(self, resource: TaggableResource) -> dict[str, str]:
1✔
4427
        account_id, region = self._get_account_id_and_region_for_taggable_resource(resource)
1✔
4428
        tags = self._list_resource_tags(resource_arn=resource, account_id=account_id, region=region)
1✔
4429
        return tags
1✔
4430

4431
    def _store_tags(self, resource: TaggableResource, tags: dict[str, str]) -> None:
1✔
4432
        account_id, region = self._get_account_id_and_region_for_taggable_resource(resource)
1✔
4433
        existing_tags = self._list_resource_tags(
1✔
4434
            resource_arn=resource, account_id=account_id, region=region
4435
        )
4436
        if len({**existing_tags, **tags}) > LAMBDA_TAG_LIMIT_PER_RESOURCE:
1✔
4437
            # note: we cannot use | on `ImmutableDict` and regular `dict`
4438
            raise InvalidParameterValueException(
1✔
4439
                "Number of tags exceeds resource tag limit.", Type="User"
4440
            )
4441
        self._update_resource_tags(
1✔
4442
            resource_arn=resource,
4443
            account_id=account_id,
4444
            region=region,
4445
            tags=tags,
4446
        )
4447

4448
    def _remove_tags(self, resource: TaggableResource, keys: TagKeyList) -> None:
1✔
4449
        account_id, region = self._get_account_id_and_region_for_taggable_resource(resource)
1✔
4450
        self._remove_resource_tags(
1✔
4451
            resource_arn=resource, account_id=account_id, region=region, keys=keys
4452
        )
4453

4454
    def _remove_all_tags(self, resource: TaggableResource) -> None:
1✔
4455
        account_id, region = self._get_account_id_and_region_for_taggable_resource(resource)
1✔
4456
        self._remove_all_resource_tags(resource_arn=resource, account_id=account_id, region=region)
1✔
4457

4458
    def _get_account_id_and_region_for_taggable_resource(
1✔
4459
        self, resource: TaggableResource
4460
    ) -> tuple[str, str]:
4461
        """
4462
        Takes a resource ARN for a TaggableResource (Lambda Function, Event Source Mapping, Code Signing Config, or Capacity Provider) and returns a corresponding
4463
        LambdaStore for its region and account.
4464

4465
        In addition, this function validates that the ARN is a valid TaggableResource type, and that the TaggableResource exists.
4466

4467
        Raises:
4468
            ValidationException: If the resource ARN is not a full ARN for a TaggableResource.
4469
            ResourceNotFoundException: If the specified resource does not exist.
4470
            InvalidParameterValueException: If the resource ARN is a qualified Lambda Function.
4471
        """
4472

4473
        def _raise_validation_exception():
1✔
4474
            raise ValidationException(
1✔
4475
                f"1 validation error detected: Value '{resource}' at 'resource' failed to satisfy constraint: Member must satisfy regular expression pattern: {api_utils.TAGGABLE_RESOURCE_ARN_PATTERN}"
4476
            )
4477

4478
        # Check whether the ARN we have been passed is correctly formatted
4479
        parsed_resource_arn: ArnData = None
1✔
4480
        try:
1✔
4481
            parsed_resource_arn = parse_arn(resource)
1✔
4482
        except Exception:
1✔
4483
            _raise_validation_exception()
1✔
4484

4485
        # TODO: Should we be checking whether this is a full ARN?
4486
        region, account_id, resource_type = map(
1✔
4487
            parsed_resource_arn.get, ("region", "account", "resource")
4488
        )
4489

4490
        if not all((region, account_id, resource_type)):
1✔
4491
            _raise_validation_exception()
×
4492

4493
        if not (parts := resource_type.split(":")):
1✔
4494
            _raise_validation_exception()
×
4495

4496
        resource_type, resource_identifier, *qualifier = parts
1✔
4497

4498
        # Qualifier validation raises before checking for NotFound
4499
        if qualifier:
1✔
4500
            if resource_type == "function":
1✔
4501
                raise InvalidParameterValueException(
1✔
4502
                    "Tags on function aliases and versions are not supported. Please specify a function ARN.",
4503
                    Type="User",
4504
                )
4505
            _raise_validation_exception()
1✔
4506

4507
        if resource_type == "event-source-mapping":
1✔
4508
            self._get_esm(resource_identifier, account_id, region)
1✔
4509
        elif resource_type == "code-signing-config":
1✔
4510
            raise NotImplementedError("Resource tagging on CSC not yet implemented.")
4511
        elif resource_type == "function":
1✔
4512
            self._get_function(
1✔
4513
                function_name=resource_identifier, account_id=account_id, region=region
4514
            )
4515
        elif resource_type == "capacity-provider":
1✔
4516
            self._get_capacity_provider(resource_identifier, account_id, region)
1✔
4517
        else:
4518
            _raise_validation_exception()
1✔
4519

4520
        # If no exceptions are raised, assume ARN and referenced resource is valid for tag operations
4521
        return account_id, region
1✔
4522

4523
    def tag_resource(
1✔
4524
        self, context: RequestContext, resource: TaggableResource, tags: Tags, **kwargs
4525
    ) -> None:
4526
        if not tags:
1✔
4527
            raise InvalidParameterValueException(
1✔
4528
                "An error occurred and the request cannot be processed.", Type="User"
4529
            )
4530
        self._store_tags(resource, tags)
1✔
4531

4532
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4533
            "function"
4534
        ):
4535
            name, _, account, region = function_locators_from_arn(resource)
1✔
4536
            function = self._get_function(name, account, region)
1✔
4537
            with function.lock:
1✔
4538
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4539
                latest_version = function.versions["$LATEST"]
1✔
4540
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4541
                    latest_version, config=dataclasses.replace(latest_version.config)
4542
                )
4543

4544
    def list_tags(
1✔
4545
        self, context: RequestContext, resource: TaggableResource, **kwargs
4546
    ) -> ListTagsResponse:
4547
        tags = self._get_tags(resource)
1✔
4548
        return ListTagsResponse(Tags=tags)
1✔
4549

4550
    def untag_resource(
1✔
4551
        self, context: RequestContext, resource: TaggableResource, tag_keys: TagKeyList, **kwargs
4552
    ) -> None:
4553
        if not tag_keys:
1✔
4554
            raise ValidationException(
1✔
4555
                "1 validation error detected: Value null at 'tagKeys' failed to satisfy constraint: Member must not be null"
4556
            )  # should probably be generalized a bit
4557

4558
        self._remove_tags(resource, tag_keys)
1✔
4559

4560
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4561
            "function"
4562
        ):
4563
            name, _, account, region = function_locators_from_arn(resource)
1✔
4564
            function = self._get_function(name, account, region)
1✔
4565
            # TODO: Potential race condition
4566
            with function.lock:
1✔
4567
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4568
                latest_version = function.versions["$LATEST"]
1✔
4569
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4570
                    latest_version, config=dataclasses.replace(latest_version.config)
4571
                )
4572

4573
    # =======================================
4574
    # =======  LEGACY / DEPRECATED   ========
4575
    # =======================================
4576

4577
    def invoke_async(
1✔
4578
        self,
4579
        context: RequestContext,
4580
        function_name: NamespacedFunctionName,
4581
        invoke_args: IO[BlobStream],
4582
        **kwargs,
4583
    ) -> InvokeAsyncResponse:
4584
        """LEGACY API endpoint. Even AWS heavily discourages its usage."""
4585
        raise NotImplementedError
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc