• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 22700205643

04 Mar 2026 04:24PM UTC coverage: 86.938% (-0.01%) from 86.951%
22700205643

push

github

web-flow
Lambda: fix attribute exceptions (#13863)

3 of 4 new or added lines in 1 file covered. (75.0%)

78 existing lines in 5 files now uncovered.

69850 of 80345 relevant lines covered (86.94%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.58
/localstack-core/localstack/services/lambda_/provider.py
1
import base64
1✔
2
import dataclasses
1✔
3
import datetime
1✔
4
import itertools
1✔
5
import json
1✔
6
import logging
1✔
7
import re
1✔
8
import threading
1✔
9
import time
1✔
10
from typing import IO, Any
1✔
11

12
from botocore.exceptions import ClientError
1✔
13

14
from localstack import config
1✔
15
from localstack.aws.api import RequestContext, ServiceException, handler
1✔
16
from localstack.aws.api.lambda_ import (
1✔
17
    AccountLimit,
18
    AccountUsage,
19
    AddLayerVersionPermissionResponse,
20
    AddPermissionRequest,
21
    AddPermissionResponse,
22
    Alias,
23
    AliasConfiguration,
24
    AliasRoutingConfiguration,
25
    AllowedPublishers,
26
    Architecture,
27
    Arn,
28
    Blob,
29
    BlobStream,
30
    CapacityProviderConfig,
31
    CodeSigningConfigArn,
32
    CodeSigningConfigNotFoundException,
33
    CodeSigningPolicies,
34
    CompatibleArchitectures,
35
    CompatibleRuntimes,
36
    Concurrency,
37
    Cors,
38
    CreateCodeSigningConfigResponse,
39
    CreateEventSourceMappingRequest,
40
    CreateFunctionRequest,
41
    CreateFunctionUrlConfigResponse,
42
    DeleteCodeSigningConfigResponse,
43
    DeleteFunctionResponse,
44
    Description,
45
    DestinationConfig,
46
    DurableExecutionName,
47
    EventSourceMappingConfiguration,
48
    FunctionCodeLocation,
49
    FunctionConfiguration,
50
    FunctionEventInvokeConfig,
51
    FunctionName,
52
    FunctionUrlAuthType,
53
    FunctionUrlQualifier,
54
    FunctionVersionLatestPublished,
55
    GetAccountSettingsResponse,
56
    GetCodeSigningConfigResponse,
57
    GetFunctionCodeSigningConfigResponse,
58
    GetFunctionConcurrencyResponse,
59
    GetFunctionRecursionConfigResponse,
60
    GetFunctionResponse,
61
    GetFunctionUrlConfigResponse,
62
    GetLayerVersionPolicyResponse,
63
    GetLayerVersionResponse,
64
    GetPolicyResponse,
65
    GetProvisionedConcurrencyConfigResponse,
66
    InvalidParameterValueException,
67
    InvocationResponse,
68
    InvocationType,
69
    InvokeAsyncResponse,
70
    InvokeMode,
71
    LambdaApi,
72
    LambdaManagedInstancesCapacityProviderConfig,
73
    LastUpdateStatus,
74
    LayerName,
75
    LayerPermissionAllowedAction,
76
    LayerPermissionAllowedPrincipal,
77
    LayersListItem,
78
    LayerVersionArn,
79
    LayerVersionContentInput,
80
    LayerVersionNumber,
81
    LicenseInfo,
82
    ListAliasesResponse,
83
    ListCodeSigningConfigsResponse,
84
    ListEventSourceMappingsResponse,
85
    ListFunctionEventInvokeConfigsResponse,
86
    ListFunctionsByCodeSigningConfigResponse,
87
    ListFunctionsResponse,
88
    ListFunctionUrlConfigsResponse,
89
    ListLayersResponse,
90
    ListLayerVersionsResponse,
91
    ListProvisionedConcurrencyConfigsResponse,
92
    ListTagsResponse,
93
    ListVersionsByFunctionResponse,
94
    LogFormat,
95
    LoggingConfig,
96
    LogType,
97
    MasterRegion,
98
    MaxFunctionEventInvokeConfigListItems,
99
    MaximumEventAgeInSeconds,
100
    MaximumRetryAttempts,
101
    MaxItems,
102
    MaxLayerListItems,
103
    MaxListItems,
104
    MaxProvisionedConcurrencyConfigListItems,
105
    NamespacedFunctionName,
106
    NamespacedStatementId,
107
    NumericLatestPublishedOrAliasQualifier,
108
    OnFailure,
109
    OnSuccess,
110
    OrganizationId,
111
    PackageType,
112
    PositiveInteger,
113
    PreconditionFailedException,
114
    ProvisionedConcurrencyConfigListItem,
115
    ProvisionedConcurrencyConfigNotFoundException,
116
    ProvisionedConcurrencyStatusEnum,
117
    PublishLayerVersionResponse,
118
    PutFunctionCodeSigningConfigResponse,
119
    PutFunctionRecursionConfigResponse,
120
    PutProvisionedConcurrencyConfigResponse,
121
    Qualifier,
122
    RecursiveLoop,
123
    ReservedConcurrentExecutions,
124
    ResourceConflictException,
125
    ResourceNotFoundException,
126
    Runtime,
127
    RuntimeVersionConfig,
128
    SnapStart,
129
    SnapStartApplyOn,
130
    SnapStartOptimizationStatus,
131
    SnapStartResponse,
132
    State,
133
    StatementId,
134
    StateReasonCode,
135
    String,
136
    TaggableResource,
137
    TagKeyList,
138
    Tags,
139
    TenantId,
140
    TracingMode,
141
    UnqualifiedFunctionName,
142
    UpdateCodeSigningConfigResponse,
143
    UpdateEventSourceMappingRequest,
144
    UpdateFunctionCodeRequest,
145
    UpdateFunctionConfigurationRequest,
146
    UpdateFunctionUrlConfigResponse,
147
    VersionWithLatestPublished,
148
)
149
from localstack.aws.api.lambda_ import FunctionVersion as FunctionVersionApi
1✔
150
from localstack.aws.api.lambda_ import ServiceException as LambdaServiceException
1✔
151
from localstack.aws.api.pipes import (
1✔
152
    DynamoDBStreamStartPosition,
153
    KinesisStreamStartPosition,
154
)
155
from localstack.aws.connect import connect_to
1✔
156
from localstack.aws.spec import load_service
1✔
157
from localstack.services.edge import ROUTER
1✔
158
from localstack.services.lambda_ import api_utils
1✔
159
from localstack.services.lambda_ import hooks as lambda_hooks
1✔
160
from localstack.services.lambda_.analytics import (
1✔
161
    FunctionInitializationType,
162
    FunctionOperation,
163
    FunctionStatus,
164
    function_counter,
165
)
166
from localstack.services.lambda_.api_utils import (
1✔
167
    ARCHITECTURES,
168
    STATEMENT_ID_REGEX,
169
    SUBNET_ID_REGEX,
170
    function_locators_from_arn,
171
)
172
from localstack.services.lambda_.event_source_mapping.esm_config_factory import (
1✔
173
    EsmConfigFactory,
174
)
175
from localstack.services.lambda_.event_source_mapping.esm_worker import (
1✔
176
    EsmState,
177
    EsmWorker,
178
)
179
from localstack.services.lambda_.event_source_mapping.esm_worker_factory import (
1✔
180
    EsmWorkerFactory,
181
)
182
from localstack.services.lambda_.event_source_mapping.pipe_utils import get_internal_client
1✔
183
from localstack.services.lambda_.invocation import AccessDeniedException
1✔
184
from localstack.services.lambda_.invocation.execution_environment import (
1✔
185
    EnvironmentStartupTimeoutException,
186
)
187
from localstack.services.lambda_.invocation.lambda_models import (
1✔
188
    AliasRoutingConfig,
189
    CodeSigningConfig,
190
    DesiredCapacityProviderState,
191
    EventInvokeConfig,
192
    Function,
193
    FunctionResourcePolicy,
194
    FunctionUrlConfig,
195
    FunctionVersion,
196
    ImageConfig,
197
    LambdaEphemeralStorage,
198
    Layer,
199
    LayerPolicy,
200
    LayerPolicyStatement,
201
    LayerVersion,
202
    ProvisionedConcurrencyConfiguration,
203
    RequestEntityTooLargeException,
204
    ResourcePolicy,
205
    UpdateStatus,
206
    ValidationException,
207
    VersionAlias,
208
    VersionFunctionConfiguration,
209
    VersionIdentifier,
210
    VersionState,
211
    VpcConfig,
212
)
213
from localstack.services.lambda_.invocation.lambda_service import (
1✔
214
    LambdaService,
215
    create_image_code,
216
    destroy_code_if_not_used,
217
    lambda_stores,
218
    store_lambda_archive,
219
    store_s3_bucket_archive,
220
)
221
from localstack.services.lambda_.invocation.models import CapacityProvider as CapacityProviderModel
1✔
222
from localstack.services.lambda_.invocation.models import LambdaStore
1✔
223
from localstack.services.lambda_.invocation.runtime_executor import get_runtime_executor
1✔
224
from localstack.services.lambda_.lambda_utils import HINT_LOG
1✔
225
from localstack.services.lambda_.layerfetcher.layer_fetcher import LayerFetcher
1✔
226
from localstack.services.lambda_.provider_utils import (
1✔
227
    LambdaLayerVersionIdentifier,
228
    get_function_version,
229
    get_function_version_from_arn,
230
)
231
from localstack.services.lambda_.runtimes import (
1✔
232
    ALL_RUNTIMES,
233
    DEPRECATED_RUNTIMES,
234
    DEPRECATED_RUNTIMES_UPGRADES,
235
    RUNTIMES_AGGREGATED,
236
    SNAP_START_SUPPORTED_RUNTIMES,
237
    VALID_MANAGED_INSTANCE_RUNTIMES,
238
    VALID_RUNTIMES,
239
)
240
from localstack.services.lambda_.urlrouter import FunctionUrlRouter
1✔
241
from localstack.services.plugins import ServiceLifecycleHook
1✔
242
from localstack.state import StateVisitor
1✔
243
from localstack.utils.aws.arns import (
1✔
244
    ArnData,
245
    capacity_provider_arn,
246
    extract_resource_from_arn,
247
    extract_service_from_arn,
248
    get_partition,
249
    lambda_event_source_mapping_arn,
250
    parse_arn,
251
)
252
from localstack.utils.aws.client_types import ServicePrincipal
1✔
253
from localstack.utils.bootstrap import is_api_enabled
1✔
254
from localstack.utils.collections import PaginatedList, merge_recursive
1✔
255
from localstack.utils.event_matcher import validate_event_pattern
1✔
256
from localstack.utils.strings import get_random_hex, short_uid, to_bytes, to_str
1✔
257
from localstack.utils.sync import poll_condition
1✔
258
from localstack.utils.urls import localstack_host
1✔
259

260
LOG = logging.getLogger(__name__)
1✔
261

262
CAPACITY_PROVIDER_ARN_NAME = "arn:aws[a-zA-Z-]*:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:capacity-provider:[a-zA-Z0-9-_]+"
1✔
263
LAMBDA_DEFAULT_TIMEOUT = 3
1✔
264
LAMBDA_DEFAULT_MEMORY_SIZE = 128
1✔
265

266
LAMBDA_TAG_LIMIT_PER_RESOURCE = 50
1✔
267
LAMBDA_LAYERS_LIMIT_PER_FUNCTION = 5
1✔
268

269
TAG_KEY_CUSTOM_URL = "_custom_id_"
1✔
270
# Requirements (from RFC3986 & co): not longer than 63, first char must be
271
# alpha, then alphanumeric or hyphen, except cannot start or end with hyphen
272
TAG_KEY_CUSTOM_URL_VALIDATOR = re.compile(r"^[A-Za-z]([A-Za-z0-9\-]{0,61}[A-Za-z0-9])?$")
1✔
273

274

275
class LambdaProvider(LambdaApi, ServiceLifecycleHook):
1✔
276
    lambda_service: LambdaService
1✔
277
    create_fn_lock: threading.RLock
1✔
278
    create_layer_lock: threading.RLock
1✔
279
    router: FunctionUrlRouter
1✔
280
    esm_workers: dict[str, EsmWorker]
1✔
281
    layer_fetcher: LayerFetcher | None
1✔
282

283
    def __init__(self) -> None:
1✔
284
        self.lambda_service = LambdaService()
1✔
285
        self.create_fn_lock = threading.RLock()
1✔
286
        self.create_layer_lock = threading.RLock()
1✔
287
        self.router = FunctionUrlRouter(ROUTER, self.lambda_service)
1✔
288
        self.esm_workers = {}
1✔
289
        self.layer_fetcher = None
1✔
290
        lambda_hooks.inject_layer_fetcher.run(self)
1✔
291

292
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
293
        visitor.visit(lambda_stores)
×
294

295
    def on_before_state_reset(self):
1✔
296
        for esm_worker in self.esm_workers.values():
×
297
            esm_worker.stop_for_shutdown()
×
298
        self.esm_workers = {}
×
299
        self.lambda_service.stop()
×
300

301
    def on_after_state_reset(self):
1✔
302
        self.router.lambda_service = self.lambda_service = LambdaService()
×
303

304
    def on_before_state_load(self):
1✔
305
        self.lambda_service.stop()
×
306

307
    def on_after_state_load(self):
1✔
308
        self.lambda_service = LambdaService()
×
309
        self.router.lambda_service = self.lambda_service
×
310

311
        for account_id, account_bundle in lambda_stores.items():
×
312
            for region_name, state in account_bundle.items():
×
313
                for fn in state.functions.values():
×
314
                    # HACK to model a volatile variable that should be ignored for persistence
315
                    # Identifier unique to this function and LocalStack instance.
316
                    # A LocalStack restart or persistence load should create a new instance id.
317
                    # Used for retaining invoke queues across version updates for $LATEST, but
318
                    # separate unrelated instances.
319
                    fn.instance_id = short_uid()
×
320

321
                    for fn_version in fn.versions.values():
×
322
                        try:
×
323
                            # Skip function versions that were being deleted
324
                            if fn_version.config.state.state == State.Deleting:
×
325
                                continue
×
326

327
                            # Skip function versions whose capacity provider has been stopped
328
                            if fn_version.config.capacity_provider_config:
×
329
                                cp_arn = fn_version.config.capacity_provider_config[
×
330
                                    "LambdaManagedInstancesCapacityProviderConfig"
331
                                ]["CapacityProviderArn"]
332
                                cp_name = cp_arn.split(":")[-1]
×
333
                                cp = state.capacity_providers.get(cp_name)
×
334
                                if cp and cp.DesiredState == DesiredCapacityProviderState.Stopped:
×
335
                                    continue
×
336

337
                            # $LATEST is not invokable for Lambda functions with a capacity provider
338
                            # and has a different State (i.e., ActiveNonInvokable)
339
                            is_capacity_provider_latest = (
×
340
                                fn_version.config.capacity_provider_config
341
                                and fn_version.id.qualifier == "$LATEST"
342
                            )
343
                            if not is_capacity_provider_latest:
×
344
                                # Restore the "Pending" state for the function version and start it
345
                                new_state = VersionState(
×
346
                                    state=State.Pending,
347
                                    code=StateReasonCode.Creating,
348
                                    reason="The function is being created.",
349
                                )
350
                                new_config = dataclasses.replace(fn_version.config, state=new_state)
×
351
                                new_version = dataclasses.replace(fn_version, config=new_config)
×
352
                                fn.versions[fn_version.id.qualifier] = new_version
×
353
                                self.lambda_service.create_function_version(fn_version).result(
×
354
                                    timeout=5
355
                                )
356
                        except Exception:
×
357
                            LOG.warning(
×
358
                                "Failed to restore function version %s",
359
                                fn_version.id.qualified_arn(),
360
                                exc_info=LOG.isEnabledFor(logging.DEBUG),
361
                            )
362
                    # restore provisioned concurrency per function considering both versions and aliases
363
                    for (
×
364
                        provisioned_qualifier,
365
                        provisioned_config,
366
                    ) in fn.provisioned_concurrency_configs.items():
367
                        fn_arn = None
×
368
                        try:
×
369
                            if api_utils.qualifier_is_alias(provisioned_qualifier):
×
370
                                alias = fn.aliases.get(provisioned_qualifier)
×
371
                                resolved_version = fn.versions.get(alias.function_version)
×
372
                                fn_arn = resolved_version.id.qualified_arn()
×
373
                            elif api_utils.qualifier_is_version(provisioned_qualifier):
×
374
                                fn_version = fn.versions.get(provisioned_qualifier)
×
375
                                fn_arn = fn_version.id.qualified_arn()
×
376
                            else:
377
                                raise InvalidParameterValueException(
×
378
                                    "Invalid qualifier type:"
379
                                    " Qualifier can only be an alias or a version for provisioned concurrency."
380
                                )
381

382
                            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
383
                            manager.update_provisioned_concurrency_config(
×
384
                                provisioned_config.provisioned_concurrent_executions
385
                            )
386
                        except Exception:
×
387
                            LOG.warning(
×
388
                                "Failed to restore provisioned concurrency %s for function %s",
389
                                provisioned_config,
390
                                fn_arn,
391
                                exc_info=LOG.isEnabledFor(logging.DEBUG),
392
                            )
393

394
                for esm in state.event_source_mappings.values():
×
395
                    # Restores event source workers
396
                    function_arn = esm.get("FunctionArn")
×
397

398
                    # TODO: How do we know the event source is up?
399
                    # A basic poll to see if the mapped Lambda function is active/failed
400
                    if not poll_condition(
×
401
                        lambda: (
402
                            get_function_version_from_arn(function_arn).config.state.state
403
                            in [State.Active, State.Failed]
404
                        ),
405
                        timeout=10,
406
                    ):
407
                        LOG.warning(
×
408
                            "Creating ESM for Lambda that is not in running state: %s",
409
                            function_arn,
410
                        )
411

412
                    function_version = get_function_version_from_arn(function_arn)
×
413
                    function_role = function_version.config.role
×
414

415
                    is_esm_enabled = esm.get("State", EsmState.DISABLED) not in (
×
416
                        EsmState.DISABLED,
417
                        EsmState.DISABLING,
418
                    )
419
                    esm_worker = EsmWorkerFactory(
×
420
                        esm, function_role, is_esm_enabled
421
                    ).get_esm_worker()
422

423
                    # Note: a worker is created in the DISABLED state if not enabled
424
                    esm_worker.create()
×
425
                    # TODO: assigning the esm_worker to the dict only works after .create(). Could it cause a race
426
                    #  condition if we get a shutdown here and have a worker thread spawned but not accounted for?
427
                    self.esm_workers[esm_worker.uuid] = esm_worker
×
428

429
    def on_after_init(self):
1✔
430
        self.router.register_routes()
1✔
431
        get_runtime_executor().validate_environment()
1✔
432

433
    def on_before_stop(self) -> None:
1✔
434
        for esm_worker in self.esm_workers.values():
1✔
435
            esm_worker.stop_for_shutdown()
1✔
436

437
        # TODO: should probably unregister routes?
438
        self.lambda_service.stop()
1✔
439

440
    @staticmethod
1✔
441
    def _get_function(function_name: str, account_id: str, region: str) -> Function:
1✔
442
        state = lambda_stores[account_id][region]
1✔
443
        function = state.functions.get(function_name)
1✔
444
        if not function:
1✔
445
            arn = api_utils.unqualified_lambda_arn(
1✔
446
                function_name=function_name,
447
                account=account_id,
448
                region=region,
449
            )
450
            raise ResourceNotFoundException(
1✔
451
                f"Function not found: {arn}",
452
                Type="User",
453
            )
454
        return function
1✔
455

456
    @staticmethod
1✔
457
    def _get_esm(uuid: str, account_id: str, region: str) -> EventSourceMappingConfiguration:
1✔
458
        state = lambda_stores[account_id][region]
1✔
459
        esm = state.event_source_mappings.get(uuid)
1✔
460
        if not esm:
1✔
461
            arn = lambda_event_source_mapping_arn(uuid, account_id, region)
1✔
462
            raise ResourceNotFoundException(
1✔
463
                f"Event source mapping not found: {arn}",
464
                Type="User",
465
            )
466
        return esm
1✔
467

468
    @staticmethod
1✔
469
    def _get_capacity_provider(
1✔
470
        capacity_provider_name: str,
471
        account_id: str,
472
        region: str,
473
        error_msg_template: str = "Capacity provider not found: {}",
474
    ) -> CapacityProviderModel:
475
        state = lambda_stores[account_id][region]
1✔
476
        cp = state.capacity_providers.get(capacity_provider_name)
1✔
477
        if not cp:
1✔
478
            arn = capacity_provider_arn(capacity_provider_name, account_id, region)
1✔
479
            raise ResourceNotFoundException(
1✔
480
                error_msg_template.format(arn),
481
                Type="User",
482
            )
483
        return cp
×
484

485
    @staticmethod
1✔
486
    def _validate_qualifier_expression(qualifier: str) -> None:
1✔
487
        if error_messages := api_utils.validate_qualifier(qualifier):
1✔
488
            raise ValidationException(
×
489
                message=api_utils.construct_validation_exception_message(error_messages)
490
            )
491

492
    @staticmethod
1✔
493
    def _validate_publish_to(publish_to: str):
1✔
494
        if publish_to != FunctionVersionLatestPublished.LATEST_PUBLISHED:
×
495
            raise ValidationException(
×
496
                message=f"1 validation error detected: Value '{publish_to}' at 'publishTo' failed to satisfy constraint: Member must satisfy enum value set: [LATEST_PUBLISHED]"
497
            )
498

499
    @staticmethod
1✔
500
    def _resolve_fn_qualifier(resolved_fn: Function, qualifier: str | None) -> tuple[str, str]:
1✔
501
        """Attempts to resolve a given qualifier and returns a qualifier that exists or
502
        raises an appropriate ResourceNotFoundException.
503

504
        :param resolved_fn: The resolved lambda function
505
        :param qualifier: The qualifier to be resolved or None
506
        :return: Tuple of (resolved qualifier, function arn either qualified or unqualified)"""
507
        function_name = resolved_fn.function_name
1✔
508
        # assuming function versions need to live in the same account and region
509
        account_id = resolved_fn.latest().id.account
1✔
510
        region = resolved_fn.latest().id.region
1✔
511
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
512
        if qualifier is not None:
1✔
513
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
514
            if api_utils.qualifier_is_alias(qualifier):
1✔
515
                if qualifier not in resolved_fn.aliases:
1✔
516
                    raise ResourceNotFoundException(f"Cannot find alias arn: {fn_arn}", Type="User")
1✔
517
            elif api_utils.qualifier_is_version(qualifier) or qualifier == "$LATEST":
1✔
518
                if qualifier not in resolved_fn.versions:
1✔
519
                    raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
520
            else:
521
                # matches qualifier pattern but invalid alias or version
522
                raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
523
        resolved_qualifier = qualifier or "$LATEST"
1✔
524
        return resolved_qualifier, fn_arn
1✔
525

526
    @staticmethod
1✔
527
    def _function_revision_id(resolved_fn: Function, resolved_qualifier: str) -> str:
1✔
528
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
529
            return resolved_fn.aliases[resolved_qualifier].revision_id
1✔
530
        # Assumes that a non-alias is a version
531
        else:
532
            return resolved_fn.versions[resolved_qualifier].config.revision_id
1✔
533

534
    def _resolve_vpc_id(self, account_id: str, region_name: str, subnet_id: str) -> str:
1✔
535
        ec2_client = connect_to(aws_access_key_id=account_id, region_name=region_name).ec2
1✔
536
        try:
1✔
537
            return ec2_client.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["VpcId"]
1✔
538
        except ec2_client.exceptions.ClientError as e:
1✔
539
            code = e.response["Error"]["Code"]
1✔
540
            message = e.response["Error"]["Message"]
1✔
541
            raise InvalidParameterValueException(
1✔
542
                f"Error occurred while DescribeSubnets. EC2 Error Code: {code}. EC2 Error Message: {message}",
543
                Type="User",
544
            )
545

546
    def _build_vpc_config(
1✔
547
        self,
548
        account_id: str,
549
        region_name: str,
550
        vpc_config: dict | None = None,
551
    ) -> VpcConfig | None:
552
        if not vpc_config or not is_api_enabled("ec2"):
1✔
553
            return None
1✔
554

555
        subnet_ids = vpc_config.get("SubnetIds", [])
1✔
556
        if subnet_ids is not None and len(subnet_ids) == 0:
1✔
557
            return VpcConfig(vpc_id="", security_group_ids=[], subnet_ids=[])
1✔
558

559
        subnet_id = subnet_ids[0]
1✔
560
        if not bool(SUBNET_ID_REGEX.match(subnet_id)):
1✔
561
            raise ValidationException(
1✔
562
                f"1 validation error detected: Value '[{subnet_id}]' at 'vpcConfig.subnetIds' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 0, Member must satisfy regular expression pattern: subnet-[0-9a-z]*]"
563
            )
564

565
        return VpcConfig(
1✔
566
            vpc_id=self._resolve_vpc_id(account_id, region_name, subnet_id),
567
            security_group_ids=vpc_config.get("SecurityGroupIds", []),
568
            subnet_ids=subnet_ids,
569
        )
570

571
    def _create_version_model(
1✔
572
        self,
573
        function_name: str,
574
        region: str,
575
        account_id: str,
576
        description: str | None = None,
577
        revision_id: str | None = None,
578
        code_sha256: str | None = None,
579
        publish_to: FunctionVersionLatestPublished | None = None,
580
        is_active: bool = False,
581
    ) -> tuple[FunctionVersion, bool]:
582
        """
583
        Release a new version to the model if all restrictions are met.
584
        Restrictions:
585
          - CodeSha256, if provided, must equal the current latest version code hash
586
          - RevisionId, if provided, must equal the current latest version revision id
587
          - Some changes have been done to the latest version since last publish
588
        Will return a tuple of the version, and whether the version was published (True) or the latest available version was taken (False).
589
        This can happen if the latest version has not been changed since the last version publish, in this case the last version will be returned.
590

591
        :param function_name: Function name to be published
592
        :param region: Region of the function
593
        :param account_id: Account of the function
594
        :param description: new description of the version (will be the description of the function if missing)
595
        :param revision_id: Revision id, function will raise error if it does not match latest revision id
596
        :param code_sha256: Code sha256, function will raise error if it does not match latest code hash
597
        :return: Tuple of (published version, whether version was released or last released version returned, since nothing changed)
598
        """
599
        current_latest_version = get_function_version(
1✔
600
            function_name=function_name, qualifier="$LATEST", account_id=account_id, region=region
601
        )
602
        if revision_id and current_latest_version.config.revision_id != revision_id:
1✔
603
            raise PreconditionFailedException(
1✔
604
                "The Revision Id provided does not match the latest Revision Id. Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
605
                Type="User",
606
            )
607

608
        # check if code hashes match if they are specified
609
        current_hash = (
1✔
610
            current_latest_version.config.code.code_sha256
611
            if current_latest_version.config.package_type == PackageType.Zip
612
            else current_latest_version.config.image.code_sha256
613
        )
614
        # if the code is a zip package and hot reloaded (hot reloading is currently only supported for zip packagetypes)
615
        # we cannot enforce the codesha256 check
616
        is_hot_reloaded_zip_package = (
1✔
617
            current_latest_version.config.package_type == PackageType.Zip
618
            and current_latest_version.config.code.is_hot_reloading()
619
        )
620
        if code_sha256 and current_hash != code_sha256 and not is_hot_reloaded_zip_package:
1✔
621
            raise InvalidParameterValueException(
1✔
622
                f"CodeSHA256 ({code_sha256}) is different from current CodeSHA256 in $LATEST ({current_hash}). Please try again with the CodeSHA256 in $LATEST.",
623
                Type="User",
624
            )
625

626
        state = lambda_stores[account_id][region]
1✔
627
        function = state.functions.get(function_name)
1✔
628
        changes = {}
1✔
629
        if description is not None:
1✔
630
            changes["description"] = description
1✔
631
        # TODO copy environment instead of restarting one, get rid of all the "Pending"s
632

633
        with function.lock:
1✔
634
            if function.next_version > 1 and (
1✔
635
                prev_version := function.versions.get(str(function.next_version - 1))
636
            ):
637
                if (
1✔
638
                    prev_version.config.internal_revision
639
                    == current_latest_version.config.internal_revision
640
                ):
641
                    return prev_version, False
1✔
642
            # TODO check if there was a change since last version
643
            if publish_to == FunctionVersionLatestPublished.LATEST_PUBLISHED:
1✔
644
                qualifier = "$LATEST.PUBLISHED"
×
645
            else:
646
                qualifier = str(function.next_version)
1✔
647
                function.next_version += 1
1✔
648
            new_id = VersionIdentifier(
1✔
649
                function_name=function_name,
650
                qualifier=qualifier,
651
                region=region,
652
                account=account_id,
653
            )
654

655
            if current_latest_version.config.capacity_provider_config:
1✔
656
                # for lambda managed functions, snap start is not supported
657
                snap_start = None
×
658
            else:
659
                apply_on = current_latest_version.config.snap_start["ApplyOn"]
1✔
660
                optimization_status = SnapStartOptimizationStatus.Off
1✔
661
                if apply_on == SnapStartApplyOn.PublishedVersions:
1✔
662
                    optimization_status = SnapStartOptimizationStatus.On
×
663
                snap_start = SnapStartResponse(
1✔
664
                    ApplyOn=apply_on,
665
                    OptimizationStatus=optimization_status,
666
                )
667

668
            last_update = None
1✔
669
            new_state = VersionState(
1✔
670
                state=State.Pending,
671
                code=StateReasonCode.Creating,
672
                reason="The function is being created.",
673
            )
674
            if publish_to == FunctionVersionLatestPublished.LATEST_PUBLISHED:
1✔
675
                last_update = UpdateStatus(
×
676
                    status=LastUpdateStatus.InProgress,
677
                    code="Updating",
678
                    reason="The function is being updated.",
679
                )
680
                if is_active:
×
681
                    new_state = VersionState(state=State.Active)
×
682
            new_version = dataclasses.replace(
1✔
683
                current_latest_version,
684
                config=dataclasses.replace(
685
                    current_latest_version.config,
686
                    last_update=last_update,
687
                    state=new_state,
688
                    snap_start=snap_start,
689
                    **changes,
690
                ),
691
                id=new_id,
692
            )
693
            function.versions[qualifier] = new_version
1✔
694
        return new_version, True
1✔
695

696
    def _publish_version_from_existing_version(
1✔
697
        self,
698
        function_name: str,
699
        region: str,
700
        account_id: str,
701
        description: str | None = None,
702
        revision_id: str | None = None,
703
        code_sha256: str | None = None,
704
        publish_to: FunctionVersionLatestPublished | None = None,
705
    ) -> FunctionVersion:
706
        """
707
        Publish version from an existing, already initialized LATEST
708

709
        :param function_name: Function name
710
        :param region: region
711
        :param account_id: account id
712
        :param description: description
713
        :param revision_id: revision id (check if current version matches)
714
        :param code_sha256: code sha (check if current code matches)
715
        :return: new version
716
        """
717
        is_active = True if publish_to == FunctionVersionLatestPublished.LATEST_PUBLISHED else False
1✔
718
        new_version, changed = self._create_version_model(
1✔
719
            function_name=function_name,
720
            region=region,
721
            account_id=account_id,
722
            description=description,
723
            revision_id=revision_id,
724
            code_sha256=code_sha256,
725
            publish_to=publish_to,
726
            is_active=is_active,
727
        )
728
        if not changed:
1✔
729
            return new_version
1✔
730

731
        if new_version.config.capacity_provider_config:
1✔
732
            self.lambda_service.publish_version_async(new_version)
×
733
        else:
734
            self.lambda_service.publish_version(new_version)
1✔
735
        state = lambda_stores[account_id][region]
1✔
736
        function = state.functions.get(function_name)
1✔
737

738
        # Update revision id for $LATEST version
739
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
740
        latest_version = function.versions["$LATEST"]
1✔
741
        function.versions["$LATEST"] = dataclasses.replace(
1✔
742
            latest_version, config=dataclasses.replace(latest_version.config)
743
        )
744
        if new_version.config.capacity_provider_config:
1✔
745
            # publish_version happens async for functions with a capacity provider.
746
            # Therefore, we return the new_version with State=Pending or LastUpdateStatus=InProgress ($LATEST.PUBLISHED)
747
            return new_version
×
748
        else:
749
            # Regular functions yield an Active state modified during `publish_version` (sync).
750
            # Therefore, we need to get the updated version from the store.
751
            updated_version = function.versions.get(new_version.id.qualifier)
1✔
752
            return updated_version
1✔
753

754
    def _publish_version_with_changes(
1✔
755
        self,
756
        function_name: str,
757
        region: str,
758
        account_id: str,
759
        description: str | None = None,
760
        revision_id: str | None = None,
761
        code_sha256: str | None = None,
762
        publish_to: FunctionVersionLatestPublished | None = None,
763
        is_active: bool = False,
764
    ) -> FunctionVersion:
765
        """
766
        Publish version together with a new latest version (publish on create / update)
767

768
        :param function_name: Function name
769
        :param region: region
770
        :param account_id: account id
771
        :param description: description
772
        :param revision_id: revision id (check if current version matches)
773
        :param code_sha256: code sha (check if current code matches)
774
        :return: new version
775
        """
776
        new_version, changed = self._create_version_model(
1✔
777
            function_name=function_name,
778
            region=region,
779
            account_id=account_id,
780
            description=description,
781
            revision_id=revision_id,
782
            code_sha256=code_sha256,
783
            publish_to=publish_to,
784
            is_active=is_active,
785
        )
786
        if not changed:
1✔
787
            return new_version
×
788
        self.lambda_service.create_function_version(new_version)
1✔
789
        return new_version
1✔
790

791
    @staticmethod
1✔
792
    def _verify_env_variables(env_vars: dict[str, str]):
1✔
793
        dumped_env_vars = json.dumps(env_vars, separators=(",", ":"))
1✔
794
        if (
1✔
795
            len(dumped_env_vars.encode("utf-8"))
796
            > config.LAMBDA_LIMITS_MAX_FUNCTION_ENVVAR_SIZE_BYTES
797
        ):
798
            raise InvalidParameterValueException(
1✔
799
                f"Lambda was unable to configure your environment variables because the environment variables you have provided exceeded the 4KB limit. String measured: {dumped_env_vars}",
800
                Type="User",
801
            )
802

803
    @staticmethod
1✔
804
    def _validate_snapstart(snap_start: SnapStart, runtime: Runtime):
1✔
805
        apply_on = snap_start.get("ApplyOn")
1✔
806
        if apply_on not in [
1✔
807
            SnapStartApplyOn.PublishedVersions,
808
            SnapStartApplyOn.None_,
809
        ]:
810
            raise ValidationException(
1✔
811
                f"1 validation error detected: Value '{apply_on}' at 'snapStart.applyOn' failed to satisfy constraint: Member must satisfy enum value set: [PublishedVersions, None]"
812
            )
813

814
        if runtime not in SNAP_START_SUPPORTED_RUNTIMES:
1✔
815
            raise InvalidParameterValueException(
×
816
                f"{runtime} is not supported for SnapStart enabled functions.", Type="User"
817
            )
818

819
    def _validate_layers(self, new_layers: list[str], region: str, account_id: str):
1✔
820
        if len(new_layers) > LAMBDA_LAYERS_LIMIT_PER_FUNCTION:
1✔
821
            raise InvalidParameterValueException(
1✔
822
                "Cannot reference more than 5 layers.", Type="User"
823
            )
824

825
        visited_layers = {}
1✔
826
        for layer_version_arn in new_layers:
1✔
827
            (
1✔
828
                layer_region,
829
                layer_account_id,
830
                layer_name,
831
                layer_version_str,
832
            ) = api_utils.parse_layer_arn(layer_version_arn)
833
            if layer_version_str is None:
1✔
834
                raise ValidationException(
1✔
835
                    f"1 validation error detected: Value '[{layer_version_arn}]'"
836
                    + " at 'layers' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 2048, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: "
837
                    + "(arn:(aws[a-zA-Z-]*)?:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+), Member must not be null]",
838
                )
839

840
            state = lambda_stores[layer_account_id][layer_region]
1✔
841
            layer = state.layers.get(layer_name)
1✔
842
            layer_version = None
1✔
843
            if layer is not None:
1✔
844
                layer_version = layer.layer_versions.get(layer_version_str)
1✔
845
            if layer_account_id == account_id:
1✔
846
                if region and layer_region != region:
1✔
847
                    raise InvalidParameterValueException(
1✔
848
                        f"Layers are not in the same region as the function. "
849
                        f"Layers are expected to be in region {region}.",
850
                        Type="User",
851
                    )
852
                if layer is None or layer.layer_versions.get(layer_version_str) is None:
1✔
853
                    raise InvalidParameterValueException(
1✔
854
                        f"Layer version {layer_version_arn} does not exist.", Type="User"
855
                    )
856
            else:  # External layer from other account
857
                # TODO: validate IAM layer policy here, allowing access by default for now and only checking region
858
                if region and layer_region != region:
×
859
                    # TODO: detect user or role from context when IAM users are implemented
860
                    user = "user/localstack-testing"
×
861
                    raise AccessDeniedException(
×
862
                        f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
863
                    )
864
                if layer is None or layer_version is None:
×
865
                    # Limitation: cannot fetch external layers when using the same account id as the target layer
866
                    # because we do not want to trigger the layer fetcher for every non-existing layer.
867
                    if self.layer_fetcher is None:
×
868
                        raise NotImplementedError(
869
                            "Fetching shared layers from AWS is a pro feature."
870
                        )
871

872
                    layer = self.layer_fetcher.fetch_layer(layer_version_arn)
×
873
                    if layer is None:
×
874
                        # TODO: detect user or role from context when IAM users are implemented
875
                        user = "user/localstack-testing"
×
876
                        raise AccessDeniedException(
×
877
                            f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
878
                        )
879

880
                    # Distinguish between new layer and new layer version
881
                    if layer_version is None:
×
882
                        # Create whole layer from scratch
883
                        state.layers[layer_name] = layer
×
884
                    else:
885
                        # Create layer version if another version of the same layer already exists
886
                        state.layers[layer_name].layer_versions[layer_version_str] = (
×
887
                            layer.layer_versions.get(layer_version_str)
888
                        )
889

890
            # only the first two matches in the array are considered for the error message
891
            layer_arn = ":".join(layer_version_arn.split(":")[:-1])
1✔
892
            if layer_arn in visited_layers:
1✔
893
                conflict_layer_version_arn = visited_layers[layer_arn]
1✔
894
                raise InvalidParameterValueException(
1✔
895
                    f"Two different versions of the same layer are not allowed to be referenced in the same function. {conflict_layer_version_arn} and {layer_version_arn} are versions of the same layer.",
896
                    Type="User",
897
                )
898
            visited_layers[layer_arn] = layer_version_arn
1✔
899

900
    def _validate_capacity_provider_config(
1✔
901
        self, capacity_provider_config: CapacityProviderConfig, context: RequestContext
902
    ):
903
        if not capacity_provider_config.get("LambdaManagedInstancesCapacityProviderConfig"):
×
904
            raise ValidationException(
×
905
                "1 validation error detected: Value null at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig' failed to satisfy constraint: Member must not be null"
906
            )
907

908
        capacity_provider_arn = capacity_provider_config.get(
×
909
            "LambdaManagedInstancesCapacityProviderConfig", {}
910
        ).get("CapacityProviderArn")
911
        if not capacity_provider_arn:
×
912
            raise ValidationException(
×
913
                "1 validation error detected: Value null at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig.capacityProviderArn' failed to satisfy constraint: Member must not be null"
914
            )
915

916
        if not re.match(CAPACITY_PROVIDER_ARN_NAME, capacity_provider_arn):
×
917
            raise ValidationException(
×
918
                f"1 validation error detected: Value '{capacity_provider_arn}' at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig.capacityProviderArn' failed to satisfy constraint: Member must satisfy regular expression pattern: {CAPACITY_PROVIDER_ARN_NAME}"
919
            )
920

921
        capacity_provider_name = capacity_provider_arn.split(":")[-1]
×
922
        self.get_capacity_provider(context, capacity_provider_name)
×
923

924
    @staticmethod
1✔
925
    def map_layers(new_layers: list[str]) -> list[LayerVersion]:
1✔
926
        layers = []
1✔
927
        for layer_version_arn in new_layers:
1✔
928
            region_name, account_id, layer_name, layer_version = api_utils.parse_layer_arn(
1✔
929
                layer_version_arn
930
            )
931
            layer = lambda_stores[account_id][region_name].layers.get(layer_name)
1✔
932
            layer_version = layer.layer_versions.get(layer_version)
1✔
933
            layers.append(layer_version)
1✔
934
        return layers
1✔
935

936
    def get_function_recursion_config(
1✔
937
        self,
938
        context: RequestContext,
939
        function_name: UnqualifiedFunctionName,
940
        **kwargs,
941
    ) -> GetFunctionRecursionConfigResponse:
942
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
943
        function_name = api_utils.get_function_name(function_name, context)
1✔
944
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
945
        return GetFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
946

947
    def put_function_recursion_config(
1✔
948
        self,
949
        context: RequestContext,
950
        function_name: UnqualifiedFunctionName,
951
        recursive_loop: RecursiveLoop,
952
        **kwargs,
953
    ) -> PutFunctionRecursionConfigResponse:
954
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
955
        function_name = api_utils.get_function_name(function_name, context)
1✔
956

957
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
958

959
        allowed_values = list(RecursiveLoop.__members__.values())
1✔
960
        if recursive_loop not in allowed_values:
1✔
961
            raise ValidationException(
1✔
962
                f"1 validation error detected: Value '{recursive_loop}' at 'recursiveLoop' failed to satisfy constraint: "
963
                f"Member must satisfy enum value set: [Terminate, Allow]"
964
            )
965

966
        fn.recursive_loop = recursive_loop
1✔
967
        return PutFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
968

969
    @handler(operation="CreateFunction", expand=False)
1✔
970
    def create_function(
1✔
971
        self,
972
        context: RequestContext,
973
        request: CreateFunctionRequest,
974
    ) -> FunctionConfiguration:
975
        context_region = context.region
1✔
976
        context_account_id = context.account_id
1✔
977

978
        zip_file = (request.get("Code") or {}).get("ZipFile")
1✔
979
        if zip_file and len(zip_file) > config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED:
1✔
980
            raise RequestEntityTooLargeException(
1✔
981
                f"Zipped size must be smaller than {config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED} bytes"
982
            )
983

984
        if context.request.content_length > config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE:
1✔
985
            raise RequestEntityTooLargeException(
1✔
986
                f"Request must be smaller than {config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE} bytes for the CreateFunction operation"
987
            )
988

989
        if architectures := request.get("Architectures"):
1✔
990
            if len(architectures) != 1:
1✔
991
                raise ValidationException(
1✔
992
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
993
                    f"satisfy constraint: Member must have length less than or equal to 1",
994
                )
995
            if architectures[0] not in ARCHITECTURES:
1✔
996
                raise ValidationException(
1✔
997
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
998
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
999
                    f"[x86_64, arm64], Member must not be null]",
1000
                )
1001

1002
        if env_vars := request.get("Environment", {}).get("Variables"):
1✔
1003
            self._verify_env_variables(env_vars)
1✔
1004

1005
        if layers := request.get("Layers", []):
1✔
1006
            self._validate_layers(layers, region=context_region, account_id=context_account_id)
1✔
1007

1008
        if not api_utils.is_role_arn(request.get("Role")):
1✔
1009
            raise ValidationException(
1✔
1010
                f"1 validation error detected: Value '{request.get('Role')}'"
1011
                + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
1012
            )
1013
        if not self.lambda_service.can_assume_role(request.get("Role"), context.region):
1✔
1014
            raise InvalidParameterValueException(
×
1015
                "The role defined for the function cannot be assumed by Lambda.", Type="User"
1016
            )
1017
        package_type = request.get("PackageType", PackageType.Zip)
1✔
1018
        runtime = request.get("Runtime")
1✔
1019
        self._validate_runtime(package_type, runtime)
1✔
1020

1021
        request_function_name = request.get("FunctionName")
1✔
1022

1023
        function_name, *_ = api_utils.get_name_and_qualifier(
1✔
1024
            function_arn_or_name=request_function_name,
1025
            qualifier=None,
1026
            context=context,
1027
        )
1028

1029
        if runtime in DEPRECATED_RUNTIMES:
1✔
1030
            LOG.warning(
1✔
1031
                "The Lambda runtime %s} is deprecated. "
1032
                "Please upgrade the runtime for the function %s: "
1033
                "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
1034
                runtime,
1035
                function_name,
1036
            )
1037
        if snap_start := request.get("SnapStart"):
1✔
1038
            self._validate_snapstart(snap_start, runtime)
1✔
1039
        if publish_to := request.get("PublishTo"):
1✔
1040
            self._validate_publish_to(publish_to)
×
1041
        state = lambda_stores[context_account_id][context_region]
1✔
1042

1043
        with self.create_fn_lock:
1✔
1044
            if function_name in state.functions:
1✔
1045
                raise ResourceConflictException(f"Function already exist: {function_name}")
×
1046
            fn = Function(function_name=function_name)
1✔
1047
            arn = VersionIdentifier(
1✔
1048
                function_name=function_name,
1049
                qualifier="$LATEST",
1050
                region=context_region,
1051
                account=context_account_id,
1052
            )
1053
            # save function code to s3
1054
            code = None
1✔
1055
            image = None
1✔
1056
            image_config = None
1✔
1057
            runtime_version_config = RuntimeVersionConfig(
1✔
1058
                # Limitation: the runtime id (presumably sha256 of image) is currently hardcoded
1059
                # Potential implementation: provide (cached) sha256 hash of used Docker image
1060
                RuntimeVersionArn=f"arn:{context.partition}:lambda:{context_region}::runtime:8eeff65f6809a3ce81507fe733fe09b835899b99481ba22fd75b5a7338290ec1"
1061
            )
1062
            request_code = request.get("Code") or {}
1✔
1063
            if package_type == PackageType.Zip:
1✔
1064
                # TODO verify if correct combination of code is set
1065
                if zip_file := request_code.get("ZipFile"):
1✔
1066
                    code = store_lambda_archive(
1✔
1067
                        archive_file=zip_file,
1068
                        function_name=function_name,
1069
                        region_name=context_region,
1070
                        account_id=context_account_id,
1071
                    )
1072
                elif s3_bucket := request_code.get("S3Bucket"):
1✔
1073
                    s3_key = request_code["S3Key"]
1✔
1074
                    s3_object_version = request_code.get("S3ObjectVersion")
1✔
1075
                    code = store_s3_bucket_archive(
1✔
1076
                        archive_bucket=s3_bucket,
1077
                        archive_key=s3_key,
1078
                        archive_version=s3_object_version,
1079
                        function_name=function_name,
1080
                        region_name=context_region,
1081
                        account_id=context_account_id,
1082
                    )
1083
                else:
1084
                    raise LambdaServiceException("A ZIP file or S3 bucket is required")
×
1085
            elif package_type == PackageType.Image:
1✔
1086
                image = request_code.get("ImageUri")
1✔
1087
                if not image:
1✔
1088
                    raise LambdaServiceException(
×
1089
                        "An image is required when the package type is set to 'image'"
1090
                    )
1091
                image = create_image_code(image_uri=image)
1✔
1092

1093
                image_config_req = request.get("ImageConfig") or {}
1✔
1094
                image_config = ImageConfig(
1✔
1095
                    command=image_config_req.get("Command"),
1096
                    entrypoint=image_config_req.get("EntryPoint"),
1097
                    working_directory=image_config_req.get("WorkingDirectory"),
1098
                )
1099
                # Runtime management controls are not available when providing a custom image
1100
                runtime_version_config = None
1✔
1101

1102
            capacity_provider_config = None
1✔
1103
            memory_size = request.get("MemorySize", LAMBDA_DEFAULT_MEMORY_SIZE)
1✔
1104
            if "CapacityProviderConfig" in request:
1✔
1105
                capacity_provider_config = request["CapacityProviderConfig"]
×
1106
                self._validate_capacity_provider_config(capacity_provider_config, context)
×
1107
                self._validate_managed_instances_runtime(runtime)
×
1108

1109
                default_config = CapacityProviderConfig(
×
1110
                    LambdaManagedInstancesCapacityProviderConfig=LambdaManagedInstancesCapacityProviderConfig(
1111
                        ExecutionEnvironmentMemoryGiBPerVCpu=2.0,
1112
                        PerExecutionEnvironmentMaxConcurrency=16,
1113
                    )
1114
                )
1115
                capacity_provider_config = merge_recursive(default_config, capacity_provider_config)
×
1116
                memory_size = 2048
×
NEW
1117
                if (request.get("LoggingConfig") or {}).get("LogFormat") == LogFormat.Text:
×
1118
                    raise InvalidParameterValueException(
×
1119
                        'LogLevel is not supported when LogFormat is set to "Text". Remove LogLevel from your request or change the LogFormat to "JSON" and try again.',
1120
                        Type="User",
1121
                    )
1122
            if "LoggingConfig" in request:
1✔
1123
                logging_config = request["LoggingConfig"]
1✔
1124
                LOG.warning(
1✔
1125
                    "Advanced Lambda Logging Configuration is currently mocked "
1126
                    "and will not impact the logging behavior. "
1127
                    "Please create a feature request if needed."
1128
                )
1129

1130
                # when switching to JSON, app and system level log is auto set to INFO
1131
                if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
1132
                    logging_config = {
1✔
1133
                        "ApplicationLogLevel": "INFO",
1134
                        "SystemLogLevel": "INFO",
1135
                        "LogGroup": f"/aws/lambda/{function_name}",
1136
                    } | logging_config
1137
                else:
1138
                    logging_config = (
×
1139
                        LoggingConfig(
1140
                            LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
1141
                        )
1142
                        | logging_config
1143
                    )
1144

1145
            elif capacity_provider_config:
1✔
1146
                logging_config = LoggingConfig(
×
1147
                    LogFormat=LogFormat.JSON,
1148
                    LogGroup=f"/aws/lambda/{function_name}",
1149
                    ApplicationLogLevel="INFO",
1150
                    SystemLogLevel="INFO",
1151
                )
1152
            else:
1153
                logging_config = LoggingConfig(
1✔
1154
                    LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
1155
                )
1156
            snap_start = (
1✔
1157
                None
1158
                if capacity_provider_config
1159
                else SnapStartResponse(
1160
                    ApplyOn=request.get("SnapStart", {}).get("ApplyOn", SnapStartApplyOn.None_),
1161
                    OptimizationStatus=SnapStartOptimizationStatus.Off,
1162
                )
1163
            )
1164
            version = FunctionVersion(
1✔
1165
                id=arn,
1166
                config=VersionFunctionConfiguration(
1167
                    last_modified=api_utils.format_lambda_date(datetime.datetime.now()),
1168
                    description=request.get("Description", ""),
1169
                    role=request["Role"],
1170
                    timeout=request.get("Timeout", LAMBDA_DEFAULT_TIMEOUT),
1171
                    runtime=request.get("Runtime"),
1172
                    memory_size=memory_size,
1173
                    handler=request.get("Handler"),
1174
                    package_type=package_type,
1175
                    environment=env_vars,
1176
                    architectures=request.get("Architectures") or [Architecture.x86_64],
1177
                    tracing_config_mode=request.get("TracingConfig", {}).get(
1178
                        "Mode", TracingMode.PassThrough
1179
                    ),
1180
                    image=image,
1181
                    image_config=image_config,
1182
                    code=code,
1183
                    layers=self.map_layers(layers),
1184
                    internal_revision=short_uid(),
1185
                    ephemeral_storage=LambdaEphemeralStorage(
1186
                        size=request.get("EphemeralStorage", {}).get("Size", 512)
1187
                    ),
1188
                    snap_start=snap_start,
1189
                    runtime_version_config=runtime_version_config,
1190
                    dead_letter_arn=request.get("DeadLetterConfig", {}).get("TargetArn"),
1191
                    vpc_config=self._build_vpc_config(
1192
                        context_account_id, context_region, request.get("VpcConfig")
1193
                    ),
1194
                    state=VersionState(
1195
                        state=State.Pending,
1196
                        code=StateReasonCode.Creating,
1197
                        reason="The function is being created.",
1198
                    ),
1199
                    logging_config=logging_config,
1200
                    # TODO: might need something like **optional_kwargs if None
1201
                    #   -> Test with regular GetFunction (i.e., without a capacity provider)
1202
                    capacity_provider_config=capacity_provider_config,
1203
                ),
1204
            )
1205
            version_post_response = None
1✔
1206
            if capacity_provider_config:
1✔
1207
                version_post_response = dataclasses.replace(
×
1208
                    version,
1209
                    config=dataclasses.replace(
1210
                        version.config,
1211
                        last_update=UpdateStatus(status=LastUpdateStatus.Successful),
1212
                        state=VersionState(state=State.ActiveNonInvocable),
1213
                    ),
1214
                )
1215
            fn.versions["$LATEST"] = version_post_response or version
1✔
1216
            state.functions[function_name] = fn
1✔
1217
        initialization_type = (
1✔
1218
            FunctionInitializationType.lambda_managed_instances
1219
            if capacity_provider_config
1220
            else FunctionInitializationType.on_demand
1221
        )
1222
        function_counter.labels(
1✔
1223
            operation=FunctionOperation.create,
1224
            runtime=runtime or "n/a",
1225
            status=FunctionStatus.success,
1226
            invocation_type="n/a",
1227
            package_type=package_type,
1228
            initialization_type=initialization_type,
1229
        )
1230
        # TODO: consider potential other side effects of not having a function version for $LATEST
1231
        # Provisioning happens upon publishing for functions using a capacity provider
1232
        if not capacity_provider_config:
1✔
1233
            self.lambda_service.create_function_version(version)
1✔
1234

1235
        if tags := request.get("Tags"):
1✔
1236
            # This will check whether the function exists.
1237
            self._store_tags(arn.unqualified_arn(), tags)
1✔
1238

1239
        if request.get("Publish"):
1✔
1240
            version = self._publish_version_with_changes(
1✔
1241
                function_name=function_name,
1242
                region=context_region,
1243
                account_id=context_account_id,
1244
                publish_to=request.get("PublishTo"),
1245
            )
1246

1247
        if config.LAMBDA_SYNCHRONOUS_CREATE:
1✔
1248
            # block via retrying until "terminal" condition reached before returning
1249
            if not poll_condition(
×
1250
                lambda: (
1251
                    get_function_version(
1252
                        function_name, version.id.qualifier, version.id.account, version.id.region
1253
                    ).config.state.state
1254
                    in [State.Active, State.ActiveNonInvocable, State.Failed]
1255
                ),
1256
                timeout=10,
1257
            ):
1258
                LOG.warning(
×
1259
                    "LAMBDA_SYNCHRONOUS_CREATE is active, but waiting for %s reached timeout.",
1260
                    function_name,
1261
                )
1262

1263
        return api_utils.map_config_out(
1✔
1264
            version, return_qualified_arn=False, return_update_status=False
1265
        )
1266

1267
    def _validate_runtime(self, package_type, runtime):
1✔
1268
        runtimes = ALL_RUNTIMES
1✔
1269
        if config.LAMBDA_RUNTIME_VALIDATION:
1✔
1270
            runtimes = list(itertools.chain(RUNTIMES_AGGREGATED.values()))
1✔
1271

1272
        if package_type == PackageType.Zip and runtime not in runtimes:
1✔
1273
            # deprecated runtimes have different error
1274
            if runtime in DEPRECATED_RUNTIMES:
1✔
1275
                HINT_LOG.info(
1✔
1276
                    "Set env variable LAMBDA_RUNTIME_VALIDATION to 0"
1277
                    " in order to allow usage of deprecated runtimes"
1278
                )
1279
                self._check_for_recomended_migration_target(runtime)
1✔
1280

1281
            raise InvalidParameterValueException(
1✔
1282
                f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1283
                Type="User",
1284
            )
1285

1286
    def _validate_managed_instances_runtime(self, runtime):
1✔
1287
        if runtime not in VALID_MANAGED_INSTANCE_RUNTIMES:
×
1288
            raise InvalidParameterValueException(
×
1289
                f"Runtime Enum {runtime} does not support specified feature: Lambda Managed Instances"
1290
            )
1291

1292
    def _check_for_recomended_migration_target(self, deprecated_runtime):
1✔
1293
        # AWS offers recommended runtime for migration for "newly" deprecated runtimes
1294
        # in order to preserve parity with error messages we need the code bellow
1295
        latest_runtime = DEPRECATED_RUNTIMES_UPGRADES.get(deprecated_runtime)
1✔
1296

1297
        if latest_runtime is not None:
1✔
1298
            LOG.debug(
1✔
1299
                "The Lambda runtime %s is deprecated. Please upgrade to a supported Lambda runtime such as %s.",
1300
                deprecated_runtime,
1301
                latest_runtime,
1302
            )
1303
            raise InvalidParameterValueException(
1✔
1304
                f"The runtime parameter of {deprecated_runtime} is no longer supported for creating or updating AWS Lambda functions. We recommend you use a supported runtime while creating or updating functions.",
1305
                Type="User",
1306
            )
1307

1308
    @handler(operation="UpdateFunctionConfiguration", expand=False)
1✔
1309
    def update_function_configuration(
1✔
1310
        self, context: RequestContext, request: UpdateFunctionConfigurationRequest
1311
    ) -> FunctionConfiguration:
1312
        """updates the $LATEST version of the function"""
1313
        function_name = request.get("FunctionName")
1✔
1314

1315
        # in case we got ARN or partial ARN
1316
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1317
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1318
        state = lambda_stores[account_id][region]
1✔
1319

1320
        if function_name not in state.functions:
1✔
1321
            raise ResourceNotFoundException(
×
1322
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1323
                Type="User",
1324
            )
1325
        function = state.functions[function_name]
1✔
1326

1327
        # TODO: lock modification of latest version
1328
        # TODO: notify service for changes relevant to re-provisioning of $LATEST
1329
        latest_version = function.latest()
1✔
1330
        latest_version_config = latest_version.config
1✔
1331

1332
        revision_id = request.get("RevisionId")
1✔
1333
        if revision_id and revision_id != latest_version.config.revision_id:
1✔
1334
            raise PreconditionFailedException(
1✔
1335
                "The Revision Id provided does not match the latest Revision Id. "
1336
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1337
                Type="User",
1338
            )
1339

1340
        replace_kwargs = {}
1✔
1341
        if "EphemeralStorage" in request:
1✔
1342
            replace_kwargs["ephemeral_storage"] = LambdaEphemeralStorage(
×
1343
                request.get("EphemeralStorage", {}).get("Size", 512)
1344
            )  # TODO: do defaults here apply as well?
1345

1346
        if "Role" in request:
1✔
1347
            if not api_utils.is_role_arn(request["Role"]):
1✔
1348
                raise ValidationException(
1✔
1349
                    f"1 validation error detected: Value '{request.get('Role')}'"
1350
                    + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
1351
                )
1352
            replace_kwargs["role"] = request["Role"]
1✔
1353

1354
        if "Description" in request:
1✔
1355
            replace_kwargs["description"] = request["Description"]
1✔
1356

1357
        if "Timeout" in request:
1✔
1358
            replace_kwargs["timeout"] = request["Timeout"]
1✔
1359

1360
        if "MemorySize" in request:
1✔
1361
            replace_kwargs["memory_size"] = request["MemorySize"]
1✔
1362

1363
        if "DeadLetterConfig" in request:
1✔
1364
            replace_kwargs["dead_letter_arn"] = request.get("DeadLetterConfig", {}).get("TargetArn")
1✔
1365

1366
        if vpc_config := request.get("VpcConfig"):
1✔
1367
            replace_kwargs["vpc_config"] = self._build_vpc_config(account_id, region, vpc_config)
1✔
1368

1369
        if "Handler" in request:
1✔
1370
            replace_kwargs["handler"] = request["Handler"]
1✔
1371

1372
        if "Runtime" in request:
1✔
1373
            runtime = request["Runtime"]
1✔
1374

1375
            if runtime not in ALL_RUNTIMES:
1✔
1376
                raise InvalidParameterValueException(
1✔
1377
                    f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1378
                    Type="User",
1379
                )
1380
            if runtime in DEPRECATED_RUNTIMES:
1✔
1381
                LOG.warning(
×
1382
                    "The Lambda runtime %s is deprecated. "
1383
                    "Please upgrade the runtime for the function %s: "
1384
                    "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
1385
                    runtime,
1386
                    function_name,
1387
                )
1388
            replace_kwargs["runtime"] = request["Runtime"]
1✔
1389

1390
        if snap_start := request.get("SnapStart"):
1✔
1391
            runtime = replace_kwargs.get("runtime") or latest_version_config.runtime
1✔
1392
            self._validate_snapstart(snap_start, runtime)
1✔
1393
            replace_kwargs["snap_start"] = SnapStartResponse(
1✔
1394
                ApplyOn=snap_start.get("ApplyOn", SnapStartApplyOn.None_),
1395
                OptimizationStatus=SnapStartOptimizationStatus.Off,
1396
            )
1397

1398
        if "Environment" in request:
1✔
1399
            if env_vars := request.get("Environment", {}).get("Variables", {}):
1✔
1400
                self._verify_env_variables(env_vars)
1✔
1401
            replace_kwargs["environment"] = env_vars
1✔
1402

1403
        if "Layers" in request:
1✔
1404
            new_layers = request["Layers"]
1✔
1405
            if new_layers:
1✔
1406
                self._validate_layers(new_layers, region=region, account_id=account_id)
1✔
1407
            replace_kwargs["layers"] = self.map_layers(new_layers)
1✔
1408

1409
        if "ImageConfig" in request:
1✔
1410
            new_image_config = request["ImageConfig"]
1✔
1411
            replace_kwargs["image_config"] = ImageConfig(
1✔
1412
                command=new_image_config.get("Command"),
1413
                entrypoint=new_image_config.get("EntryPoint"),
1414
                working_directory=new_image_config.get("WorkingDirectory"),
1415
            )
1416

1417
        if "LoggingConfig" in request:
1✔
1418
            logging_config = request["LoggingConfig"]
1✔
1419
            LOG.warning(
1✔
1420
                "Advanced Lambda Logging Configuration is currently mocked "
1421
                "and will not impact the logging behavior. "
1422
                "Please create a feature request if needed."
1423
            )
1424

1425
            # when switching to JSON, app and system level log is auto set to INFO
1426
            if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
1427
                logging_config = {
1✔
1428
                    "ApplicationLogLevel": "INFO",
1429
                    "SystemLogLevel": "INFO",
1430
                } | logging_config
1431

1432
            last_config = latest_version_config.logging_config
1✔
1433

1434
            # add partial update
1435
            new_logging_config = last_config | logging_config
1✔
1436

1437
            # in case we switched from JSON to Text we need to remove LogLevel keys
1438
            if (
1✔
1439
                new_logging_config.get("LogFormat") == LogFormat.Text
1440
                and last_config.get("LogFormat") == LogFormat.JSON
1441
            ):
1442
                new_logging_config.pop("ApplicationLogLevel", None)
1✔
1443
                new_logging_config.pop("SystemLogLevel", None)
1✔
1444

1445
            replace_kwargs["logging_config"] = new_logging_config
1✔
1446

1447
        if "TracingConfig" in request:
1✔
1448
            new_mode = request.get("TracingConfig", {}).get("Mode")
×
1449
            if new_mode:
×
1450
                replace_kwargs["tracing_config_mode"] = new_mode
×
1451

1452
        if "CapacityProviderConfig" in request:
1✔
1453
            capacity_provider_config = request["CapacityProviderConfig"]
×
1454
            self._validate_capacity_provider_config(capacity_provider_config, context)
×
1455

1456
            if latest_version.config.capacity_provider_config and not request[
×
1457
                "CapacityProviderConfig"
1458
            ].get("LambdaManagedInstancesCapacityProviderConfig"):
1459
                raise ValidationException(
×
1460
                    "1 validation error detected: Value null at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig' failed to satisfy constraint: Member must not be null"
1461
                )
1462
            if not latest_version.config.capacity_provider_config:
×
1463
                raise InvalidParameterValueException(
×
1464
                    "CapacityProviderConfig isn't supported for Lambda Default functions.",
1465
                    Type="User",
1466
                )
1467

1468
            default_config = CapacityProviderConfig(
×
1469
                LambdaManagedInstancesCapacityProviderConfig=LambdaManagedInstancesCapacityProviderConfig(
1470
                    ExecutionEnvironmentMemoryGiBPerVCpu=2.0,
1471
                    PerExecutionEnvironmentMaxConcurrency=16,
1472
                )
1473
            )
1474
            capacity_provider_config = merge_recursive(default_config, capacity_provider_config)
×
1475
            replace_kwargs["capacity_provider_config"] = capacity_provider_config
×
1476
        new_latest_version = dataclasses.replace(
1✔
1477
            latest_version,
1478
            config=dataclasses.replace(
1479
                latest_version_config,
1480
                last_modified=api_utils.generate_lambda_date(),
1481
                internal_revision=short_uid(),
1482
                last_update=UpdateStatus(
1483
                    status=LastUpdateStatus.InProgress,
1484
                    code="Creating",
1485
                    reason="The function is being created.",
1486
                ),
1487
                **replace_kwargs,
1488
            ),
1489
        )
1490
        function.versions["$LATEST"] = new_latest_version  # TODO: notify
1✔
1491

1492
        if function.latest().config.capacity_provider_config:
1✔
1493

1494
            def _update_version_with_logging():
×
1495
                try:
×
1496
                    self.lambda_service.update_version(new_latest_version)
×
1497
                except Exception:
×
1498
                    LOG.error(
×
1499
                        "Failed to update Lambda Managed Instances function version %s",
1500
                        new_latest_version.id.qualified_arn(),
1501
                        exc_info=LOG.isEnabledFor(logging.DEBUG),
1502
                    )
1503

1504
            self.lambda_service.task_executor.submit(_update_version_with_logging)
×
1505
        else:
1506
            self.lambda_service.update_version(new_version=new_latest_version)
1✔
1507

1508
        return api_utils.map_config_out(new_latest_version)
1✔
1509

1510
    @handler(operation="UpdateFunctionCode", expand=False)
1✔
1511
    def update_function_code(
1✔
1512
        self, context: RequestContext, request: UpdateFunctionCodeRequest
1513
    ) -> FunctionConfiguration:
1514
        """updates the $LATEST version of the function"""
1515
        # only supports normal zip packaging atm
1516
        # if request.get("Publish"):
1517
        #     self.lambda_service.create_function_version()
1518

1519
        function_name = request.get("FunctionName")
1✔
1520
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1521
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1522

1523
        store = lambda_stores[account_id][region]
1✔
1524
        if function_name not in store.functions:
1✔
1525
            raise ResourceNotFoundException(
×
1526
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1527
                Type="User",
1528
            )
1529
        function = store.functions[function_name]
1✔
1530

1531
        revision_id = request.get("RevisionId")
1✔
1532
        if revision_id and revision_id != function.latest().config.revision_id:
1✔
1533
            raise PreconditionFailedException(
1✔
1534
                "The Revision Id provided does not match the latest Revision Id. "
1535
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1536
                Type="User",
1537
            )
1538

1539
        # TODO verify if correct combination of code is set
1540
        image = None
1✔
1541
        if (
1✔
1542
            request.get("ZipFile") or request.get("S3Bucket")
1543
        ) and function.latest().config.package_type == PackageType.Image:
1544
            raise InvalidParameterValueException(
1✔
1545
                "Please provide ImageUri when updating a function with packageType Image.",
1546
                Type="User",
1547
            )
1548
        elif request.get("ImageUri") and function.latest().config.package_type == PackageType.Zip:
1✔
1549
            raise InvalidParameterValueException(
1✔
1550
                "Please don't provide ImageUri when updating a function with packageType Zip.",
1551
                Type="User",
1552
            )
1553

1554
        if publish_to := request.get("PublishTo"):
1✔
1555
            self._validate_publish_to(publish_to)
×
1556

1557
        if zip_file := request.get("ZipFile"):
1✔
1558
            code = store_lambda_archive(
1✔
1559
                archive_file=zip_file,
1560
                function_name=function_name,
1561
                region_name=region,
1562
                account_id=account_id,
1563
            )
1564
        elif s3_bucket := request.get("S3Bucket"):
1✔
1565
            s3_key = request["S3Key"]
1✔
1566
            s3_object_version = request.get("S3ObjectVersion")
1✔
1567
            code = store_s3_bucket_archive(
1✔
1568
                archive_bucket=s3_bucket,
1569
                archive_key=s3_key,
1570
                archive_version=s3_object_version,
1571
                function_name=function_name,
1572
                region_name=region,
1573
                account_id=account_id,
1574
            )
1575
        elif image := request.get("ImageUri"):
1✔
1576
            code = None
1✔
1577
            image = create_image_code(image_uri=image)
1✔
1578
        else:
1579
            raise LambdaServiceException("A ZIP file, S3 bucket, or image is required")
×
1580

1581
        old_function_version = function.versions.get("$LATEST")
1✔
1582
        replace_kwargs = {"code": code} if code else {"image": image}
1✔
1583

1584
        if architectures := request.get("Architectures"):
1✔
1585
            if len(architectures) != 1:
×
1586
                raise ValidationException(
×
1587
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1588
                    f"satisfy constraint: Member must have length less than or equal to 1",
1589
                )
1590
            # An empty list of architectures is also forbidden. Further exceptions are tested here for create_function:
1591
            # tests.aws.services.lambda_.test_lambda_api.TestLambdaFunction.test_create_lambda_exceptions
1592
            if architectures[0] not in ARCHITECTURES:
×
1593
                raise ValidationException(
×
1594
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1595
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
1596
                    f"[x86_64, arm64], Member must not be null]",
1597
                )
1598
            replace_kwargs["architectures"] = architectures
×
1599

1600
        config = dataclasses.replace(
1✔
1601
            old_function_version.config,
1602
            internal_revision=short_uid(),
1603
            last_modified=api_utils.generate_lambda_date(),
1604
            last_update=UpdateStatus(
1605
                status=LastUpdateStatus.InProgress,
1606
                code="Creating",
1607
                reason="The function is being created.",
1608
            ),
1609
            **replace_kwargs,
1610
        )
1611
        function_version = dataclasses.replace(old_function_version, config=config)
1✔
1612
        function.versions["$LATEST"] = function_version
1✔
1613

1614
        self.lambda_service.update_version(new_version=function_version)
1✔
1615
        if request.get("Publish"):
1✔
1616
            function_version = self._publish_version_with_changes(
1✔
1617
                function_name=function_name,
1618
                region=region,
1619
                account_id=account_id,
1620
                publish_to=publish_to,
1621
                is_active=True,
1622
            )
1623
        return api_utils.map_config_out(
1✔
1624
            function_version, return_qualified_arn=bool(request.get("Publish"))
1625
        )
1626

1627
    # TODO: does deleting the latest published version affect the next versions number?
1628
    # TODO: what happens when we call this with a qualifier and a fully qualified ARN? (+ conflicts?)
1629
    # TODO: test different ARN patterns (shorthand ARN?)
1630
    # TODO: test deleting across regions?
1631
    # TODO: test mismatch between context region and region in ARN
1632
    # TODO: test qualifier $LATEST, alias-name and version
1633
    def delete_function(
1✔
1634
        self,
1635
        context: RequestContext,
1636
        function_name: NamespacedFunctionName,
1637
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1638
        **kwargs,
1639
    ) -> DeleteFunctionResponse:
1640
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1641
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1642
            function_name, qualifier, context
1643
        )
1644

1645
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1646
            raise InvalidParameterValueException(
×
1647
                "Deletion of aliases is not currently supported.",
1648
                Type="User",
1649
            )
1650

1651
        store = lambda_stores[account_id][region]
1✔
1652
        if qualifier == "$LATEST":
1✔
1653
            raise InvalidParameterValueException(
1✔
1654
                "$LATEST version cannot be deleted without deleting the function.", Type="User"
1655
            )
1656

1657
        unqualified_function_arn = api_utils.unqualified_lambda_arn(
1✔
1658
            function_name=function_name, region=region, account=account_id
1659
        )
1660
        if function_name not in store.functions:
1✔
1661
            e = ResourceNotFoundException(
1✔
1662
                f"Function not found: {unqualified_function_arn}",
1663
                Type="User",
1664
            )
1665
            raise e
1✔
1666
        function = store.functions.get(function_name)
1✔
1667

1668
        function_has_capacity_provider = False
1✔
1669
        if qualifier:
1✔
1670
            # delete a version of the function
1671
            version = function.versions.get(qualifier, None)
1✔
1672
            if version:
1✔
1673
                if version.config.capacity_provider_config:
1✔
1674
                    function_has_capacity_provider = True
×
1675
                    # async delete from store
1676
                    self.lambda_service.delete_function_version_async(function, version, qualifier)
×
1677
                else:
1678
                    function.versions.pop(qualifier, None)
1✔
1679
                self.lambda_service.stop_version(version.id.qualified_arn())
1✔
1680
                destroy_code_if_not_used(code=version.config.code, function=function)
1✔
1681
        else:
1682
            # delete the whole function
1683
            self._remove_all_tags(unqualified_function_arn)
1✔
1684
            # TODO: introduce locking for safe deletion: We could create a new version at the API layer before
1685
            #  the old version gets cleaned up in the internal lambda service.
1686
            function = store.functions.get(function_name)
1✔
1687
            if function.latest().config.capacity_provider_config:
1✔
1688
                function_has_capacity_provider = True
×
1689
                # async delete version from store
1690
                self.lambda_service.delete_function_async(store, function_name)
×
1691

1692
            for version in function.versions.values():
1✔
1693
                # Functions with a capacity provider do NOT have a version manager for $LATEST because only
1694
                # published versions are invokable.
1695
                if not function_has_capacity_provider or (
1✔
1696
                    function_has_capacity_provider and version.id.qualifier != "$LATEST"
1697
                ):
1698
                    self.lambda_service.stop_version(qualified_arn=version.id.qualified_arn())
1✔
1699
                # we can safely destroy the code here
1700
                if version.config.code:
1✔
1701
                    version.config.code.destroy()
1✔
1702
            if not function_has_capacity_provider:
1✔
1703
                store.functions.pop(function_name, None)
1✔
1704

1705
        return DeleteFunctionResponse(StatusCode=202 if function_has_capacity_provider else 204)
1✔
1706

1707
    def list_functions(
1✔
1708
        self,
1709
        context: RequestContext,
1710
        master_region: MasterRegion = None,  # (only relevant for lambda@edge)
1711
        function_version: FunctionVersionApi = None,
1712
        marker: String = None,
1713
        max_items: MaxListItems = None,
1714
        **kwargs,
1715
    ) -> ListFunctionsResponse:
1716
        state = lambda_stores[context.account_id][context.region]
1✔
1717

1718
        if function_version and function_version != FunctionVersionApi.ALL:
1✔
1719
            raise ValidationException(
1✔
1720
                f"1 validation error detected: Value '{function_version}'"
1721
                + " at 'functionVersion' failed to satisfy constraint: Member must satisfy enum value set: [ALL]"
1722
            )
1723

1724
        if function_version == FunctionVersionApi.ALL:
1✔
1725
            # include all versions for all function
1726
            versions = [v for f in state.functions.values() for v in f.versions.values()]
1✔
1727
            return_qualified_arn = True
1✔
1728
        else:
1729
            versions = [f.latest() for f in state.functions.values()]
1✔
1730
            return_qualified_arn = False
1✔
1731

1732
        versions = [
1✔
1733
            api_utils.map_to_list_response(
1734
                api_utils.map_config_out(fc, return_qualified_arn=return_qualified_arn)
1735
            )
1736
            for fc in versions
1737
        ]
1738
        versions = PaginatedList(versions)
1✔
1739
        page, token = versions.get_page(
1✔
1740
            lambda version: version["FunctionArn"],
1741
            marker,
1742
            max_items,
1743
        )
1744
        return ListFunctionsResponse(Functions=page, NextMarker=token)
1✔
1745

1746
    def get_function(
1✔
1747
        self,
1748
        context: RequestContext,
1749
        function_name: NamespacedFunctionName,
1750
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1751
        **kwargs,
1752
    ) -> GetFunctionResponse:
1753
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1754
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1755
            function_name, qualifier, context
1756
        )
1757

1758
        fn = lambda_stores[account_id][region].functions.get(function_name)
1✔
1759
        if fn is None:
1✔
1760
            if qualifier is None:
1✔
1761
                raise ResourceNotFoundException(
1✔
1762
                    f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
1763
                    Type="User",
1764
                )
1765
            else:
1766
                raise ResourceNotFoundException(
1✔
1767
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
1768
                    Type="User",
1769
                )
1770
        alias_name = None
1✔
1771
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1772
            if qualifier not in fn.aliases:
1✔
1773
                alias_arn = api_utils.qualified_lambda_arn(
1✔
1774
                    function_name, qualifier, account_id, region
1775
                )
1776
                raise ResourceNotFoundException(f"Function not found: {alias_arn}", Type="User")
1✔
1777
            alias_name = qualifier
1✔
1778
            qualifier = fn.aliases[alias_name].function_version
1✔
1779

1780
        version = get_function_version(
1✔
1781
            function_name=function_name,
1782
            qualifier=qualifier,
1783
            account_id=account_id,
1784
            region=region,
1785
        )
1786
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
1787
        additional_fields = {}
1✔
1788
        if tags:
1✔
1789
            additional_fields["Tags"] = tags
1✔
1790
        code_location = None
1✔
1791
        if code := version.config.code:
1✔
1792
            code_location = FunctionCodeLocation(
1✔
1793
                Location=code.generate_presigned_url(endpoint_url=config.external_service_url()),
1794
                RepositoryType="S3",
1795
            )
1796
        elif image := version.config.image:
1✔
1797
            code_location = FunctionCodeLocation(
1✔
1798
                ImageUri=image.image_uri,
1799
                RepositoryType=image.repository_type,
1800
                ResolvedImageUri=image.resolved_image_uri,
1801
            )
1802
        concurrency = None
1✔
1803
        if fn.reserved_concurrent_executions:
1✔
1804
            concurrency = Concurrency(
1✔
1805
                ReservedConcurrentExecutions=fn.reserved_concurrent_executions
1806
            )
1807

1808
        return GetFunctionResponse(
1✔
1809
            Configuration=api_utils.map_config_out(
1810
                version, return_qualified_arn=bool(qualifier), alias_name=alias_name
1811
            ),
1812
            Code=code_location,  # TODO
1813
            Concurrency=concurrency,
1814
            **additional_fields,
1815
        )
1816

1817
    def get_function_configuration(
1✔
1818
        self,
1819
        context: RequestContext,
1820
        function_name: NamespacedFunctionName,
1821
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1822
        **kwargs,
1823
    ) -> FunctionConfiguration:
1824
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1825
        # CAVE: THIS RETURN VALUE IS *NOT* THE SAME AS IN get_function (!) but seems to be only configuration part?
1826
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1827
            function_name, qualifier, context
1828
        )
1829
        version = get_function_version(
1✔
1830
            function_name=function_name,
1831
            qualifier=qualifier,
1832
            account_id=account_id,
1833
            region=region,
1834
        )
1835
        return api_utils.map_config_out(version, return_qualified_arn=bool(qualifier))
1✔
1836

1837
    def invoke(
1✔
1838
        self,
1839
        context: RequestContext,
1840
        function_name: NamespacedFunctionName,
1841
        invocation_type: InvocationType | None = None,
1842
        log_type: LogType | None = None,
1843
        client_context: String | None = None,
1844
        durable_execution_name: DurableExecutionName | None = None,
1845
        payload: IO[Blob] | None = None,
1846
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1847
        tenant_id: TenantId | None = None,
1848
        **kwargs,
1849
    ) -> InvocationResponse:
1850
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1851
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1852
            function_name, qualifier, context
1853
        )
1854

1855
        user_agent = context.request.user_agent.string
1✔
1856

1857
        time_before = time.perf_counter()
1✔
1858
        try:
1✔
1859
            invocation_result = self.lambda_service.invoke(
1✔
1860
                function_name=function_name,
1861
                qualifier=qualifier,
1862
                region=region,
1863
                account_id=account_id,
1864
                invocation_type=invocation_type,
1865
                client_context=client_context,
1866
                request_id=context.request_id,
1867
                trace_context=context.trace_context,
1868
                payload=payload.read() if payload else None,
1869
                user_agent=user_agent,
1870
            )
1871
        except ServiceException:
1✔
1872
            raise
1✔
1873
        except EnvironmentStartupTimeoutException as e:
1✔
1874
            raise LambdaServiceException(
1✔
1875
                f"[{context.request_id}] Timeout while starting up lambda environment for function {function_name}:{qualifier}"
1876
            ) from e
1877
        except Exception as e:
1✔
1878
            LOG.error(
1✔
1879
                "[%s] Error while invoking lambda %s",
1880
                context.request_id,
1881
                function_name,
1882
                exc_info=LOG.isEnabledFor(logging.DEBUG),
1883
            )
1884
            raise LambdaServiceException(
1✔
1885
                f"[{context.request_id}] Internal error while executing lambda {function_name}:{qualifier}. Caused by {type(e).__name__}: {e}"
1886
            ) from e
1887

1888
        if invocation_type == InvocationType.Event:
1✔
1889
            # This happens when invocation type is event
1890
            return InvocationResponse(StatusCode=202)
1✔
1891
        if invocation_type == InvocationType.DryRun:
1✔
1892
            # This happens when invocation type is dryrun
1893
            return InvocationResponse(StatusCode=204)
1✔
1894
        LOG.debug("Lambda invocation duration: %0.2fms", (time.perf_counter() - time_before) * 1000)
1✔
1895

1896
        response = InvocationResponse(
1✔
1897
            StatusCode=200,
1898
            Payload=invocation_result.payload,
1899
            ExecutedVersion=invocation_result.executed_version,
1900
        )
1901

1902
        if invocation_result.is_error:
1✔
1903
            response["FunctionError"] = "Unhandled"
1✔
1904

1905
        if log_type == LogType.Tail:
1✔
1906
            response["LogResult"] = to_str(
1✔
1907
                base64.b64encode(to_bytes(invocation_result.logs)[-4096:])
1908
            )
1909

1910
        return response
1✔
1911

1912
    # Version operations
1913
    def publish_version(
1✔
1914
        self,
1915
        context: RequestContext,
1916
        function_name: FunctionName,
1917
        code_sha256: String | None = None,
1918
        description: Description | None = None,
1919
        revision_id: String | None = None,
1920
        publish_to: FunctionVersionLatestPublished | None = None,
1921
        **kwargs,
1922
    ) -> FunctionConfiguration:
1923
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1924
        function_name = api_utils.get_function_name(function_name, context)
1✔
1925
        if publish_to:
1✔
1926
            self._validate_publish_to(publish_to)
×
1927
        new_version = self._publish_version_from_existing_version(
1✔
1928
            function_name=function_name,
1929
            description=description,
1930
            account_id=account_id,
1931
            region=region,
1932
            revision_id=revision_id,
1933
            code_sha256=code_sha256,
1934
            publish_to=publish_to,
1935
        )
1936
        return api_utils.map_config_out(new_version, return_qualified_arn=True)
1✔
1937

1938
    def list_versions_by_function(
1✔
1939
        self,
1940
        context: RequestContext,
1941
        function_name: NamespacedFunctionName,
1942
        marker: String = None,
1943
        max_items: MaxListItems = None,
1944
        **kwargs,
1945
    ) -> ListVersionsByFunctionResponse:
1946
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1947
        function_name = api_utils.get_function_name(function_name, context)
1✔
1948
        function = self._get_function(
1✔
1949
            function_name=function_name, region=region, account_id=account_id
1950
        )
1951
        versions = [
1✔
1952
            api_utils.map_to_list_response(
1953
                api_utils.map_config_out(version=version, return_qualified_arn=True)
1954
            )
1955
            for version in function.versions.values()
1956
        ]
1957
        items = PaginatedList(versions)
1✔
1958
        page, token = items.get_page(
1✔
1959
            lambda item: item,
1960
            marker,
1961
            max_items,
1962
        )
1963
        return ListVersionsByFunctionResponse(Versions=page, NextMarker=token)
1✔
1964

1965
    # Alias
1966

1967
    def _create_routing_config_model(
1✔
1968
        self, routing_config_dict: dict[str, float], function_version: FunctionVersion
1969
    ):
1970
        if len(routing_config_dict) > 1:
1✔
1971
            raise InvalidParameterValueException(
1✔
1972
                "Number of items in AdditionalVersionWeights cannot be greater than 1",
1973
                Type="User",
1974
            )
1975
        # should be exactly one item here, still iterating, might be supported in the future
1976
        for key, value in routing_config_dict.items():
1✔
1977
            if value < 0.0 or value >= 1.0:
1✔
1978
                raise ValidationException(
1✔
1979
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map value must satisfy constraint: [Member must have value less than or equal to 1.0, Member must have value greater than or equal to 0.0, Member must not be null]"
1980
                )
1981
            if key == function_version.id.qualifier:
1✔
1982
                raise InvalidParameterValueException(
1✔
1983
                    f"Invalid function version {function_version.id.qualifier}. Function version {function_version.id.qualifier} is already included in routing configuration.",
1984
                    Type="User",
1985
                )
1986
            # check if version target is latest, then no routing config is allowed
1987
            if function_version.id.qualifier == "$LATEST":
1✔
1988
                raise InvalidParameterValueException(
1✔
1989
                    "$LATEST is not supported for an alias pointing to more than 1 version"
1990
                )
1991
            if not api_utils.qualifier_is_version(key):
1✔
1992
                raise ValidationException(
1✔
1993
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map keys must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: [0-9]+]"
1994
                )
1995

1996
            # checking if the version in the config exists
1997
            get_function_version(
1✔
1998
                function_name=function_version.id.function_name,
1999
                qualifier=key,
2000
                region=function_version.id.region,
2001
                account_id=function_version.id.account,
2002
            )
2003
        return AliasRoutingConfig(version_weights=routing_config_dict)
1✔
2004

2005
    def create_alias(
1✔
2006
        self,
2007
        context: RequestContext,
2008
        function_name: FunctionName,
2009
        name: Alias,
2010
        function_version: VersionWithLatestPublished,
2011
        description: Description = None,
2012
        routing_config: AliasRoutingConfiguration = None,
2013
        **kwargs,
2014
    ) -> AliasConfiguration:
2015
        if not api_utils.qualifier_is_alias(name):
1✔
2016
            raise ValidationException(
1✔
2017
                f"1 validation error detected: Value '{name}' at 'name' failed to satisfy constraint: Member must satisfy regular expression pattern: (?!^[0-9]+$)([a-zA-Z0-9-_]+)"
2018
            )
2019

2020
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2021
        function_name = api_utils.get_function_name(function_name, context)
1✔
2022
        target_version = get_function_version(
1✔
2023
            function_name=function_name,
2024
            qualifier=function_version,
2025
            region=region,
2026
            account_id=account_id,
2027
        )
2028
        function = self._get_function(
1✔
2029
            function_name=function_name, region=region, account_id=account_id
2030
        )
2031
        # description is always present, if not specified it's an empty string
2032
        description = description or ""
1✔
2033
        with function.lock:
1✔
2034
            if existing_alias := function.aliases.get(name):
1✔
2035
                raise ResourceConflictException(
1✔
2036
                    f"Alias already exists: {api_utils.map_alias_out(alias=existing_alias, function=function)['AliasArn']}",
2037
                    Type="User",
2038
                )
2039
            # checking if the version exists
2040
            routing_configuration = None
1✔
2041
            if routing_config and (
1✔
2042
                routing_config_dict := routing_config.get("AdditionalVersionWeights")
2043
            ):
2044
                routing_configuration = self._create_routing_config_model(
1✔
2045
                    routing_config_dict, target_version
2046
                )
2047

2048
            alias = VersionAlias(
1✔
2049
                name=name,
2050
                function_version=function_version,
2051
                description=description,
2052
                routing_configuration=routing_configuration,
2053
            )
2054
            function.aliases[name] = alias
1✔
2055
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
2056

2057
    def list_aliases(
1✔
2058
        self,
2059
        context: RequestContext,
2060
        function_name: FunctionName,
2061
        function_version: VersionWithLatestPublished = None,
2062
        marker: String = None,
2063
        max_items: MaxListItems = None,
2064
        **kwargs,
2065
    ) -> ListAliasesResponse:
2066
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2067
        function_name = api_utils.get_function_name(function_name, context)
1✔
2068
        function = self._get_function(
1✔
2069
            function_name=function_name, region=region, account_id=account_id
2070
        )
2071
        aliases = [
1✔
2072
            api_utils.map_alias_out(alias, function)
2073
            for alias in function.aliases.values()
2074
            if function_version is None or alias.function_version == function_version
2075
        ]
2076

2077
        aliases = PaginatedList(aliases)
1✔
2078
        page, token = aliases.get_page(
1✔
2079
            lambda alias: alias["AliasArn"],
2080
            marker,
2081
            max_items,
2082
        )
2083

2084
        return ListAliasesResponse(Aliases=page, NextMarker=token)
1✔
2085

2086
    def delete_alias(
1✔
2087
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
2088
    ) -> None:
2089
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2090
        function_name = api_utils.get_function_name(function_name, context)
1✔
2091
        function = self._get_function(
1✔
2092
            function_name=function_name, region=region, account_id=account_id
2093
        )
2094
        version_alias = function.aliases.pop(name, None)
1✔
2095

2096
        # cleanup related resources
2097
        if name in function.provisioned_concurrency_configs:
1✔
2098
            function.provisioned_concurrency_configs.pop(name)
1✔
2099

2100
        # TODO: Allow for deactivating/unregistering specific Lambda URLs
2101
        if version_alias and name in function.function_url_configs:
1✔
2102
            url_config = function.function_url_configs.pop(name)
1✔
2103
            LOG.debug(
1✔
2104
                "Stopping aliased Lambda Function URL %s for %s",
2105
                url_config.url,
2106
                url_config.function_name,
2107
            )
2108

2109
    def get_alias(
1✔
2110
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
2111
    ) -> AliasConfiguration:
2112
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2113
        function_name = api_utils.get_function_name(function_name, context)
1✔
2114
        function = self._get_function(
1✔
2115
            function_name=function_name, region=region, account_id=account_id
2116
        )
2117
        if not (alias := function.aliases.get(name)):
1✔
2118
            raise ResourceNotFoundException(
1✔
2119
                f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name=function_name, qualifier=name, region=region, account=account_id)}",
2120
                Type="User",
2121
            )
2122
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
2123

2124
    def update_alias(
1✔
2125
        self,
2126
        context: RequestContext,
2127
        function_name: FunctionName,
2128
        name: Alias,
2129
        function_version: VersionWithLatestPublished = None,
2130
        description: Description = None,
2131
        routing_config: AliasRoutingConfiguration = None,
2132
        revision_id: String = None,
2133
        **kwargs,
2134
    ) -> AliasConfiguration:
2135
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2136
        function_name = api_utils.get_function_name(function_name, context)
1✔
2137
        function = self._get_function(
1✔
2138
            function_name=function_name, region=region, account_id=account_id
2139
        )
2140
        if not (alias := function.aliases.get(name)):
1✔
2141
            fn_arn = api_utils.qualified_lambda_arn(function_name, name, account_id, region)
1✔
2142
            raise ResourceNotFoundException(
1✔
2143
                f"Alias not found: {fn_arn}",
2144
                Type="User",
2145
            )
2146
        if revision_id and alias.revision_id != revision_id:
1✔
2147
            raise PreconditionFailedException(
1✔
2148
                "The Revision Id provided does not match the latest Revision Id. "
2149
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2150
                Type="User",
2151
            )
2152
        changes = {}
1✔
2153
        if function_version is not None:
1✔
2154
            changes |= {"function_version": function_version}
1✔
2155
        if description is not None:
1✔
2156
            changes |= {"description": description}
1✔
2157
        if routing_config is not None:
1✔
2158
            # if it is an empty dict or AdditionalVersionWeights is empty, set routing config to None
2159
            new_routing_config = None
1✔
2160
            if routing_config_dict := routing_config.get("AdditionalVersionWeights"):
1✔
2161
                new_routing_config = self._create_routing_config_model(routing_config_dict)
×
2162
            changes |= {"routing_configuration": new_routing_config}
1✔
2163
        # even if no changes are done, we have to update revision id for some reason
2164
        old_alias = alias
1✔
2165
        alias = dataclasses.replace(alias, **changes)
1✔
2166
        function.aliases[name] = alias
1✔
2167

2168
        # TODO: signal lambda service that pointer potentially changed
2169
        self.lambda_service.update_alias(old_alias=old_alias, new_alias=alias, function=function)
1✔
2170

2171
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
2172

2173
    # =======================================
2174
    # ======= EVENT SOURCE MAPPINGS =========
2175
    # =======================================
2176
    def check_service_resource_exists(
1✔
2177
        self, service: str, resource_arn: str, function_arn: str, function_role_arn: str
2178
    ):
2179
        """
2180
        Check if the service resource exists and if the function has access to it.
2181

2182
        Raises:
2183
            InvalidParameterValueException: If the service resource does not exist or the function does not have access to it.
2184
        """
2185
        arn = parse_arn(resource_arn)
1✔
2186
        source_client = get_internal_client(
1✔
2187
            arn=resource_arn,
2188
            role_arn=function_role_arn,
2189
            service_principal=ServicePrincipal.lambda_,
2190
            source_arn=function_arn,
2191
        )
2192
        if service in ["sqs", "sqs-fifo"]:
1✔
2193
            try:
1✔
2194
                # AWS uses `GetQueueAttributes` internally to verify the queue existence, but we need the `QueueUrl`
2195
                # which is not given directly. We build out a dummy `QueueUrl` which can be parsed by SQS to return
2196
                # the right value
2197
                queue_name = arn["resource"].split("/")[-1]
1✔
2198
                queue_url = f"http://sqs.{arn['region']}.domain/{arn['account']}/{queue_name}"
1✔
2199
                source_client.get_queue_attributes(QueueUrl=queue_url)
1✔
2200
            except ClientError as e:
1✔
2201
                error_code = e.response["Error"]["Code"]
1✔
2202
                if error_code == "AWS.SimpleQueueService.NonExistentQueue":
1✔
2203
                    raise InvalidParameterValueException(
1✔
2204
                        f"Error occurred while ReceiveMessage. SQS Error Code: {error_code}. SQS Error Message: {e.response['Error']['Message']}",
2205
                        Type="User",
2206
                    )
2207
                raise e
×
2208
        elif service in ["kinesis"]:
1✔
2209
            try:
1✔
2210
                source_client.describe_stream(StreamARN=resource_arn)
1✔
2211
            except ClientError as e:
1✔
2212
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
2213
                    raise InvalidParameterValueException(
1✔
2214
                        f"Stream not found: {resource_arn}",
2215
                        Type="User",
2216
                    )
2217
                raise e
×
2218
        elif service in ["dynamodb"]:
1✔
2219
            try:
1✔
2220
                source_client.describe_stream(StreamArn=resource_arn)
1✔
2221
            except ClientError as e:
1✔
2222
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
2223
                    raise InvalidParameterValueException(
1✔
2224
                        f"Stream not found: {resource_arn}",
2225
                        Type="User",
2226
                    )
2227
                raise e
×
2228

2229
    @handler("CreateEventSourceMapping", expand=False)
1✔
2230
    def create_event_source_mapping(
1✔
2231
        self,
2232
        context: RequestContext,
2233
        request: CreateEventSourceMappingRequest,
2234
    ) -> EventSourceMappingConfiguration:
2235
        return self.create_event_source_mapping_v2(context, request)
1✔
2236

2237
    def create_event_source_mapping_v2(
1✔
2238
        self,
2239
        context: RequestContext,
2240
        request: CreateEventSourceMappingRequest,
2241
    ) -> EventSourceMappingConfiguration:
2242
        # Validations
2243
        function_arn, function_name, state, function_version, function_role = (
1✔
2244
            self.validate_event_source_mapping(context, request)
2245
        )
2246

2247
        esm_config = EsmConfigFactory(request, context, function_arn).get_esm_config()
1✔
2248

2249
        # Copy esm_config to avoid a race condition with potential async update in the store
2250
        state.event_source_mappings[esm_config["UUID"]] = esm_config.copy()
1✔
2251
        enabled = request.get("Enabled", True)
1✔
2252
        # TODO: check for potential async race condition update -> think about locking
2253
        esm_worker = EsmWorkerFactory(esm_config, function_role, enabled).get_esm_worker()
1✔
2254
        self.esm_workers[esm_worker.uuid] = esm_worker
1✔
2255
        # TODO: check StateTransitionReason, LastModified, LastProcessingResult (concurrent updates requires locking!)
2256
        if tags := request.get("Tags"):
1✔
2257
            self._store_tags(esm_config.get("EventSourceMappingArn"), tags)
1✔
2258
        esm_worker.create()
1✔
2259
        return esm_config
1✔
2260

2261
    def validate_event_source_mapping(self, context, request):
1✔
2262
        # TODO: test whether stream ARNs are valid sources for Pipes or ESM or whether only DynamoDB table ARNs work
2263
        # TODO: Validate MaxRecordAgeInSeconds (i.e cannot subceed 60s but can be -1) and MaxRetryAttempts parameters.
2264
        # See https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumrecordageinseconds
2265
        is_create_esm_request = context.operation.name == self.create_event_source_mapping.operation
1✔
2266

2267
        if destination_config := request.get("DestinationConfig"):
1✔
2268
            if "OnSuccess" in destination_config:
1✔
2269
                raise InvalidParameterValueException(
1✔
2270
                    "Unsupported DestinationConfig parameter for given event source mapping type.",
2271
                    Type="User",
2272
                )
2273

2274
        service = None
1✔
2275
        if "SelfManagedEventSource" in request:
1✔
2276
            service = "kafka"
×
2277
            if "SourceAccessConfigurations" not in request:
×
2278
                raise InvalidParameterValueException(
×
2279
                    "Required 'sourceAccessConfigurations' parameter is missing.", Type="User"
2280
                )
2281
        if service is None and "EventSourceArn" not in request:
1✔
2282
            raise InvalidParameterValueException("Unrecognized event source.", Type="User")
1✔
2283
        if service is None:
1✔
2284
            service = extract_service_from_arn(request["EventSourceArn"])
1✔
2285

2286
        batch_size = api_utils.validate_and_set_batch_size(service, request.get("BatchSize"))
1✔
2287
        if service in ["dynamodb", "kinesis"]:
1✔
2288
            starting_position = request.get("StartingPosition")
1✔
2289
            if not starting_position:
1✔
2290
                raise InvalidParameterValueException(
1✔
2291
                    "1 validation error detected: Value null at 'startingPosition' failed to satisfy constraint: Member must not be null.",
2292
                    Type="User",
2293
                )
2294

2295
            if starting_position not in KinesisStreamStartPosition.__members__:
1✔
2296
                raise ValidationException(
1✔
2297
                    f"1 validation error detected: Value '{starting_position}' at 'startingPosition' failed to satisfy constraint: Member must satisfy enum value set: [LATEST, AT_TIMESTAMP, TRIM_HORIZON]"
2298
                )
2299
            # AT_TIMESTAMP is not allowed for DynamoDB Streams
2300
            elif (
1✔
2301
                service == "dynamodb"
2302
                and starting_position not in DynamoDBStreamStartPosition.__members__
2303
            ):
2304
                raise InvalidParameterValueException(
1✔
2305
                    f"Unsupported starting position for arn type: {request['EventSourceArn']}",
2306
                    Type="User",
2307
                )
2308

2309
        if service in ["sqs", "sqs-fifo"]:
1✔
2310
            if batch_size > 10 and request.get("MaximumBatchingWindowInSeconds", 0) == 0:
1✔
2311
                raise InvalidParameterValueException(
1✔
2312
                    "Maximum batch window in seconds must be greater than 0 if maximum batch size is greater than 10",
2313
                    Type="User",
2314
                )
2315

2316
        if (filter_criteria := request.get("FilterCriteria")) is not None:
1✔
2317
            for filter_ in filter_criteria.get("Filters", []):
1✔
2318
                pattern_str = filter_.get("Pattern")
1✔
2319
                if not pattern_str or not isinstance(pattern_str, str):
1✔
2320
                    raise InvalidParameterValueException(
×
2321
                        "Invalid filter pattern definition.", Type="User"
2322
                    )
2323

2324
                if not validate_event_pattern(pattern_str):
1✔
2325
                    raise InvalidParameterValueException(
1✔
2326
                        "Invalid filter pattern definition.", Type="User"
2327
                    )
2328

2329
        # Can either have a FunctionName (i.e CreateEventSourceMapping request) or
2330
        # an internal EventSourceMappingConfiguration representation
2331
        request_function_name = request.get("FunctionName") or request.get("FunctionArn")
1✔
2332
        # can be either a partial arn or a full arn for the version/alias
2333
        function_name, qualifier, account, region = function_locators_from_arn(
1✔
2334
            request_function_name
2335
        )
2336
        # TODO: validate `context.region` vs. `region(request["FunctionName"])` vs. `region(request["EventSourceArn"])`
2337
        account = account or context.account_id
1✔
2338
        region = region or context.region
1✔
2339
        state = lambda_stores[account][region]
1✔
2340
        fn = state.functions.get(function_name)
1✔
2341
        if not fn:
1✔
2342
            raise InvalidParameterValueException("Function does not exist", Type="User")
1✔
2343

2344
        if qualifier:
1✔
2345
            # make sure the function version/alias exists
2346
            if api_utils.qualifier_is_alias(qualifier):
1✔
2347
                fn_alias = fn.aliases.get(qualifier)
1✔
2348
                if not fn_alias:
1✔
2349
                    raise Exception("unknown alias")  # TODO: cover via test
×
2350
            elif api_utils.qualifier_is_version(qualifier):
1✔
2351
                fn_version = fn.versions.get(qualifier)
1✔
2352
                if not fn_version:
1✔
2353
                    raise Exception("unknown version")  # TODO: cover via test
×
2354
            elif qualifier == "$LATEST":
1✔
2355
                pass
1✔
2356
            elif qualifier == "$LATEST.PUBLISHED":
×
2357
                if fn.versions.get(qualifier):
×
2358
                    pass
×
2359
            else:
2360
                raise Exception("invalid functionname")  # TODO: cover via test
×
2361
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account, region)
1✔
2362

2363
        else:
2364
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account, region)
1✔
2365

2366
        function_version = get_function_version_from_arn(fn_arn)
1✔
2367
        function_role = function_version.config.role
1✔
2368

2369
        if source_arn := request.get("EventSourceArn"):
1✔
2370
            self.check_service_resource_exists(service, source_arn, fn_arn, function_role)
1✔
2371
        # Check we are validating a CreateEventSourceMapping request
2372
        if is_create_esm_request:
1✔
2373

2374
            def _get_mapping_sources(mapping: dict[str, Any]) -> list[str]:
1✔
2375
                if event_source_arn := mapping.get("EventSourceArn"):
1✔
2376
                    return [event_source_arn]
1✔
2377
                return (
×
2378
                    mapping.get("SelfManagedEventSource", {})
2379
                    .get("Endpoints", {})
2380
                    .get("KAFKA_BOOTSTRAP_SERVERS", [])
2381
                )
2382

2383
            # check for event source duplicates
2384
            # TODO: currently validated for sqs, kinesis, and dynamodb
2385
            service_id = load_service(service).service_id
1✔
2386
            for uuid, mapping in state.event_source_mappings.items():
1✔
2387
                mapping_sources = _get_mapping_sources(mapping)
1✔
2388
                request_sources = _get_mapping_sources(request)
1✔
2389
                if mapping["FunctionArn"] == fn_arn and (
1✔
2390
                    set(mapping_sources).intersection(request_sources)
2391
                ):
2392
                    if service == "sqs":
1✔
2393
                        # *shakes fist at SQS*
2394
                        raise ResourceConflictException(
1✔
2395
                            f'An event source mapping with {service_id} arn (" {mapping["EventSourceArn"]} ") '
2396
                            f'and function (" {function_name} ") already exists. Please update or delete the '
2397
                            f"existing mapping with UUID {uuid}",
2398
                            Type="User",
2399
                        )
2400
                    elif service == "kafka":
1✔
2401
                        if set(mapping["Topics"]).intersection(request["Topics"]):
×
2402
                            raise ResourceConflictException(
×
2403
                                f'An event source mapping with event source ("{",".join(request_sources)}"), '
2404
                                f'function ("{fn_arn}"), '
2405
                                f'topics ("{",".join(request["Topics"])}") already exists. Please update or delete the '
2406
                                f"existing mapping with UUID {uuid}",
2407
                                Type="User",
2408
                            )
2409
                    else:
2410
                        raise ResourceConflictException(
1✔
2411
                            f'The event source arn (" {mapping["EventSourceArn"]} ") and function '
2412
                            f'(" {function_name} ") provided mapping already exists. Please update or delete the '
2413
                            f"existing mapping with UUID {uuid}",
2414
                            Type="User",
2415
                        )
2416
        return fn_arn, function_name, state, function_version, function_role
1✔
2417

2418
    @handler("UpdateEventSourceMapping", expand=False)
1✔
2419
    def update_event_source_mapping(
1✔
2420
        self,
2421
        context: RequestContext,
2422
        request: UpdateEventSourceMappingRequest,
2423
    ) -> EventSourceMappingConfiguration:
2424
        return self.update_event_source_mapping_v2(context, request)
1✔
2425

2426
    def update_event_source_mapping_v2(
1✔
2427
        self,
2428
        context: RequestContext,
2429
        request: UpdateEventSourceMappingRequest,
2430
    ) -> EventSourceMappingConfiguration:
2431
        # TODO: test and implement this properly (quite complex with many validations and limitations!)
2432
        LOG.warning(
1✔
2433
            "Updating Lambda Event Source Mapping is in experimental state and not yet fully tested."
2434
        )
2435
        state = lambda_stores[context.account_id][context.region]
1✔
2436
        request_data = {**request}
1✔
2437
        uuid = request_data.pop("UUID", None)
1✔
2438
        if not uuid:
1✔
2439
            raise ResourceNotFoundException(
×
2440
                "The resource you requested does not exist.", Type="User"
2441
            )
2442
        old_event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2443
        esm_worker = self.esm_workers.get(uuid)
1✔
2444
        if old_event_source_mapping is None or esm_worker is None:
1✔
2445
            raise ResourceNotFoundException(
1✔
2446
                "The resource you requested does not exist.", Type="User"
2447
            )  # TODO: test?
2448

2449
        # normalize values to overwrite
2450
        event_source_mapping = old_event_source_mapping | request_data
1✔
2451

2452
        temp_params = {}  # values only set for the returned response, not saved internally (e.g. transient state)
1✔
2453

2454
        # Validate the newly updated ESM object. We ignore the output here since we only care whether an Exception is raised.
2455
        function_arn, _, _, function_version, function_role = self.validate_event_source_mapping(
1✔
2456
            context, event_source_mapping
2457
        )
2458

2459
        # remove the FunctionName field
2460
        event_source_mapping.pop("FunctionName", None)
1✔
2461

2462
        if function_arn:
1✔
2463
            event_source_mapping["FunctionArn"] = function_arn
1✔
2464

2465
        # Only apply update if the desired state differs
2466
        enabled = request.get("Enabled")
1✔
2467
        if enabled is not None:
1✔
2468
            if enabled and old_event_source_mapping["State"] != EsmState.ENABLED:
1✔
2469
                event_source_mapping["State"] = EsmState.ENABLING
1✔
2470
            # TODO: What happens when trying to update during an update or failed state?!
2471
            elif not enabled and old_event_source_mapping["State"] == EsmState.ENABLED:
1✔
2472
                event_source_mapping["State"] = EsmState.DISABLING
1✔
2473
        else:
2474
            event_source_mapping["State"] = EsmState.UPDATING
1✔
2475

2476
        # To ensure parity, certain responses need to be immediately returned
2477
        temp_params["State"] = event_source_mapping["State"]
1✔
2478

2479
        state.event_source_mappings[uuid] = event_source_mapping
1✔
2480

2481
        # TODO: Currently, we re-create the entire ESM worker. Look into approach with better performance.
2482
        worker_factory = EsmWorkerFactory(
1✔
2483
            event_source_mapping, function_role, request.get("Enabled", esm_worker.enabled)
2484
        )
2485

2486
        # Get a new ESM worker object but do not active it, since the factory holds all logic for creating new worker from configuration.
2487
        updated_esm_worker = worker_factory.get_esm_worker()
1✔
2488
        self.esm_workers[uuid] = updated_esm_worker
1✔
2489

2490
        # We should stop() the worker since the delete() will remove the ESM from the state mapping.
2491
        esm_worker.stop()
1✔
2492
        # This will either create an EsmWorker in the CREATING state if enabled. Otherwise, the DISABLING state is set.
2493
        updated_esm_worker.create()
1✔
2494

2495
        return {**event_source_mapping, **temp_params}
1✔
2496

2497
    def delete_event_source_mapping(
1✔
2498
        self, context: RequestContext, uuid: String, **kwargs
2499
    ) -> EventSourceMappingConfiguration:
2500
        state = lambda_stores[context.account_id][context.region]
1✔
2501
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2502
        if not event_source_mapping:
1✔
2503
            raise ResourceNotFoundException(
1✔
2504
                "The resource you requested does not exist.", Type="User"
2505
            )
2506
        esm = state.event_source_mappings[uuid]
1✔
2507
        # TODO: add proper locking
2508
        esm_worker = self.esm_workers.pop(uuid, None)
1✔
2509
        # Asynchronous delete in v2
2510
        if not esm_worker:
1✔
2511
            raise ResourceNotFoundException(
×
2512
                "The resource you requested does not exist.", Type="User"
2513
            )
2514
        # the full deletion of the ESM is happening asynchronously, but we delete the Tags instantly
2515
        # this behavior is similar to ``get_event_source_mapping`` which will raise right after deletion, but it is not
2516
        # always the case in AWS. Add more testing and align behavior with ``get_event_source_mapping``.
2517
        self._remove_all_tags(event_source_mapping["EventSourceMappingArn"])
1✔
2518
        esm_worker.delete()
1✔
2519
        return {**esm, "State": EsmState.DELETING}
1✔
2520

2521
    def get_event_source_mapping(
1✔
2522
        self, context: RequestContext, uuid: String, **kwargs
2523
    ) -> EventSourceMappingConfiguration:
2524
        state = lambda_stores[context.account_id][context.region]
1✔
2525
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2526
        if not event_source_mapping:
1✔
2527
            raise ResourceNotFoundException(
1✔
2528
                "The resource you requested does not exist.", Type="User"
2529
            )
2530
        esm_worker = self.esm_workers.get(uuid)
1✔
2531
        if not esm_worker:
1✔
2532
            raise ResourceNotFoundException(
×
2533
                "The resource you requested does not exist.", Type="User"
2534
            )
2535
        event_source_mapping["State"] = esm_worker.current_state
1✔
2536
        event_source_mapping["StateTransitionReason"] = esm_worker.state_transition_reason
1✔
2537
        return event_source_mapping
1✔
2538

2539
    def list_event_source_mappings(
1✔
2540
        self,
2541
        context: RequestContext,
2542
        event_source_arn: Arn = None,
2543
        function_name: FunctionName = None,
2544
        marker: String = None,
2545
        max_items: MaxListItems = None,
2546
        **kwargs,
2547
    ) -> ListEventSourceMappingsResponse:
2548
        state = lambda_stores[context.account_id][context.region]
1✔
2549

2550
        esms = state.event_source_mappings.values()
1✔
2551
        # TODO: update and test State and StateTransitionReason for ESM v2
2552

2553
        if event_source_arn:  # TODO: validate pattern
1✔
2554
            esms = [e for e in esms if e.get("EventSourceArn") == event_source_arn]
1✔
2555

2556
        if function_name:
1✔
2557
            esms = [e for e in esms if function_name in e["FunctionArn"]]
1✔
2558

2559
        esms = PaginatedList(esms)
1✔
2560
        page, token = esms.get_page(
1✔
2561
            lambda x: x["UUID"],
2562
            marker,
2563
            max_items,
2564
        )
2565
        return ListEventSourceMappingsResponse(EventSourceMappings=page, NextMarker=token)
1✔
2566

2567
    def get_source_type_from_request(self, request: dict[str, Any]) -> str:
1✔
2568
        if event_source_arn := request.get("EventSourceArn", ""):
×
2569
            service = extract_service_from_arn(event_source_arn)
×
2570
            if service == "sqs" and "fifo" in event_source_arn:
×
2571
                service = "sqs-fifo"
×
2572
            return service
×
2573
        elif request.get("SelfManagedEventSource"):
×
2574
            return "kafka"
×
2575

2576
    # =======================================
2577
    # ============ FUNCTION URLS ============
2578
    # =======================================
2579

2580
    @staticmethod
1✔
2581
    def _validate_qualifier(qualifier: str) -> None:
1✔
2582
        if qualifier in ["$LATEST", "$LATEST.PUBLISHED"] or (
1✔
2583
            qualifier and api_utils.qualifier_is_version(qualifier)
2584
        ):
2585
            raise ValidationException(
1✔
2586
                f"1 validation error detected: Value '{qualifier}' at 'qualifier' failed to satisfy constraint: Member must satisfy regular expression pattern: ((?!^\\d+$)^[0-9a-zA-Z-_]+$)"
2587
            )
2588

2589
    @staticmethod
1✔
2590
    def _validate_invoke_mode(invoke_mode: str) -> None:
1✔
2591
        if invoke_mode and invoke_mode not in [InvokeMode.BUFFERED, InvokeMode.RESPONSE_STREAM]:
1✔
2592
            raise ValidationException(
1✔
2593
                f"1 validation error detected: Value '{invoke_mode}' at 'invokeMode' failed to satisfy constraint: Member must satisfy enum value set: [RESPONSE_STREAM, BUFFERED]"
2594
            )
2595
        if invoke_mode == InvokeMode.RESPONSE_STREAM:
1✔
2596
            # TODO should we actually fail for setting RESPONSE_STREAM?
2597
            #  It should trigger InvokeWithResponseStream which is not implemented
2598
            LOG.warning(
1✔
2599
                "The invokeMode 'RESPONSE_STREAM' is not yet supported on LocalStack. The property is only mocked, the execution will still be 'BUFFERED'"
2600
            )
2601

2602
    # TODO: what happens if function state is not active?
2603
    def create_function_url_config(
1✔
2604
        self,
2605
        context: RequestContext,
2606
        function_name: FunctionName,
2607
        auth_type: FunctionUrlAuthType,
2608
        qualifier: FunctionUrlQualifier = None,
2609
        cors: Cors = None,
2610
        invoke_mode: InvokeMode = None,
2611
        **kwargs,
2612
    ) -> CreateFunctionUrlConfigResponse:
2613
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2614
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2615
            function_name, qualifier, context
2616
        )
2617
        state = lambda_stores[account_id][region]
1✔
2618
        self._validate_qualifier(qualifier)
1✔
2619
        self._validate_invoke_mode(invoke_mode)
1✔
2620

2621
        fn = state.functions.get(function_name)
1✔
2622
        if fn is None:
1✔
2623
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2624

2625
        url_config = fn.function_url_configs.get(qualifier or "$LATEST")
1✔
2626
        if url_config:
1✔
2627
            raise ResourceConflictException(
1✔
2628
                f"Failed to create function url config for [functionArn = {url_config.function_arn}]. Error message:  FunctionUrlConfig exists for this Lambda function",
2629
                Type="User",
2630
            )
2631

2632
        if qualifier and qualifier != "$LATEST" and qualifier not in fn.aliases:
1✔
2633
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2634

2635
        normalized_qualifier = qualifier or "$LATEST"
1✔
2636

2637
        function_arn = (
1✔
2638
            api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
2639
            if qualifier
2640
            else api_utils.unqualified_lambda_arn(function_name, account_id, region)
2641
        )
2642

2643
        custom_id: str | None = None
1✔
2644

2645
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
2646
        if TAG_KEY_CUSTOM_URL in tags:
1✔
2647
            # Note: I really wanted to add verification here that the
2648
            # url_id is unique, so we could surface that to the user ASAP.
2649
            # However, it seems like that information isn't available yet,
2650
            # since (as far as I can tell) we call
2651
            # self.router.register_routes() once, in a single shot, for all
2652
            # of the routes -- and we need to verify that it's unique not
2653
            # just for this particular lambda function, but for the entire
2654
            # lambda provider. Therefore... that idea proved non-trivial!
2655
            custom_id_tag_value = (
1✔
2656
                f"{tags[TAG_KEY_CUSTOM_URL]}-{qualifier}" if qualifier else tags[TAG_KEY_CUSTOM_URL]
2657
            )
2658
            if TAG_KEY_CUSTOM_URL_VALIDATOR.match(custom_id_tag_value):
1✔
2659
                custom_id = custom_id_tag_value
1✔
2660

2661
            else:
2662
                # Note: we're logging here instead of raising to prioritize
2663
                # strict parity with AWS over the localstack-only custom_id
2664
                LOG.warning(
1✔
2665
                    "Invalid custom ID tag value for lambda URL (%s=%s). "
2666
                    "Replaced with default (random id)",
2667
                    TAG_KEY_CUSTOM_URL,
2668
                    custom_id_tag_value,
2669
                )
2670

2671
        # The url_id is the subdomain used for the URL we're creating. This
2672
        # is either created randomly (as in AWS), or can be passed as a tag
2673
        # to the lambda itself (localstack-only).
2674
        url_id: str
2675
        if custom_id is None:
1✔
2676
            url_id = api_utils.generate_random_url_id()
1✔
2677
        else:
2678
            url_id = custom_id
1✔
2679

2680
        host_definition = localstack_host(custom_port=config.GATEWAY_LISTEN[0].port)
1✔
2681
        fn.function_url_configs[normalized_qualifier] = FunctionUrlConfig(
1✔
2682
            function_arn=function_arn,
2683
            function_name=function_name,
2684
            cors=cors,
2685
            url_id=url_id,
2686
            url=f"http://{url_id}.lambda-url.{context.region}.{host_definition.host_and_port()}/",  # TODO: https support
2687
            auth_type=auth_type,
2688
            creation_time=api_utils.generate_lambda_date(),
2689
            last_modified_time=api_utils.generate_lambda_date(),
2690
            invoke_mode=invoke_mode,
2691
        )
2692

2693
        # persist and start URL
2694
        # TODO: implement URL invoke
2695
        api_url_config = api_utils.map_function_url_config(
1✔
2696
            fn.function_url_configs[normalized_qualifier]
2697
        )
2698

2699
        return CreateFunctionUrlConfigResponse(
1✔
2700
            FunctionUrl=api_url_config["FunctionUrl"],
2701
            FunctionArn=api_url_config["FunctionArn"],
2702
            AuthType=api_url_config["AuthType"],
2703
            Cors=api_url_config["Cors"],
2704
            CreationTime=api_url_config["CreationTime"],
2705
            InvokeMode=api_url_config["InvokeMode"],
2706
        )
2707

2708
    def get_function_url_config(
1✔
2709
        self,
2710
        context: RequestContext,
2711
        function_name: FunctionName,
2712
        qualifier: FunctionUrlQualifier = None,
2713
        **kwargs,
2714
    ) -> GetFunctionUrlConfigResponse:
2715
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2716
        state = lambda_stores[account_id][region]
1✔
2717

2718
        fn_name, qualifier = api_utils.get_name_and_qualifier(function_name, qualifier, context)
1✔
2719

2720
        self._validate_qualifier(qualifier)
1✔
2721

2722
        resolved_fn = state.functions.get(fn_name)
1✔
2723
        if not resolved_fn:
1✔
2724
            raise ResourceNotFoundException(
1✔
2725
                "The resource you requested does not exist.", Type="User"
2726
            )
2727

2728
        qualifier = qualifier or "$LATEST"
1✔
2729
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2730
        if not url_config:
1✔
2731
            raise ResourceNotFoundException(
1✔
2732
                "The resource you requested does not exist.", Type="User"
2733
            )
2734

2735
        return api_utils.map_function_url_config(url_config)
1✔
2736

2737
    def update_function_url_config(
1✔
2738
        self,
2739
        context: RequestContext,
2740
        function_name: FunctionName,
2741
        qualifier: FunctionUrlQualifier = None,
2742
        auth_type: FunctionUrlAuthType = None,
2743
        cors: Cors = None,
2744
        invoke_mode: InvokeMode = None,
2745
        **kwargs,
2746
    ) -> UpdateFunctionUrlConfigResponse:
2747
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2748
        state = lambda_stores[account_id][region]
1✔
2749

2750
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2751
            function_name, qualifier, context
2752
        )
2753
        self._validate_qualifier(qualifier)
1✔
2754
        self._validate_invoke_mode(invoke_mode)
1✔
2755

2756
        fn = state.functions.get(function_name)
1✔
2757
        if not fn:
1✔
2758
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2759

2760
        normalized_qualifier = qualifier or "$LATEST"
1✔
2761

2762
        if (
1✔
2763
            api_utils.qualifier_is_alias(normalized_qualifier)
2764
            and normalized_qualifier not in fn.aliases
2765
        ):
2766
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2767

2768
        url_config = fn.function_url_configs.get(normalized_qualifier)
1✔
2769
        if not url_config:
1✔
2770
            raise ResourceNotFoundException(
1✔
2771
                "The resource you requested does not exist.", Type="User"
2772
            )
2773

2774
        changes = {
1✔
2775
            "last_modified_time": api_utils.generate_lambda_date(),
2776
            **({"cors": cors} if cors is not None else {}),
2777
            **({"auth_type": auth_type} if auth_type is not None else {}),
2778
        }
2779

2780
        if invoke_mode:
1✔
2781
            changes["invoke_mode"] = invoke_mode
1✔
2782

2783
        new_url_config = dataclasses.replace(url_config, **changes)
1✔
2784
        fn.function_url_configs[normalized_qualifier] = new_url_config
1✔
2785

2786
        return UpdateFunctionUrlConfigResponse(
1✔
2787
            FunctionUrl=new_url_config.url,
2788
            FunctionArn=new_url_config.function_arn,
2789
            AuthType=new_url_config.auth_type,
2790
            Cors=new_url_config.cors,
2791
            CreationTime=new_url_config.creation_time,
2792
            LastModifiedTime=new_url_config.last_modified_time,
2793
            InvokeMode=new_url_config.invoke_mode,
2794
        )
2795

2796
    def delete_function_url_config(
1✔
2797
        self,
2798
        context: RequestContext,
2799
        function_name: FunctionName,
2800
        qualifier: FunctionUrlQualifier = None,
2801
        **kwargs,
2802
    ) -> None:
2803
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2804
        state = lambda_stores[account_id][region]
1✔
2805

2806
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2807
            function_name, qualifier, context
2808
        )
2809
        self._validate_qualifier(qualifier)
1✔
2810

2811
        resolved_fn = state.functions.get(function_name)
1✔
2812
        if not resolved_fn:
1✔
2813
            raise ResourceNotFoundException(
1✔
2814
                "The resource you requested does not exist.", Type="User"
2815
            )
2816

2817
        qualifier = qualifier or "$LATEST"
1✔
2818
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2819
        if not url_config:
1✔
2820
            raise ResourceNotFoundException(
1✔
2821
                "The resource you requested does not exist.", Type="User"
2822
            )
2823

2824
        del resolved_fn.function_url_configs[qualifier]
1✔
2825

2826
    def list_function_url_configs(
1✔
2827
        self,
2828
        context: RequestContext,
2829
        function_name: FunctionName,
2830
        marker: String = None,
2831
        max_items: MaxItems = None,
2832
        **kwargs,
2833
    ) -> ListFunctionUrlConfigsResponse:
2834
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2835
        state = lambda_stores[account_id][region]
1✔
2836

2837
        fn_name = api_utils.get_function_name(function_name, context)
1✔
2838
        resolved_fn = state.functions.get(fn_name)
1✔
2839
        if not resolved_fn:
1✔
2840
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2841

2842
        url_configs = [
1✔
2843
            api_utils.map_function_url_config(fn_conf)
2844
            for fn_conf in resolved_fn.function_url_configs.values()
2845
        ]
2846
        url_configs = PaginatedList(url_configs)
1✔
2847
        page, token = url_configs.get_page(
1✔
2848
            lambda url_config: url_config["FunctionArn"],
2849
            marker,
2850
            max_items,
2851
        )
2852
        url_configs = page
1✔
2853
        return ListFunctionUrlConfigsResponse(FunctionUrlConfigs=url_configs, NextMarker=token)
1✔
2854

2855
    # =======================================
2856
    # ============  Permissions  ============
2857
    # =======================================
2858

2859
    @handler("AddPermission", expand=False)
1✔
2860
    def add_permission(
1✔
2861
        self,
2862
        context: RequestContext,
2863
        request: AddPermissionRequest,
2864
    ) -> AddPermissionResponse:
2865
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2866
            request.get("FunctionName"), request.get("Qualifier"), context
2867
        )
2868

2869
        # validate qualifier
2870
        if qualifier is not None:
1✔
2871
            self._validate_qualifier_expression(qualifier)
1✔
2872
            if qualifier == "$LATEST":
1✔
2873
                raise InvalidParameterValueException(
1✔
2874
                    "We currently do not support adding policies for $LATEST.", Type="User"
2875
                )
2876
        account_id, region = api_utils.get_account_and_region(request.get("FunctionName"), context)
1✔
2877

2878
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2879
        resolved_qualifier, fn_arn = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2880

2881
        revision_id = request.get("RevisionId")
1✔
2882
        if revision_id:
1✔
2883
            fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2884
            if revision_id != fn_revision_id:
1✔
2885
                raise PreconditionFailedException(
1✔
2886
                    "The Revision Id provided does not match the latest Revision Id. "
2887
                    "Call the GetPolicy API to retrieve the latest Revision Id",
2888
                    Type="User",
2889
                )
2890

2891
        request_sid = request["StatementId"]
1✔
2892
        if not bool(STATEMENT_ID_REGEX.match(request_sid)):
1✔
2893
            raise ValidationException(
1✔
2894
                f"1 validation error detected: Value '{request_sid}' at 'statementId' failed to satisfy constraint: Member must satisfy regular expression pattern: ([a-zA-Z0-9-_]+)"
2895
            )
2896
        # check for an already existing policy and any conflicts in existing statements
2897
        existing_policy = resolved_fn.permissions.get(resolved_qualifier)
1✔
2898
        if existing_policy:
1✔
2899
            if request_sid in [s["Sid"] for s in existing_policy.policy.Statement]:
1✔
2900
                # uniqueness scope: statement id needs to be unique per qualified function ($LATEST, version, or alias)
2901
                # Counterexample: the same sid can exist within $LATEST, version, and alias
2902
                raise ResourceConflictException(
1✔
2903
                    f"The statement id ({request_sid}) provided already exists. Please provide a new statement id, or remove the existing statement.",
2904
                    Type="User",
2905
                )
2906

2907
        permission_statement = api_utils.build_statement(
1✔
2908
            partition=context.partition,
2909
            resource_arn=fn_arn,
2910
            statement_id=request["StatementId"],
2911
            action=request["Action"],
2912
            principal=request["Principal"],
2913
            source_arn=request.get("SourceArn"),
2914
            source_account=request.get("SourceAccount"),
2915
            principal_org_id=request.get("PrincipalOrgID"),
2916
            event_source_token=request.get("EventSourceToken"),
2917
            auth_type=request.get("FunctionUrlAuthType"),
2918
        )
2919
        new_policy = existing_policy
1✔
2920
        if not existing_policy:
1✔
2921
            new_policy = FunctionResourcePolicy(
1✔
2922
                policy=ResourcePolicy(Version="2012-10-17", Id="default", Statement=[])
2923
            )
2924
        new_policy.policy.Statement.append(permission_statement)
1✔
2925
        if not existing_policy:
1✔
2926
            resolved_fn.permissions[resolved_qualifier] = new_policy
1✔
2927

2928
        # Update revision id of alias or version
2929
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2930
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2931
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2932
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2933
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
1✔
2934
        # Assumes that a non-alias is a version
2935
        else:
2936
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2937
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2938
                resolved_version, config=dataclasses.replace(resolved_version.config)
2939
            )
2940
        return AddPermissionResponse(Statement=json.dumps(permission_statement))
1✔
2941

2942
    def remove_permission(
1✔
2943
        self,
2944
        context: RequestContext,
2945
        function_name: NamespacedFunctionName,
2946
        statement_id: NamespacedStatementId,
2947
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
2948
        revision_id: String | None = None,
2949
        **kwargs,
2950
    ) -> None:
2951
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2952
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2953
            function_name, qualifier, context
2954
        )
2955
        if qualifier is not None:
1✔
2956
            self._validate_qualifier_expression(qualifier)
1✔
2957

2958
        state = lambda_stores[account_id][region]
1✔
2959
        resolved_fn = state.functions.get(function_name)
1✔
2960
        if resolved_fn is None:
1✔
2961
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2962
            raise ResourceNotFoundException(f"No policy found for: {fn_arn}", Type="User")
1✔
2963

2964
        resolved_qualifier, _ = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2965
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2966
        if not function_permission:
1✔
2967
            raise ResourceNotFoundException(
1✔
2968
                "No policy is associated with the given resource.", Type="User"
2969
            )
2970

2971
        # try to find statement in policy and delete it
2972
        statement = None
1✔
2973
        for s in function_permission.policy.Statement:
1✔
2974
            if s["Sid"] == statement_id:
1✔
2975
                statement = s
1✔
2976
                break
1✔
2977

2978
        if not statement:
1✔
2979
            raise ResourceNotFoundException(
1✔
2980
                f"Statement {statement_id} is not found in resource policy.", Type="User"
2981
            )
2982
        fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2983
        if revision_id and revision_id != fn_revision_id:
1✔
2984
            raise PreconditionFailedException(
×
2985
                "The Revision Id provided does not match the latest Revision Id. "
2986
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2987
                Type="User",
2988
            )
2989
        function_permission.policy.Statement.remove(statement)
1✔
2990

2991
        # Update revision id for alias or version
2992
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2993
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2994
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2995
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
×
2996
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
×
2997
        # Assumes that a non-alias is a version
2998
        else:
2999
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
3000
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
3001
                resolved_version, config=dataclasses.replace(resolved_version.config)
3002
            )
3003

3004
        # remove the policy as a whole when there's no statement left in it
3005
        if len(function_permission.policy.Statement) == 0:
1✔
3006
            del resolved_fn.permissions[resolved_qualifier]
1✔
3007

3008
    def get_policy(
1✔
3009
        self,
3010
        context: RequestContext,
3011
        function_name: NamespacedFunctionName,
3012
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
3013
        **kwargs,
3014
    ) -> GetPolicyResponse:
3015
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3016
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3017
            function_name, qualifier, context
3018
        )
3019

3020
        if qualifier is not None:
1✔
3021
            self._validate_qualifier_expression(qualifier)
1✔
3022

3023
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
3024

3025
        resolved_qualifier = qualifier or "$LATEST"
1✔
3026
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
3027
        if not function_permission:
1✔
3028
            raise ResourceNotFoundException(
1✔
3029
                "The resource you requested does not exist.", Type="User"
3030
            )
3031

3032
        fn_revision_id = None
1✔
3033
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
3034
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
3035
            fn_revision_id = resolved_alias.revision_id
1✔
3036
        # Assumes that a non-alias is a version
3037
        else:
3038
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
3039
            fn_revision_id = resolved_version.config.revision_id
1✔
3040

3041
        return GetPolicyResponse(
1✔
3042
            Policy=json.dumps(dataclasses.asdict(function_permission.policy)),
3043
            RevisionId=fn_revision_id,
3044
        )
3045

3046
    # =======================================
3047
    # ========  Code signing config  ========
3048
    # =======================================
3049

3050
    def create_code_signing_config(
1✔
3051
        self,
3052
        context: RequestContext,
3053
        allowed_publishers: AllowedPublishers,
3054
        description: Description | None = None,
3055
        code_signing_policies: CodeSigningPolicies | None = None,
3056
        tags: Tags | None = None,
3057
        **kwargs,
3058
    ) -> CreateCodeSigningConfigResponse:
3059
        account = context.account_id
1✔
3060
        region = context.region
1✔
3061

3062
        state = lambda_stores[account][region]
1✔
3063
        # TODO: can there be duplicates?
3064
        csc_id = f"csc-{get_random_hex(17)}"  # e.g. 'csc-077c33b4c19e26036'
1✔
3065
        csc_arn = f"arn:{context.partition}:lambda:{region}:{account}:code-signing-config:{csc_id}"
1✔
3066
        csc = CodeSigningConfig(
1✔
3067
            csc_id=csc_id,
3068
            arn=csc_arn,
3069
            allowed_publishers=allowed_publishers,
3070
            policies=code_signing_policies,
3071
            last_modified=api_utils.generate_lambda_date(),
3072
            description=description,
3073
        )
3074
        state.code_signing_configs[csc_arn] = csc
1✔
3075
        return CreateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
3076

3077
    def put_function_code_signing_config(
1✔
3078
        self,
3079
        context: RequestContext,
3080
        code_signing_config_arn: CodeSigningConfigArn,
3081
        function_name: NamespacedFunctionName,
3082
        **kwargs,
3083
    ) -> PutFunctionCodeSigningConfigResponse:
3084
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3085
        state = lambda_stores[account_id][region]
1✔
3086
        function_name = api_utils.get_function_name(function_name, context)
1✔
3087

3088
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3089
        if not csc:
1✔
3090
            raise CodeSigningConfigNotFoundException(
1✔
3091
                f"The code signing configuration cannot be found. Check that the provided configuration is not deleted: {code_signing_config_arn}.",
3092
                Type="User",
3093
            )
3094

3095
        fn = state.functions.get(function_name)
1✔
3096
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
3097
        if not fn:
1✔
3098
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
3099

3100
        fn.code_signing_config_arn = code_signing_config_arn
1✔
3101
        return PutFunctionCodeSigningConfigResponse(
1✔
3102
            CodeSigningConfigArn=code_signing_config_arn, FunctionName=function_name
3103
        )
3104

3105
    def update_code_signing_config(
1✔
3106
        self,
3107
        context: RequestContext,
3108
        code_signing_config_arn: CodeSigningConfigArn,
3109
        description: Description = None,
3110
        allowed_publishers: AllowedPublishers = None,
3111
        code_signing_policies: CodeSigningPolicies = None,
3112
        **kwargs,
3113
    ) -> UpdateCodeSigningConfigResponse:
3114
        state = lambda_stores[context.account_id][context.region]
1✔
3115
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3116
        if not csc:
1✔
3117
            raise ResourceNotFoundException(
1✔
3118
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3119
            )
3120

3121
        changes = {
1✔
3122
            **(
3123
                {"allowed_publishers": allowed_publishers} if allowed_publishers is not None else {}
3124
            ),
3125
            **({"policies": code_signing_policies} if code_signing_policies is not None else {}),
3126
            **({"description": description} if description is not None else {}),
3127
        }
3128
        new_csc = dataclasses.replace(
1✔
3129
            csc, last_modified=api_utils.generate_lambda_date(), **changes
3130
        )
3131
        state.code_signing_configs[code_signing_config_arn] = new_csc
1✔
3132

3133
        return UpdateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(new_csc))
1✔
3134

3135
    def get_code_signing_config(
1✔
3136
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
3137
    ) -> GetCodeSigningConfigResponse:
3138
        state = lambda_stores[context.account_id][context.region]
1✔
3139
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3140
        if not csc:
1✔
3141
            raise ResourceNotFoundException(
1✔
3142
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3143
            )
3144

3145
        return GetCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
3146

3147
    def get_function_code_signing_config(
1✔
3148
        self, context: RequestContext, function_name: NamespacedFunctionName, **kwargs
3149
    ) -> GetFunctionCodeSigningConfigResponse:
3150
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3151
        state = lambda_stores[account_id][region]
1✔
3152
        function_name = api_utils.get_function_name(function_name, context)
1✔
3153
        fn = state.functions.get(function_name)
1✔
3154
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
3155
        if not fn:
1✔
3156
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
3157

3158
        if fn.code_signing_config_arn:
1✔
3159
            return GetFunctionCodeSigningConfigResponse(
1✔
3160
                CodeSigningConfigArn=fn.code_signing_config_arn, FunctionName=function_name
3161
            )
3162

3163
        return GetFunctionCodeSigningConfigResponse()
1✔
3164

3165
    def delete_function_code_signing_config(
1✔
3166
        self, context: RequestContext, function_name: NamespacedFunctionName, **kwargs
3167
    ) -> None:
3168
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3169
        state = lambda_stores[account_id][region]
1✔
3170
        function_name = api_utils.get_function_name(function_name, context)
1✔
3171
        fn = state.functions.get(function_name)
1✔
3172
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
3173
        if not fn:
1✔
3174
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
3175

3176
        fn.code_signing_config_arn = None
1✔
3177

3178
    def delete_code_signing_config(
1✔
3179
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
3180
    ) -> DeleteCodeSigningConfigResponse:
3181
        state = lambda_stores[context.account_id][context.region]
1✔
3182

3183
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3184
        if not csc:
1✔
3185
            raise ResourceNotFoundException(
1✔
3186
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3187
            )
3188

3189
        del state.code_signing_configs[code_signing_config_arn]
1✔
3190

3191
        return DeleteCodeSigningConfigResponse()
1✔
3192

3193
    def list_code_signing_configs(
1✔
3194
        self,
3195
        context: RequestContext,
3196
        marker: String = None,
3197
        max_items: MaxListItems = None,
3198
        **kwargs,
3199
    ) -> ListCodeSigningConfigsResponse:
3200
        state = lambda_stores[context.account_id][context.region]
1✔
3201

3202
        cscs = [api_utils.map_csc(csc) for csc in state.code_signing_configs.values()]
1✔
3203
        cscs = PaginatedList(cscs)
1✔
3204
        page, token = cscs.get_page(
1✔
3205
            lambda csc: csc["CodeSigningConfigId"],
3206
            marker,
3207
            max_items,
3208
        )
3209
        return ListCodeSigningConfigsResponse(CodeSigningConfigs=page, NextMarker=token)
1✔
3210

3211
    def list_functions_by_code_signing_config(
1✔
3212
        self,
3213
        context: RequestContext,
3214
        code_signing_config_arn: CodeSigningConfigArn,
3215
        marker: String = None,
3216
        max_items: MaxListItems = None,
3217
        **kwargs,
3218
    ) -> ListFunctionsByCodeSigningConfigResponse:
3219
        account = context.account_id
1✔
3220
        region = context.region
1✔
3221

3222
        state = lambda_stores[account][region]
1✔
3223

3224
        if code_signing_config_arn not in state.code_signing_configs:
1✔
3225
            raise ResourceNotFoundException(
1✔
3226
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3227
            )
3228

3229
        fn_arns = [
1✔
3230
            api_utils.unqualified_lambda_arn(fn.function_name, account, region)
3231
            for fn in state.functions.values()
3232
            if fn.code_signing_config_arn == code_signing_config_arn
3233
        ]
3234

3235
        cscs = PaginatedList(fn_arns)
1✔
3236
        page, token = cscs.get_page(
1✔
3237
            lambda x: x,
3238
            marker,
3239
            max_items,
3240
        )
3241
        return ListFunctionsByCodeSigningConfigResponse(FunctionArns=page, NextMarker=token)
1✔
3242

3243
    # =======================================
3244
    # =========  Account Settings   =========
3245
    # =======================================
3246

3247
    # CAVE: these settings & usages are *per* region!
3248
    # Lambda quotas: https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
3249
    def get_account_settings(self, context: RequestContext, **kwargs) -> GetAccountSettingsResponse:
1✔
3250
        state = lambda_stores[context.account_id][context.region]
1✔
3251

3252
        fn_count = 0
1✔
3253
        code_size_sum = 0
1✔
3254
        reserved_concurrency_sum = 0
1✔
3255
        for fn in state.functions.values():
1✔
3256
            fn_count += 1
1✔
3257
            for fn_version in fn.versions.values():
1✔
3258
                # Image-based Lambdas do not have a code attribute and count against the ECR quotas instead
3259
                if fn_version.config.package_type == PackageType.Zip:
1✔
3260
                    code_size_sum += fn_version.config.code.code_size
1✔
3261
            if fn.reserved_concurrent_executions is not None:
1✔
3262
                reserved_concurrency_sum += fn.reserved_concurrent_executions
1✔
3263
            for c in fn.provisioned_concurrency_configs.values():
1✔
3264
                reserved_concurrency_sum += c.provisioned_concurrent_executions
1✔
3265
        for layer in state.layers.values():
1✔
3266
            for layer_version in layer.layer_versions.values():
1✔
3267
                code_size_sum += layer_version.code.code_size
1✔
3268
        return GetAccountSettingsResponse(
1✔
3269
            AccountLimit=AccountLimit(
3270
                TotalCodeSize=config.LAMBDA_LIMITS_TOTAL_CODE_SIZE,
3271
                CodeSizeZipped=config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED,
3272
                CodeSizeUnzipped=config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED,
3273
                ConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS,
3274
                UnreservedConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS
3275
                - reserved_concurrency_sum,
3276
            ),
3277
            AccountUsage=AccountUsage(
3278
                TotalCodeSize=code_size_sum,
3279
                FunctionCount=fn_count,
3280
            ),
3281
        )
3282

3283
    # =======================================
3284
    # ==  Provisioned Concurrency Config   ==
3285
    # =======================================
3286

3287
    def _get_provisioned_config(
1✔
3288
        self, context: RequestContext, function_name: str, qualifier: str
3289
    ) -> ProvisionedConcurrencyConfiguration | None:
3290
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3291
        state = lambda_stores[account_id][region]
1✔
3292
        function_name = api_utils.get_function_name(function_name, context)
1✔
3293
        fn = state.functions.get(function_name)
1✔
3294
        if api_utils.qualifier_is_alias(qualifier):
1✔
3295
            fn_alias = None
1✔
3296
            if fn:
1✔
3297
                fn_alias = fn.aliases.get(qualifier)
1✔
3298
            if fn_alias is None:
1✔
3299
                raise ResourceNotFoundException(
1✔
3300
                    f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3301
                    Type="User",
3302
                )
3303
        elif api_utils.qualifier_is_version(qualifier):
1✔
3304
            fn_version = None
1✔
3305
            if fn:
1✔
3306
                fn_version = fn.versions.get(qualifier)
1✔
3307
            if fn_version is None:
1✔
3308
                raise ResourceNotFoundException(
1✔
3309
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3310
                    Type="User",
3311
                )
3312

3313
        return fn.provisioned_concurrency_configs.get(qualifier)
1✔
3314

3315
    def put_provisioned_concurrency_config(
1✔
3316
        self,
3317
        context: RequestContext,
3318
        function_name: FunctionName,
3319
        qualifier: Qualifier,
3320
        provisioned_concurrent_executions: PositiveInteger,
3321
        **kwargs,
3322
    ) -> PutProvisionedConcurrencyConfigResponse:
3323
        if provisioned_concurrent_executions <= 0:
1✔
3324
            raise ValidationException(
1✔
3325
                f"1 validation error detected: Value '{provisioned_concurrent_executions}' at 'provisionedConcurrentExecutions' failed to satisfy constraint: Member must have value greater than or equal to 1"
3326
            )
3327

3328
        if qualifier == "$LATEST":
1✔
3329
            raise InvalidParameterValueException(
1✔
3330
                "Provisioned Concurrency Configs cannot be applied to unpublished function versions.",
3331
                Type="User",
3332
            )
3333
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3334
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3335
            function_name, qualifier, context
3336
        )
3337
        state = lambda_stores[account_id][region]
1✔
3338
        fn = state.functions.get(function_name)
1✔
3339

3340
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3341

3342
        if provisioned_config:  # TODO: merge?
1✔
3343
            # TODO: add a test for partial updates (if possible)
3344
            LOG.warning(
1✔
3345
                "Partial update of provisioned concurrency config is currently not supported."
3346
            )
3347

3348
        other_provisioned_sum = sum(
1✔
3349
            [
3350
                provisioned_configs.provisioned_concurrent_executions
3351
                for provisioned_qualifier, provisioned_configs in fn.provisioned_concurrency_configs.items()
3352
                if provisioned_qualifier != qualifier
3353
            ]
3354
        )
3355

3356
        if (
1✔
3357
            fn.reserved_concurrent_executions is not None
3358
            and fn.reserved_concurrent_executions
3359
            < other_provisioned_sum + provisioned_concurrent_executions
3360
        ):
3361
            raise InvalidParameterValueException(
1✔
3362
                "Requested Provisioned Concurrency should not be greater than the reservedConcurrentExecution for function",
3363
                Type="User",
3364
            )
3365

3366
        if provisioned_concurrent_executions > config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS:
1✔
3367
            raise InvalidParameterValueException(
1✔
3368
                f"Specified ConcurrentExecutions for function is greater than account's unreserved concurrency"
3369
                f" [{config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS}]."
3370
            )
3371

3372
        settings = self.get_account_settings(context)
1✔
3373
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
3374
            "UnreservedConcurrentExecutions"
3375
        ]
3376
        if (
1✔
3377
            unreserved_concurrent_executions - provisioned_concurrent_executions
3378
            < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY
3379
        ):
3380
            raise InvalidParameterValueException(
1✔
3381
                f"Specified ConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below"
3382
                f" its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
3383
            )
3384

3385
        provisioned_config = ProvisionedConcurrencyConfiguration(
1✔
3386
            provisioned_concurrent_executions, api_utils.generate_lambda_date()
3387
        )
3388
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3389

3390
        if api_utils.qualifier_is_alias(qualifier):
1✔
3391
            alias = fn.aliases.get(qualifier)
1✔
3392
            resolved_version = fn.versions.get(alias.function_version)
1✔
3393

3394
            if (
1✔
3395
                resolved_version
3396
                and fn.provisioned_concurrency_configs.get(alias.function_version) is not None
3397
            ):
3398
                raise ResourceConflictException(
1✔
3399
                    "Alias can't be used for Provisioned Concurrency configuration on an already Provisioned version",
3400
                    Type="User",
3401
                )
3402
            fn_arn = resolved_version.id.qualified_arn()
1✔
3403
        elif api_utils.qualifier_is_version(qualifier):
1✔
3404
            fn_version = fn.versions.get(qualifier)
1✔
3405

3406
            # TODO: might be useful other places, utilize
3407
            pointing_aliases = []
1✔
3408
            for alias in fn.aliases.values():
1✔
3409
                if (
1✔
3410
                    alias.function_version == qualifier
3411
                    and fn.provisioned_concurrency_configs.get(alias.name) is not None
3412
                ):
3413
                    pointing_aliases.append(alias.name)
1✔
3414
            if pointing_aliases:
1✔
3415
                raise ResourceConflictException(
1✔
3416
                    "Version is pointed by a Provisioned Concurrency alias", Type="User"
3417
                )
3418

3419
            fn_arn = fn_version.id.qualified_arn()
1✔
3420

3421
        manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3422

3423
        fn.provisioned_concurrency_configs[qualifier] = provisioned_config
1✔
3424

3425
        manager.update_provisioned_concurrency_config(
1✔
3426
            provisioned_config.provisioned_concurrent_executions
3427
        )
3428

3429
        return PutProvisionedConcurrencyConfigResponse(
1✔
3430
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3431
            AvailableProvisionedConcurrentExecutions=0,
3432
            AllocatedProvisionedConcurrentExecutions=0,
3433
            Status=ProvisionedConcurrencyStatusEnum.IN_PROGRESS,
3434
            # StatusReason=manager.provisioned_state.status_reason,
3435
            LastModified=provisioned_config.last_modified,  # TODO: does change with configuration or also with state changes?
3436
        )
3437

3438
    def get_provisioned_concurrency_config(
1✔
3439
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3440
    ) -> GetProvisionedConcurrencyConfigResponse:
3441
        if qualifier == "$LATEST":
1✔
3442
            raise InvalidParameterValueException(
1✔
3443
                "The function resource provided must be an alias or a published version.",
3444
                Type="User",
3445
            )
3446
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3447
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3448
            function_name, qualifier, context
3449
        )
3450

3451
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3452
        if not provisioned_config:
1✔
3453
            raise ProvisionedConcurrencyConfigNotFoundException(
1✔
3454
                "No Provisioned Concurrency Config found for this function", Type="User"
3455
            )
3456

3457
        # TODO: make this compatible with alias pointer migration on update
3458
        if api_utils.qualifier_is_alias(qualifier):
1✔
3459
            state = lambda_stores[account_id][region]
1✔
3460
            fn = state.functions.get(function_name)
1✔
3461
            alias = fn.aliases.get(qualifier)
1✔
3462
            fn_arn = api_utils.qualified_lambda_arn(
1✔
3463
                function_name, alias.function_version, account_id, region
3464
            )
3465
        else:
3466
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3467

3468
        ver_manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3469

3470
        return GetProvisionedConcurrencyConfigResponse(
1✔
3471
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3472
            LastModified=provisioned_config.last_modified,
3473
            AvailableProvisionedConcurrentExecutions=ver_manager.provisioned_state.available,
3474
            AllocatedProvisionedConcurrentExecutions=ver_manager.provisioned_state.allocated,
3475
            Status=ver_manager.provisioned_state.status,
3476
            StatusReason=ver_manager.provisioned_state.status_reason,
3477
        )
3478

3479
    def list_provisioned_concurrency_configs(
1✔
3480
        self,
3481
        context: RequestContext,
3482
        function_name: FunctionName,
3483
        marker: String = None,
3484
        max_items: MaxProvisionedConcurrencyConfigListItems = None,
3485
        **kwargs,
3486
    ) -> ListProvisionedConcurrencyConfigsResponse:
3487
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3488
        state = lambda_stores[account_id][region]
1✔
3489

3490
        function_name = api_utils.get_function_name(function_name, context)
1✔
3491
        fn = state.functions.get(function_name)
1✔
3492
        if fn is None:
1✔
3493
            raise ResourceNotFoundException(
1✔
3494
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
3495
                Type="User",
3496
            )
3497

3498
        configs = []
1✔
3499
        for qualifier, pc_config in fn.provisioned_concurrency_configs.items():
1✔
3500
            if api_utils.qualifier_is_alias(qualifier):
×
3501
                alias = fn.aliases.get(qualifier)
×
3502
                fn_arn = api_utils.qualified_lambda_arn(
×
3503
                    function_name, alias.function_version, account_id, region
3504
                )
3505
            else:
3506
                fn_arn = api_utils.qualified_lambda_arn(
×
3507
                    function_name, qualifier, account_id, region
3508
                )
3509

3510
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
3511

3512
            configs.append(
×
3513
                ProvisionedConcurrencyConfigListItem(
3514
                    FunctionArn=api_utils.qualified_lambda_arn(
3515
                        function_name, qualifier, account_id, region
3516
                    ),
3517
                    RequestedProvisionedConcurrentExecutions=pc_config.provisioned_concurrent_executions,
3518
                    AvailableProvisionedConcurrentExecutions=manager.provisioned_state.available,
3519
                    AllocatedProvisionedConcurrentExecutions=manager.provisioned_state.allocated,
3520
                    Status=manager.provisioned_state.status,
3521
                    StatusReason=manager.provisioned_state.status_reason,
3522
                    LastModified=pc_config.last_modified,
3523
                )
3524
            )
3525

3526
        provisioned_concurrency_configs = configs
1✔
3527
        provisioned_concurrency_configs = PaginatedList(provisioned_concurrency_configs)
1✔
3528
        page, token = provisioned_concurrency_configs.get_page(
1✔
3529
            lambda x: x,
3530
            marker,
3531
            max_items,
3532
        )
3533
        return ListProvisionedConcurrencyConfigsResponse(
1✔
3534
            ProvisionedConcurrencyConfigs=page, NextMarker=token
3535
        )
3536

3537
    def delete_provisioned_concurrency_config(
1✔
3538
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3539
    ) -> None:
3540
        if qualifier == "$LATEST":
1✔
3541
            raise InvalidParameterValueException(
1✔
3542
                "The function resource provided must be an alias or a published version.",
3543
                Type="User",
3544
            )
3545
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3546
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3547
            function_name, qualifier, context
3548
        )
3549
        state = lambda_stores[account_id][region]
1✔
3550
        fn = state.functions.get(function_name)
1✔
3551

3552
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3553
        # delete is idempotent and doesn't actually care about the provisioned concurrency config not existing
3554
        if provisioned_config:
1✔
3555
            fn.provisioned_concurrency_configs.pop(qualifier)
1✔
3556
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3557
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3558
            manager.update_provisioned_concurrency_config(0)
1✔
3559

3560
    # =======================================
3561
    # =======  Event Invoke Config   ========
3562
    # =======================================
3563

3564
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)"
3565
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]2((-gov)|(-iso(b?)))?-[a-z]+-\\d1)?:(\\d12)?:(.*)" ... (expected → actual)
3566

3567
    def _validate_destination_config(
1✔
3568
        self, store: LambdaStore, function_name: str, destination_config: DestinationConfig
3569
    ):
3570
        def _validate_destination_arn(destination_arn) -> bool:
1✔
3571
            if not api_utils.DESTINATION_ARN_PATTERN.match(destination_arn):
1✔
3572
                # technically we shouldn't handle this in the provider
3573
                raise ValidationException(
1✔
3574
                    "1 validation error detected: Value '"
3575
                    + destination_arn
3576
                    + "' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: "
3577
                    + "$|kafka://([^.]([a-zA-Z0-9\\-_.]{0,248}))|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:((eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)"
3578
                )
3579

3580
            match destination_arn.split(":")[2]:
1✔
3581
                case "lambda":
1✔
3582
                    fn_parts = api_utils.FULL_FN_ARN_PATTERN.search(destination_arn).groupdict()
1✔
3583
                    if fn_parts:
1✔
3584
                        # check if it exists
3585
                        fn = store.functions.get(fn_parts["function_name"])
1✔
3586
                        if not fn:
1✔
3587
                            raise InvalidParameterValueException(
1✔
3588
                                f"The destination ARN {destination_arn} is invalid.", Type="User"
3589
                            )
3590
                        if fn_parts["function_name"] == function_name:
1✔
3591
                            raise InvalidParameterValueException(
1✔
3592
                                "You can't specify the function as a destination for itself.",
3593
                                Type="User",
3594
                            )
3595
                case "sns" | "sqs" | "events":
1✔
3596
                    pass
1✔
3597
                case _:
1✔
3598
                    return False
1✔
3599
            return True
1✔
3600

3601
        validation_err = False
1✔
3602

3603
        failure_destination = destination_config.get("OnFailure", {}).get("Destination")
1✔
3604
        if failure_destination:
1✔
3605
            validation_err = validation_err or not _validate_destination_arn(failure_destination)
1✔
3606

3607
        success_destination = destination_config.get("OnSuccess", {}).get("Destination")
1✔
3608
        if success_destination:
1✔
3609
            validation_err = validation_err or not _validate_destination_arn(success_destination)
1✔
3610

3611
        if validation_err:
1✔
3612
            on_success_part = (
1✔
3613
                f"OnSuccess(destination={success_destination})" if success_destination else "null"
3614
            )
3615
            on_failure_part = (
1✔
3616
                f"OnFailure(destination={failure_destination})" if failure_destination else "null"
3617
            )
3618
            raise InvalidParameterValueException(
1✔
3619
                f"The provided destination config DestinationConfig(onSuccess={on_success_part}, onFailure={on_failure_part}) is invalid.",
3620
                Type="User",
3621
            )
3622

3623
    def put_function_event_invoke_config(
1✔
3624
        self,
3625
        context: RequestContext,
3626
        function_name: FunctionName,
3627
        qualifier: NumericLatestPublishedOrAliasQualifier = None,
3628
        maximum_retry_attempts: MaximumRetryAttempts = None,
3629
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3630
        destination_config: DestinationConfig = None,
3631
        **kwargs,
3632
    ) -> FunctionEventInvokeConfig:
3633
        """
3634
        Destination ARNs can be:
3635
        * SQS arn
3636
        * SNS arn
3637
        * Lambda arn
3638
        * EventBridge arn
3639

3640
        Differences between put_ and update_:
3641
            * put overwrites any existing config
3642
            * update allows changes only single values while keeping the rest of existing ones
3643
            * update fails on non-existing configs
3644

3645
        Differences between destination and DLQ
3646
            * "However, a dead-letter queue is part of a function's version-specific configuration, so it is locked in when you publish a version."
3647
            *  "On-failure destinations also support additional targets and include details about the function's response in the invocation record."
3648

3649
        """
3650
        if (
1✔
3651
            maximum_event_age_in_seconds is None
3652
            and maximum_retry_attempts is None
3653
            and destination_config is None
3654
        ):
3655
            raise InvalidParameterValueException(
1✔
3656
                "You must specify at least one of error handling or destination setting.",
3657
                Type="User",
3658
            )
3659
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3660
        state = lambda_stores[account_id][region]
1✔
3661
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3662
            function_name, qualifier, context
3663
        )
3664
        fn = state.functions.get(function_name)
1✔
3665
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3666
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3667

3668
        qualifier = qualifier or "$LATEST"
1✔
3669

3670
        # validate and normalize destination config
3671
        if destination_config:
1✔
3672
            self._validate_destination_config(state, function_name, destination_config)
1✔
3673

3674
        destination_config = DestinationConfig(
1✔
3675
            OnSuccess=OnSuccess(
3676
                Destination=(destination_config or {}).get("OnSuccess", {}).get("Destination")
3677
            ),
3678
            OnFailure=OnFailure(
3679
                Destination=(destination_config or {}).get("OnFailure", {}).get("Destination")
3680
            ),
3681
        )
3682

3683
        config = EventInvokeConfig(
1✔
3684
            function_name=function_name,
3685
            qualifier=qualifier,
3686
            maximum_event_age_in_seconds=maximum_event_age_in_seconds,
3687
            maximum_retry_attempts=maximum_retry_attempts,
3688
            last_modified=api_utils.generate_lambda_date(),
3689
            destination_config=destination_config,
3690
        )
3691
        fn.event_invoke_configs[qualifier] = config
1✔
3692

3693
        return FunctionEventInvokeConfig(
1✔
3694
            LastModified=datetime.datetime.strptime(
3695
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3696
            ),
3697
            FunctionArn=api_utils.qualified_lambda_arn(
3698
                function_name, qualifier or "$LATEST", account_id, region
3699
            ),
3700
            DestinationConfig=destination_config,
3701
            MaximumEventAgeInSeconds=maximum_event_age_in_seconds,
3702
            MaximumRetryAttempts=maximum_retry_attempts,
3703
        )
3704

3705
    def get_function_event_invoke_config(
1✔
3706
        self,
3707
        context: RequestContext,
3708
        function_name: FunctionName,
3709
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
3710
        **kwargs,
3711
    ) -> FunctionEventInvokeConfig:
3712
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3713
        state = lambda_stores[account_id][region]
1✔
3714
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3715
            function_name, qualifier, context
3716
        )
3717

3718
        qualifier = qualifier or "$LATEST"
1✔
3719
        fn = state.functions.get(function_name)
1✔
3720
        if not fn:
1✔
3721
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3722
            raise ResourceNotFoundException(
1✔
3723
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3724
            )
3725

3726
        config = fn.event_invoke_configs.get(qualifier)
1✔
3727
        if not config:
1✔
3728
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3729
            raise ResourceNotFoundException(
1✔
3730
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3731
            )
3732

3733
        return FunctionEventInvokeConfig(
1✔
3734
            LastModified=datetime.datetime.strptime(
3735
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3736
            ),
3737
            FunctionArn=api_utils.qualified_lambda_arn(
3738
                function_name, qualifier, account_id, region
3739
            ),
3740
            DestinationConfig=config.destination_config,
3741
            MaximumEventAgeInSeconds=config.maximum_event_age_in_seconds,
3742
            MaximumRetryAttempts=config.maximum_retry_attempts,
3743
        )
3744

3745
    def list_function_event_invoke_configs(
1✔
3746
        self,
3747
        context: RequestContext,
3748
        function_name: FunctionName,
3749
        marker: String = None,
3750
        max_items: MaxFunctionEventInvokeConfigListItems = None,
3751
        **kwargs,
3752
    ) -> ListFunctionEventInvokeConfigsResponse:
3753
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3754
        state = lambda_stores[account_id][region]
1✔
3755
        fn = state.functions.get(function_name)
1✔
3756
        if not fn:
1✔
3757
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3758

3759
        event_invoke_configs = [
1✔
3760
            FunctionEventInvokeConfig(
3761
                LastModified=c.last_modified,
3762
                FunctionArn=api_utils.qualified_lambda_arn(
3763
                    function_name, c.qualifier, account_id, region
3764
                ),
3765
                MaximumEventAgeInSeconds=c.maximum_event_age_in_seconds,
3766
                MaximumRetryAttempts=c.maximum_retry_attempts,
3767
                DestinationConfig=c.destination_config,
3768
            )
3769
            for c in fn.event_invoke_configs.values()
3770
        ]
3771

3772
        event_invoke_configs = PaginatedList(event_invoke_configs)
1✔
3773
        page, token = event_invoke_configs.get_page(
1✔
3774
            lambda x: x["FunctionArn"],
3775
            marker,
3776
            max_items,
3777
        )
3778
        return ListFunctionEventInvokeConfigsResponse(
1✔
3779
            FunctionEventInvokeConfigs=page, NextMarker=token
3780
        )
3781

3782
    def delete_function_event_invoke_config(
1✔
3783
        self,
3784
        context: RequestContext,
3785
        function_name: FunctionName,
3786
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
3787
        **kwargs,
3788
    ) -> None:
3789
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3790
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3791
            function_name, qualifier, context
3792
        )
3793
        state = lambda_stores[account_id][region]
1✔
3794
        fn = state.functions.get(function_name)
1✔
3795
        resolved_qualifier = qualifier or "$LATEST"
1✔
3796
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3797
        if not fn:
1✔
3798
            raise ResourceNotFoundException(
1✔
3799
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3800
            )
3801

3802
        config = fn.event_invoke_configs.get(resolved_qualifier)
1✔
3803
        if not config:
1✔
3804
            raise ResourceNotFoundException(
1✔
3805
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3806
            )
3807

3808
        del fn.event_invoke_configs[resolved_qualifier]
1✔
3809

3810
    def update_function_event_invoke_config(
1✔
3811
        self,
3812
        context: RequestContext,
3813
        function_name: FunctionName,
3814
        qualifier: NumericLatestPublishedOrAliasQualifier = None,
3815
        maximum_retry_attempts: MaximumRetryAttempts = None,
3816
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3817
        destination_config: DestinationConfig = None,
3818
        **kwargs,
3819
    ) -> FunctionEventInvokeConfig:
3820
        # like put but only update single fields via replace
3821
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3822
        state = lambda_stores[account_id][region]
1✔
3823
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3824
            function_name, qualifier, context
3825
        )
3826

3827
        if (
1✔
3828
            maximum_event_age_in_seconds is None
3829
            and maximum_retry_attempts is None
3830
            and destination_config is None
3831
        ):
3832
            raise InvalidParameterValueException(
×
3833
                "You must specify at least one of error handling or destination setting.",
3834
                Type="User",
3835
            )
3836

3837
        fn = state.functions.get(function_name)
1✔
3838
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3839
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3840

3841
        qualifier = qualifier or "$LATEST"
1✔
3842

3843
        config = fn.event_invoke_configs.get(qualifier)
1✔
3844
        if not config:
1✔
3845
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3846
            raise ResourceNotFoundException(
1✔
3847
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3848
            )
3849

3850
        if destination_config:
1✔
3851
            self._validate_destination_config(state, function_name, destination_config)
×
3852

3853
        optional_kwargs = {
1✔
3854
            k: v
3855
            for k, v in {
3856
                "destination_config": destination_config,
3857
                "maximum_retry_attempts": maximum_retry_attempts,
3858
                "maximum_event_age_in_seconds": maximum_event_age_in_seconds,
3859
            }.items()
3860
            if v is not None
3861
        }
3862

3863
        new_config = dataclasses.replace(
1✔
3864
            config, last_modified=api_utils.generate_lambda_date(), **optional_kwargs
3865
        )
3866
        fn.event_invoke_configs[qualifier] = new_config
1✔
3867

3868
        return FunctionEventInvokeConfig(
1✔
3869
            LastModified=datetime.datetime.strptime(
3870
                new_config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3871
            ),
3872
            FunctionArn=api_utils.qualified_lambda_arn(
3873
                function_name, qualifier or "$LATEST", account_id, region
3874
            ),
3875
            DestinationConfig=new_config.destination_config,
3876
            MaximumEventAgeInSeconds=new_config.maximum_event_age_in_seconds,
3877
            MaximumRetryAttempts=new_config.maximum_retry_attempts,
3878
        )
3879

3880
    # =======================================
3881
    # ======  Layer & Layer Versions  =======
3882
    # =======================================
3883

3884
    @staticmethod
1✔
3885
    def _resolve_layer(
1✔
3886
        layer_name_or_arn: str, context: RequestContext
3887
    ) -> tuple[str, str, str, str | None]:
3888
        """
3889
        Return locator attributes for a given Lambda layer.
3890

3891
        :param layer_name_or_arn: Layer name or ARN
3892
        :param context: Request context
3893
        :return: Tuple of region, account ID, layer name, layer version
3894
        """
3895
        if api_utils.is_layer_arn(layer_name_or_arn):
1✔
3896
            return api_utils.parse_layer_arn(layer_name_or_arn)
1✔
3897

3898
        return context.region, context.account_id, layer_name_or_arn, None
1✔
3899

3900
    def publish_layer_version(
1✔
3901
        self,
3902
        context: RequestContext,
3903
        layer_name: LayerName,
3904
        content: LayerVersionContentInput,
3905
        description: Description | None = None,
3906
        compatible_runtimes: CompatibleRuntimes | None = None,
3907
        license_info: LicenseInfo | None = None,
3908
        compatible_architectures: CompatibleArchitectures | None = None,
3909
        **kwargs,
3910
    ) -> PublishLayerVersionResponse:
3911
        """
3912
        On first use of a LayerName a new layer is created and for each subsequent call with the same LayerName a new version is created.
3913
        Note that there are no $LATEST versions with layers!
3914

3915
        """
3916
        account = context.account_id
1✔
3917
        region = context.region
1✔
3918

3919
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
3920
            compatible_runtimes, compatible_architectures
3921
        )
3922
        if validation_errors:
1✔
3923
            raise ValidationException(
1✔
3924
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {'; '.join(validation_errors)}"
3925
            )
3926

3927
        state = lambda_stores[account][region]
1✔
3928
        with self.create_layer_lock:
1✔
3929
            if layer_name not in state.layers:
1✔
3930
                # we don't have a version so create new layer object
3931
                # lock is required to avoid creating two v1 objects for the same name
3932
                layer = Layer(
1✔
3933
                    arn=api_utils.layer_arn(layer_name=layer_name, account=account, region=region)
3934
                )
3935
                state.layers[layer_name] = layer
1✔
3936

3937
        layer = state.layers[layer_name]
1✔
3938
        with layer.next_version_lock:
1✔
3939
            next_version = LambdaLayerVersionIdentifier(
1✔
3940
                account_id=account, region=region, layer_name=layer_name
3941
            ).generate(next_version=layer.next_version)
3942
            # When creating a layer with user defined layer version, it is possible that we
3943
            # create layer versions out of order.
3944
            # ie. a user could replicate layer v2 then layer v1. It is important to always keep the maximum possible
3945
            # value for next layer to avoid overwriting existing versions
3946
            if layer.next_version <= next_version:
1✔
3947
                # We don't need to update layer.next_version if the created version is lower than the "next in line"
3948
                layer.next_version = max(next_version, layer.next_version) + 1
1✔
3949

3950
        # creating a new layer
3951
        if content.get("ZipFile"):
1✔
3952
            code = store_lambda_archive(
1✔
3953
                archive_file=content["ZipFile"],
3954
                function_name=layer_name,
3955
                region_name=region,
3956
                account_id=account,
3957
            )
3958
        else:
3959
            code = store_s3_bucket_archive(
1✔
3960
                archive_bucket=content["S3Bucket"],
3961
                archive_key=content["S3Key"],
3962
                archive_version=content.get("S3ObjectVersion"),
3963
                function_name=layer_name,
3964
                region_name=region,
3965
                account_id=account,
3966
            )
3967

3968
        new_layer_version = LayerVersion(
1✔
3969
            layer_version_arn=api_utils.layer_version_arn(
3970
                layer_name=layer_name,
3971
                account=account,
3972
                region=region,
3973
                version=str(next_version),
3974
            ),
3975
            layer_arn=layer.arn,
3976
            version=next_version,
3977
            description=description or "",
3978
            license_info=license_info,
3979
            compatible_runtimes=compatible_runtimes,
3980
            compatible_architectures=compatible_architectures,
3981
            created=api_utils.generate_lambda_date(),
3982
            code=code,
3983
        )
3984

3985
        layer.layer_versions[str(next_version)] = new_layer_version
1✔
3986

3987
        return api_utils.map_layer_out(new_layer_version)
1✔
3988

3989
    def get_layer_version(
1✔
3990
        self,
3991
        context: RequestContext,
3992
        layer_name: LayerName,
3993
        version_number: LayerVersionNumber,
3994
        **kwargs,
3995
    ) -> GetLayerVersionResponse:
3996
        # TODO: handle layer_name as an ARN
3997

3998
        region_name, account_id, layer_name, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
3999
        state = lambda_stores[account_id][region_name]
1✔
4000

4001
        layer = state.layers.get(layer_name)
1✔
4002
        if version_number < 1:
1✔
4003
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
4004
        if layer is None:
1✔
4005
            raise ResourceNotFoundException(
1✔
4006
                "The resource you requested does not exist.", Type="User"
4007
            )
4008
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4009
        if layer_version is None:
1✔
4010
            raise ResourceNotFoundException(
1✔
4011
                "The resource you requested does not exist.", Type="User"
4012
            )
4013
        return api_utils.map_layer_out(layer_version)
1✔
4014

4015
    def get_layer_version_by_arn(
1✔
4016
        self, context: RequestContext, arn: LayerVersionArn, **kwargs
4017
    ) -> GetLayerVersionResponse:
4018
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
4019
            arn, context
4020
        )
4021

4022
        if not layer_version:
1✔
4023
            raise ValidationException(
1✔
4024
                f"1 validation error detected: Value '{arn}' at 'arn' failed to satisfy constraint: Member must satisfy regular expression pattern: "
4025
                + "(arn:(aws[a-zA-Z-]*)?:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+)"
4026
            )
4027

4028
        store = lambda_stores[account_id][region_name]
1✔
4029
        if not (layers := store.layers.get(layer_name)):
1✔
4030
            raise ResourceNotFoundException(
×
4031
                "The resource you requested does not exist.", Type="User"
4032
            )
4033

4034
        layer_version = layers.layer_versions.get(layer_version)
1✔
4035

4036
        if not layer_version:
1✔
4037
            raise ResourceNotFoundException(
1✔
4038
                "The resource you requested does not exist.", Type="User"
4039
            )
4040

4041
        return api_utils.map_layer_out(layer_version)
1✔
4042

4043
    def list_layers(
1✔
4044
        self,
4045
        context: RequestContext,
4046
        compatible_runtime: Runtime | None = None,
4047
        marker: String | None = None,
4048
        max_items: MaxLayerListItems | None = None,
4049
        compatible_architecture: Architecture | None = None,
4050
        **kwargs,
4051
    ) -> ListLayersResponse:
4052
        validation_errors = []
1✔
4053

4054
        validation_error_arch = api_utils.validate_layer_architecture(compatible_architecture)
1✔
4055
        if validation_error_arch:
1✔
4056
            validation_errors.append(validation_error_arch)
1✔
4057

4058
        validation_error_runtime = api_utils.validate_layer_runtime(compatible_runtime)
1✔
4059
        if validation_error_runtime:
1✔
4060
            validation_errors.append(validation_error_runtime)
1✔
4061

4062
        if validation_errors:
1✔
4063
            raise ValidationException(
1✔
4064
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
4065
            )
4066
        # TODO: handle filter: compatible_runtime
4067
        # TODO: handle filter: compatible_architecture
4068

4069
        state = lambda_stores[context.account_id][context.region]
×
4070
        layers = state.layers
×
4071

4072
        # TODO: test how filters behave together with only returning layers here? Does it return the latest "matching" layer, i.e. does it ignore later layer versions that don't match?
4073

4074
        responses: list[LayersListItem] = []
×
4075
        for layer_name, layer in layers.items():
×
4076
            # fetch latest version
4077
            layer_versions = list(layer.layer_versions.values())
×
4078
            sorted(layer_versions, key=lambda x: x.version)
×
4079
            latest_layer_version = layer_versions[-1]
×
4080
            responses.append(
×
4081
                LayersListItem(
4082
                    LayerName=layer_name,
4083
                    LayerArn=layer.arn,
4084
                    LatestMatchingVersion=api_utils.map_layer_out(latest_layer_version),
4085
                )
4086
            )
4087

4088
        responses = PaginatedList(responses)
×
4089
        page, token = responses.get_page(
×
4090
            lambda version: version,
4091
            marker,
4092
            max_items,
4093
        )
4094

4095
        return ListLayersResponse(NextMarker=token, Layers=page)
×
4096

4097
    def list_layer_versions(
1✔
4098
        self,
4099
        context: RequestContext,
4100
        layer_name: LayerName,
4101
        compatible_runtime: Runtime | None = None,
4102
        marker: String | None = None,
4103
        max_items: MaxLayerListItems | None = None,
4104
        compatible_architecture: Architecture | None = None,
4105
        **kwargs,
4106
    ) -> ListLayerVersionsResponse:
4107
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
4108
            [compatible_runtime] if compatible_runtime else [],
4109
            [compatible_architecture] if compatible_architecture else [],
4110
        )
4111
        if validation_errors:
1✔
4112
            raise ValidationException(
×
4113
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
4114
            )
4115

4116
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
4117
            layer_name, context
4118
        )
4119
        state = lambda_stores[account_id][region_name]
1✔
4120

4121
        # TODO: Test & handle filter: compatible_runtime
4122
        # TODO: Test & handle filter: compatible_architecture
4123
        all_layer_versions = []
1✔
4124
        layer = state.layers.get(layer_name)
1✔
4125
        if layer is not None:
1✔
4126
            for layer_version in layer.layer_versions.values():
1✔
4127
                all_layer_versions.append(api_utils.map_layer_out(layer_version))
1✔
4128

4129
        all_layer_versions.sort(key=lambda x: x["Version"], reverse=True)
1✔
4130
        all_layer_versions = PaginatedList(all_layer_versions)
1✔
4131
        page, token = all_layer_versions.get_page(
1✔
4132
            lambda version: version["LayerVersionArn"],
4133
            marker,
4134
            max_items,
4135
        )
4136
        return ListLayerVersionsResponse(NextMarker=token, LayerVersions=page)
1✔
4137

4138
    def delete_layer_version(
1✔
4139
        self,
4140
        context: RequestContext,
4141
        layer_name: LayerName,
4142
        version_number: LayerVersionNumber,
4143
        **kwargs,
4144
    ) -> None:
4145
        if version_number < 1:
1✔
4146
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
4147

4148
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
4149
            layer_name, context
4150
        )
4151

4152
        store = lambda_stores[account_id][region_name]
1✔
4153
        layer = store.layers.get(layer_name, {})
1✔
4154
        if layer:
1✔
4155
            layer.layer_versions.pop(str(version_number), None)
1✔
4156

4157
    # =======================================
4158
    # =====  Layer Version Permissions  =====
4159
    # =======================================
4160
    # TODO: lock updates that change revision IDs
4161

4162
    def add_layer_version_permission(
1✔
4163
        self,
4164
        context: RequestContext,
4165
        layer_name: LayerName,
4166
        version_number: LayerVersionNumber,
4167
        statement_id: StatementId,
4168
        action: LayerPermissionAllowedAction,
4169
        principal: LayerPermissionAllowedPrincipal,
4170
        organization_id: OrganizationId = None,
4171
        revision_id: String = None,
4172
        **kwargs,
4173
    ) -> AddLayerVersionPermissionResponse:
4174
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4175
        # `layer_n` contains the layer name.
4176
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
4177

4178
        if action != "lambda:GetLayerVersion":
1✔
4179
            raise ValidationException(
1✔
4180
                f"1 validation error detected: Value '{action}' at 'action' failed to satisfy constraint: Member must satisfy regular expression pattern: lambda:GetLayerVersion"
4181
            )
4182

4183
        store = lambda_stores[account_id][region_name]
1✔
4184
        layer = store.layers.get(layer_n)
1✔
4185

4186
        layer_version_arn = api_utils.layer_version_arn(
1✔
4187
            layer_name, account_id, region_name, str(version_number)
4188
        )
4189

4190
        if layer is None:
1✔
4191
            raise ResourceNotFoundException(
1✔
4192
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4193
            )
4194
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4195
        if layer_version is None:
1✔
4196
            raise ResourceNotFoundException(
1✔
4197
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4198
            )
4199
        # do we have a policy? if not set one
4200
        if layer_version.policy is None:
1✔
4201
            layer_version.policy = LayerPolicy()
1✔
4202

4203
        if statement_id in layer_version.policy.statements:
1✔
4204
            raise ResourceConflictException(
1✔
4205
                f"The statement id ({statement_id}) provided already exists. Please provide a new statement id, or remove the existing statement.",
4206
                Type="User",
4207
            )
4208

4209
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
4210
            raise PreconditionFailedException(
1✔
4211
                "The Revision Id provided does not match the latest Revision Id. "
4212
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
4213
                Type="User",
4214
            )
4215

4216
        statement = LayerPolicyStatement(
1✔
4217
            sid=statement_id, action=action, principal=principal, organization_id=organization_id
4218
        )
4219

4220
        old_statements = layer_version.policy.statements
1✔
4221
        layer_version.policy = dataclasses.replace(
1✔
4222
            layer_version.policy, statements={**old_statements, statement_id: statement}
4223
        )
4224

4225
        return AddLayerVersionPermissionResponse(
1✔
4226
            Statement=json.dumps(
4227
                {
4228
                    "Sid": statement.sid,
4229
                    "Effect": "Allow",
4230
                    "Principal": statement.principal,
4231
                    "Action": statement.action,
4232
                    "Resource": layer_version.layer_version_arn,
4233
                }
4234
            ),
4235
            RevisionId=layer_version.policy.revision_id,
4236
        )
4237

4238
    def remove_layer_version_permission(
1✔
4239
        self,
4240
        context: RequestContext,
4241
        layer_name: LayerName,
4242
        version_number: LayerVersionNumber,
4243
        statement_id: StatementId,
4244
        revision_id: String = None,
4245
        **kwargs,
4246
    ) -> None:
4247
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4248
        # `layer_n` contains the layer name.
4249
        region_name, account_id, layer_n, layer_version = LambdaProvider._resolve_layer(
1✔
4250
            layer_name, context
4251
        )
4252

4253
        layer_version_arn = api_utils.layer_version_arn(
1✔
4254
            layer_name, account_id, region_name, str(version_number)
4255
        )
4256

4257
        state = lambda_stores[account_id][region_name]
1✔
4258
        layer = state.layers.get(layer_n)
1✔
4259
        if layer is None:
1✔
4260
            raise ResourceNotFoundException(
1✔
4261
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4262
            )
4263
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4264
        if layer_version is None:
1✔
4265
            raise ResourceNotFoundException(
1✔
4266
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4267
            )
4268

4269
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
4270
            raise PreconditionFailedException(
1✔
4271
                "The Revision Id provided does not match the latest Revision Id. "
4272
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
4273
                Type="User",
4274
            )
4275

4276
        if statement_id not in layer_version.policy.statements:
1✔
4277
            raise ResourceNotFoundException(
1✔
4278
                f"Statement {statement_id} is not found in resource policy.", Type="User"
4279
            )
4280

4281
        old_statements = layer_version.policy.statements
1✔
4282
        layer_version.policy = dataclasses.replace(
1✔
4283
            layer_version.policy,
4284
            statements={k: v for k, v in old_statements.items() if k != statement_id},
4285
        )
4286

4287
    def get_layer_version_policy(
1✔
4288
        self,
4289
        context: RequestContext,
4290
        layer_name: LayerName,
4291
        version_number: LayerVersionNumber,
4292
        **kwargs,
4293
    ) -> GetLayerVersionPolicyResponse:
4294
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4295
        # `layer_n` contains the layer name.
4296
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
4297

4298
        layer_version_arn = api_utils.layer_version_arn(
1✔
4299
            layer_name, account_id, region_name, str(version_number)
4300
        )
4301

4302
        store = lambda_stores[account_id][region_name]
1✔
4303
        layer = store.layers.get(layer_n)
1✔
4304

4305
        if layer is None:
1✔
4306
            raise ResourceNotFoundException(
1✔
4307
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4308
            )
4309

4310
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4311
        if layer_version is None:
1✔
4312
            raise ResourceNotFoundException(
1✔
4313
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4314
            )
4315

4316
        if layer_version.policy is None:
1✔
4317
            raise ResourceNotFoundException(
1✔
4318
                "No policy is associated with the given resource.", Type="User"
4319
            )
4320

4321
        return GetLayerVersionPolicyResponse(
1✔
4322
            Policy=json.dumps(
4323
                {
4324
                    "Version": layer_version.policy.version,
4325
                    "Id": layer_version.policy.id,
4326
                    "Statement": [
4327
                        {
4328
                            "Sid": ps.sid,
4329
                            "Effect": "Allow",
4330
                            "Principal": ps.principal,
4331
                            "Action": ps.action,
4332
                            "Resource": layer_version.layer_version_arn,
4333
                        }
4334
                        for ps in layer_version.policy.statements.values()
4335
                    ],
4336
                }
4337
            ),
4338
            RevisionId=layer_version.policy.revision_id,
4339
        )
4340

4341
    # =======================================
4342
    # =======  Function Concurrency  ========
4343
    # =======================================
4344
    # (Reserved) function concurrency is scoped to the whole function
4345

4346
    def get_function_concurrency(
1✔
4347
        self, context: RequestContext, function_name: FunctionName, **kwargs
4348
    ) -> GetFunctionConcurrencyResponse:
4349
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4350
        function_name = api_utils.get_function_name(function_name, context)
1✔
4351
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
4352
        return GetFunctionConcurrencyResponse(
1✔
4353
            ReservedConcurrentExecutions=fn.reserved_concurrent_executions
4354
        )
4355

4356
    def put_function_concurrency(
1✔
4357
        self,
4358
        context: RequestContext,
4359
        function_name: FunctionName,
4360
        reserved_concurrent_executions: ReservedConcurrentExecutions,
4361
        **kwargs,
4362
    ) -> Concurrency:
4363
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4364

4365
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4366
        if qualifier:
1✔
4367
            raise InvalidParameterValueException(
1✔
4368
                "This operation is permitted on Lambda functions only. Aliases and versions do not support this operation. Please specify either a function name or an unqualified function ARN.",
4369
                Type="User",
4370
            )
4371

4372
        store = lambda_stores[account_id][region]
1✔
4373
        fn = store.functions.get(function_name)
1✔
4374
        if not fn:
1✔
4375
            fn_arn = api_utils.qualified_lambda_arn(
1✔
4376
                function_name,
4377
                qualifier="$LATEST",
4378
                account=account_id,
4379
                region=region,
4380
            )
4381
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
4382

4383
        settings = self.get_account_settings(context)
1✔
4384
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
4385
            "UnreservedConcurrentExecutions"
4386
        ]
4387

4388
        # The existing reserved concurrent executions for the same function are already deduced in
4389
        # unreserved_concurrent_executions but must not count because the new one will replace the existing one.
4390
        # Joel tested this behavior manually against AWS (2023-11-28).
4391
        existing_reserved_concurrent_executions = (
1✔
4392
            fn.reserved_concurrent_executions if fn.reserved_concurrent_executions else 0
4393
        )
4394
        if (
1✔
4395
            unreserved_concurrent_executions
4396
            - reserved_concurrent_executions
4397
            + existing_reserved_concurrent_executions
4398
        ) < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY:
4399
            raise InvalidParameterValueException(
1✔
4400
                f"Specified ReservedConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
4401
            )
4402

4403
        total_provisioned_concurrency = sum(
1✔
4404
            [
4405
                provisioned_configs.provisioned_concurrent_executions
4406
                for provisioned_configs in fn.provisioned_concurrency_configs.values()
4407
            ]
4408
        )
4409
        if total_provisioned_concurrency > reserved_concurrent_executions:
1✔
4410
            raise InvalidParameterValueException(
1✔
4411
                f" ReservedConcurrentExecutions  {reserved_concurrent_executions} should not be lower than function's total provisioned concurrency [{total_provisioned_concurrency}]."
4412
            )
4413

4414
        fn.reserved_concurrent_executions = reserved_concurrent_executions
1✔
4415

4416
        return Concurrency(ReservedConcurrentExecutions=fn.reserved_concurrent_executions)
1✔
4417

4418
    def delete_function_concurrency(
1✔
4419
        self, context: RequestContext, function_name: FunctionName, **kwargs
4420
    ) -> None:
4421
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4422
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4423
        store = lambda_stores[account_id][region]
1✔
4424
        fn = store.functions.get(function_name)
1✔
4425
        fn.reserved_concurrent_executions = None
1✔
4426

4427
    # =======================================
4428
    # ===============  TAGS   ===============
4429
    # =======================================
4430
    # only Function, Event Source Mapping, and Code Signing Config (not currently supported by LocalStack) ARNs
4431
    # are available for tagging in AWS
4432

4433
    @staticmethod
1✔
4434
    def _update_resource_tags(
1✔
4435
        resource_arn: str, account_id: str, region: str, tags: dict[str, str]
4436
    ) -> None:
4437
        lambda_stores[account_id][region].tags.update_tags(resource_arn, tags)
1✔
4438

4439
    @staticmethod
1✔
4440
    def _list_resource_tags(resource_arn: str, account_id: str, region: str) -> dict[str, str]:
1✔
4441
        return lambda_stores[account_id][region].tags.get_tags(resource_arn)
1✔
4442

4443
    @staticmethod
1✔
4444
    def _remove_resource_tags(
1✔
4445
        resource_arn: str, account_id: str, region: str, keys: TagKeyList
4446
    ) -> None:
4447
        lambda_stores[account_id][region].tags.delete_tags(resource_arn, keys)
1✔
4448

4449
    @staticmethod
1✔
4450
    def _remove_all_resource_tags(resource_arn: str, account_id: str, region: str) -> None:
1✔
4451
        lambda_stores[account_id][region].tags.delete_all_tags(resource_arn)
1✔
4452

4453
    def _get_tags(self, resource: TaggableResource) -> dict[str, str]:
1✔
4454
        account_id, region = self._get_account_id_and_region_for_taggable_resource(resource)
1✔
4455
        tags = self._list_resource_tags(resource_arn=resource, account_id=account_id, region=region)
1✔
4456
        return tags
1✔
4457

4458
    def _store_tags(self, resource: TaggableResource, tags: dict[str, str]) -> None:
1✔
4459
        account_id, region = self._get_account_id_and_region_for_taggable_resource(resource)
1✔
4460
        existing_tags = self._list_resource_tags(
1✔
4461
            resource_arn=resource, account_id=account_id, region=region
4462
        )
4463
        if len({**existing_tags, **tags}) > LAMBDA_TAG_LIMIT_PER_RESOURCE:
1✔
4464
            # note: we cannot use | on `ImmutableDict` and regular `dict`
4465
            raise InvalidParameterValueException(
1✔
4466
                "Number of tags exceeds resource tag limit.", Type="User"
4467
            )
4468
        self._update_resource_tags(
1✔
4469
            resource_arn=resource,
4470
            account_id=account_id,
4471
            region=region,
4472
            tags=tags,
4473
        )
4474

4475
    def _remove_tags(self, resource: TaggableResource, keys: TagKeyList) -> None:
1✔
4476
        account_id, region = self._get_account_id_and_region_for_taggable_resource(resource)
1✔
4477
        self._remove_resource_tags(
1✔
4478
            resource_arn=resource, account_id=account_id, region=region, keys=keys
4479
        )
4480

4481
    def _remove_all_tags(self, resource: TaggableResource) -> None:
1✔
4482
        account_id, region = self._get_account_id_and_region_for_taggable_resource(resource)
1✔
4483
        self._remove_all_resource_tags(resource_arn=resource, account_id=account_id, region=region)
1✔
4484

4485
    def _get_account_id_and_region_for_taggable_resource(
1✔
4486
        self, resource: TaggableResource
4487
    ) -> tuple[str, str]:
4488
        """
4489
        Takes a resource ARN for a TaggableResource (Lambda Function, Event Source Mapping, Code Signing Config, or Capacity Provider) and returns a corresponding
4490
        LambdaStore for its region and account.
4491

4492
        In addition, this function validates that the ARN is a valid TaggableResource type, and that the TaggableResource exists.
4493

4494
        Raises:
4495
            ValidationException: If the resource ARN is not a full ARN for a TaggableResource.
4496
            ResourceNotFoundException: If the specified resource does not exist.
4497
            InvalidParameterValueException: If the resource ARN is a qualified Lambda Function.
4498
        """
4499

4500
        def _raise_validation_exception():
1✔
4501
            raise ValidationException(
1✔
4502
                f"1 validation error detected: Value '{resource}' at 'resource' failed to satisfy constraint: Member must satisfy regular expression pattern: {api_utils.TAGGABLE_RESOURCE_ARN_PATTERN}"
4503
            )
4504

4505
        # Check whether the ARN we have been passed is correctly formatted
4506
        parsed_resource_arn: ArnData = None
1✔
4507
        try:
1✔
4508
            parsed_resource_arn = parse_arn(resource)
1✔
4509
        except Exception:
1✔
4510
            _raise_validation_exception()
1✔
4511

4512
        # TODO: Should we be checking whether this is a full ARN?
4513
        region, account_id, resource_type = map(
1✔
4514
            parsed_resource_arn.get, ("region", "account", "resource")
4515
        )
4516

4517
        if not all((region, account_id, resource_type)):
1✔
4518
            _raise_validation_exception()
×
4519

4520
        if not (parts := resource_type.split(":")):
1✔
4521
            _raise_validation_exception()
×
4522

4523
        resource_type, resource_identifier, *qualifier = parts
1✔
4524

4525
        # Qualifier validation raises before checking for NotFound
4526
        if qualifier:
1✔
4527
            if resource_type == "function":
1✔
4528
                raise InvalidParameterValueException(
1✔
4529
                    "Tags on function aliases and versions are not supported. Please specify a function ARN.",
4530
                    Type="User",
4531
                )
4532
            _raise_validation_exception()
1✔
4533

4534
        if resource_type == "event-source-mapping":
1✔
4535
            self._get_esm(resource_identifier, account_id, region)
1✔
4536
        elif resource_type == "code-signing-config":
1✔
4537
            raise NotImplementedError("Resource tagging on CSC not yet implemented.")
4538
        elif resource_type == "function":
1✔
4539
            self._get_function(
1✔
4540
                function_name=resource_identifier, account_id=account_id, region=region
4541
            )
4542
        elif resource_type == "capacity-provider":
1✔
4543
            self._get_capacity_provider(resource_identifier, account_id, region)
1✔
4544
        else:
4545
            _raise_validation_exception()
1✔
4546

4547
        # If no exceptions are raised, assume ARN and referenced resource is valid for tag operations
4548
        return account_id, region
1✔
4549

4550
    def tag_resource(
1✔
4551
        self, context: RequestContext, resource: TaggableResource, tags: Tags, **kwargs
4552
    ) -> None:
4553
        if not tags:
1✔
4554
            raise InvalidParameterValueException(
1✔
4555
                "An error occurred and the request cannot be processed.", Type="User"
4556
            )
4557
        self._store_tags(resource, tags)
1✔
4558

4559
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4560
            "function"
4561
        ):
4562
            name, _, account, region = function_locators_from_arn(resource)
1✔
4563
            function = self._get_function(name, account, region)
1✔
4564
            with function.lock:
1✔
4565
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4566
                latest_version = function.versions["$LATEST"]
1✔
4567
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4568
                    latest_version, config=dataclasses.replace(latest_version.config)
4569
                )
4570

4571
    def list_tags(
1✔
4572
        self, context: RequestContext, resource: TaggableResource, **kwargs
4573
    ) -> ListTagsResponse:
4574
        tags = self._get_tags(resource)
1✔
4575
        return ListTagsResponse(Tags=tags)
1✔
4576

4577
    def untag_resource(
1✔
4578
        self, context: RequestContext, resource: TaggableResource, tag_keys: TagKeyList, **kwargs
4579
    ) -> None:
4580
        if not tag_keys:
1✔
4581
            raise ValidationException(
1✔
4582
                "1 validation error detected: Value null at 'tagKeys' failed to satisfy constraint: Member must not be null"
4583
            )  # should probably be generalized a bit
4584

4585
        self._remove_tags(resource, tag_keys)
1✔
4586

4587
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4588
            "function"
4589
        ):
4590
            name, _, account, region = function_locators_from_arn(resource)
1✔
4591
            function = self._get_function(name, account, region)
1✔
4592
            # TODO: Potential race condition
4593
            with function.lock:
1✔
4594
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4595
                latest_version = function.versions["$LATEST"]
1✔
4596
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4597
                    latest_version, config=dataclasses.replace(latest_version.config)
4598
                )
4599

4600
    # =======================================
4601
    # =======  LEGACY / DEPRECATED   ========
4602
    # =======================================
4603

4604
    def invoke_async(
1✔
4605
        self,
4606
        context: RequestContext,
4607
        function_name: NamespacedFunctionName,
4608
        invoke_args: IO[BlobStream],
4609
        **kwargs,
4610
    ) -> InvokeAsyncResponse:
4611
        """LEGACY API endpoint. Even AWS heavily discourages its usage."""
4612
        raise NotImplementedError
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc