• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 21502801941

29 Jan 2026 06:53PM UTC coverage: 86.962% (+0.007%) from 86.955%
21502801941

push

github

web-flow
Route53: add `Update` support for `RecordSet` resource (#13627)

26 of 28 new or added lines in 1 file covered. (92.86%)

163 existing lines in 4 files now uncovered.

70379 of 80931 relevant lines covered (86.96%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.5
/localstack-core/localstack/services/lambda_/provider.py
1
import base64
1✔
2
import dataclasses
1✔
3
import datetime
1✔
4
import itertools
1✔
5
import json
1✔
6
import logging
1✔
7
import re
1✔
8
import threading
1✔
9
import time
1✔
10
from typing import IO, Any
1✔
11

12
from botocore.exceptions import ClientError
1✔
13

14
from localstack import config
1✔
15
from localstack.aws.api import RequestContext, ServiceException, handler
1✔
16
from localstack.aws.api.lambda_ import (
1✔
17
    AccountLimit,
18
    AccountUsage,
19
    AddLayerVersionPermissionResponse,
20
    AddPermissionRequest,
21
    AddPermissionResponse,
22
    Alias,
23
    AliasConfiguration,
24
    AliasRoutingConfiguration,
25
    AllowedPublishers,
26
    Architecture,
27
    Arn,
28
    Blob,
29
    BlobStream,
30
    CapacityProviderConfig,
31
    CodeSigningConfigArn,
32
    CodeSigningConfigNotFoundException,
33
    CodeSigningPolicies,
34
    CompatibleArchitectures,
35
    CompatibleRuntimes,
36
    Concurrency,
37
    Cors,
38
    CreateCodeSigningConfigResponse,
39
    CreateEventSourceMappingRequest,
40
    CreateFunctionRequest,
41
    CreateFunctionUrlConfigResponse,
42
    DeleteCodeSigningConfigResponse,
43
    DeleteFunctionResponse,
44
    Description,
45
    DestinationConfig,
46
    DurableExecutionName,
47
    EventSourceMappingConfiguration,
48
    FunctionCodeLocation,
49
    FunctionConfiguration,
50
    FunctionEventInvokeConfig,
51
    FunctionName,
52
    FunctionUrlAuthType,
53
    FunctionUrlQualifier,
54
    FunctionVersionLatestPublished,
55
    GetAccountSettingsResponse,
56
    GetCodeSigningConfigResponse,
57
    GetFunctionCodeSigningConfigResponse,
58
    GetFunctionConcurrencyResponse,
59
    GetFunctionRecursionConfigResponse,
60
    GetFunctionResponse,
61
    GetFunctionUrlConfigResponse,
62
    GetLayerVersionPolicyResponse,
63
    GetLayerVersionResponse,
64
    GetPolicyResponse,
65
    GetProvisionedConcurrencyConfigResponse,
66
    InvalidParameterValueException,
67
    InvocationResponse,
68
    InvocationType,
69
    InvokeAsyncResponse,
70
    InvokeMode,
71
    LambdaApi,
72
    LambdaManagedInstancesCapacityProviderConfig,
73
    LastUpdateStatus,
74
    LayerName,
75
    LayerPermissionAllowedAction,
76
    LayerPermissionAllowedPrincipal,
77
    LayersListItem,
78
    LayerVersionArn,
79
    LayerVersionContentInput,
80
    LayerVersionNumber,
81
    LicenseInfo,
82
    ListAliasesResponse,
83
    ListCodeSigningConfigsResponse,
84
    ListEventSourceMappingsResponse,
85
    ListFunctionEventInvokeConfigsResponse,
86
    ListFunctionsByCodeSigningConfigResponse,
87
    ListFunctionsResponse,
88
    ListFunctionUrlConfigsResponse,
89
    ListLayersResponse,
90
    ListLayerVersionsResponse,
91
    ListProvisionedConcurrencyConfigsResponse,
92
    ListTagsResponse,
93
    ListVersionsByFunctionResponse,
94
    LogFormat,
95
    LoggingConfig,
96
    LogType,
97
    MasterRegion,
98
    MaxFunctionEventInvokeConfigListItems,
99
    MaximumEventAgeInSeconds,
100
    MaximumRetryAttempts,
101
    MaxItems,
102
    MaxLayerListItems,
103
    MaxListItems,
104
    MaxProvisionedConcurrencyConfigListItems,
105
    NamespacedFunctionName,
106
    NamespacedStatementId,
107
    NumericLatestPublishedOrAliasQualifier,
108
    OnFailure,
109
    OnSuccess,
110
    OrganizationId,
111
    PackageType,
112
    PositiveInteger,
113
    PreconditionFailedException,
114
    ProvisionedConcurrencyConfigListItem,
115
    ProvisionedConcurrencyConfigNotFoundException,
116
    ProvisionedConcurrencyStatusEnum,
117
    PublishLayerVersionResponse,
118
    PutFunctionCodeSigningConfigResponse,
119
    PutFunctionRecursionConfigResponse,
120
    PutProvisionedConcurrencyConfigResponse,
121
    Qualifier,
122
    RecursiveLoop,
123
    ReservedConcurrentExecutions,
124
    ResourceConflictException,
125
    ResourceNotFoundException,
126
    Runtime,
127
    RuntimeVersionConfig,
128
    SnapStart,
129
    SnapStartApplyOn,
130
    SnapStartOptimizationStatus,
131
    SnapStartResponse,
132
    State,
133
    StatementId,
134
    StateReasonCode,
135
    String,
136
    TaggableResource,
137
    TagKeyList,
138
    Tags,
139
    TenantId,
140
    TracingMode,
141
    UnqualifiedFunctionName,
142
    UpdateCodeSigningConfigResponse,
143
    UpdateEventSourceMappingRequest,
144
    UpdateFunctionCodeRequest,
145
    UpdateFunctionConfigurationRequest,
146
    UpdateFunctionUrlConfigResponse,
147
    VersionWithLatestPublished,
148
)
149
from localstack.aws.api.lambda_ import FunctionVersion as FunctionVersionApi
1✔
150
from localstack.aws.api.lambda_ import ServiceException as LambdaServiceException
1✔
151
from localstack.aws.api.pipes import (
1✔
152
    DynamoDBStreamStartPosition,
153
    KinesisStreamStartPosition,
154
)
155
from localstack.aws.connect import connect_to
1✔
156
from localstack.aws.spec import load_service
1✔
157
from localstack.services.edge import ROUTER
1✔
158
from localstack.services.lambda_ import api_utils
1✔
159
from localstack.services.lambda_ import hooks as lambda_hooks
1✔
160
from localstack.services.lambda_.analytics import (
1✔
161
    FunctionInitializationType,
162
    FunctionOperation,
163
    FunctionStatus,
164
    function_counter,
165
)
166
from localstack.services.lambda_.api_utils import (
1✔
167
    ARCHITECTURES,
168
    STATEMENT_ID_REGEX,
169
    SUBNET_ID_REGEX,
170
    function_locators_from_arn,
171
)
172
from localstack.services.lambda_.event_source_mapping.esm_config_factory import (
1✔
173
    EsmConfigFactory,
174
)
175
from localstack.services.lambda_.event_source_mapping.esm_worker import (
1✔
176
    EsmState,
177
    EsmWorker,
178
)
179
from localstack.services.lambda_.event_source_mapping.esm_worker_factory import (
1✔
180
    EsmWorkerFactory,
181
)
182
from localstack.services.lambda_.event_source_mapping.pipe_utils import get_internal_client
1✔
183
from localstack.services.lambda_.invocation import AccessDeniedException
1✔
184
from localstack.services.lambda_.invocation.execution_environment import (
1✔
185
    EnvironmentStartupTimeoutException,
186
)
187
from localstack.services.lambda_.invocation.lambda_models import (
1✔
188
    AliasRoutingConfig,
189
    CodeSigningConfig,
190
    EventInvokeConfig,
191
    Function,
192
    FunctionResourcePolicy,
193
    FunctionUrlConfig,
194
    FunctionVersion,
195
    ImageConfig,
196
    LambdaEphemeralStorage,
197
    Layer,
198
    LayerPolicy,
199
    LayerPolicyStatement,
200
    LayerVersion,
201
    ProvisionedConcurrencyConfiguration,
202
    RequestEntityTooLargeException,
203
    ResourcePolicy,
204
    UpdateStatus,
205
    ValidationException,
206
    VersionAlias,
207
    VersionFunctionConfiguration,
208
    VersionIdentifier,
209
    VersionState,
210
    VpcConfig,
211
)
212
from localstack.services.lambda_.invocation.lambda_service import (
1✔
213
    LambdaService,
214
    create_image_code,
215
    destroy_code_if_not_used,
216
    lambda_stores,
217
    store_lambda_archive,
218
    store_s3_bucket_archive,
219
)
220
from localstack.services.lambda_.invocation.models import CapacityProvider as CapacityProviderModel
1✔
221
from localstack.services.lambda_.invocation.models import LambdaStore
1✔
222
from localstack.services.lambda_.invocation.runtime_executor import get_runtime_executor
1✔
223
from localstack.services.lambda_.lambda_utils import HINT_LOG
1✔
224
from localstack.services.lambda_.layerfetcher.layer_fetcher import LayerFetcher
1✔
225
from localstack.services.lambda_.provider_utils import (
1✔
226
    LambdaLayerVersionIdentifier,
227
    get_function_version,
228
    get_function_version_from_arn,
229
)
230
from localstack.services.lambda_.runtimes import (
1✔
231
    ALL_RUNTIMES,
232
    DEPRECATED_RUNTIMES,
233
    DEPRECATED_RUNTIMES_UPGRADES,
234
    RUNTIMES_AGGREGATED,
235
    SNAP_START_SUPPORTED_RUNTIMES,
236
    VALID_MANAGED_INSTANCE_RUNTIMES,
237
    VALID_RUNTIMES,
238
)
239
from localstack.services.lambda_.urlrouter import FunctionUrlRouter
1✔
240
from localstack.services.plugins import ServiceLifecycleHook
1✔
241
from localstack.state import StateVisitor
1✔
242
from localstack.utils.aws.arns import (
1✔
243
    ArnData,
244
    capacity_provider_arn,
245
    extract_resource_from_arn,
246
    extract_service_from_arn,
247
    get_partition,
248
    lambda_event_source_mapping_arn,
249
    parse_arn,
250
)
251
from localstack.utils.aws.client_types import ServicePrincipal
1✔
252
from localstack.utils.bootstrap import is_api_enabled
1✔
253
from localstack.utils.collections import PaginatedList, merge_recursive
1✔
254
from localstack.utils.event_matcher import validate_event_pattern
1✔
255
from localstack.utils.strings import get_random_hex, short_uid, to_bytes, to_str
1✔
256
from localstack.utils.sync import poll_condition
1✔
257
from localstack.utils.urls import localstack_host
1✔
258

259
LOG = logging.getLogger(__name__)
1✔
260

261
CAPACITY_PROVIDER_ARN_NAME = "arn:aws[a-zA-Z-]*:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:capacity-provider:[a-zA-Z0-9-_]+"
1✔
262
LAMBDA_DEFAULT_TIMEOUT = 3
1✔
263
LAMBDA_DEFAULT_MEMORY_SIZE = 128
1✔
264

265
LAMBDA_TAG_LIMIT_PER_RESOURCE = 50
1✔
266
LAMBDA_LAYERS_LIMIT_PER_FUNCTION = 5
1✔
267

268
TAG_KEY_CUSTOM_URL = "_custom_id_"
1✔
269
# Requirements (from RFC3986 & co): not longer than 63, first char must be
270
# alpha, then alphanumeric or hyphen, except cannot start or end with hyphen
271
TAG_KEY_CUSTOM_URL_VALIDATOR = re.compile(r"^[A-Za-z]([A-Za-z0-9\-]{0,61}[A-Za-z0-9])?$")
1✔
272

273

274
class LambdaProvider(LambdaApi, ServiceLifecycleHook):
1✔
275
    lambda_service: LambdaService
1✔
276
    create_fn_lock: threading.RLock
1✔
277
    create_layer_lock: threading.RLock
1✔
278
    router: FunctionUrlRouter
1✔
279
    esm_workers: dict[str, EsmWorker]
1✔
280
    layer_fetcher: LayerFetcher | None
1✔
281

282
    def __init__(self) -> None:
1✔
283
        self.lambda_service = LambdaService()
1✔
284
        self.create_fn_lock = threading.RLock()
1✔
285
        self.create_layer_lock = threading.RLock()
1✔
286
        self.router = FunctionUrlRouter(ROUTER, self.lambda_service)
1✔
287
        self.esm_workers = {}
1✔
288
        self.layer_fetcher = None
1✔
289
        lambda_hooks.inject_layer_fetcher.run(self)
1✔
290

291
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
UNCOV
292
        visitor.visit(lambda_stores)
×
293

294
    def on_before_state_reset(self):
1✔
UNCOV
295
        self.lambda_service.stop()
×
296

297
    def on_after_state_reset(self):
1✔
UNCOV
298
        self.router.lambda_service = self.lambda_service = LambdaService()
×
299

300
    def on_before_state_load(self):
1✔
UNCOV
301
        self.lambda_service.stop()
×
302

303
    def on_after_state_load(self):
1✔
304
        self.lambda_service = LambdaService()
×
UNCOV
305
        self.router.lambda_service = self.lambda_service
×
306

307
        for account_id, account_bundle in lambda_stores.items():
×
308
            for region_name, state in account_bundle.items():
×
309
                for fn in state.functions.values():
×
310
                    for fn_version in fn.versions.values():
×
UNCOV
311
                        try:
×
312
                            # $LATEST is not invokable for Lambda functions with a capacity provider
313
                            # and has a different State (i.e., ActiveNonInvokable)
UNCOV
314
                            is_capacity_provider_latest = (
×
315
                                fn_version.config.CapacityProviderConfig
316
                                and fn_version.id.qualifier == "$LATEST"
317
                            )
UNCOV
318
                            if not is_capacity_provider_latest:
×
319
                                # Restore the "Pending" state for the function version and start it
UNCOV
320
                                new_state = VersionState(
×
321
                                    state=State.Pending,
322
                                    code=StateReasonCode.Creating,
323
                                    reason="The function is being created.",
324
                                )
325
                                new_config = dataclasses.replace(fn_version.config, state=new_state)
×
326
                                new_version = dataclasses.replace(fn_version, config=new_config)
×
327
                                fn.versions[fn_version.id.qualifier] = new_version
×
UNCOV
328
                                self.lambda_service.create_function_version(fn_version).result(
×
329
                                    timeout=5
330
                                )
331
                        except Exception:
×
UNCOV
332
                            LOG.warning(
×
333
                                "Failed to restore function version %s",
334
                                fn_version.id.qualified_arn(),
335
                                exc_info=LOG.isEnabledFor(logging.DEBUG),
336
                            )
337
                    # restore provisioned concurrency per function considering both versions and aliases
UNCOV
338
                    for (
×
339
                        provisioned_qualifier,
340
                        provisioned_config,
341
                    ) in fn.provisioned_concurrency_configs.items():
342
                        fn_arn = None
×
343
                        try:
×
344
                            if api_utils.qualifier_is_alias(provisioned_qualifier):
×
345
                                alias = fn.aliases.get(provisioned_qualifier)
×
346
                                resolved_version = fn.versions.get(alias.function_version)
×
347
                                fn_arn = resolved_version.id.qualified_arn()
×
348
                            elif api_utils.qualifier_is_version(provisioned_qualifier):
×
349
                                fn_version = fn.versions.get(provisioned_qualifier)
×
UNCOV
350
                                fn_arn = fn_version.id.qualified_arn()
×
351
                            else:
UNCOV
352
                                raise InvalidParameterValueException(
×
353
                                    "Invalid qualifier type:"
354
                                    " Qualifier can only be an alias or a version for provisioned concurrency."
355
                                )
356

357
                            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
UNCOV
358
                            manager.update_provisioned_concurrency_config(
×
359
                                provisioned_config.provisioned_concurrent_executions
360
                            )
361
                        except Exception:
×
UNCOV
362
                            LOG.warning(
×
363
                                "Failed to restore provisioned concurrency %s for function %s",
364
                                provisioned_config,
365
                                fn_arn,
366
                                exc_info=LOG.isEnabledFor(logging.DEBUG),
367
                            )
368

UNCOV
369
                for esm in state.event_source_mappings.values():
×
370
                    # Restores event source workers
UNCOV
371
                    function_arn = esm.get("FunctionArn")
×
372

373
                    # TODO: How do we know the event source is up?
374
                    # A basic poll to see if the mapped Lambda function is active/failed
UNCOV
375
                    if not poll_condition(
×
376
                        lambda: get_function_version_from_arn(function_arn).config.state.state
377
                        in [State.Active, State.Failed],
378
                        timeout=10,
379
                    ):
UNCOV
380
                        LOG.warning(
×
381
                            "Creating ESM for Lambda that is not in running state: %s",
382
                            function_arn,
383
                        )
384

385
                    function_version = get_function_version_from_arn(function_arn)
×
UNCOV
386
                    function_role = function_version.config.role
×
387

UNCOV
388
                    is_esm_enabled = esm.get("State", EsmState.DISABLED) not in (
×
389
                        EsmState.DISABLED,
390
                        EsmState.DISABLING,
391
                    )
UNCOV
392
                    esm_worker = EsmWorkerFactory(
×
393
                        esm, function_role, is_esm_enabled
394
                    ).get_esm_worker()
395

396
                    # Note: a worker is created in the DISABLED state if not enabled
UNCOV
397
                    esm_worker.create()
×
398
                    # TODO: assigning the esm_worker to the dict only works after .create(). Could it cause a race
399
                    #  condition if we get a shutdown here and have a worker thread spawned but not accounted for?
UNCOV
400
                    self.esm_workers[esm_worker.uuid] = esm_worker
×
401

402
    def on_after_init(self):
1✔
403
        self.router.register_routes()
1✔
404
        get_runtime_executor().validate_environment()
1✔
405

406
    def on_before_stop(self) -> None:
1✔
407
        for esm_worker in self.esm_workers.values():
1✔
408
            esm_worker.stop_for_shutdown()
1✔
409

410
        # TODO: should probably unregister routes?
411
        self.lambda_service.stop()
1✔
412

413
    @staticmethod
1✔
414
    def _get_function(function_name: str, account_id: str, region: str) -> Function:
1✔
415
        state = lambda_stores[account_id][region]
1✔
416
        function = state.functions.get(function_name)
1✔
417
        if not function:
1✔
418
            arn = api_utils.unqualified_lambda_arn(
1✔
419
                function_name=function_name,
420
                account=account_id,
421
                region=region,
422
            )
423
            raise ResourceNotFoundException(
1✔
424
                f"Function not found: {arn}",
425
                Type="User",
426
            )
427
        return function
1✔
428

429
    @staticmethod
1✔
430
    def _get_esm(uuid: str, account_id: str, region: str) -> EventSourceMappingConfiguration:
1✔
431
        state = lambda_stores[account_id][region]
1✔
432
        esm = state.event_source_mappings.get(uuid)
1✔
433
        if not esm:
1✔
434
            arn = lambda_event_source_mapping_arn(uuid, account_id, region)
1✔
435
            raise ResourceNotFoundException(
1✔
436
                f"Event source mapping not found: {arn}",
437
                Type="User",
438
            )
439
        return esm
1✔
440

441
    @staticmethod
1✔
442
    def _get_capacity_provider(
1✔
443
        capacity_provider_name: str,
444
        account_id: str,
445
        region: str,
446
        error_msg_template: str = "Capacity provider not found: {}",
447
    ) -> CapacityProviderModel:
448
        state = lambda_stores[account_id][region]
1✔
449
        cp = state.capacity_providers.get(capacity_provider_name)
1✔
450
        if not cp:
1✔
451
            arn = capacity_provider_arn(capacity_provider_name, account_id, region)
1✔
452
            raise ResourceNotFoundException(
1✔
453
                error_msg_template.format(arn),
454
                Type="User",
455
            )
UNCOV
456
        return cp
×
457

458
    @staticmethod
1✔
459
    def _validate_qualifier_expression(qualifier: str) -> None:
1✔
460
        if error_messages := api_utils.validate_qualifier(qualifier):
1✔
UNCOV
461
            raise ValidationException(
×
462
                message=api_utils.construct_validation_exception_message(error_messages)
463
            )
464

465
    @staticmethod
1✔
466
    def _validate_publish_to(publish_to: str):
1✔
467
        if publish_to != FunctionVersionLatestPublished.LATEST_PUBLISHED:
×
UNCOV
468
            raise ValidationException(
×
469
                message=f"1 validation error detected: Value '{publish_to}' at 'publishTo' failed to satisfy constraint: Member must satisfy enum value set: [LATEST_PUBLISHED]"
470
            )
471

472
    @staticmethod
1✔
473
    def _resolve_fn_qualifier(resolved_fn: Function, qualifier: str | None) -> tuple[str, str]:
1✔
474
        """Attempts to resolve a given qualifier and returns a qualifier that exists or
475
        raises an appropriate ResourceNotFoundException.
476

477
        :param resolved_fn: The resolved lambda function
478
        :param qualifier: The qualifier to be resolved or None
479
        :return: Tuple of (resolved qualifier, function arn either qualified or unqualified)"""
480
        function_name = resolved_fn.function_name
1✔
481
        # assuming function versions need to live in the same account and region
482
        account_id = resolved_fn.latest().id.account
1✔
483
        region = resolved_fn.latest().id.region
1✔
484
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
485
        if qualifier is not None:
1✔
486
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
487
            if api_utils.qualifier_is_alias(qualifier):
1✔
488
                if qualifier not in resolved_fn.aliases:
1✔
489
                    raise ResourceNotFoundException(f"Cannot find alias arn: {fn_arn}", Type="User")
1✔
490
            elif api_utils.qualifier_is_version(qualifier) or qualifier == "$LATEST":
1✔
491
                if qualifier not in resolved_fn.versions:
1✔
492
                    raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
493
            else:
494
                # matches qualifier pattern but invalid alias or version
495
                raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
496
        resolved_qualifier = qualifier or "$LATEST"
1✔
497
        return resolved_qualifier, fn_arn
1✔
498

499
    @staticmethod
1✔
500
    def _function_revision_id(resolved_fn: Function, resolved_qualifier: str) -> str:
1✔
501
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
502
            return resolved_fn.aliases[resolved_qualifier].revision_id
1✔
503
        # Assumes that a non-alias is a version
504
        else:
505
            return resolved_fn.versions[resolved_qualifier].config.revision_id
1✔
506

507
    def _resolve_vpc_id(self, account_id: str, region_name: str, subnet_id: str) -> str:
1✔
508
        ec2_client = connect_to(aws_access_key_id=account_id, region_name=region_name).ec2
1✔
509
        try:
1✔
510
            return ec2_client.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["VpcId"]
1✔
511
        except ec2_client.exceptions.ClientError as e:
1✔
512
            code = e.response["Error"]["Code"]
1✔
513
            message = e.response["Error"]["Message"]
1✔
514
            raise InvalidParameterValueException(
1✔
515
                f"Error occurred while DescribeSubnets. EC2 Error Code: {code}. EC2 Error Message: {message}",
516
                Type="User",
517
            )
518

519
    def _build_vpc_config(
1✔
520
        self,
521
        account_id: str,
522
        region_name: str,
523
        vpc_config: dict | None = None,
524
    ) -> VpcConfig | None:
525
        if not vpc_config or not is_api_enabled("ec2"):
1✔
526
            return None
1✔
527

528
        subnet_ids = vpc_config.get("SubnetIds", [])
1✔
529
        if subnet_ids is not None and len(subnet_ids) == 0:
1✔
530
            return VpcConfig(vpc_id="", security_group_ids=[], subnet_ids=[])
1✔
531

532
        subnet_id = subnet_ids[0]
1✔
533
        if not bool(SUBNET_ID_REGEX.match(subnet_id)):
1✔
534
            raise ValidationException(
1✔
535
                f"1 validation error detected: Value '[{subnet_id}]' at 'vpcConfig.subnetIds' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 0, Member must satisfy regular expression pattern: subnet-[0-9a-z]*]"
536
            )
537

538
        return VpcConfig(
1✔
539
            vpc_id=self._resolve_vpc_id(account_id, region_name, subnet_id),
540
            security_group_ids=vpc_config.get("SecurityGroupIds", []),
541
            subnet_ids=subnet_ids,
542
        )
543

544
    def _create_version_model(
1✔
545
        self,
546
        function_name: str,
547
        region: str,
548
        account_id: str,
549
        description: str | None = None,
550
        revision_id: str | None = None,
551
        code_sha256: str | None = None,
552
        publish_to: FunctionVersionLatestPublished | None = None,
553
        is_active: bool = False,
554
    ) -> tuple[FunctionVersion, bool]:
555
        """
556
        Release a new version to the model if all restrictions are met.
557
        Restrictions:
558
          - CodeSha256, if provided, must equal the current latest version code hash
559
          - RevisionId, if provided, must equal the current latest version revision id
560
          - Some changes have been done to the latest version since last publish
561
        Will return a tuple of the version, and whether the version was published (True) or the latest available version was taken (False).
562
        This can happen if the latest version has not been changed since the last version publish, in this case the last version will be returned.
563

564
        :param function_name: Function name to be published
565
        :param region: Region of the function
566
        :param account_id: Account of the function
567
        :param description: new description of the version (will be the description of the function if missing)
568
        :param revision_id: Revision id, function will raise error if it does not match latest revision id
569
        :param code_sha256: Code sha256, function will raise error if it does not match latest code hash
570
        :return: Tuple of (published version, whether version was released or last released version returned, since nothing changed)
571
        """
572
        current_latest_version = get_function_version(
1✔
573
            function_name=function_name, qualifier="$LATEST", account_id=account_id, region=region
574
        )
575
        if revision_id and current_latest_version.config.revision_id != revision_id:
1✔
576
            raise PreconditionFailedException(
1✔
577
                "The Revision Id provided does not match the latest Revision Id. Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
578
                Type="User",
579
            )
580

581
        # check if code hashes match if they are specified
582
        current_hash = (
1✔
583
            current_latest_version.config.code.code_sha256
584
            if current_latest_version.config.package_type == PackageType.Zip
585
            else current_latest_version.config.image.code_sha256
586
        )
587
        # if the code is a zip package and hot reloaded (hot reloading is currently only supported for zip packagetypes)
588
        # we cannot enforce the codesha256 check
589
        is_hot_reloaded_zip_package = (
1✔
590
            current_latest_version.config.package_type == PackageType.Zip
591
            and current_latest_version.config.code.is_hot_reloading()
592
        )
593
        if code_sha256 and current_hash != code_sha256 and not is_hot_reloaded_zip_package:
1✔
594
            raise InvalidParameterValueException(
1✔
595
                f"CodeSHA256 ({code_sha256}) is different from current CodeSHA256 in $LATEST ({current_hash}). Please try again with the CodeSHA256 in $LATEST.",
596
                Type="User",
597
            )
598

599
        state = lambda_stores[account_id][region]
1✔
600
        function = state.functions.get(function_name)
1✔
601
        changes = {}
1✔
602
        if description is not None:
1✔
603
            changes["description"] = description
1✔
604
        # TODO copy environment instead of restarting one, get rid of all the "Pending"s
605

606
        with function.lock:
1✔
607
            if function.next_version > 1 and (
1✔
608
                prev_version := function.versions.get(str(function.next_version - 1))
609
            ):
610
                if (
1✔
611
                    prev_version.config.internal_revision
612
                    == current_latest_version.config.internal_revision
613
                ):
614
                    return prev_version, False
1✔
615
            # TODO check if there was a change since last version
616
            if publish_to == FunctionVersionLatestPublished.LATEST_PUBLISHED:
1✔
UNCOV
617
                qualifier = "$LATEST.PUBLISHED"
×
618
            else:
619
                qualifier = str(function.next_version)
1✔
620
                function.next_version += 1
1✔
621
            new_id = VersionIdentifier(
1✔
622
                function_name=function_name,
623
                qualifier=qualifier,
624
                region=region,
625
                account=account_id,
626
            )
627

628
            if current_latest_version.config.CapacityProviderConfig:
1✔
629
                # for lambda managed functions, snap start is not supported
UNCOV
630
                snap_start = None
×
631
            else:
632
                apply_on = current_latest_version.config.snap_start["ApplyOn"]
1✔
633
                optimization_status = SnapStartOptimizationStatus.Off
1✔
634
                if apply_on == SnapStartApplyOn.PublishedVersions:
1✔
UNCOV
635
                    optimization_status = SnapStartOptimizationStatus.On
×
636
                snap_start = SnapStartResponse(
1✔
637
                    ApplyOn=apply_on,
638
                    OptimizationStatus=optimization_status,
639
                )
640

641
            last_update = None
1✔
642
            new_state = VersionState(
1✔
643
                state=State.Pending,
644
                code=StateReasonCode.Creating,
645
                reason="The function is being created.",
646
            )
647
            if publish_to == FunctionVersionLatestPublished.LATEST_PUBLISHED:
1✔
UNCOV
648
                last_update = UpdateStatus(
×
649
                    status=LastUpdateStatus.InProgress,
650
                    code="Updating",
651
                    reason="The function is being updated.",
652
                )
653
                if is_active:
×
UNCOV
654
                    new_state = VersionState(state=State.Active)
×
655
            new_version = dataclasses.replace(
1✔
656
                current_latest_version,
657
                config=dataclasses.replace(
658
                    current_latest_version.config,
659
                    last_update=last_update,
660
                    state=new_state,
661
                    snap_start=snap_start,
662
                    **changes,
663
                ),
664
                id=new_id,
665
            )
666
            function.versions[qualifier] = new_version
1✔
667
        return new_version, True
1✔
668

669
    def _publish_version_from_existing_version(
1✔
670
        self,
671
        function_name: str,
672
        region: str,
673
        account_id: str,
674
        description: str | None = None,
675
        revision_id: str | None = None,
676
        code_sha256: str | None = None,
677
        publish_to: FunctionVersionLatestPublished | None = None,
678
    ) -> FunctionVersion:
679
        """
680
        Publish version from an existing, already initialized LATEST
681

682
        :param function_name: Function name
683
        :param region: region
684
        :param account_id: account id
685
        :param description: description
686
        :param revision_id: revision id (check if current version matches)
687
        :param code_sha256: code sha (check if current code matches)
688
        :return: new version
689
        """
690
        is_active = True if publish_to == FunctionVersionLatestPublished.LATEST_PUBLISHED else False
1✔
691
        new_version, changed = self._create_version_model(
1✔
692
            function_name=function_name,
693
            region=region,
694
            account_id=account_id,
695
            description=description,
696
            revision_id=revision_id,
697
            code_sha256=code_sha256,
698
            publish_to=publish_to,
699
            is_active=is_active,
700
        )
701
        if not changed:
1✔
702
            return new_version
1✔
703

704
        if new_version.config.CapacityProviderConfig:
1✔
UNCOV
705
            self.lambda_service.publish_version_async(new_version)
×
706
        else:
707
            self.lambda_service.publish_version(new_version)
1✔
708
        state = lambda_stores[account_id][region]
1✔
709
        function = state.functions.get(function_name)
1✔
710

711
        # Update revision id for $LATEST version
712
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
713
        latest_version = function.versions["$LATEST"]
1✔
714
        function.versions["$LATEST"] = dataclasses.replace(
1✔
715
            latest_version, config=dataclasses.replace(latest_version.config)
716
        )
717
        if new_version.config.CapacityProviderConfig:
1✔
718
            # publish_version happens async for functions with a capacity provider.
719
            # Therefore, we return the new_version with State=Pending or LastUpdateStatus=InProgress ($LATEST.PUBLISHED)
UNCOV
720
            return new_version
×
721
        else:
722
            # Regular functions yield an Active state modified during `publish_version` (sync).
723
            # Therefore, we need to get the updated version from the store.
724
            updated_version = function.versions.get(new_version.id.qualifier)
1✔
725
            return updated_version
1✔
726

727
    def _publish_version_with_changes(
1✔
728
        self,
729
        function_name: str,
730
        region: str,
731
        account_id: str,
732
        description: str | None = None,
733
        revision_id: str | None = None,
734
        code_sha256: str | None = None,
735
        publish_to: FunctionVersionLatestPublished | None = None,
736
        is_active: bool = False,
737
    ) -> FunctionVersion:
738
        """
739
        Publish version together with a new latest version (publish on create / update)
740

741
        :param function_name: Function name
742
        :param region: region
743
        :param account_id: account id
744
        :param description: description
745
        :param revision_id: revision id (check if current version matches)
746
        :param code_sha256: code sha (check if current code matches)
747
        :return: new version
748
        """
749
        new_version, changed = self._create_version_model(
1✔
750
            function_name=function_name,
751
            region=region,
752
            account_id=account_id,
753
            description=description,
754
            revision_id=revision_id,
755
            code_sha256=code_sha256,
756
            publish_to=publish_to,
757
            is_active=is_active,
758
        )
759
        if not changed:
1✔
UNCOV
760
            return new_version
×
761
        self.lambda_service.create_function_version(new_version)
1✔
762
        return new_version
1✔
763

764
    @staticmethod
1✔
765
    def _verify_env_variables(env_vars: dict[str, str]):
1✔
766
        dumped_env_vars = json.dumps(env_vars, separators=(",", ":"))
1✔
767
        if (
1✔
768
            len(dumped_env_vars.encode("utf-8"))
769
            > config.LAMBDA_LIMITS_MAX_FUNCTION_ENVVAR_SIZE_BYTES
770
        ):
771
            raise InvalidParameterValueException(
1✔
772
                f"Lambda was unable to configure your environment variables because the environment variables you have provided exceeded the 4KB limit. String measured: {dumped_env_vars}",
773
                Type="User",
774
            )
775

776
    @staticmethod
1✔
777
    def _validate_snapstart(snap_start: SnapStart, runtime: Runtime):
1✔
778
        apply_on = snap_start.get("ApplyOn")
1✔
779
        if apply_on not in [
1✔
780
            SnapStartApplyOn.PublishedVersions,
781
            SnapStartApplyOn.None_,
782
        ]:
783
            raise ValidationException(
1✔
784
                f"1 validation error detected: Value '{apply_on}' at 'snapStart.applyOn' failed to satisfy constraint: Member must satisfy enum value set: [PublishedVersions, None]"
785
            )
786

787
        if runtime not in SNAP_START_SUPPORTED_RUNTIMES:
1✔
UNCOV
788
            raise InvalidParameterValueException(
×
789
                f"{runtime} is not supported for SnapStart enabled functions.", Type="User"
790
            )
791

792
    def _validate_layers(self, new_layers: list[str], region: str, account_id: str):
1✔
793
        if len(new_layers) > LAMBDA_LAYERS_LIMIT_PER_FUNCTION:
1✔
794
            raise InvalidParameterValueException(
1✔
795
                "Cannot reference more than 5 layers.", Type="User"
796
            )
797

798
        visited_layers = {}
1✔
799
        for layer_version_arn in new_layers:
1✔
800
            (
1✔
801
                layer_region,
802
                layer_account_id,
803
                layer_name,
804
                layer_version_str,
805
            ) = api_utils.parse_layer_arn(layer_version_arn)
806
            if layer_version_str is None:
1✔
807
                raise ValidationException(
1✔
808
                    f"1 validation error detected: Value '[{layer_version_arn}]'"
809
                    + " at 'layers' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 2048, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: "
810
                    + "(arn:(aws[a-zA-Z-]*)?:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+), Member must not be null]",
811
                )
812

813
            state = lambda_stores[layer_account_id][layer_region]
1✔
814
            layer = state.layers.get(layer_name)
1✔
815
            layer_version = None
1✔
816
            if layer is not None:
1✔
817
                layer_version = layer.layer_versions.get(layer_version_str)
1✔
818
            if layer_account_id == account_id:
1✔
819
                if region and layer_region != region:
1✔
820
                    raise InvalidParameterValueException(
1✔
821
                        f"Layers are not in the same region as the function. "
822
                        f"Layers are expected to be in region {region}.",
823
                        Type="User",
824
                    )
825
                if layer is None or layer.layer_versions.get(layer_version_str) is None:
1✔
826
                    raise InvalidParameterValueException(
1✔
827
                        f"Layer version {layer_version_arn} does not exist.", Type="User"
828
                    )
829
            else:  # External layer from other account
830
                # TODO: validate IAM layer policy here, allowing access by default for now and only checking region
UNCOV
831
                if region and layer_region != region:
×
832
                    # TODO: detect user or role from context when IAM users are implemented
833
                    user = "user/localstack-testing"
×
UNCOV
834
                    raise AccessDeniedException(
×
835
                        f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
836
                    )
UNCOV
837
                if layer is None or layer_version is None:
×
838
                    # Limitation: cannot fetch external layers when using the same account id as the target layer
839
                    # because we do not want to trigger the layer fetcher for every non-existing layer.
UNCOV
840
                    if self.layer_fetcher is None:
×
841
                        raise NotImplementedError(
842
                            "Fetching shared layers from AWS is a pro feature."
843
                        )
844

845
                    layer = self.layer_fetcher.fetch_layer(layer_version_arn)
×
UNCOV
846
                    if layer is None:
×
847
                        # TODO: detect user or role from context when IAM users are implemented
848
                        user = "user/localstack-testing"
×
UNCOV
849
                        raise AccessDeniedException(
×
850
                            f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
851
                        )
852

853
                    # Distinguish between new layer and new layer version
UNCOV
854
                    if layer_version is None:
×
855
                        # Create whole layer from scratch
UNCOV
856
                        state.layers[layer_name] = layer
×
857
                    else:
858
                        # Create layer version if another version of the same layer already exists
UNCOV
859
                        state.layers[layer_name].layer_versions[layer_version_str] = (
×
860
                            layer.layer_versions.get(layer_version_str)
861
                        )
862

863
            # only the first two matches in the array are considered for the error message
864
            layer_arn = ":".join(layer_version_arn.split(":")[:-1])
1✔
865
            if layer_arn in visited_layers:
1✔
866
                conflict_layer_version_arn = visited_layers[layer_arn]
1✔
867
                raise InvalidParameterValueException(
1✔
868
                    f"Two different versions of the same layer are not allowed to be referenced in the same function. {conflict_layer_version_arn} and {layer_version_arn} are versions of the same layer.",
869
                    Type="User",
870
                )
871
            visited_layers[layer_arn] = layer_version_arn
1✔
872

873
    def _validate_capacity_provider_config(
1✔
874
        self, capacity_provider_config: CapacityProviderConfig, context: RequestContext
875
    ):
876
        if not capacity_provider_config.get("LambdaManagedInstancesCapacityProviderConfig"):
×
UNCOV
877
            raise ValidationException(
×
878
                "1 validation error detected: Value null at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig' failed to satisfy constraint: Member must not be null"
879
            )
880

UNCOV
881
        capacity_provider_arn = capacity_provider_config.get(
×
882
            "LambdaManagedInstancesCapacityProviderConfig", {}
883
        ).get("CapacityProviderArn")
884
        if not capacity_provider_arn:
×
UNCOV
885
            raise ValidationException(
×
886
                "1 validation error detected: Value null at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig.capacityProviderArn' failed to satisfy constraint: Member must not be null"
887
            )
888

889
        if not re.match(CAPACITY_PROVIDER_ARN_NAME, capacity_provider_arn):
×
UNCOV
890
            raise ValidationException(
×
891
                f"1 validation error detected: Value '{capacity_provider_arn}' at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig.capacityProviderArn' failed to satisfy constraint: Member must satisfy regular expression pattern: {CAPACITY_PROVIDER_ARN_NAME}"
892
            )
893

894
        capacity_provider_name = capacity_provider_arn.split(":")[-1]
×
UNCOV
895
        self.get_capacity_provider(context, capacity_provider_name)
×
896

897
    @staticmethod
1✔
898
    def map_layers(new_layers: list[str]) -> list[LayerVersion]:
1✔
899
        layers = []
1✔
900
        for layer_version_arn in new_layers:
1✔
901
            region_name, account_id, layer_name, layer_version = api_utils.parse_layer_arn(
1✔
902
                layer_version_arn
903
            )
904
            layer = lambda_stores[account_id][region_name].layers.get(layer_name)
1✔
905
            layer_version = layer.layer_versions.get(layer_version)
1✔
906
            layers.append(layer_version)
1✔
907
        return layers
1✔
908

909
    def get_function_recursion_config(
1✔
910
        self,
911
        context: RequestContext,
912
        function_name: UnqualifiedFunctionName,
913
        **kwargs,
914
    ) -> GetFunctionRecursionConfigResponse:
915
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
916
        function_name = api_utils.get_function_name(function_name, context)
1✔
917
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
918
        return GetFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
919

920
    def put_function_recursion_config(
1✔
921
        self,
922
        context: RequestContext,
923
        function_name: UnqualifiedFunctionName,
924
        recursive_loop: RecursiveLoop,
925
        **kwargs,
926
    ) -> PutFunctionRecursionConfigResponse:
927
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
928
        function_name = api_utils.get_function_name(function_name, context)
1✔
929

930
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
931

932
        allowed_values = list(RecursiveLoop.__members__.values())
1✔
933
        if recursive_loop not in allowed_values:
1✔
934
            raise ValidationException(
1✔
935
                f"1 validation error detected: Value '{recursive_loop}' at 'recursiveLoop' failed to satisfy constraint: "
936
                f"Member must satisfy enum value set: [Terminate, Allow]"
937
            )
938

939
        fn.recursive_loop = recursive_loop
1✔
940
        return PutFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
941

942
    @handler(operation="CreateFunction", expand=False)
1✔
943
    def create_function(
1✔
944
        self,
945
        context: RequestContext,
946
        request: CreateFunctionRequest,
947
    ) -> FunctionConfiguration:
948
        context_region = context.region
1✔
949
        context_account_id = context.account_id
1✔
950

951
        zip_file = request.get("Code", {}).get("ZipFile")
1✔
952
        if zip_file and len(zip_file) > config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED:
1✔
953
            raise RequestEntityTooLargeException(
1✔
954
                f"Zipped size must be smaller than {config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED} bytes"
955
            )
956

957
        if context.request.content_length > config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE:
1✔
958
            raise RequestEntityTooLargeException(
1✔
959
                f"Request must be smaller than {config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE} bytes for the CreateFunction operation"
960
            )
961

962
        if architectures := request.get("Architectures"):
1✔
963
            if len(architectures) != 1:
1✔
964
                raise ValidationException(
1✔
965
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
966
                    f"satisfy constraint: Member must have length less than or equal to 1",
967
                )
968
            if architectures[0] not in ARCHITECTURES:
1✔
969
                raise ValidationException(
1✔
970
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
971
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
972
                    f"[x86_64, arm64], Member must not be null]",
973
                )
974

975
        if env_vars := request.get("Environment", {}).get("Variables"):
1✔
976
            self._verify_env_variables(env_vars)
1✔
977

978
        if layers := request.get("Layers", []):
1✔
979
            self._validate_layers(layers, region=context_region, account_id=context_account_id)
1✔
980

981
        if not api_utils.is_role_arn(request.get("Role")):
1✔
982
            raise ValidationException(
1✔
983
                f"1 validation error detected: Value '{request.get('Role')}'"
984
                + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
985
            )
986
        if not self.lambda_service.can_assume_role(request.get("Role"), context.region):
1✔
UNCOV
987
            raise InvalidParameterValueException(
×
988
                "The role defined for the function cannot be assumed by Lambda.", Type="User"
989
            )
990
        package_type = request.get("PackageType", PackageType.Zip)
1✔
991
        runtime = request.get("Runtime")
1✔
992
        self._validate_runtime(package_type, runtime)
1✔
993

994
        request_function_name = request.get("FunctionName")
1✔
995

996
        function_name, *_ = api_utils.get_name_and_qualifier(
1✔
997
            function_arn_or_name=request_function_name,
998
            qualifier=None,
999
            context=context,
1000
        )
1001

1002
        if runtime in DEPRECATED_RUNTIMES:
1✔
1003
            LOG.warning(
1✔
1004
                "The Lambda runtime %s} is deprecated. "
1005
                "Please upgrade the runtime for the function %s: "
1006
                "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
1007
                runtime,
1008
                function_name,
1009
            )
1010
        if snap_start := request.get("SnapStart"):
1✔
1011
            self._validate_snapstart(snap_start, runtime)
1✔
1012
        if publish_to := request.get("PublishTo"):
1✔
UNCOV
1013
            self._validate_publish_to(publish_to)
×
1014
        state = lambda_stores[context_account_id][context_region]
1✔
1015

1016
        with self.create_fn_lock:
1✔
1017
            if function_name in state.functions:
1✔
UNCOV
1018
                raise ResourceConflictException(f"Function already exist: {function_name}")
×
1019
            fn = Function(function_name=function_name)
1✔
1020
            arn = VersionIdentifier(
1✔
1021
                function_name=function_name,
1022
                qualifier="$LATEST",
1023
                region=context_region,
1024
                account=context_account_id,
1025
            )
1026
            # save function code to s3
1027
            code = None
1✔
1028
            image = None
1✔
1029
            image_config = None
1✔
1030
            runtime_version_config = RuntimeVersionConfig(
1✔
1031
                # Limitation: the runtime id (presumably sha256 of image) is currently hardcoded
1032
                # Potential implementation: provide (cached) sha256 hash of used Docker image
1033
                RuntimeVersionArn=f"arn:{context.partition}:lambda:{context_region}::runtime:8eeff65f6809a3ce81507fe733fe09b835899b99481ba22fd75b5a7338290ec1"
1034
            )
1035
            request_code = request.get("Code")
1✔
1036
            if package_type == PackageType.Zip:
1✔
1037
                # TODO verify if correct combination of code is set
1038
                if zip_file := request_code.get("ZipFile"):
1✔
1039
                    code = store_lambda_archive(
1✔
1040
                        archive_file=zip_file,
1041
                        function_name=function_name,
1042
                        region_name=context_region,
1043
                        account_id=context_account_id,
1044
                    )
1045
                elif s3_bucket := request_code.get("S3Bucket"):
1✔
1046
                    s3_key = request_code["S3Key"]
1✔
1047
                    s3_object_version = request_code.get("S3ObjectVersion")
1✔
1048
                    code = store_s3_bucket_archive(
1✔
1049
                        archive_bucket=s3_bucket,
1050
                        archive_key=s3_key,
1051
                        archive_version=s3_object_version,
1052
                        function_name=function_name,
1053
                        region_name=context_region,
1054
                        account_id=context_account_id,
1055
                    )
1056
                else:
UNCOV
1057
                    raise LambdaServiceException("A ZIP file or S3 bucket is required")
×
1058
            elif package_type == PackageType.Image:
1✔
1059
                image = request_code.get("ImageUri")
1✔
1060
                if not image:
1✔
UNCOV
1061
                    raise LambdaServiceException(
×
1062
                        "An image is required when the package type is set to 'image'"
1063
                    )
1064
                image = create_image_code(image_uri=image)
1✔
1065

1066
                image_config_req = request.get("ImageConfig", {})
1✔
1067
                image_config = ImageConfig(
1✔
1068
                    command=image_config_req.get("Command"),
1069
                    entrypoint=image_config_req.get("EntryPoint"),
1070
                    working_directory=image_config_req.get("WorkingDirectory"),
1071
                )
1072
                # Runtime management controls are not available when providing a custom image
1073
                runtime_version_config = None
1✔
1074

1075
            capacity_provider_config = None
1✔
1076
            memory_size = request.get("MemorySize", LAMBDA_DEFAULT_MEMORY_SIZE)
1✔
1077
            if "CapacityProviderConfig" in request:
1✔
1078
                capacity_provider_config = request["CapacityProviderConfig"]
×
UNCOV
1079
                self._validate_capacity_provider_config(capacity_provider_config, context)
×
1080
                self._validate_managed_instances_runtime(runtime)
×
1081

UNCOV
1082
                default_config = CapacityProviderConfig(
×
1083
                    LambdaManagedInstancesCapacityProviderConfig=LambdaManagedInstancesCapacityProviderConfig(
1084
                        ExecutionEnvironmentMemoryGiBPerVCpu=2.0,
1085
                        PerExecutionEnvironmentMaxConcurrency=16,
1086
                    )
1087
                )
1088
                capacity_provider_config = merge_recursive(default_config, capacity_provider_config)
×
1089
                memory_size = 2048
×
UNCOV
1090
                if request.get("LoggingConfig", {}).get("LogFormat") == LogFormat.Text:
×
UNCOV
1091
                    raise InvalidParameterValueException(
×
1092
                        'LogLevel is not supported when LogFormat is set to "Text". Remove LogLevel from your request or change the LogFormat to "JSON" and try again.',
1093
                        Type="User",
1094
                    )
1095
            if "LoggingConfig" in request:
1✔
1096
                logging_config = request["LoggingConfig"]
1✔
1097
                LOG.warning(
1✔
1098
                    "Advanced Lambda Logging Configuration is currently mocked "
1099
                    "and will not impact the logging behavior. "
1100
                    "Please create a feature request if needed."
1101
                )
1102

1103
                # when switching to JSON, app and system level log is auto set to INFO
1104
                if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
1105
                    logging_config = {
1✔
1106
                        "ApplicationLogLevel": "INFO",
1107
                        "SystemLogLevel": "INFO",
1108
                        "LogGroup": f"/aws/lambda/{function_name}",
1109
                    } | logging_config
1110
                else:
UNCOV
1111
                    logging_config = (
×
1112
                        LoggingConfig(
1113
                            LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
1114
                        )
1115
                        | logging_config
1116
                    )
1117

1118
            elif capacity_provider_config:
1✔
UNCOV
1119
                logging_config = LoggingConfig(
×
1120
                    LogFormat=LogFormat.JSON,
1121
                    LogGroup=f"/aws/lambda/{function_name}",
1122
                    ApplicationLogLevel="INFO",
1123
                    SystemLogLevel="INFO",
1124
                )
1125
            else:
1126
                logging_config = LoggingConfig(
1✔
1127
                    LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
1128
                )
1129
            snap_start = (
1✔
1130
                None
1131
                if capacity_provider_config
1132
                else SnapStartResponse(
1133
                    ApplyOn=request.get("SnapStart", {}).get("ApplyOn", SnapStartApplyOn.None_),
1134
                    OptimizationStatus=SnapStartOptimizationStatus.Off,
1135
                )
1136
            )
1137
            version = FunctionVersion(
1✔
1138
                id=arn,
1139
                config=VersionFunctionConfiguration(
1140
                    last_modified=api_utils.format_lambda_date(datetime.datetime.now()),
1141
                    description=request.get("Description", ""),
1142
                    role=request["Role"],
1143
                    timeout=request.get("Timeout", LAMBDA_DEFAULT_TIMEOUT),
1144
                    runtime=request.get("Runtime"),
1145
                    memory_size=memory_size,
1146
                    handler=request.get("Handler"),
1147
                    package_type=package_type,
1148
                    environment=env_vars,
1149
                    architectures=request.get("Architectures") or [Architecture.x86_64],
1150
                    tracing_config_mode=request.get("TracingConfig", {}).get(
1151
                        "Mode", TracingMode.PassThrough
1152
                    ),
1153
                    image=image,
1154
                    image_config=image_config,
1155
                    code=code,
1156
                    layers=self.map_layers(layers),
1157
                    internal_revision=short_uid(),
1158
                    ephemeral_storage=LambdaEphemeralStorage(
1159
                        size=request.get("EphemeralStorage", {}).get("Size", 512)
1160
                    ),
1161
                    snap_start=snap_start,
1162
                    runtime_version_config=runtime_version_config,
1163
                    dead_letter_arn=request.get("DeadLetterConfig", {}).get("TargetArn"),
1164
                    vpc_config=self._build_vpc_config(
1165
                        context_account_id, context_region, request.get("VpcConfig")
1166
                    ),
1167
                    state=VersionState(
1168
                        state=State.Pending,
1169
                        code=StateReasonCode.Creating,
1170
                        reason="The function is being created.",
1171
                    ),
1172
                    logging_config=logging_config,
1173
                    # TODO: might need something like **optional_kwargs if None
1174
                    #   -> Test with regular GetFunction (i.e., without a capacity provider)
1175
                    CapacityProviderConfig=capacity_provider_config,
1176
                ),
1177
            )
1178
            version_post_response = None
1✔
1179
            if capacity_provider_config:
1✔
UNCOV
1180
                version_post_response = dataclasses.replace(
×
1181
                    version,
1182
                    config=dataclasses.replace(
1183
                        version.config,
1184
                        last_update=UpdateStatus(status=LastUpdateStatus.Successful),
1185
                        state=VersionState(state=State.ActiveNonInvocable),
1186
                    ),
1187
                )
1188
            fn.versions["$LATEST"] = version_post_response or version
1✔
1189
            state.functions[function_name] = fn
1✔
1190
        initialization_type = (
1✔
1191
            FunctionInitializationType.lambda_managed_instances
1192
            if capacity_provider_config
1193
            else FunctionInitializationType.on_demand
1194
        )
1195
        function_counter.labels(
1✔
1196
            operation=FunctionOperation.create,
1197
            runtime=runtime or "n/a",
1198
            status=FunctionStatus.success,
1199
            invocation_type="n/a",
1200
            package_type=package_type,
1201
            initialization_type=initialization_type,
1202
        )
1203
        # TODO: consider potential other side effects of not having a function version for $LATEST
1204
        # Provisioning happens upon publishing for functions using a capacity provider
1205
        if not capacity_provider_config:
1✔
1206
            self.lambda_service.create_function_version(version)
1✔
1207

1208
        if tags := request.get("Tags"):
1✔
1209
            # This will check whether the function exists.
1210
            self._store_tags(arn.unqualified_arn(), tags)
1✔
1211

1212
        if request.get("Publish"):
1✔
1213
            version = self._publish_version_with_changes(
1✔
1214
                function_name=function_name,
1215
                region=context_region,
1216
                account_id=context_account_id,
1217
                publish_to=request.get("PublishTo"),
1218
            )
1219

1220
        if config.LAMBDA_SYNCHRONOUS_CREATE:
1✔
1221
            # block via retrying until "terminal" condition reached before returning
UNCOV
1222
            if not poll_condition(
×
1223
                lambda: get_function_version(
1224
                    function_name, version.id.qualifier, version.id.account, version.id.region
1225
                ).config.state.state
1226
                in [State.Active, State.ActiveNonInvocable, State.Failed],
1227
                timeout=10,
1228
            ):
UNCOV
1229
                LOG.warning(
×
1230
                    "LAMBDA_SYNCHRONOUS_CREATE is active, but waiting for %s reached timeout.",
1231
                    function_name,
1232
                )
1233

1234
        return api_utils.map_config_out(
1✔
1235
            version, return_qualified_arn=False, return_update_status=False
1236
        )
1237

1238
    def _validate_runtime(self, package_type, runtime):
1✔
1239
        runtimes = ALL_RUNTIMES
1✔
1240
        if config.LAMBDA_RUNTIME_VALIDATION:
1✔
1241
            runtimes = list(itertools.chain(RUNTIMES_AGGREGATED.values()))
1✔
1242

1243
        if package_type == PackageType.Zip and runtime not in runtimes:
1✔
1244
            # deprecated runtimes have different error
1245
            if runtime in DEPRECATED_RUNTIMES:
1✔
1246
                HINT_LOG.info(
1✔
1247
                    "Set env variable LAMBDA_RUNTIME_VALIDATION to 0"
1248
                    " in order to allow usage of deprecated runtimes"
1249
                )
1250
                self._check_for_recomended_migration_target(runtime)
1✔
1251

1252
            raise InvalidParameterValueException(
1✔
1253
                f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1254
                Type="User",
1255
            )
1256

1257
    def _validate_managed_instances_runtime(self, runtime):
1✔
UNCOV
1258
        if runtime not in VALID_MANAGED_INSTANCE_RUNTIMES:
×
UNCOV
1259
            raise InvalidParameterValueException(
×
1260
                f"Runtime Enum {runtime} does not support specified feature: Lambda Managed Instances"
1261
            )
1262

1263
    def _check_for_recomended_migration_target(self, deprecated_runtime):
1✔
1264
        # AWS offers recommended runtime for migration for "newly" deprecated runtimes
1265
        # in order to preserve parity with error messages we need the code bellow
1266
        latest_runtime = DEPRECATED_RUNTIMES_UPGRADES.get(deprecated_runtime)
1✔
1267

1268
        if latest_runtime is not None:
1✔
1269
            LOG.debug(
1✔
1270
                "The Lambda runtime %s is deprecated. Please upgrade to a supported Lambda runtime such as %s.",
1271
                deprecated_runtime,
1272
                latest_runtime,
1273
            )
1274
            raise InvalidParameterValueException(
1✔
1275
                f"The runtime parameter of {deprecated_runtime} is no longer supported for creating or updating AWS Lambda functions. We recommend you use a supported runtime while creating or updating functions.",
1276
                Type="User",
1277
            )
1278

1279
    @handler(operation="UpdateFunctionConfiguration", expand=False)
1✔
1280
    def update_function_configuration(
1✔
1281
        self, context: RequestContext, request: UpdateFunctionConfigurationRequest
1282
    ) -> FunctionConfiguration:
1283
        """updates the $LATEST version of the function"""
1284
        function_name = request.get("FunctionName")
1✔
1285

1286
        # in case we got ARN or partial ARN
1287
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1288
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1289
        state = lambda_stores[account_id][region]
1✔
1290

1291
        if function_name not in state.functions:
1✔
UNCOV
1292
            raise ResourceNotFoundException(
×
1293
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1294
                Type="User",
1295
            )
1296
        function = state.functions[function_name]
1✔
1297

1298
        # TODO: lock modification of latest version
1299
        # TODO: notify service for changes relevant to re-provisioning of $LATEST
1300
        latest_version = function.latest()
1✔
1301
        latest_version_config = latest_version.config
1✔
1302

1303
        revision_id = request.get("RevisionId")
1✔
1304
        if revision_id and revision_id != latest_version.config.revision_id:
1✔
1305
            raise PreconditionFailedException(
1✔
1306
                "The Revision Id provided does not match the latest Revision Id. "
1307
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1308
                Type="User",
1309
            )
1310

1311
        replace_kwargs = {}
1✔
1312
        if "EphemeralStorage" in request:
1✔
UNCOV
1313
            replace_kwargs["ephemeral_storage"] = LambdaEphemeralStorage(
×
1314
                request.get("EphemeralStorage", {}).get("Size", 512)
1315
            )  # TODO: do defaults here apply as well?
1316

1317
        if "Role" in request:
1✔
1318
            if not api_utils.is_role_arn(request["Role"]):
1✔
1319
                raise ValidationException(
1✔
1320
                    f"1 validation error detected: Value '{request.get('Role')}'"
1321
                    + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
1322
                )
1323
            replace_kwargs["role"] = request["Role"]
1✔
1324

1325
        if "Description" in request:
1✔
1326
            replace_kwargs["description"] = request["Description"]
1✔
1327

1328
        if "Timeout" in request:
1✔
1329
            replace_kwargs["timeout"] = request["Timeout"]
1✔
1330

1331
        if "MemorySize" in request:
1✔
1332
            replace_kwargs["memory_size"] = request["MemorySize"]
1✔
1333

1334
        if "DeadLetterConfig" in request:
1✔
1335
            replace_kwargs["dead_letter_arn"] = request.get("DeadLetterConfig", {}).get("TargetArn")
1✔
1336

1337
        if vpc_config := request.get("VpcConfig"):
1✔
1338
            replace_kwargs["vpc_config"] = self._build_vpc_config(account_id, region, vpc_config)
1✔
1339

1340
        if "Handler" in request:
1✔
1341
            replace_kwargs["handler"] = request["Handler"]
1✔
1342

1343
        if "Runtime" in request:
1✔
1344
            runtime = request["Runtime"]
1✔
1345

1346
            if runtime not in ALL_RUNTIMES:
1✔
1347
                raise InvalidParameterValueException(
1✔
1348
                    f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1349
                    Type="User",
1350
                )
1351
            if runtime in DEPRECATED_RUNTIMES:
1✔
UNCOV
1352
                LOG.warning(
×
1353
                    "The Lambda runtime %s is deprecated. "
1354
                    "Please upgrade the runtime for the function %s: "
1355
                    "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
1356
                    runtime,
1357
                    function_name,
1358
                )
1359
            replace_kwargs["runtime"] = request["Runtime"]
1✔
1360

1361
        if snap_start := request.get("SnapStart"):
1✔
1362
            runtime = replace_kwargs.get("runtime") or latest_version_config.runtime
1✔
1363
            self._validate_snapstart(snap_start, runtime)
1✔
1364
            replace_kwargs["snap_start"] = SnapStartResponse(
1✔
1365
                ApplyOn=snap_start.get("ApplyOn", SnapStartApplyOn.None_),
1366
                OptimizationStatus=SnapStartOptimizationStatus.Off,
1367
            )
1368

1369
        if "Environment" in request:
1✔
1370
            if env_vars := request.get("Environment", {}).get("Variables", {}):
1✔
1371
                self._verify_env_variables(env_vars)
1✔
1372
            replace_kwargs["environment"] = env_vars
1✔
1373

1374
        if "Layers" in request:
1✔
1375
            new_layers = request["Layers"]
1✔
1376
            if new_layers:
1✔
1377
                self._validate_layers(new_layers, region=region, account_id=account_id)
1✔
1378
            replace_kwargs["layers"] = self.map_layers(new_layers)
1✔
1379

1380
        if "ImageConfig" in request:
1✔
1381
            new_image_config = request["ImageConfig"]
1✔
1382
            replace_kwargs["image_config"] = ImageConfig(
1✔
1383
                command=new_image_config.get("Command"),
1384
                entrypoint=new_image_config.get("EntryPoint"),
1385
                working_directory=new_image_config.get("WorkingDirectory"),
1386
            )
1387

1388
        if "LoggingConfig" in request:
1✔
1389
            logging_config = request["LoggingConfig"]
1✔
1390
            LOG.warning(
1✔
1391
                "Advanced Lambda Logging Configuration is currently mocked "
1392
                "and will not impact the logging behavior. "
1393
                "Please create a feature request if needed."
1394
            )
1395

1396
            # when switching to JSON, app and system level log is auto set to INFO
1397
            if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
1398
                logging_config = {
1✔
1399
                    "ApplicationLogLevel": "INFO",
1400
                    "SystemLogLevel": "INFO",
1401
                } | logging_config
1402

1403
            last_config = latest_version_config.logging_config
1✔
1404

1405
            # add partial update
1406
            new_logging_config = last_config | logging_config
1✔
1407

1408
            # in case we switched from JSON to Text we need to remove LogLevel keys
1409
            if (
1✔
1410
                new_logging_config.get("LogFormat") == LogFormat.Text
1411
                and last_config.get("LogFormat") == LogFormat.JSON
1412
            ):
1413
                new_logging_config.pop("ApplicationLogLevel", None)
1✔
1414
                new_logging_config.pop("SystemLogLevel", None)
1✔
1415

1416
            replace_kwargs["logging_config"] = new_logging_config
1✔
1417

1418
        if "TracingConfig" in request:
1✔
1419
            new_mode = request.get("TracingConfig", {}).get("Mode")
×
UNCOV
1420
            if new_mode:
×
UNCOV
1421
                replace_kwargs["tracing_config_mode"] = new_mode
×
1422

1423
        if "CapacityProviderConfig" in request:
1✔
UNCOV
1424
            capacity_provider_config = request["CapacityProviderConfig"]
×
1425
            self._validate_capacity_provider_config(capacity_provider_config, context)
×
1426

UNCOV
1427
            if latest_version.config.CapacityProviderConfig and not request[
×
1428
                "CapacityProviderConfig"
1429
            ].get("LambdaManagedInstancesCapacityProviderConfig"):
UNCOV
1430
                raise ValidationException(
×
1431
                    "1 validation error detected: Value null at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig' failed to satisfy constraint: Member must not be null"
1432
                )
UNCOV
1433
            if not latest_version.config.CapacityProviderConfig:
×
UNCOV
1434
                raise InvalidParameterValueException(
×
1435
                    "CapacityProviderConfig isn't supported for Lambda Default functions.",
1436
                    Type="User",
1437
                )
1438

1439
        new_latest_version = dataclasses.replace(
1✔
1440
            latest_version,
1441
            config=dataclasses.replace(
1442
                latest_version_config,
1443
                last_modified=api_utils.generate_lambda_date(),
1444
                internal_revision=short_uid(),
1445
                last_update=UpdateStatus(
1446
                    status=LastUpdateStatus.InProgress,
1447
                    code="Creating",
1448
                    reason="The function is being created.",
1449
                ),
1450
                **replace_kwargs,
1451
            ),
1452
        )
1453
        function.versions["$LATEST"] = new_latest_version  # TODO: notify
1✔
1454
        self.lambda_service.update_version(new_version=new_latest_version)
1✔
1455

1456
        return api_utils.map_config_out(new_latest_version)
1✔
1457

1458
    @handler(operation="UpdateFunctionCode", expand=False)
1✔
1459
    def update_function_code(
1✔
1460
        self, context: RequestContext, request: UpdateFunctionCodeRequest
1461
    ) -> FunctionConfiguration:
1462
        """updates the $LATEST version of the function"""
1463
        # only supports normal zip packaging atm
1464
        # if request.get("Publish"):
1465
        #     self.lambda_service.create_function_version()
1466

1467
        function_name = request.get("FunctionName")
1✔
1468
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1469
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1470

1471
        store = lambda_stores[account_id][region]
1✔
1472
        if function_name not in store.functions:
1✔
UNCOV
1473
            raise ResourceNotFoundException(
×
1474
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1475
                Type="User",
1476
            )
1477
        function = store.functions[function_name]
1✔
1478

1479
        revision_id = request.get("RevisionId")
1✔
1480
        if revision_id and revision_id != function.latest().config.revision_id:
1✔
1481
            raise PreconditionFailedException(
1✔
1482
                "The Revision Id provided does not match the latest Revision Id. "
1483
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1484
                Type="User",
1485
            )
1486

1487
        # TODO verify if correct combination of code is set
1488
        image = None
1✔
1489
        if (
1✔
1490
            request.get("ZipFile") or request.get("S3Bucket")
1491
        ) and function.latest().config.package_type == PackageType.Image:
1492
            raise InvalidParameterValueException(
1✔
1493
                "Please provide ImageUri when updating a function with packageType Image.",
1494
                Type="User",
1495
            )
1496
        elif request.get("ImageUri") and function.latest().config.package_type == PackageType.Zip:
1✔
1497
            raise InvalidParameterValueException(
1✔
1498
                "Please don't provide ImageUri when updating a function with packageType Zip.",
1499
                Type="User",
1500
            )
1501

1502
        if publish_to := request.get("PublishTo"):
1✔
UNCOV
1503
            self._validate_publish_to(publish_to)
×
1504

1505
        if zip_file := request.get("ZipFile"):
1✔
1506
            code = store_lambda_archive(
1✔
1507
                archive_file=zip_file,
1508
                function_name=function_name,
1509
                region_name=region,
1510
                account_id=account_id,
1511
            )
1512
        elif s3_bucket := request.get("S3Bucket"):
1✔
1513
            s3_key = request["S3Key"]
1✔
1514
            s3_object_version = request.get("S3ObjectVersion")
1✔
1515
            code = store_s3_bucket_archive(
1✔
1516
                archive_bucket=s3_bucket,
1517
                archive_key=s3_key,
1518
                archive_version=s3_object_version,
1519
                function_name=function_name,
1520
                region_name=region,
1521
                account_id=account_id,
1522
            )
1523
        elif image := request.get("ImageUri"):
1✔
1524
            code = None
1✔
1525
            image = create_image_code(image_uri=image)
1✔
1526
        else:
UNCOV
1527
            raise LambdaServiceException("A ZIP file, S3 bucket, or image is required")
×
1528

1529
        old_function_version = function.versions.get("$LATEST")
1✔
1530
        replace_kwargs = {"code": code} if code else {"image": image}
1✔
1531

1532
        if architectures := request.get("Architectures"):
1✔
1533
            if len(architectures) != 1:
×
UNCOV
1534
                raise ValidationException(
×
1535
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1536
                    f"satisfy constraint: Member must have length less than or equal to 1",
1537
                )
1538
            # An empty list of architectures is also forbidden. Further exceptions are tested here for create_function:
1539
            # tests.aws.services.lambda_.test_lambda_api.TestLambdaFunction.test_create_lambda_exceptions
UNCOV
1540
            if architectures[0] not in ARCHITECTURES:
×
UNCOV
1541
                raise ValidationException(
×
1542
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1543
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
1544
                    f"[x86_64, arm64], Member must not be null]",
1545
                )
UNCOV
1546
            replace_kwargs["architectures"] = architectures
×
1547

1548
        config = dataclasses.replace(
1✔
1549
            old_function_version.config,
1550
            internal_revision=short_uid(),
1551
            last_modified=api_utils.generate_lambda_date(),
1552
            last_update=UpdateStatus(
1553
                status=LastUpdateStatus.InProgress,
1554
                code="Creating",
1555
                reason="The function is being created.",
1556
            ),
1557
            **replace_kwargs,
1558
        )
1559
        function_version = dataclasses.replace(old_function_version, config=config)
1✔
1560
        function.versions["$LATEST"] = function_version
1✔
1561

1562
        self.lambda_service.update_version(new_version=function_version)
1✔
1563
        if request.get("Publish"):
1✔
1564
            function_version = self._publish_version_with_changes(
1✔
1565
                function_name=function_name,
1566
                region=region,
1567
                account_id=account_id,
1568
                publish_to=publish_to,
1569
                is_active=True,
1570
            )
1571
        return api_utils.map_config_out(
1✔
1572
            function_version, return_qualified_arn=bool(request.get("Publish"))
1573
        )
1574

1575
    # TODO: does deleting the latest published version affect the next versions number?
1576
    # TODO: what happens when we call this with a qualifier and a fully qualified ARN? (+ conflicts?)
1577
    # TODO: test different ARN patterns (shorthand ARN?)
1578
    # TODO: test deleting across regions?
1579
    # TODO: test mismatch between context region and region in ARN
1580
    # TODO: test qualifier $LATEST, alias-name and version
1581
    def delete_function(
1✔
1582
        self,
1583
        context: RequestContext,
1584
        function_name: NamespacedFunctionName,
1585
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1586
        **kwargs,
1587
    ) -> DeleteFunctionResponse:
1588
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1589
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1590
            function_name, qualifier, context
1591
        )
1592

1593
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
UNCOV
1594
            raise InvalidParameterValueException(
×
1595
                "Deletion of aliases is not currently supported.",
1596
                Type="User",
1597
            )
1598

1599
        store = lambda_stores[account_id][region]
1✔
1600
        if qualifier == "$LATEST":
1✔
1601
            raise InvalidParameterValueException(
1✔
1602
                "$LATEST version cannot be deleted without deleting the function.", Type="User"
1603
            )
1604

1605
        if function_name not in store.functions:
1✔
1606
            e = ResourceNotFoundException(
1✔
1607
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1608
                Type="User",
1609
            )
1610
            raise e
1✔
1611
        function = store.functions.get(function_name)
1✔
1612

1613
        function_has_capacity_provider = False
1✔
1614
        if qualifier:
1✔
1615
            # delete a version of the function
1616
            version = function.versions.pop(qualifier, None)
1✔
1617
            if version:
1✔
1618
                if version.config.CapacityProviderConfig:
1✔
UNCOV
1619
                    function_has_capacity_provider = True
×
1620
                self.lambda_service.stop_version(version.id.qualified_arn())
1✔
1621
                destroy_code_if_not_used(code=version.config.code, function=function)
1✔
1622
        else:
1623
            # delete the whole function
1624
            # TODO: introduce locking for safe deletion: We could create a new version at the API layer before
1625
            #  the old version gets cleaned up in the internal lambda service.
1626
            function = store.functions.pop(function_name)
1✔
1627
            for version in function.versions.values():
1✔
1628
                # Functions with a capacity provider do NOT have a version manager for $LATEST because only
1629
                # published versions are invokable.
1630
                if version.config.CapacityProviderConfig:
1✔
UNCOV
1631
                    function_has_capacity_provider = True
×
UNCOV
1632
                    if version.id.qualifier == "$LATEST":
×
UNCOV
1633
                        pass
×
1634
                else:
1635
                    self.lambda_service.stop_version(qualified_arn=version.id.qualified_arn())
1✔
1636
                # we can safely destroy the code here
1637
                if version.config.code:
1✔
1638
                    version.config.code.destroy()
1✔
1639

1640
        return DeleteFunctionResponse(StatusCode=202 if function_has_capacity_provider else 204)
1✔
1641

1642
    def list_functions(
1✔
1643
        self,
1644
        context: RequestContext,
1645
        master_region: MasterRegion = None,  # (only relevant for lambda@edge)
1646
        function_version: FunctionVersionApi = None,
1647
        marker: String = None,
1648
        max_items: MaxListItems = None,
1649
        **kwargs,
1650
    ) -> ListFunctionsResponse:
1651
        state = lambda_stores[context.account_id][context.region]
1✔
1652

1653
        if function_version and function_version != FunctionVersionApi.ALL:
1✔
1654
            raise ValidationException(
1✔
1655
                f"1 validation error detected: Value '{function_version}'"
1656
                + " at 'functionVersion' failed to satisfy constraint: Member must satisfy enum value set: [ALL]"
1657
            )
1658

1659
        if function_version == FunctionVersionApi.ALL:
1✔
1660
            # include all versions for all function
1661
            versions = [v for f in state.functions.values() for v in f.versions.values()]
1✔
1662
            return_qualified_arn = True
1✔
1663
        else:
1664
            versions = [f.latest() for f in state.functions.values()]
1✔
1665
            return_qualified_arn = False
1✔
1666

1667
        versions = [
1✔
1668
            api_utils.map_to_list_response(
1669
                api_utils.map_config_out(fc, return_qualified_arn=return_qualified_arn)
1670
            )
1671
            for fc in versions
1672
        ]
1673
        versions = PaginatedList(versions)
1✔
1674
        page, token = versions.get_page(
1✔
1675
            lambda version: version["FunctionArn"],
1676
            marker,
1677
            max_items,
1678
        )
1679
        return ListFunctionsResponse(Functions=page, NextMarker=token)
1✔
1680

1681
    def get_function(
1✔
1682
        self,
1683
        context: RequestContext,
1684
        function_name: NamespacedFunctionName,
1685
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1686
        **kwargs,
1687
    ) -> GetFunctionResponse:
1688
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1689
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1690
            function_name, qualifier, context
1691
        )
1692

1693
        fn = lambda_stores[account_id][region].functions.get(function_name)
1✔
1694
        if fn is None:
1✔
1695
            if qualifier is None:
1✔
1696
                raise ResourceNotFoundException(
1✔
1697
                    f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
1698
                    Type="User",
1699
                )
1700
            else:
1701
                raise ResourceNotFoundException(
1✔
1702
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
1703
                    Type="User",
1704
                )
1705
        alias_name = None
1✔
1706
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1707
            if qualifier not in fn.aliases:
1✔
1708
                alias_arn = api_utils.qualified_lambda_arn(
1✔
1709
                    function_name, qualifier, account_id, region
1710
                )
1711
                raise ResourceNotFoundException(f"Function not found: {alias_arn}", Type="User")
1✔
1712
            alias_name = qualifier
1✔
1713
            qualifier = fn.aliases[alias_name].function_version
1✔
1714

1715
        version = get_function_version(
1✔
1716
            function_name=function_name,
1717
            qualifier=qualifier,
1718
            account_id=account_id,
1719
            region=region,
1720
        )
1721
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
1722
        additional_fields = {}
1✔
1723
        if tags:
1✔
1724
            additional_fields["Tags"] = tags
1✔
1725
        code_location = None
1✔
1726
        if code := version.config.code:
1✔
1727
            code_location = FunctionCodeLocation(
1✔
1728
                Location=code.generate_presigned_url(endpoint_url=config.external_service_url()),
1729
                RepositoryType="S3",
1730
            )
1731
        elif image := version.config.image:
1✔
1732
            code_location = FunctionCodeLocation(
1✔
1733
                ImageUri=image.image_uri,
1734
                RepositoryType=image.repository_type,
1735
                ResolvedImageUri=image.resolved_image_uri,
1736
            )
1737
        concurrency = None
1✔
1738
        if fn.reserved_concurrent_executions:
1✔
1739
            concurrency = Concurrency(
1✔
1740
                ReservedConcurrentExecutions=fn.reserved_concurrent_executions
1741
            )
1742

1743
        return GetFunctionResponse(
1✔
1744
            Configuration=api_utils.map_config_out(
1745
                version, return_qualified_arn=bool(qualifier), alias_name=alias_name
1746
            ),
1747
            Code=code_location,  # TODO
1748
            Concurrency=concurrency,
1749
            **additional_fields,
1750
        )
1751

1752
    def get_function_configuration(
1✔
1753
        self,
1754
        context: RequestContext,
1755
        function_name: NamespacedFunctionName,
1756
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1757
        **kwargs,
1758
    ) -> FunctionConfiguration:
1759
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1760
        # CAVE: THIS RETURN VALUE IS *NOT* THE SAME AS IN get_function (!) but seems to be only configuration part?
1761
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1762
            function_name, qualifier, context
1763
        )
1764
        version = get_function_version(
1✔
1765
            function_name=function_name,
1766
            qualifier=qualifier,
1767
            account_id=account_id,
1768
            region=region,
1769
        )
1770
        return api_utils.map_config_out(version, return_qualified_arn=bool(qualifier))
1✔
1771

1772
    def invoke(
1✔
1773
        self,
1774
        context: RequestContext,
1775
        function_name: NamespacedFunctionName,
1776
        invocation_type: InvocationType | None = None,
1777
        log_type: LogType | None = None,
1778
        client_context: String | None = None,
1779
        durable_execution_name: DurableExecutionName | None = None,
1780
        payload: IO[Blob] | None = None,
1781
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1782
        tenant_id: TenantId | None = None,
1783
        **kwargs,
1784
    ) -> InvocationResponse:
1785
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1786
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1787
            function_name, qualifier, context
1788
        )
1789

1790
        user_agent = context.request.user_agent.string
1✔
1791

1792
        time_before = time.perf_counter()
1✔
1793
        try:
1✔
1794
            invocation_result = self.lambda_service.invoke(
1✔
1795
                function_name=function_name,
1796
                qualifier=qualifier,
1797
                region=region,
1798
                account_id=account_id,
1799
                invocation_type=invocation_type,
1800
                client_context=client_context,
1801
                request_id=context.request_id,
1802
                trace_context=context.trace_context,
1803
                payload=payload.read() if payload else None,
1804
                user_agent=user_agent,
1805
            )
1806
        except ServiceException:
1✔
1807
            raise
1✔
1808
        except EnvironmentStartupTimeoutException as e:
1✔
1809
            raise LambdaServiceException(
1✔
1810
                f"[{context.request_id}] Timeout while starting up lambda environment for function {function_name}:{qualifier}"
1811
            ) from e
1812
        except Exception as e:
1✔
1813
            LOG.error(
1✔
1814
                "[%s] Error while invoking lambda %s",
1815
                context.request_id,
1816
                function_name,
1817
                exc_info=LOG.isEnabledFor(logging.DEBUG),
1818
            )
1819
            raise LambdaServiceException(
1✔
1820
                f"[{context.request_id}] Internal error while executing lambda {function_name}:{qualifier}. Caused by {type(e).__name__}: {e}"
1821
            ) from e
1822

1823
        if invocation_type == InvocationType.Event:
1✔
1824
            # This happens when invocation type is event
1825
            return InvocationResponse(StatusCode=202)
1✔
1826
        if invocation_type == InvocationType.DryRun:
1✔
1827
            # This happens when invocation type is dryrun
1828
            return InvocationResponse(StatusCode=204)
1✔
1829
        LOG.debug("Lambda invocation duration: %0.2fms", (time.perf_counter() - time_before) * 1000)
1✔
1830

1831
        response = InvocationResponse(
1✔
1832
            StatusCode=200,
1833
            Payload=invocation_result.payload,
1834
            ExecutedVersion=invocation_result.executed_version,
1835
        )
1836

1837
        if invocation_result.is_error:
1✔
1838
            response["FunctionError"] = "Unhandled"
1✔
1839

1840
        if log_type == LogType.Tail:
1✔
1841
            response["LogResult"] = to_str(
1✔
1842
                base64.b64encode(to_bytes(invocation_result.logs)[-4096:])
1843
            )
1844

1845
        return response
1✔
1846

1847
    # Version operations
1848
    def publish_version(
1✔
1849
        self,
1850
        context: RequestContext,
1851
        function_name: FunctionName,
1852
        code_sha256: String | None = None,
1853
        description: Description | None = None,
1854
        revision_id: String | None = None,
1855
        publish_to: FunctionVersionLatestPublished | None = None,
1856
        **kwargs,
1857
    ) -> FunctionConfiguration:
1858
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1859
        function_name = api_utils.get_function_name(function_name, context)
1✔
1860
        if publish_to:
1✔
UNCOV
1861
            self._validate_publish_to(publish_to)
×
1862
        new_version = self._publish_version_from_existing_version(
1✔
1863
            function_name=function_name,
1864
            description=description,
1865
            account_id=account_id,
1866
            region=region,
1867
            revision_id=revision_id,
1868
            code_sha256=code_sha256,
1869
            publish_to=publish_to,
1870
        )
1871
        return api_utils.map_config_out(new_version, return_qualified_arn=True)
1✔
1872

1873
    def list_versions_by_function(
1✔
1874
        self,
1875
        context: RequestContext,
1876
        function_name: NamespacedFunctionName,
1877
        marker: String = None,
1878
        max_items: MaxListItems = None,
1879
        **kwargs,
1880
    ) -> ListVersionsByFunctionResponse:
1881
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1882
        function_name = api_utils.get_function_name(function_name, context)
1✔
1883
        function = self._get_function(
1✔
1884
            function_name=function_name, region=region, account_id=account_id
1885
        )
1886
        versions = [
1✔
1887
            api_utils.map_to_list_response(
1888
                api_utils.map_config_out(version=version, return_qualified_arn=True)
1889
            )
1890
            for version in function.versions.values()
1891
        ]
1892
        items = PaginatedList(versions)
1✔
1893
        page, token = items.get_page(
1✔
1894
            lambda item: item,
1895
            marker,
1896
            max_items,
1897
        )
1898
        return ListVersionsByFunctionResponse(Versions=page, NextMarker=token)
1✔
1899

1900
    # Alias
1901

1902
    def _create_routing_config_model(
1✔
1903
        self, routing_config_dict: dict[str, float], function_version: FunctionVersion
1904
    ):
1905
        if len(routing_config_dict) > 1:
1✔
1906
            raise InvalidParameterValueException(
1✔
1907
                "Number of items in AdditionalVersionWeights cannot be greater than 1",
1908
                Type="User",
1909
            )
1910
        # should be exactly one item here, still iterating, might be supported in the future
1911
        for key, value in routing_config_dict.items():
1✔
1912
            if value < 0.0 or value >= 1.0:
1✔
1913
                raise ValidationException(
1✔
1914
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map value must satisfy constraint: [Member must have value less than or equal to 1.0, Member must have value greater than or equal to 0.0, Member must not be null]"
1915
                )
1916
            if key == function_version.id.qualifier:
1✔
1917
                raise InvalidParameterValueException(
1✔
1918
                    f"Invalid function version {function_version.id.qualifier}. Function version {function_version.id.qualifier} is already included in routing configuration.",
1919
                    Type="User",
1920
                )
1921
            # check if version target is latest, then no routing config is allowed
1922
            if function_version.id.qualifier == "$LATEST":
1✔
1923
                raise InvalidParameterValueException(
1✔
1924
                    "$LATEST is not supported for an alias pointing to more than 1 version"
1925
                )
1926
            if not api_utils.qualifier_is_version(key):
1✔
1927
                raise ValidationException(
1✔
1928
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map keys must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: [0-9]+]"
1929
                )
1930

1931
            # checking if the version in the config exists
1932
            get_function_version(
1✔
1933
                function_name=function_version.id.function_name,
1934
                qualifier=key,
1935
                region=function_version.id.region,
1936
                account_id=function_version.id.account,
1937
            )
1938
        return AliasRoutingConfig(version_weights=routing_config_dict)
1✔
1939

1940
    def create_alias(
1✔
1941
        self,
1942
        context: RequestContext,
1943
        function_name: FunctionName,
1944
        name: Alias,
1945
        function_version: VersionWithLatestPublished,
1946
        description: Description = None,
1947
        routing_config: AliasRoutingConfiguration = None,
1948
        **kwargs,
1949
    ) -> AliasConfiguration:
1950
        if not api_utils.qualifier_is_alias(name):
1✔
1951
            raise ValidationException(
1✔
1952
                f"1 validation error detected: Value '{name}' at 'name' failed to satisfy constraint: Member must satisfy regular expression pattern: (?!^[0-9]+$)([a-zA-Z0-9-_]+)"
1953
            )
1954

1955
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1956
        function_name = api_utils.get_function_name(function_name, context)
1✔
1957
        target_version = get_function_version(
1✔
1958
            function_name=function_name,
1959
            qualifier=function_version,
1960
            region=region,
1961
            account_id=account_id,
1962
        )
1963
        function = self._get_function(
1✔
1964
            function_name=function_name, region=region, account_id=account_id
1965
        )
1966
        # description is always present, if not specified it's an empty string
1967
        description = description or ""
1✔
1968
        with function.lock:
1✔
1969
            if existing_alias := function.aliases.get(name):
1✔
1970
                raise ResourceConflictException(
1✔
1971
                    f"Alias already exists: {api_utils.map_alias_out(alias=existing_alias, function=function)['AliasArn']}",
1972
                    Type="User",
1973
                )
1974
            # checking if the version exists
1975
            routing_configuration = None
1✔
1976
            if routing_config and (
1✔
1977
                routing_config_dict := routing_config.get("AdditionalVersionWeights")
1978
            ):
1979
                routing_configuration = self._create_routing_config_model(
1✔
1980
                    routing_config_dict, target_version
1981
                )
1982

1983
            alias = VersionAlias(
1✔
1984
                name=name,
1985
                function_version=function_version,
1986
                description=description,
1987
                routing_configuration=routing_configuration,
1988
            )
1989
            function.aliases[name] = alias
1✔
1990
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1991

1992
    def list_aliases(
1✔
1993
        self,
1994
        context: RequestContext,
1995
        function_name: FunctionName,
1996
        function_version: VersionWithLatestPublished = None,
1997
        marker: String = None,
1998
        max_items: MaxListItems = None,
1999
        **kwargs,
2000
    ) -> ListAliasesResponse:
2001
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2002
        function_name = api_utils.get_function_name(function_name, context)
1✔
2003
        function = self._get_function(
1✔
2004
            function_name=function_name, region=region, account_id=account_id
2005
        )
2006
        aliases = [
1✔
2007
            api_utils.map_alias_out(alias, function)
2008
            for alias in function.aliases.values()
2009
            if function_version is None or alias.function_version == function_version
2010
        ]
2011

2012
        aliases = PaginatedList(aliases)
1✔
2013
        page, token = aliases.get_page(
1✔
2014
            lambda alias: alias["AliasArn"],
2015
            marker,
2016
            max_items,
2017
        )
2018

2019
        return ListAliasesResponse(Aliases=page, NextMarker=token)
1✔
2020

2021
    def delete_alias(
1✔
2022
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
2023
    ) -> None:
2024
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2025
        function_name = api_utils.get_function_name(function_name, context)
1✔
2026
        function = self._get_function(
1✔
2027
            function_name=function_name, region=region, account_id=account_id
2028
        )
2029
        version_alias = function.aliases.pop(name, None)
1✔
2030

2031
        # cleanup related resources
2032
        if name in function.provisioned_concurrency_configs:
1✔
2033
            function.provisioned_concurrency_configs.pop(name)
1✔
2034

2035
        # TODO: Allow for deactivating/unregistering specific Lambda URLs
2036
        if version_alias and name in function.function_url_configs:
1✔
2037
            url_config = function.function_url_configs.pop(name)
1✔
2038
            LOG.debug(
1✔
2039
                "Stopping aliased Lambda Function URL %s for %s",
2040
                url_config.url,
2041
                url_config.function_name,
2042
            )
2043

2044
    def get_alias(
1✔
2045
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
2046
    ) -> AliasConfiguration:
2047
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2048
        function_name = api_utils.get_function_name(function_name, context)
1✔
2049
        function = self._get_function(
1✔
2050
            function_name=function_name, region=region, account_id=account_id
2051
        )
2052
        if not (alias := function.aliases.get(name)):
1✔
2053
            raise ResourceNotFoundException(
1✔
2054
                f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name=function_name, qualifier=name, region=region, account=account_id)}",
2055
                Type="User",
2056
            )
2057
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
2058

2059
    def update_alias(
1✔
2060
        self,
2061
        context: RequestContext,
2062
        function_name: FunctionName,
2063
        name: Alias,
2064
        function_version: VersionWithLatestPublished = None,
2065
        description: Description = None,
2066
        routing_config: AliasRoutingConfiguration = None,
2067
        revision_id: String = None,
2068
        **kwargs,
2069
    ) -> AliasConfiguration:
2070
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2071
        function_name = api_utils.get_function_name(function_name, context)
1✔
2072
        function = self._get_function(
1✔
2073
            function_name=function_name, region=region, account_id=account_id
2074
        )
2075
        if not (alias := function.aliases.get(name)):
1✔
2076
            fn_arn = api_utils.qualified_lambda_arn(function_name, name, account_id, region)
1✔
2077
            raise ResourceNotFoundException(
1✔
2078
                f"Alias not found: {fn_arn}",
2079
                Type="User",
2080
            )
2081
        if revision_id and alias.revision_id != revision_id:
1✔
2082
            raise PreconditionFailedException(
1✔
2083
                "The Revision Id provided does not match the latest Revision Id. "
2084
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2085
                Type="User",
2086
            )
2087
        changes = {}
1✔
2088
        if function_version is not None:
1✔
2089
            changes |= {"function_version": function_version}
1✔
2090
        if description is not None:
1✔
2091
            changes |= {"description": description}
1✔
2092
        if routing_config is not None:
1✔
2093
            # if it is an empty dict or AdditionalVersionWeights is empty, set routing config to None
2094
            new_routing_config = None
1✔
2095
            if routing_config_dict := routing_config.get("AdditionalVersionWeights"):
1✔
UNCOV
2096
                new_routing_config = self._create_routing_config_model(routing_config_dict)
×
2097
            changes |= {"routing_configuration": new_routing_config}
1✔
2098
        # even if no changes are done, we have to update revision id for some reason
2099
        old_alias = alias
1✔
2100
        alias = dataclasses.replace(alias, **changes)
1✔
2101
        function.aliases[name] = alias
1✔
2102

2103
        # TODO: signal lambda service that pointer potentially changed
2104
        self.lambda_service.update_alias(old_alias=old_alias, new_alias=alias, function=function)
1✔
2105

2106
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
2107

2108
    # =======================================
2109
    # ======= EVENT SOURCE MAPPINGS =========
2110
    # =======================================
2111
    def check_service_resource_exists(
1✔
2112
        self, service: str, resource_arn: str, function_arn: str, function_role_arn: str
2113
    ):
2114
        """
2115
        Check if the service resource exists and if the function has access to it.
2116

2117
        Raises:
2118
            InvalidParameterValueException: If the service resource does not exist or the function does not have access to it.
2119
        """
2120
        arn = parse_arn(resource_arn)
1✔
2121
        source_client = get_internal_client(
1✔
2122
            arn=resource_arn,
2123
            role_arn=function_role_arn,
2124
            service_principal=ServicePrincipal.lambda_,
2125
            source_arn=function_arn,
2126
        )
2127
        if service in ["sqs", "sqs-fifo"]:
1✔
2128
            try:
1✔
2129
                # AWS uses `GetQueueAttributes` internally to verify the queue existence, but we need the `QueueUrl`
2130
                # which is not given directly. We build out a dummy `QueueUrl` which can be parsed by SQS to return
2131
                # the right value
2132
                queue_name = arn["resource"].split("/")[-1]
1✔
2133
                queue_url = f"http://sqs.{arn['region']}.domain/{arn['account']}/{queue_name}"
1✔
2134
                source_client.get_queue_attributes(QueueUrl=queue_url)
1✔
2135
            except ClientError as e:
1✔
2136
                error_code = e.response["Error"]["Code"]
1✔
2137
                if error_code == "AWS.SimpleQueueService.NonExistentQueue":
1✔
2138
                    raise InvalidParameterValueException(
1✔
2139
                        f"Error occurred while ReceiveMessage. SQS Error Code: {error_code}. SQS Error Message: {e.response['Error']['Message']}",
2140
                        Type="User",
2141
                    )
UNCOV
2142
                raise e
×
2143
        elif service in ["kinesis"]:
1✔
2144
            try:
1✔
2145
                source_client.describe_stream(StreamARN=resource_arn)
1✔
2146
            except ClientError as e:
1✔
2147
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
2148
                    raise InvalidParameterValueException(
1✔
2149
                        f"Stream not found: {resource_arn}",
2150
                        Type="User",
2151
                    )
UNCOV
2152
                raise e
×
2153
        elif service in ["dynamodb"]:
1✔
2154
            try:
1✔
2155
                source_client.describe_stream(StreamArn=resource_arn)
1✔
2156
            except ClientError as e:
1✔
2157
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
2158
                    raise InvalidParameterValueException(
1✔
2159
                        f"Stream not found: {resource_arn}",
2160
                        Type="User",
2161
                    )
UNCOV
2162
                raise e
×
2163

2164
    @handler("CreateEventSourceMapping", expand=False)
1✔
2165
    def create_event_source_mapping(
1✔
2166
        self,
2167
        context: RequestContext,
2168
        request: CreateEventSourceMappingRequest,
2169
    ) -> EventSourceMappingConfiguration:
2170
        return self.create_event_source_mapping_v2(context, request)
1✔
2171

2172
    def create_event_source_mapping_v2(
1✔
2173
        self,
2174
        context: RequestContext,
2175
        request: CreateEventSourceMappingRequest,
2176
    ) -> EventSourceMappingConfiguration:
2177
        # Validations
2178
        function_arn, function_name, state, function_version, function_role = (
1✔
2179
            self.validate_event_source_mapping(context, request)
2180
        )
2181

2182
        esm_config = EsmConfigFactory(request, context, function_arn).get_esm_config()
1✔
2183

2184
        # Copy esm_config to avoid a race condition with potential async update in the store
2185
        state.event_source_mappings[esm_config["UUID"]] = esm_config.copy()
1✔
2186
        enabled = request.get("Enabled", True)
1✔
2187
        # TODO: check for potential async race condition update -> think about locking
2188
        esm_worker = EsmWorkerFactory(esm_config, function_role, enabled).get_esm_worker()
1✔
2189
        self.esm_workers[esm_worker.uuid] = esm_worker
1✔
2190
        # TODO: check StateTransitionReason, LastModified, LastProcessingResult (concurrent updates requires locking!)
2191
        if tags := request.get("Tags"):
1✔
2192
            self._store_tags(esm_config.get("EventSourceMappingArn"), tags)
1✔
2193
        esm_worker.create()
1✔
2194
        return esm_config
1✔
2195

2196
    def validate_event_source_mapping(self, context, request):
1✔
2197
        # TODO: test whether stream ARNs are valid sources for Pipes or ESM or whether only DynamoDB table ARNs work
2198
        # TODO: Validate MaxRecordAgeInSeconds (i.e cannot subceed 60s but can be -1) and MaxRetryAttempts parameters.
2199
        # See https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumrecordageinseconds
2200
        is_create_esm_request = context.operation.name == self.create_event_source_mapping.operation
1✔
2201

2202
        if destination_config := request.get("DestinationConfig"):
1✔
2203
            if "OnSuccess" in destination_config:
1✔
2204
                raise InvalidParameterValueException(
1✔
2205
                    "Unsupported DestinationConfig parameter for given event source mapping type.",
2206
                    Type="User",
2207
                )
2208

2209
        service = None
1✔
2210
        if "SelfManagedEventSource" in request:
1✔
UNCOV
2211
            service = "kafka"
×
UNCOV
2212
            if "SourceAccessConfigurations" not in request:
×
UNCOV
2213
                raise InvalidParameterValueException(
×
2214
                    "Required 'sourceAccessConfigurations' parameter is missing.", Type="User"
2215
                )
2216
        if service is None and "EventSourceArn" not in request:
1✔
2217
            raise InvalidParameterValueException("Unrecognized event source.", Type="User")
1✔
2218
        if service is None:
1✔
2219
            service = extract_service_from_arn(request["EventSourceArn"])
1✔
2220

2221
        batch_size = api_utils.validate_and_set_batch_size(service, request.get("BatchSize"))
1✔
2222
        if service in ["dynamodb", "kinesis"]:
1✔
2223
            starting_position = request.get("StartingPosition")
1✔
2224
            if not starting_position:
1✔
2225
                raise InvalidParameterValueException(
1✔
2226
                    "1 validation error detected: Value null at 'startingPosition' failed to satisfy constraint: Member must not be null.",
2227
                    Type="User",
2228
                )
2229

2230
            if starting_position not in KinesisStreamStartPosition.__members__:
1✔
2231
                raise ValidationException(
1✔
2232
                    f"1 validation error detected: Value '{starting_position}' at 'startingPosition' failed to satisfy constraint: Member must satisfy enum value set: [LATEST, AT_TIMESTAMP, TRIM_HORIZON]"
2233
                )
2234
            # AT_TIMESTAMP is not allowed for DynamoDB Streams
2235
            elif (
1✔
2236
                service == "dynamodb"
2237
                and starting_position not in DynamoDBStreamStartPosition.__members__
2238
            ):
2239
                raise InvalidParameterValueException(
1✔
2240
                    f"Unsupported starting position for arn type: {request['EventSourceArn']}",
2241
                    Type="User",
2242
                )
2243

2244
        if service in ["sqs", "sqs-fifo"]:
1✔
2245
            if batch_size > 10 and request.get("MaximumBatchingWindowInSeconds", 0) == 0:
1✔
2246
                raise InvalidParameterValueException(
1✔
2247
                    "Maximum batch window in seconds must be greater than 0 if maximum batch size is greater than 10",
2248
                    Type="User",
2249
                )
2250

2251
        if (filter_criteria := request.get("FilterCriteria")) is not None:
1✔
2252
            for filter_ in filter_criteria.get("Filters", []):
1✔
2253
                pattern_str = filter_.get("Pattern")
1✔
2254
                if not pattern_str or not isinstance(pattern_str, str):
1✔
UNCOV
2255
                    raise InvalidParameterValueException(
×
2256
                        "Invalid filter pattern definition.", Type="User"
2257
                    )
2258

2259
                if not validate_event_pattern(pattern_str):
1✔
2260
                    raise InvalidParameterValueException(
1✔
2261
                        "Invalid filter pattern definition.", Type="User"
2262
                    )
2263

2264
        # Can either have a FunctionName (i.e CreateEventSourceMapping request) or
2265
        # an internal EventSourceMappingConfiguration representation
2266
        request_function_name = request.get("FunctionName") or request.get("FunctionArn")
1✔
2267
        # can be either a partial arn or a full arn for the version/alias
2268
        function_name, qualifier, account, region = function_locators_from_arn(
1✔
2269
            request_function_name
2270
        )
2271
        # TODO: validate `context.region` vs. `region(request["FunctionName"])` vs. `region(request["EventSourceArn"])`
2272
        account = account or context.account_id
1✔
2273
        region = region or context.region
1✔
2274
        state = lambda_stores[account][region]
1✔
2275
        fn = state.functions.get(function_name)
1✔
2276
        if not fn:
1✔
2277
            raise InvalidParameterValueException("Function does not exist", Type="User")
1✔
2278

2279
        if qualifier:
1✔
2280
            # make sure the function version/alias exists
2281
            if api_utils.qualifier_is_alias(qualifier):
1✔
2282
                fn_alias = fn.aliases.get(qualifier)
1✔
2283
                if not fn_alias:
1✔
2284
                    raise Exception("unknown alias")  # TODO: cover via test
×
2285
            elif api_utils.qualifier_is_version(qualifier):
1✔
2286
                fn_version = fn.versions.get(qualifier)
1✔
2287
                if not fn_version:
1✔
UNCOV
2288
                    raise Exception("unknown version")  # TODO: cover via test
×
2289
            elif qualifier == "$LATEST":
1✔
2290
                pass
1✔
UNCOV
2291
            elif qualifier == "$LATEST.PUBLISHED":
×
UNCOV
2292
                if fn.versions.get(qualifier):
×
UNCOV
2293
                    pass
×
2294
            else:
UNCOV
2295
                raise Exception("invalid functionname")  # TODO: cover via test
×
2296
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account, region)
1✔
2297

2298
        else:
2299
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account, region)
1✔
2300

2301
        function_version = get_function_version_from_arn(fn_arn)
1✔
2302
        function_role = function_version.config.role
1✔
2303

2304
        if source_arn := request.get("EventSourceArn"):
1✔
2305
            self.check_service_resource_exists(service, source_arn, fn_arn, function_role)
1✔
2306
        # Check we are validating a CreateEventSourceMapping request
2307
        if is_create_esm_request:
1✔
2308

2309
            def _get_mapping_sources(mapping: dict[str, Any]) -> list[str]:
1✔
2310
                if event_source_arn := mapping.get("EventSourceArn"):
1✔
2311
                    return [event_source_arn]
1✔
UNCOV
2312
                return (
×
2313
                    mapping.get("SelfManagedEventSource", {})
2314
                    .get("Endpoints", {})
2315
                    .get("KAFKA_BOOTSTRAP_SERVERS", [])
2316
                )
2317

2318
            # check for event source duplicates
2319
            # TODO: currently validated for sqs, kinesis, and dynamodb
2320
            service_id = load_service(service).service_id
1✔
2321
            for uuid, mapping in state.event_source_mappings.items():
1✔
2322
                mapping_sources = _get_mapping_sources(mapping)
1✔
2323
                request_sources = _get_mapping_sources(request)
1✔
2324
                if mapping["FunctionArn"] == fn_arn and (
1✔
2325
                    set(mapping_sources).intersection(request_sources)
2326
                ):
2327
                    if service == "sqs":
1✔
2328
                        # *shakes fist at SQS*
2329
                        raise ResourceConflictException(
1✔
2330
                            f'An event source mapping with {service_id} arn (" {mapping["EventSourceArn"]} ") '
2331
                            f'and function (" {function_name} ") already exists. Please update or delete the '
2332
                            f"existing mapping with UUID {uuid}",
2333
                            Type="User",
2334
                        )
2335
                    elif service == "kafka":
1✔
UNCOV
2336
                        if set(mapping["Topics"]).intersection(request["Topics"]):
×
UNCOV
2337
                            raise ResourceConflictException(
×
2338
                                f'An event source mapping with event source ("{",".join(request_sources)}"), '
2339
                                f'function ("{fn_arn}"), '
2340
                                f'topics ("{",".join(request["Topics"])}") already exists. Please update or delete the '
2341
                                f"existing mapping with UUID {uuid}",
2342
                                Type="User",
2343
                            )
2344
                    else:
2345
                        raise ResourceConflictException(
1✔
2346
                            f'The event source arn (" {mapping["EventSourceArn"]} ") and function '
2347
                            f'(" {function_name} ") provided mapping already exists. Please update or delete the '
2348
                            f"existing mapping with UUID {uuid}",
2349
                            Type="User",
2350
                        )
2351
        return fn_arn, function_name, state, function_version, function_role
1✔
2352

2353
    @handler("UpdateEventSourceMapping", expand=False)
1✔
2354
    def update_event_source_mapping(
1✔
2355
        self,
2356
        context: RequestContext,
2357
        request: UpdateEventSourceMappingRequest,
2358
    ) -> EventSourceMappingConfiguration:
2359
        return self.update_event_source_mapping_v2(context, request)
1✔
2360

2361
    def update_event_source_mapping_v2(
1✔
2362
        self,
2363
        context: RequestContext,
2364
        request: UpdateEventSourceMappingRequest,
2365
    ) -> EventSourceMappingConfiguration:
2366
        # TODO: test and implement this properly (quite complex with many validations and limitations!)
2367
        LOG.warning(
1✔
2368
            "Updating Lambda Event Source Mapping is in experimental state and not yet fully tested."
2369
        )
2370
        state = lambda_stores[context.account_id][context.region]
1✔
2371
        request_data = {**request}
1✔
2372
        uuid = request_data.pop("UUID", None)
1✔
2373
        if not uuid:
1✔
UNCOV
2374
            raise ResourceNotFoundException(
×
2375
                "The resource you requested does not exist.", Type="User"
2376
            )
2377
        old_event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2378
        esm_worker = self.esm_workers.get(uuid)
1✔
2379
        if old_event_source_mapping is None or esm_worker is None:
1✔
2380
            raise ResourceNotFoundException(
1✔
2381
                "The resource you requested does not exist.", Type="User"
2382
            )  # TODO: test?
2383

2384
        # normalize values to overwrite
2385
        event_source_mapping = old_event_source_mapping | request_data
1✔
2386

2387
        temp_params = {}  # values only set for the returned response, not saved internally (e.g. transient state)
1✔
2388

2389
        # Validate the newly updated ESM object. We ignore the output here since we only care whether an Exception is raised.
2390
        function_arn, _, _, function_version, function_role = self.validate_event_source_mapping(
1✔
2391
            context, event_source_mapping
2392
        )
2393

2394
        # remove the FunctionName field
2395
        event_source_mapping.pop("FunctionName", None)
1✔
2396

2397
        if function_arn:
1✔
2398
            event_source_mapping["FunctionArn"] = function_arn
1✔
2399

2400
        # Only apply update if the desired state differs
2401
        enabled = request.get("Enabled")
1✔
2402
        if enabled is not None:
1✔
2403
            if enabled and old_event_source_mapping["State"] != EsmState.ENABLED:
1✔
2404
                event_source_mapping["State"] = EsmState.ENABLING
1✔
2405
            # TODO: What happens when trying to update during an update or failed state?!
2406
            elif not enabled and old_event_source_mapping["State"] == EsmState.ENABLED:
1✔
2407
                event_source_mapping["State"] = EsmState.DISABLING
1✔
2408
        else:
2409
            event_source_mapping["State"] = EsmState.UPDATING
1✔
2410

2411
        # To ensure parity, certain responses need to be immediately returned
2412
        temp_params["State"] = event_source_mapping["State"]
1✔
2413

2414
        state.event_source_mappings[uuid] = event_source_mapping
1✔
2415

2416
        # TODO: Currently, we re-create the entire ESM worker. Look into approach with better performance.
2417
        worker_factory = EsmWorkerFactory(
1✔
2418
            event_source_mapping, function_role, request.get("Enabled", esm_worker.enabled)
2419
        )
2420

2421
        # Get a new ESM worker object but do not active it, since the factory holds all logic for creating new worker from configuration.
2422
        updated_esm_worker = worker_factory.get_esm_worker()
1✔
2423
        self.esm_workers[uuid] = updated_esm_worker
1✔
2424

2425
        # We should stop() the worker since the delete() will remove the ESM from the state mapping.
2426
        esm_worker.stop()
1✔
2427
        # This will either create an EsmWorker in the CREATING state if enabled. Otherwise, the DISABLING state is set.
2428
        updated_esm_worker.create()
1✔
2429

2430
        return {**event_source_mapping, **temp_params}
1✔
2431

2432
    def delete_event_source_mapping(
1✔
2433
        self, context: RequestContext, uuid: String, **kwargs
2434
    ) -> EventSourceMappingConfiguration:
2435
        state = lambda_stores[context.account_id][context.region]
1✔
2436
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2437
        if not event_source_mapping:
1✔
2438
            raise ResourceNotFoundException(
1✔
2439
                "The resource you requested does not exist.", Type="User"
2440
            )
2441
        esm = state.event_source_mappings[uuid]
1✔
2442
        # TODO: add proper locking
2443
        esm_worker = self.esm_workers.pop(uuid, None)
1✔
2444
        # Asynchronous delete in v2
2445
        if not esm_worker:
1✔
UNCOV
2446
            raise ResourceNotFoundException(
×
2447
                "The resource you requested does not exist.", Type="User"
2448
            )
2449
        esm_worker.delete()
1✔
2450
        return {**esm, "State": EsmState.DELETING}
1✔
2451

2452
    def get_event_source_mapping(
1✔
2453
        self, context: RequestContext, uuid: String, **kwargs
2454
    ) -> EventSourceMappingConfiguration:
2455
        state = lambda_stores[context.account_id][context.region]
1✔
2456
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2457
        if not event_source_mapping:
1✔
2458
            raise ResourceNotFoundException(
1✔
2459
                "The resource you requested does not exist.", Type="User"
2460
            )
2461
        esm_worker = self.esm_workers.get(uuid)
1✔
2462
        if not esm_worker:
1✔
UNCOV
2463
            raise ResourceNotFoundException(
×
2464
                "The resource you requested does not exist.", Type="User"
2465
            )
2466
        event_source_mapping["State"] = esm_worker.current_state
1✔
2467
        event_source_mapping["StateTransitionReason"] = esm_worker.state_transition_reason
1✔
2468
        return event_source_mapping
1✔
2469

2470
    def list_event_source_mappings(
1✔
2471
        self,
2472
        context: RequestContext,
2473
        event_source_arn: Arn = None,
2474
        function_name: FunctionName = None,
2475
        marker: String = None,
2476
        max_items: MaxListItems = None,
2477
        **kwargs,
2478
    ) -> ListEventSourceMappingsResponse:
2479
        state = lambda_stores[context.account_id][context.region]
1✔
2480

2481
        esms = state.event_source_mappings.values()
1✔
2482
        # TODO: update and test State and StateTransitionReason for ESM v2
2483

2484
        if event_source_arn:  # TODO: validate pattern
1✔
2485
            esms = [e for e in esms if e.get("EventSourceArn") == event_source_arn]
1✔
2486

2487
        if function_name:
1✔
2488
            esms = [e for e in esms if function_name in e["FunctionArn"]]
1✔
2489

2490
        esms = PaginatedList(esms)
1✔
2491
        page, token = esms.get_page(
1✔
2492
            lambda x: x["UUID"],
2493
            marker,
2494
            max_items,
2495
        )
2496
        return ListEventSourceMappingsResponse(EventSourceMappings=page, NextMarker=token)
1✔
2497

2498
    def get_source_type_from_request(self, request: dict[str, Any]) -> str:
1✔
UNCOV
2499
        if event_source_arn := request.get("EventSourceArn", ""):
×
UNCOV
2500
            service = extract_service_from_arn(event_source_arn)
×
UNCOV
2501
            if service == "sqs" and "fifo" in event_source_arn:
×
UNCOV
2502
                service = "sqs-fifo"
×
UNCOV
2503
            return service
×
UNCOV
2504
        elif request.get("SelfManagedEventSource"):
×
UNCOV
2505
            return "kafka"
×
2506

2507
    # =======================================
2508
    # ============ FUNCTION URLS ============
2509
    # =======================================
2510

2511
    @staticmethod
1✔
2512
    def _validate_qualifier(qualifier: str) -> None:
1✔
2513
        if qualifier in ["$LATEST", "$LATEST.PUBLISHED"] or (
1✔
2514
            qualifier and api_utils.qualifier_is_version(qualifier)
2515
        ):
2516
            raise ValidationException(
1✔
2517
                f"1 validation error detected: Value '{qualifier}' at 'qualifier' failed to satisfy constraint: Member must satisfy regular expression pattern: ((?!^\\d+$)^[0-9a-zA-Z-_]+$)"
2518
            )
2519

2520
    @staticmethod
1✔
2521
    def _validate_invoke_mode(invoke_mode: str) -> None:
1✔
2522
        if invoke_mode and invoke_mode not in [InvokeMode.BUFFERED, InvokeMode.RESPONSE_STREAM]:
1✔
2523
            raise ValidationException(
1✔
2524
                f"1 validation error detected: Value '{invoke_mode}' at 'invokeMode' failed to satisfy constraint: Member must satisfy enum value set: [RESPONSE_STREAM, BUFFERED]"
2525
            )
2526
        if invoke_mode == InvokeMode.RESPONSE_STREAM:
1✔
2527
            # TODO should we actually fail for setting RESPONSE_STREAM?
2528
            #  It should trigger InvokeWithResponseStream which is not implemented
2529
            LOG.warning(
1✔
2530
                "The invokeMode 'RESPONSE_STREAM' is not yet supported on LocalStack. The property is only mocked, the execution will still be 'BUFFERED'"
2531
            )
2532

2533
    # TODO: what happens if function state is not active?
2534
    def create_function_url_config(
1✔
2535
        self,
2536
        context: RequestContext,
2537
        function_name: FunctionName,
2538
        auth_type: FunctionUrlAuthType,
2539
        qualifier: FunctionUrlQualifier = None,
2540
        cors: Cors = None,
2541
        invoke_mode: InvokeMode = None,
2542
        **kwargs,
2543
    ) -> CreateFunctionUrlConfigResponse:
2544
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2545
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2546
            function_name, qualifier, context
2547
        )
2548
        state = lambda_stores[account_id][region]
1✔
2549
        self._validate_qualifier(qualifier)
1✔
2550
        self._validate_invoke_mode(invoke_mode)
1✔
2551

2552
        fn = state.functions.get(function_name)
1✔
2553
        if fn is None:
1✔
2554
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2555

2556
        url_config = fn.function_url_configs.get(qualifier or "$LATEST")
1✔
2557
        if url_config:
1✔
2558
            raise ResourceConflictException(
1✔
2559
                f"Failed to create function url config for [functionArn = {url_config.function_arn}]. Error message:  FunctionUrlConfig exists for this Lambda function",
2560
                Type="User",
2561
            )
2562

2563
        if qualifier and qualifier != "$LATEST" and qualifier not in fn.aliases:
1✔
2564
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2565

2566
        normalized_qualifier = qualifier or "$LATEST"
1✔
2567

2568
        function_arn = (
1✔
2569
            api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
2570
            if qualifier
2571
            else api_utils.unqualified_lambda_arn(function_name, account_id, region)
2572
        )
2573

2574
        custom_id: str | None = None
1✔
2575

2576
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
2577
        if TAG_KEY_CUSTOM_URL in tags:
1✔
2578
            # Note: I really wanted to add verification here that the
2579
            # url_id is unique, so we could surface that to the user ASAP.
2580
            # However, it seems like that information isn't available yet,
2581
            # since (as far as I can tell) we call
2582
            # self.router.register_routes() once, in a single shot, for all
2583
            # of the routes -- and we need to verify that it's unique not
2584
            # just for this particular lambda function, but for the entire
2585
            # lambda provider. Therefore... that idea proved non-trivial!
2586
            custom_id_tag_value = (
1✔
2587
                f"{tags[TAG_KEY_CUSTOM_URL]}-{qualifier}" if qualifier else tags[TAG_KEY_CUSTOM_URL]
2588
            )
2589
            if TAG_KEY_CUSTOM_URL_VALIDATOR.match(custom_id_tag_value):
1✔
2590
                custom_id = custom_id_tag_value
1✔
2591

2592
            else:
2593
                # Note: we're logging here instead of raising to prioritize
2594
                # strict parity with AWS over the localstack-only custom_id
2595
                LOG.warning(
1✔
2596
                    "Invalid custom ID tag value for lambda URL (%s=%s). "
2597
                    "Replaced with default (random id)",
2598
                    TAG_KEY_CUSTOM_URL,
2599
                    custom_id_tag_value,
2600
                )
2601

2602
        # The url_id is the subdomain used for the URL we're creating. This
2603
        # is either created randomly (as in AWS), or can be passed as a tag
2604
        # to the lambda itself (localstack-only).
2605
        url_id: str
2606
        if custom_id is None:
1✔
2607
            url_id = api_utils.generate_random_url_id()
1✔
2608
        else:
2609
            url_id = custom_id
1✔
2610

2611
        host_definition = localstack_host(custom_port=config.GATEWAY_LISTEN[0].port)
1✔
2612
        fn.function_url_configs[normalized_qualifier] = FunctionUrlConfig(
1✔
2613
            function_arn=function_arn,
2614
            function_name=function_name,
2615
            cors=cors,
2616
            url_id=url_id,
2617
            url=f"http://{url_id}.lambda-url.{context.region}.{host_definition.host_and_port()}/",  # TODO: https support
2618
            auth_type=auth_type,
2619
            creation_time=api_utils.generate_lambda_date(),
2620
            last_modified_time=api_utils.generate_lambda_date(),
2621
            invoke_mode=invoke_mode,
2622
        )
2623

2624
        # persist and start URL
2625
        # TODO: implement URL invoke
2626
        api_url_config = api_utils.map_function_url_config(
1✔
2627
            fn.function_url_configs[normalized_qualifier]
2628
        )
2629

2630
        return CreateFunctionUrlConfigResponse(
1✔
2631
            FunctionUrl=api_url_config["FunctionUrl"],
2632
            FunctionArn=api_url_config["FunctionArn"],
2633
            AuthType=api_url_config["AuthType"],
2634
            Cors=api_url_config["Cors"],
2635
            CreationTime=api_url_config["CreationTime"],
2636
            InvokeMode=api_url_config["InvokeMode"],
2637
        )
2638

2639
    def get_function_url_config(
1✔
2640
        self,
2641
        context: RequestContext,
2642
        function_name: FunctionName,
2643
        qualifier: FunctionUrlQualifier = None,
2644
        **kwargs,
2645
    ) -> GetFunctionUrlConfigResponse:
2646
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2647
        state = lambda_stores[account_id][region]
1✔
2648

2649
        fn_name, qualifier = api_utils.get_name_and_qualifier(function_name, qualifier, context)
1✔
2650

2651
        self._validate_qualifier(qualifier)
1✔
2652

2653
        resolved_fn = state.functions.get(fn_name)
1✔
2654
        if not resolved_fn:
1✔
2655
            raise ResourceNotFoundException(
1✔
2656
                "The resource you requested does not exist.", Type="User"
2657
            )
2658

2659
        qualifier = qualifier or "$LATEST"
1✔
2660
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2661
        if not url_config:
1✔
2662
            raise ResourceNotFoundException(
1✔
2663
                "The resource you requested does not exist.", Type="User"
2664
            )
2665

2666
        return api_utils.map_function_url_config(url_config)
1✔
2667

2668
    def update_function_url_config(
1✔
2669
        self,
2670
        context: RequestContext,
2671
        function_name: FunctionName,
2672
        qualifier: FunctionUrlQualifier = None,
2673
        auth_type: FunctionUrlAuthType = None,
2674
        cors: Cors = None,
2675
        invoke_mode: InvokeMode = None,
2676
        **kwargs,
2677
    ) -> UpdateFunctionUrlConfigResponse:
2678
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2679
        state = lambda_stores[account_id][region]
1✔
2680

2681
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2682
            function_name, qualifier, context
2683
        )
2684
        self._validate_qualifier(qualifier)
1✔
2685
        self._validate_invoke_mode(invoke_mode)
1✔
2686

2687
        fn = state.functions.get(function_name)
1✔
2688
        if not fn:
1✔
2689
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2690

2691
        normalized_qualifier = qualifier or "$LATEST"
1✔
2692

2693
        if (
1✔
2694
            api_utils.qualifier_is_alias(normalized_qualifier)
2695
            and normalized_qualifier not in fn.aliases
2696
        ):
2697
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2698

2699
        url_config = fn.function_url_configs.get(normalized_qualifier)
1✔
2700
        if not url_config:
1✔
2701
            raise ResourceNotFoundException(
1✔
2702
                "The resource you requested does not exist.", Type="User"
2703
            )
2704

2705
        changes = {
1✔
2706
            "last_modified_time": api_utils.generate_lambda_date(),
2707
            **({"cors": cors} if cors is not None else {}),
2708
            **({"auth_type": auth_type} if auth_type is not None else {}),
2709
        }
2710

2711
        if invoke_mode:
1✔
2712
            changes["invoke_mode"] = invoke_mode
1✔
2713

2714
        new_url_config = dataclasses.replace(url_config, **changes)
1✔
2715
        fn.function_url_configs[normalized_qualifier] = new_url_config
1✔
2716

2717
        return UpdateFunctionUrlConfigResponse(
1✔
2718
            FunctionUrl=new_url_config.url,
2719
            FunctionArn=new_url_config.function_arn,
2720
            AuthType=new_url_config.auth_type,
2721
            Cors=new_url_config.cors,
2722
            CreationTime=new_url_config.creation_time,
2723
            LastModifiedTime=new_url_config.last_modified_time,
2724
            InvokeMode=new_url_config.invoke_mode,
2725
        )
2726

2727
    def delete_function_url_config(
1✔
2728
        self,
2729
        context: RequestContext,
2730
        function_name: FunctionName,
2731
        qualifier: FunctionUrlQualifier = None,
2732
        **kwargs,
2733
    ) -> None:
2734
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2735
        state = lambda_stores[account_id][region]
1✔
2736

2737
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2738
            function_name, qualifier, context
2739
        )
2740
        self._validate_qualifier(qualifier)
1✔
2741

2742
        resolved_fn = state.functions.get(function_name)
1✔
2743
        if not resolved_fn:
1✔
2744
            raise ResourceNotFoundException(
1✔
2745
                "The resource you requested does not exist.", Type="User"
2746
            )
2747

2748
        qualifier = qualifier or "$LATEST"
1✔
2749
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2750
        if not url_config:
1✔
2751
            raise ResourceNotFoundException(
1✔
2752
                "The resource you requested does not exist.", Type="User"
2753
            )
2754

2755
        del resolved_fn.function_url_configs[qualifier]
1✔
2756

2757
    def list_function_url_configs(
1✔
2758
        self,
2759
        context: RequestContext,
2760
        function_name: FunctionName,
2761
        marker: String = None,
2762
        max_items: MaxItems = None,
2763
        **kwargs,
2764
    ) -> ListFunctionUrlConfigsResponse:
2765
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2766
        state = lambda_stores[account_id][region]
1✔
2767

2768
        fn_name = api_utils.get_function_name(function_name, context)
1✔
2769
        resolved_fn = state.functions.get(fn_name)
1✔
2770
        if not resolved_fn:
1✔
2771
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2772

2773
        url_configs = [
1✔
2774
            api_utils.map_function_url_config(fn_conf)
2775
            for fn_conf in resolved_fn.function_url_configs.values()
2776
        ]
2777
        url_configs = PaginatedList(url_configs)
1✔
2778
        page, token = url_configs.get_page(
1✔
2779
            lambda url_config: url_config["FunctionArn"],
2780
            marker,
2781
            max_items,
2782
        )
2783
        url_configs = page
1✔
2784
        return ListFunctionUrlConfigsResponse(FunctionUrlConfigs=url_configs, NextMarker=token)
1✔
2785

2786
    # =======================================
2787
    # ============  Permissions  ============
2788
    # =======================================
2789

2790
    @handler("AddPermission", expand=False)
1✔
2791
    def add_permission(
1✔
2792
        self,
2793
        context: RequestContext,
2794
        request: AddPermissionRequest,
2795
    ) -> AddPermissionResponse:
2796
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2797
            request.get("FunctionName"), request.get("Qualifier"), context
2798
        )
2799

2800
        # validate qualifier
2801
        if qualifier is not None:
1✔
2802
            self._validate_qualifier_expression(qualifier)
1✔
2803
            if qualifier == "$LATEST":
1✔
2804
                raise InvalidParameterValueException(
1✔
2805
                    "We currently do not support adding policies for $LATEST.", Type="User"
2806
                )
2807
        account_id, region = api_utils.get_account_and_region(request.get("FunctionName"), context)
1✔
2808

2809
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2810
        resolved_qualifier, fn_arn = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2811

2812
        revision_id = request.get("RevisionId")
1✔
2813
        if revision_id:
1✔
2814
            fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2815
            if revision_id != fn_revision_id:
1✔
2816
                raise PreconditionFailedException(
1✔
2817
                    "The Revision Id provided does not match the latest Revision Id. "
2818
                    "Call the GetPolicy API to retrieve the latest Revision Id",
2819
                    Type="User",
2820
                )
2821

2822
        request_sid = request["StatementId"]
1✔
2823
        if not bool(STATEMENT_ID_REGEX.match(request_sid)):
1✔
2824
            raise ValidationException(
1✔
2825
                f"1 validation error detected: Value '{request_sid}' at 'statementId' failed to satisfy constraint: Member must satisfy regular expression pattern: ([a-zA-Z0-9-_]+)"
2826
            )
2827
        # check for an already existing policy and any conflicts in existing statements
2828
        existing_policy = resolved_fn.permissions.get(resolved_qualifier)
1✔
2829
        if existing_policy:
1✔
2830
            if request_sid in [s["Sid"] for s in existing_policy.policy.Statement]:
1✔
2831
                # uniqueness scope: statement id needs to be unique per qualified function ($LATEST, version, or alias)
2832
                # Counterexample: the same sid can exist within $LATEST, version, and alias
2833
                raise ResourceConflictException(
1✔
2834
                    f"The statement id ({request_sid}) provided already exists. Please provide a new statement id, or remove the existing statement.",
2835
                    Type="User",
2836
                )
2837

2838
        permission_statement = api_utils.build_statement(
1✔
2839
            partition=context.partition,
2840
            resource_arn=fn_arn,
2841
            statement_id=request["StatementId"],
2842
            action=request["Action"],
2843
            principal=request["Principal"],
2844
            source_arn=request.get("SourceArn"),
2845
            source_account=request.get("SourceAccount"),
2846
            principal_org_id=request.get("PrincipalOrgID"),
2847
            event_source_token=request.get("EventSourceToken"),
2848
            auth_type=request.get("FunctionUrlAuthType"),
2849
        )
2850
        new_policy = existing_policy
1✔
2851
        if not existing_policy:
1✔
2852
            new_policy = FunctionResourcePolicy(
1✔
2853
                policy=ResourcePolicy(Version="2012-10-17", Id="default", Statement=[])
2854
            )
2855
        new_policy.policy.Statement.append(permission_statement)
1✔
2856
        if not existing_policy:
1✔
2857
            resolved_fn.permissions[resolved_qualifier] = new_policy
1✔
2858

2859
        # Update revision id of alias or version
2860
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2861
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2862
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2863
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2864
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
1✔
2865
        # Assumes that a non-alias is a version
2866
        else:
2867
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2868
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2869
                resolved_version, config=dataclasses.replace(resolved_version.config)
2870
            )
2871
        return AddPermissionResponse(Statement=json.dumps(permission_statement))
1✔
2872

2873
    def remove_permission(
1✔
2874
        self,
2875
        context: RequestContext,
2876
        function_name: NamespacedFunctionName,
2877
        statement_id: NamespacedStatementId,
2878
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
2879
        revision_id: String | None = None,
2880
        **kwargs,
2881
    ) -> None:
2882
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2883
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2884
            function_name, qualifier, context
2885
        )
2886
        if qualifier is not None:
1✔
2887
            self._validate_qualifier_expression(qualifier)
1✔
2888

2889
        state = lambda_stores[account_id][region]
1✔
2890
        resolved_fn = state.functions.get(function_name)
1✔
2891
        if resolved_fn is None:
1✔
2892
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2893
            raise ResourceNotFoundException(f"No policy found for: {fn_arn}", Type="User")
1✔
2894

2895
        resolved_qualifier, _ = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2896
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2897
        if not function_permission:
1✔
2898
            raise ResourceNotFoundException(
1✔
2899
                "No policy is associated with the given resource.", Type="User"
2900
            )
2901

2902
        # try to find statement in policy and delete it
2903
        statement = None
1✔
2904
        for s in function_permission.policy.Statement:
1✔
2905
            if s["Sid"] == statement_id:
1✔
2906
                statement = s
1✔
2907
                break
1✔
2908

2909
        if not statement:
1✔
2910
            raise ResourceNotFoundException(
1✔
2911
                f"Statement {statement_id} is not found in resource policy.", Type="User"
2912
            )
2913
        fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2914
        if revision_id and revision_id != fn_revision_id:
1✔
UNCOV
2915
            raise PreconditionFailedException(
×
2916
                "The Revision Id provided does not match the latest Revision Id. "
2917
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2918
                Type="User",
2919
            )
2920
        function_permission.policy.Statement.remove(statement)
1✔
2921

2922
        # Update revision id for alias or version
2923
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2924
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2925
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
UNCOV
2926
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
×
UNCOV
2927
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
×
2928
        # Assumes that a non-alias is a version
2929
        else:
2930
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2931
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2932
                resolved_version, config=dataclasses.replace(resolved_version.config)
2933
            )
2934

2935
        # remove the policy as a whole when there's no statement left in it
2936
        if len(function_permission.policy.Statement) == 0:
1✔
2937
            del resolved_fn.permissions[resolved_qualifier]
1✔
2938

2939
    def get_policy(
1✔
2940
        self,
2941
        context: RequestContext,
2942
        function_name: NamespacedFunctionName,
2943
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
2944
        **kwargs,
2945
    ) -> GetPolicyResponse:
2946
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2947
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2948
            function_name, qualifier, context
2949
        )
2950

2951
        if qualifier is not None:
1✔
2952
            self._validate_qualifier_expression(qualifier)
1✔
2953

2954
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2955

2956
        resolved_qualifier = qualifier or "$LATEST"
1✔
2957
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2958
        if not function_permission:
1✔
2959
            raise ResourceNotFoundException(
1✔
2960
                "The resource you requested does not exist.", Type="User"
2961
            )
2962

2963
        fn_revision_id = None
1✔
2964
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2965
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2966
            fn_revision_id = resolved_alias.revision_id
1✔
2967
        # Assumes that a non-alias is a version
2968
        else:
2969
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2970
            fn_revision_id = resolved_version.config.revision_id
1✔
2971

2972
        return GetPolicyResponse(
1✔
2973
            Policy=json.dumps(dataclasses.asdict(function_permission.policy)),
2974
            RevisionId=fn_revision_id,
2975
        )
2976

2977
    # =======================================
2978
    # ========  Code signing config  ========
2979
    # =======================================
2980

2981
    def create_code_signing_config(
1✔
2982
        self,
2983
        context: RequestContext,
2984
        allowed_publishers: AllowedPublishers,
2985
        description: Description | None = None,
2986
        code_signing_policies: CodeSigningPolicies | None = None,
2987
        tags: Tags | None = None,
2988
        **kwargs,
2989
    ) -> CreateCodeSigningConfigResponse:
2990
        account = context.account_id
1✔
2991
        region = context.region
1✔
2992

2993
        state = lambda_stores[account][region]
1✔
2994
        # TODO: can there be duplicates?
2995
        csc_id = f"csc-{get_random_hex(17)}"  # e.g. 'csc-077c33b4c19e26036'
1✔
2996
        csc_arn = f"arn:{context.partition}:lambda:{region}:{account}:code-signing-config:{csc_id}"
1✔
2997
        csc = CodeSigningConfig(
1✔
2998
            csc_id=csc_id,
2999
            arn=csc_arn,
3000
            allowed_publishers=allowed_publishers,
3001
            policies=code_signing_policies,
3002
            last_modified=api_utils.generate_lambda_date(),
3003
            description=description,
3004
        )
3005
        state.code_signing_configs[csc_arn] = csc
1✔
3006
        return CreateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
3007

3008
    def put_function_code_signing_config(
1✔
3009
        self,
3010
        context: RequestContext,
3011
        code_signing_config_arn: CodeSigningConfigArn,
3012
        function_name: NamespacedFunctionName,
3013
        **kwargs,
3014
    ) -> PutFunctionCodeSigningConfigResponse:
3015
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3016
        state = lambda_stores[account_id][region]
1✔
3017
        function_name = api_utils.get_function_name(function_name, context)
1✔
3018

3019
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3020
        if not csc:
1✔
3021
            raise CodeSigningConfigNotFoundException(
1✔
3022
                f"The code signing configuration cannot be found. Check that the provided configuration is not deleted: {code_signing_config_arn}.",
3023
                Type="User",
3024
            )
3025

3026
        fn = state.functions.get(function_name)
1✔
3027
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
3028
        if not fn:
1✔
3029
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
3030

3031
        fn.code_signing_config_arn = code_signing_config_arn
1✔
3032
        return PutFunctionCodeSigningConfigResponse(
1✔
3033
            CodeSigningConfigArn=code_signing_config_arn, FunctionName=function_name
3034
        )
3035

3036
    def update_code_signing_config(
1✔
3037
        self,
3038
        context: RequestContext,
3039
        code_signing_config_arn: CodeSigningConfigArn,
3040
        description: Description = None,
3041
        allowed_publishers: AllowedPublishers = None,
3042
        code_signing_policies: CodeSigningPolicies = None,
3043
        **kwargs,
3044
    ) -> UpdateCodeSigningConfigResponse:
3045
        state = lambda_stores[context.account_id][context.region]
1✔
3046
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3047
        if not csc:
1✔
3048
            raise ResourceNotFoundException(
1✔
3049
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3050
            )
3051

3052
        changes = {
1✔
3053
            **(
3054
                {"allowed_publishers": allowed_publishers} if allowed_publishers is not None else {}
3055
            ),
3056
            **({"policies": code_signing_policies} if code_signing_policies is not None else {}),
3057
            **({"description": description} if description is not None else {}),
3058
        }
3059
        new_csc = dataclasses.replace(
1✔
3060
            csc, last_modified=api_utils.generate_lambda_date(), **changes
3061
        )
3062
        state.code_signing_configs[code_signing_config_arn] = new_csc
1✔
3063

3064
        return UpdateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(new_csc))
1✔
3065

3066
    def get_code_signing_config(
1✔
3067
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
3068
    ) -> GetCodeSigningConfigResponse:
3069
        state = lambda_stores[context.account_id][context.region]
1✔
3070
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3071
        if not csc:
1✔
3072
            raise ResourceNotFoundException(
1✔
3073
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3074
            )
3075

3076
        return GetCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
3077

3078
    def get_function_code_signing_config(
1✔
3079
        self, context: RequestContext, function_name: NamespacedFunctionName, **kwargs
3080
    ) -> GetFunctionCodeSigningConfigResponse:
3081
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3082
        state = lambda_stores[account_id][region]
1✔
3083
        function_name = api_utils.get_function_name(function_name, context)
1✔
3084
        fn = state.functions.get(function_name)
1✔
3085
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
3086
        if not fn:
1✔
3087
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
3088

3089
        if fn.code_signing_config_arn:
1✔
3090
            return GetFunctionCodeSigningConfigResponse(
1✔
3091
                CodeSigningConfigArn=fn.code_signing_config_arn, FunctionName=function_name
3092
            )
3093

3094
        return GetFunctionCodeSigningConfigResponse()
1✔
3095

3096
    def delete_function_code_signing_config(
1✔
3097
        self, context: RequestContext, function_name: NamespacedFunctionName, **kwargs
3098
    ) -> None:
3099
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3100
        state = lambda_stores[account_id][region]
1✔
3101
        function_name = api_utils.get_function_name(function_name, context)
1✔
3102
        fn = state.functions.get(function_name)
1✔
3103
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
3104
        if not fn:
1✔
3105
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
3106

3107
        fn.code_signing_config_arn = None
1✔
3108

3109
    def delete_code_signing_config(
1✔
3110
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
3111
    ) -> DeleteCodeSigningConfigResponse:
3112
        state = lambda_stores[context.account_id][context.region]
1✔
3113

3114
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3115
        if not csc:
1✔
3116
            raise ResourceNotFoundException(
1✔
3117
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3118
            )
3119

3120
        del state.code_signing_configs[code_signing_config_arn]
1✔
3121

3122
        return DeleteCodeSigningConfigResponse()
1✔
3123

3124
    def list_code_signing_configs(
1✔
3125
        self,
3126
        context: RequestContext,
3127
        marker: String = None,
3128
        max_items: MaxListItems = None,
3129
        **kwargs,
3130
    ) -> ListCodeSigningConfigsResponse:
3131
        state = lambda_stores[context.account_id][context.region]
1✔
3132

3133
        cscs = [api_utils.map_csc(csc) for csc in state.code_signing_configs.values()]
1✔
3134
        cscs = PaginatedList(cscs)
1✔
3135
        page, token = cscs.get_page(
1✔
3136
            lambda csc: csc["CodeSigningConfigId"],
3137
            marker,
3138
            max_items,
3139
        )
3140
        return ListCodeSigningConfigsResponse(CodeSigningConfigs=page, NextMarker=token)
1✔
3141

3142
    def list_functions_by_code_signing_config(
1✔
3143
        self,
3144
        context: RequestContext,
3145
        code_signing_config_arn: CodeSigningConfigArn,
3146
        marker: String = None,
3147
        max_items: MaxListItems = None,
3148
        **kwargs,
3149
    ) -> ListFunctionsByCodeSigningConfigResponse:
3150
        account = context.account_id
1✔
3151
        region = context.region
1✔
3152

3153
        state = lambda_stores[account][region]
1✔
3154

3155
        if code_signing_config_arn not in state.code_signing_configs:
1✔
3156
            raise ResourceNotFoundException(
1✔
3157
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3158
            )
3159

3160
        fn_arns = [
1✔
3161
            api_utils.unqualified_lambda_arn(fn.function_name, account, region)
3162
            for fn in state.functions.values()
3163
            if fn.code_signing_config_arn == code_signing_config_arn
3164
        ]
3165

3166
        cscs = PaginatedList(fn_arns)
1✔
3167
        page, token = cscs.get_page(
1✔
3168
            lambda x: x,
3169
            marker,
3170
            max_items,
3171
        )
3172
        return ListFunctionsByCodeSigningConfigResponse(FunctionArns=page, NextMarker=token)
1✔
3173

3174
    # =======================================
3175
    # =========  Account Settings   =========
3176
    # =======================================
3177

3178
    # CAVE: these settings & usages are *per* region!
3179
    # Lambda quotas: https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
3180
    def get_account_settings(self, context: RequestContext, **kwargs) -> GetAccountSettingsResponse:
1✔
3181
        state = lambda_stores[context.account_id][context.region]
1✔
3182

3183
        fn_count = 0
1✔
3184
        code_size_sum = 0
1✔
3185
        reserved_concurrency_sum = 0
1✔
3186
        for fn in state.functions.values():
1✔
3187
            fn_count += 1
1✔
3188
            for fn_version in fn.versions.values():
1✔
3189
                # Image-based Lambdas do not have a code attribute and count against the ECR quotas instead
3190
                if fn_version.config.package_type == PackageType.Zip:
1✔
3191
                    code_size_sum += fn_version.config.code.code_size
1✔
3192
            if fn.reserved_concurrent_executions is not None:
1✔
3193
                reserved_concurrency_sum += fn.reserved_concurrent_executions
1✔
3194
            for c in fn.provisioned_concurrency_configs.values():
1✔
3195
                reserved_concurrency_sum += c.provisioned_concurrent_executions
1✔
3196
        for layer in state.layers.values():
1✔
3197
            for layer_version in layer.layer_versions.values():
1✔
3198
                code_size_sum += layer_version.code.code_size
1✔
3199
        return GetAccountSettingsResponse(
1✔
3200
            AccountLimit=AccountLimit(
3201
                TotalCodeSize=config.LAMBDA_LIMITS_TOTAL_CODE_SIZE,
3202
                CodeSizeZipped=config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED,
3203
                CodeSizeUnzipped=config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED,
3204
                ConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS,
3205
                UnreservedConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS
3206
                - reserved_concurrency_sum,
3207
            ),
3208
            AccountUsage=AccountUsage(
3209
                TotalCodeSize=code_size_sum,
3210
                FunctionCount=fn_count,
3211
            ),
3212
        )
3213

3214
    # =======================================
3215
    # ==  Provisioned Concurrency Config   ==
3216
    # =======================================
3217

3218
    def _get_provisioned_config(
1✔
3219
        self, context: RequestContext, function_name: str, qualifier: str
3220
    ) -> ProvisionedConcurrencyConfiguration | None:
3221
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3222
        state = lambda_stores[account_id][region]
1✔
3223
        function_name = api_utils.get_function_name(function_name, context)
1✔
3224
        fn = state.functions.get(function_name)
1✔
3225
        if api_utils.qualifier_is_alias(qualifier):
1✔
3226
            fn_alias = None
1✔
3227
            if fn:
1✔
3228
                fn_alias = fn.aliases.get(qualifier)
1✔
3229
            if fn_alias is None:
1✔
3230
                raise ResourceNotFoundException(
1✔
3231
                    f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3232
                    Type="User",
3233
                )
3234
        elif api_utils.qualifier_is_version(qualifier):
1✔
3235
            fn_version = None
1✔
3236
            if fn:
1✔
3237
                fn_version = fn.versions.get(qualifier)
1✔
3238
            if fn_version is None:
1✔
3239
                raise ResourceNotFoundException(
1✔
3240
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3241
                    Type="User",
3242
                )
3243

3244
        return fn.provisioned_concurrency_configs.get(qualifier)
1✔
3245

3246
    def put_provisioned_concurrency_config(
1✔
3247
        self,
3248
        context: RequestContext,
3249
        function_name: FunctionName,
3250
        qualifier: Qualifier,
3251
        provisioned_concurrent_executions: PositiveInteger,
3252
        **kwargs,
3253
    ) -> PutProvisionedConcurrencyConfigResponse:
3254
        if provisioned_concurrent_executions <= 0:
1✔
3255
            raise ValidationException(
1✔
3256
                f"1 validation error detected: Value '{provisioned_concurrent_executions}' at 'provisionedConcurrentExecutions' failed to satisfy constraint: Member must have value greater than or equal to 1"
3257
            )
3258

3259
        if qualifier == "$LATEST":
1✔
3260
            raise InvalidParameterValueException(
1✔
3261
                "Provisioned Concurrency Configs cannot be applied to unpublished function versions.",
3262
                Type="User",
3263
            )
3264
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3265
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3266
            function_name, qualifier, context
3267
        )
3268
        state = lambda_stores[account_id][region]
1✔
3269
        fn = state.functions.get(function_name)
1✔
3270

3271
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3272

3273
        if provisioned_config:  # TODO: merge?
1✔
3274
            # TODO: add a test for partial updates (if possible)
3275
            LOG.warning(
1✔
3276
                "Partial update of provisioned concurrency config is currently not supported."
3277
            )
3278

3279
        other_provisioned_sum = sum(
1✔
3280
            [
3281
                provisioned_configs.provisioned_concurrent_executions
3282
                for provisioned_qualifier, provisioned_configs in fn.provisioned_concurrency_configs.items()
3283
                if provisioned_qualifier != qualifier
3284
            ]
3285
        )
3286

3287
        if (
1✔
3288
            fn.reserved_concurrent_executions is not None
3289
            and fn.reserved_concurrent_executions
3290
            < other_provisioned_sum + provisioned_concurrent_executions
3291
        ):
3292
            raise InvalidParameterValueException(
1✔
3293
                "Requested Provisioned Concurrency should not be greater than the reservedConcurrentExecution for function",
3294
                Type="User",
3295
            )
3296

3297
        if provisioned_concurrent_executions > config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS:
1✔
3298
            raise InvalidParameterValueException(
1✔
3299
                f"Specified ConcurrentExecutions for function is greater than account's unreserved concurrency"
3300
                f" [{config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS}]."
3301
            )
3302

3303
        settings = self.get_account_settings(context)
1✔
3304
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
3305
            "UnreservedConcurrentExecutions"
3306
        ]
3307
        if (
1✔
3308
            unreserved_concurrent_executions - provisioned_concurrent_executions
3309
            < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY
3310
        ):
3311
            raise InvalidParameterValueException(
1✔
3312
                f"Specified ConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below"
3313
                f" its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
3314
            )
3315

3316
        provisioned_config = ProvisionedConcurrencyConfiguration(
1✔
3317
            provisioned_concurrent_executions, api_utils.generate_lambda_date()
3318
        )
3319
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3320

3321
        if api_utils.qualifier_is_alias(qualifier):
1✔
3322
            alias = fn.aliases.get(qualifier)
1✔
3323
            resolved_version = fn.versions.get(alias.function_version)
1✔
3324

3325
            if (
1✔
3326
                resolved_version
3327
                and fn.provisioned_concurrency_configs.get(alias.function_version) is not None
3328
            ):
3329
                raise ResourceConflictException(
1✔
3330
                    "Alias can't be used for Provisioned Concurrency configuration on an already Provisioned version",
3331
                    Type="User",
3332
                )
3333
            fn_arn = resolved_version.id.qualified_arn()
1✔
3334
        elif api_utils.qualifier_is_version(qualifier):
1✔
3335
            fn_version = fn.versions.get(qualifier)
1✔
3336

3337
            # TODO: might be useful other places, utilize
3338
            pointing_aliases = []
1✔
3339
            for alias in fn.aliases.values():
1✔
3340
                if (
1✔
3341
                    alias.function_version == qualifier
3342
                    and fn.provisioned_concurrency_configs.get(alias.name) is not None
3343
                ):
3344
                    pointing_aliases.append(alias.name)
1✔
3345
            if pointing_aliases:
1✔
3346
                raise ResourceConflictException(
1✔
3347
                    "Version is pointed by a Provisioned Concurrency alias", Type="User"
3348
                )
3349

3350
            fn_arn = fn_version.id.qualified_arn()
1✔
3351

3352
        manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3353

3354
        fn.provisioned_concurrency_configs[qualifier] = provisioned_config
1✔
3355

3356
        manager.update_provisioned_concurrency_config(
1✔
3357
            provisioned_config.provisioned_concurrent_executions
3358
        )
3359

3360
        return PutProvisionedConcurrencyConfigResponse(
1✔
3361
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3362
            AvailableProvisionedConcurrentExecutions=0,
3363
            AllocatedProvisionedConcurrentExecutions=0,
3364
            Status=ProvisionedConcurrencyStatusEnum.IN_PROGRESS,
3365
            # StatusReason=manager.provisioned_state.status_reason,
3366
            LastModified=provisioned_config.last_modified,  # TODO: does change with configuration or also with state changes?
3367
        )
3368

3369
    def get_provisioned_concurrency_config(
1✔
3370
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3371
    ) -> GetProvisionedConcurrencyConfigResponse:
3372
        if qualifier == "$LATEST":
1✔
3373
            raise InvalidParameterValueException(
1✔
3374
                "The function resource provided must be an alias or a published version.",
3375
                Type="User",
3376
            )
3377
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3378
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3379
            function_name, qualifier, context
3380
        )
3381

3382
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3383
        if not provisioned_config:
1✔
3384
            raise ProvisionedConcurrencyConfigNotFoundException(
1✔
3385
                "No Provisioned Concurrency Config found for this function", Type="User"
3386
            )
3387

3388
        # TODO: make this compatible with alias pointer migration on update
3389
        if api_utils.qualifier_is_alias(qualifier):
1✔
3390
            state = lambda_stores[account_id][region]
1✔
3391
            fn = state.functions.get(function_name)
1✔
3392
            alias = fn.aliases.get(qualifier)
1✔
3393
            fn_arn = api_utils.qualified_lambda_arn(
1✔
3394
                function_name, alias.function_version, account_id, region
3395
            )
3396
        else:
3397
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3398

3399
        ver_manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3400

3401
        return GetProvisionedConcurrencyConfigResponse(
1✔
3402
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3403
            LastModified=provisioned_config.last_modified,
3404
            AvailableProvisionedConcurrentExecutions=ver_manager.provisioned_state.available,
3405
            AllocatedProvisionedConcurrentExecutions=ver_manager.provisioned_state.allocated,
3406
            Status=ver_manager.provisioned_state.status,
3407
            StatusReason=ver_manager.provisioned_state.status_reason,
3408
        )
3409

3410
    def list_provisioned_concurrency_configs(
1✔
3411
        self,
3412
        context: RequestContext,
3413
        function_name: FunctionName,
3414
        marker: String = None,
3415
        max_items: MaxProvisionedConcurrencyConfigListItems = None,
3416
        **kwargs,
3417
    ) -> ListProvisionedConcurrencyConfigsResponse:
3418
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3419
        state = lambda_stores[account_id][region]
1✔
3420

3421
        function_name = api_utils.get_function_name(function_name, context)
1✔
3422
        fn = state.functions.get(function_name)
1✔
3423
        if fn is None:
1✔
3424
            raise ResourceNotFoundException(
1✔
3425
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
3426
                Type="User",
3427
            )
3428

3429
        configs = []
1✔
3430
        for qualifier, pc_config in fn.provisioned_concurrency_configs.items():
1✔
UNCOV
3431
            if api_utils.qualifier_is_alias(qualifier):
×
UNCOV
3432
                alias = fn.aliases.get(qualifier)
×
3433
                fn_arn = api_utils.qualified_lambda_arn(
×
3434
                    function_name, alias.function_version, account_id, region
3435
                )
3436
            else:
UNCOV
3437
                fn_arn = api_utils.qualified_lambda_arn(
×
3438
                    function_name, qualifier, account_id, region
3439
                )
3440

UNCOV
3441
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
3442

UNCOV
3443
            configs.append(
×
3444
                ProvisionedConcurrencyConfigListItem(
3445
                    FunctionArn=api_utils.qualified_lambda_arn(
3446
                        function_name, qualifier, account_id, region
3447
                    ),
3448
                    RequestedProvisionedConcurrentExecutions=pc_config.provisioned_concurrent_executions,
3449
                    AvailableProvisionedConcurrentExecutions=manager.provisioned_state.available,
3450
                    AllocatedProvisionedConcurrentExecutions=manager.provisioned_state.allocated,
3451
                    Status=manager.provisioned_state.status,
3452
                    StatusReason=manager.provisioned_state.status_reason,
3453
                    LastModified=pc_config.last_modified,
3454
                )
3455
            )
3456

3457
        provisioned_concurrency_configs = configs
1✔
3458
        provisioned_concurrency_configs = PaginatedList(provisioned_concurrency_configs)
1✔
3459
        page, token = provisioned_concurrency_configs.get_page(
1✔
3460
            lambda x: x,
3461
            marker,
3462
            max_items,
3463
        )
3464
        return ListProvisionedConcurrencyConfigsResponse(
1✔
3465
            ProvisionedConcurrencyConfigs=page, NextMarker=token
3466
        )
3467

3468
    def delete_provisioned_concurrency_config(
1✔
3469
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3470
    ) -> None:
3471
        if qualifier == "$LATEST":
1✔
3472
            raise InvalidParameterValueException(
1✔
3473
                "The function resource provided must be an alias or a published version.",
3474
                Type="User",
3475
            )
3476
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3477
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3478
            function_name, qualifier, context
3479
        )
3480
        state = lambda_stores[account_id][region]
1✔
3481
        fn = state.functions.get(function_name)
1✔
3482

3483
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3484
        # delete is idempotent and doesn't actually care about the provisioned concurrency config not existing
3485
        if provisioned_config:
1✔
3486
            fn.provisioned_concurrency_configs.pop(qualifier)
1✔
3487
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3488
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3489
            manager.update_provisioned_concurrency_config(0)
1✔
3490

3491
    # =======================================
3492
    # =======  Event Invoke Config   ========
3493
    # =======================================
3494

3495
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)"
3496
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]2((-gov)|(-iso(b?)))?-[a-z]+-\\d1)?:(\\d12)?:(.*)" ... (expected → actual)
3497

3498
    def _validate_destination_config(
1✔
3499
        self, store: LambdaStore, function_name: str, destination_config: DestinationConfig
3500
    ):
3501
        def _validate_destination_arn(destination_arn) -> bool:
1✔
3502
            if not api_utils.DESTINATION_ARN_PATTERN.match(destination_arn):
1✔
3503
                # technically we shouldn't handle this in the provider
3504
                raise ValidationException(
1✔
3505
                    "1 validation error detected: Value '"
3506
                    + destination_arn
3507
                    + "' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: "
3508
                    + "$|kafka://([^.]([a-zA-Z0-9\\-_.]{0,248}))|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:((eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)"
3509
                )
3510

3511
            match destination_arn.split(":")[2]:
1✔
3512
                case "lambda":
1✔
3513
                    fn_parts = api_utils.FULL_FN_ARN_PATTERN.search(destination_arn).groupdict()
1✔
3514
                    if fn_parts:
1✔
3515
                        # check if it exists
3516
                        fn = store.functions.get(fn_parts["function_name"])
1✔
3517
                        if not fn:
1✔
3518
                            raise InvalidParameterValueException(
1✔
3519
                                f"The destination ARN {destination_arn} is invalid.", Type="User"
3520
                            )
3521
                        if fn_parts["function_name"] == function_name:
1✔
3522
                            raise InvalidParameterValueException(
1✔
3523
                                "You can't specify the function as a destination for itself.",
3524
                                Type="User",
3525
                            )
3526
                case "sns" | "sqs" | "events":
1✔
3527
                    pass
1✔
3528
                case _:
1✔
3529
                    return False
1✔
3530
            return True
1✔
3531

3532
        validation_err = False
1✔
3533

3534
        failure_destination = destination_config.get("OnFailure", {}).get("Destination")
1✔
3535
        if failure_destination:
1✔
3536
            validation_err = validation_err or not _validate_destination_arn(failure_destination)
1✔
3537

3538
        success_destination = destination_config.get("OnSuccess", {}).get("Destination")
1✔
3539
        if success_destination:
1✔
3540
            validation_err = validation_err or not _validate_destination_arn(success_destination)
1✔
3541

3542
        if validation_err:
1✔
3543
            on_success_part = (
1✔
3544
                f"OnSuccess(destination={success_destination})" if success_destination else "null"
3545
            )
3546
            on_failure_part = (
1✔
3547
                f"OnFailure(destination={failure_destination})" if failure_destination else "null"
3548
            )
3549
            raise InvalidParameterValueException(
1✔
3550
                f"The provided destination config DestinationConfig(onSuccess={on_success_part}, onFailure={on_failure_part}) is invalid.",
3551
                Type="User",
3552
            )
3553

3554
    def put_function_event_invoke_config(
1✔
3555
        self,
3556
        context: RequestContext,
3557
        function_name: FunctionName,
3558
        qualifier: NumericLatestPublishedOrAliasQualifier = None,
3559
        maximum_retry_attempts: MaximumRetryAttempts = None,
3560
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3561
        destination_config: DestinationConfig = None,
3562
        **kwargs,
3563
    ) -> FunctionEventInvokeConfig:
3564
        """
3565
        Destination ARNs can be:
3566
        * SQS arn
3567
        * SNS arn
3568
        * Lambda arn
3569
        * EventBridge arn
3570

3571
        Differences between put_ and update_:
3572
            * put overwrites any existing config
3573
            * update allows changes only single values while keeping the rest of existing ones
3574
            * update fails on non-existing configs
3575

3576
        Differences between destination and DLQ
3577
            * "However, a dead-letter queue is part of a function's version-specific configuration, so it is locked in when you publish a version."
3578
            *  "On-failure destinations also support additional targets and include details about the function's response in the invocation record."
3579

3580
        """
3581
        if (
1✔
3582
            maximum_event_age_in_seconds is None
3583
            and maximum_retry_attempts is None
3584
            and destination_config is None
3585
        ):
3586
            raise InvalidParameterValueException(
1✔
3587
                "You must specify at least one of error handling or destination setting.",
3588
                Type="User",
3589
            )
3590
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3591
        state = lambda_stores[account_id][region]
1✔
3592
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3593
            function_name, qualifier, context
3594
        )
3595
        fn = state.functions.get(function_name)
1✔
3596
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3597
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3598

3599
        qualifier = qualifier or "$LATEST"
1✔
3600

3601
        # validate and normalize destination config
3602
        if destination_config:
1✔
3603
            self._validate_destination_config(state, function_name, destination_config)
1✔
3604

3605
        destination_config = DestinationConfig(
1✔
3606
            OnSuccess=OnSuccess(
3607
                Destination=(destination_config or {}).get("OnSuccess", {}).get("Destination")
3608
            ),
3609
            OnFailure=OnFailure(
3610
                Destination=(destination_config or {}).get("OnFailure", {}).get("Destination")
3611
            ),
3612
        )
3613

3614
        config = EventInvokeConfig(
1✔
3615
            function_name=function_name,
3616
            qualifier=qualifier,
3617
            maximum_event_age_in_seconds=maximum_event_age_in_seconds,
3618
            maximum_retry_attempts=maximum_retry_attempts,
3619
            last_modified=api_utils.generate_lambda_date(),
3620
            destination_config=destination_config,
3621
        )
3622
        fn.event_invoke_configs[qualifier] = config
1✔
3623

3624
        return FunctionEventInvokeConfig(
1✔
3625
            LastModified=datetime.datetime.strptime(
3626
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3627
            ),
3628
            FunctionArn=api_utils.qualified_lambda_arn(
3629
                function_name, qualifier or "$LATEST", account_id, region
3630
            ),
3631
            DestinationConfig=destination_config,
3632
            MaximumEventAgeInSeconds=maximum_event_age_in_seconds,
3633
            MaximumRetryAttempts=maximum_retry_attempts,
3634
        )
3635

3636
    def get_function_event_invoke_config(
1✔
3637
        self,
3638
        context: RequestContext,
3639
        function_name: FunctionName,
3640
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
3641
        **kwargs,
3642
    ) -> FunctionEventInvokeConfig:
3643
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3644
        state = lambda_stores[account_id][region]
1✔
3645
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3646
            function_name, qualifier, context
3647
        )
3648

3649
        qualifier = qualifier or "$LATEST"
1✔
3650
        fn = state.functions.get(function_name)
1✔
3651
        if not fn:
1✔
3652
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3653
            raise ResourceNotFoundException(
1✔
3654
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3655
            )
3656

3657
        config = fn.event_invoke_configs.get(qualifier)
1✔
3658
        if not config:
1✔
3659
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3660
            raise ResourceNotFoundException(
1✔
3661
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3662
            )
3663

3664
        return FunctionEventInvokeConfig(
1✔
3665
            LastModified=datetime.datetime.strptime(
3666
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3667
            ),
3668
            FunctionArn=api_utils.qualified_lambda_arn(
3669
                function_name, qualifier, account_id, region
3670
            ),
3671
            DestinationConfig=config.destination_config,
3672
            MaximumEventAgeInSeconds=config.maximum_event_age_in_seconds,
3673
            MaximumRetryAttempts=config.maximum_retry_attempts,
3674
        )
3675

3676
    def list_function_event_invoke_configs(
1✔
3677
        self,
3678
        context: RequestContext,
3679
        function_name: FunctionName,
3680
        marker: String = None,
3681
        max_items: MaxFunctionEventInvokeConfigListItems = None,
3682
        **kwargs,
3683
    ) -> ListFunctionEventInvokeConfigsResponse:
3684
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3685
        state = lambda_stores[account_id][region]
1✔
3686
        fn = state.functions.get(function_name)
1✔
3687
        if not fn:
1✔
3688
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3689

3690
        event_invoke_configs = [
1✔
3691
            FunctionEventInvokeConfig(
3692
                LastModified=c.last_modified,
3693
                FunctionArn=api_utils.qualified_lambda_arn(
3694
                    function_name, c.qualifier, account_id, region
3695
                ),
3696
                MaximumEventAgeInSeconds=c.maximum_event_age_in_seconds,
3697
                MaximumRetryAttempts=c.maximum_retry_attempts,
3698
                DestinationConfig=c.destination_config,
3699
            )
3700
            for c in fn.event_invoke_configs.values()
3701
        ]
3702

3703
        event_invoke_configs = PaginatedList(event_invoke_configs)
1✔
3704
        page, token = event_invoke_configs.get_page(
1✔
3705
            lambda x: x["FunctionArn"],
3706
            marker,
3707
            max_items,
3708
        )
3709
        return ListFunctionEventInvokeConfigsResponse(
1✔
3710
            FunctionEventInvokeConfigs=page, NextMarker=token
3711
        )
3712

3713
    def delete_function_event_invoke_config(
1✔
3714
        self,
3715
        context: RequestContext,
3716
        function_name: FunctionName,
3717
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
3718
        **kwargs,
3719
    ) -> None:
3720
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3721
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3722
            function_name, qualifier, context
3723
        )
3724
        state = lambda_stores[account_id][region]
1✔
3725
        fn = state.functions.get(function_name)
1✔
3726
        resolved_qualifier = qualifier or "$LATEST"
1✔
3727
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3728
        if not fn:
1✔
3729
            raise ResourceNotFoundException(
1✔
3730
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3731
            )
3732

3733
        config = fn.event_invoke_configs.get(resolved_qualifier)
1✔
3734
        if not config:
1✔
3735
            raise ResourceNotFoundException(
1✔
3736
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3737
            )
3738

3739
        del fn.event_invoke_configs[resolved_qualifier]
1✔
3740

3741
    def update_function_event_invoke_config(
1✔
3742
        self,
3743
        context: RequestContext,
3744
        function_name: FunctionName,
3745
        qualifier: NumericLatestPublishedOrAliasQualifier = None,
3746
        maximum_retry_attempts: MaximumRetryAttempts = None,
3747
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3748
        destination_config: DestinationConfig = None,
3749
        **kwargs,
3750
    ) -> FunctionEventInvokeConfig:
3751
        # like put but only update single fields via replace
3752
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3753
        state = lambda_stores[account_id][region]
1✔
3754
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3755
            function_name, qualifier, context
3756
        )
3757

3758
        if (
1✔
3759
            maximum_event_age_in_seconds is None
3760
            and maximum_retry_attempts is None
3761
            and destination_config is None
3762
        ):
UNCOV
3763
            raise InvalidParameterValueException(
×
3764
                "You must specify at least one of error handling or destination setting.",
3765
                Type="User",
3766
            )
3767

3768
        fn = state.functions.get(function_name)
1✔
3769
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3770
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3771

3772
        qualifier = qualifier or "$LATEST"
1✔
3773

3774
        config = fn.event_invoke_configs.get(qualifier)
1✔
3775
        if not config:
1✔
3776
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3777
            raise ResourceNotFoundException(
1✔
3778
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3779
            )
3780

3781
        if destination_config:
1✔
UNCOV
3782
            self._validate_destination_config(state, function_name, destination_config)
×
3783

3784
        optional_kwargs = {
1✔
3785
            k: v
3786
            for k, v in {
3787
                "destination_config": destination_config,
3788
                "maximum_retry_attempts": maximum_retry_attempts,
3789
                "maximum_event_age_in_seconds": maximum_event_age_in_seconds,
3790
            }.items()
3791
            if v is not None
3792
        }
3793

3794
        new_config = dataclasses.replace(
1✔
3795
            config, last_modified=api_utils.generate_lambda_date(), **optional_kwargs
3796
        )
3797
        fn.event_invoke_configs[qualifier] = new_config
1✔
3798

3799
        return FunctionEventInvokeConfig(
1✔
3800
            LastModified=datetime.datetime.strptime(
3801
                new_config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3802
            ),
3803
            FunctionArn=api_utils.qualified_lambda_arn(
3804
                function_name, qualifier or "$LATEST", account_id, region
3805
            ),
3806
            DestinationConfig=new_config.destination_config,
3807
            MaximumEventAgeInSeconds=new_config.maximum_event_age_in_seconds,
3808
            MaximumRetryAttempts=new_config.maximum_retry_attempts,
3809
        )
3810

3811
    # =======================================
3812
    # ======  Layer & Layer Versions  =======
3813
    # =======================================
3814

3815
    @staticmethod
1✔
3816
    def _resolve_layer(
1✔
3817
        layer_name_or_arn: str, context: RequestContext
3818
    ) -> tuple[str, str, str, str | None]:
3819
        """
3820
        Return locator attributes for a given Lambda layer.
3821

3822
        :param layer_name_or_arn: Layer name or ARN
3823
        :param context: Request context
3824
        :return: Tuple of region, account ID, layer name, layer version
3825
        """
3826
        if api_utils.is_layer_arn(layer_name_or_arn):
1✔
3827
            return api_utils.parse_layer_arn(layer_name_or_arn)
1✔
3828

3829
        return context.region, context.account_id, layer_name_or_arn, None
1✔
3830

3831
    def publish_layer_version(
1✔
3832
        self,
3833
        context: RequestContext,
3834
        layer_name: LayerName,
3835
        content: LayerVersionContentInput,
3836
        description: Description | None = None,
3837
        compatible_runtimes: CompatibleRuntimes | None = None,
3838
        license_info: LicenseInfo | None = None,
3839
        compatible_architectures: CompatibleArchitectures | None = None,
3840
        **kwargs,
3841
    ) -> PublishLayerVersionResponse:
3842
        """
3843
        On first use of a LayerName a new layer is created and for each subsequent call with the same LayerName a new version is created.
3844
        Note that there are no $LATEST versions with layers!
3845

3846
        """
3847
        account = context.account_id
1✔
3848
        region = context.region
1✔
3849

3850
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
3851
            compatible_runtimes, compatible_architectures
3852
        )
3853
        if validation_errors:
1✔
3854
            raise ValidationException(
1✔
3855
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {'; '.join(validation_errors)}"
3856
            )
3857

3858
        state = lambda_stores[account][region]
1✔
3859
        with self.create_layer_lock:
1✔
3860
            if layer_name not in state.layers:
1✔
3861
                # we don't have a version so create new layer object
3862
                # lock is required to avoid creating two v1 objects for the same name
3863
                layer = Layer(
1✔
3864
                    arn=api_utils.layer_arn(layer_name=layer_name, account=account, region=region)
3865
                )
3866
                state.layers[layer_name] = layer
1✔
3867

3868
        layer = state.layers[layer_name]
1✔
3869
        with layer.next_version_lock:
1✔
3870
            next_version = LambdaLayerVersionIdentifier(
1✔
3871
                account_id=account, region=region, layer_name=layer_name
3872
            ).generate(next_version=layer.next_version)
3873
            # When creating a layer with user defined layer version, it is possible that we
3874
            # create layer versions out of order.
3875
            # ie. a user could replicate layer v2 then layer v1. It is important to always keep the maximum possible
3876
            # value for next layer to avoid overwriting existing versions
3877
            if layer.next_version <= next_version:
1✔
3878
                # We don't need to update layer.next_version if the created version is lower than the "next in line"
3879
                layer.next_version = max(next_version, layer.next_version) + 1
1✔
3880

3881
        # creating a new layer
3882
        if content.get("ZipFile"):
1✔
3883
            code = store_lambda_archive(
1✔
3884
                archive_file=content["ZipFile"],
3885
                function_name=layer_name,
3886
                region_name=region,
3887
                account_id=account,
3888
            )
3889
        else:
3890
            code = store_s3_bucket_archive(
1✔
3891
                archive_bucket=content["S3Bucket"],
3892
                archive_key=content["S3Key"],
3893
                archive_version=content.get("S3ObjectVersion"),
3894
                function_name=layer_name,
3895
                region_name=region,
3896
                account_id=account,
3897
            )
3898

3899
        new_layer_version = LayerVersion(
1✔
3900
            layer_version_arn=api_utils.layer_version_arn(
3901
                layer_name=layer_name,
3902
                account=account,
3903
                region=region,
3904
                version=str(next_version),
3905
            ),
3906
            layer_arn=layer.arn,
3907
            version=next_version,
3908
            description=description or "",
3909
            license_info=license_info,
3910
            compatible_runtimes=compatible_runtimes,
3911
            compatible_architectures=compatible_architectures,
3912
            created=api_utils.generate_lambda_date(),
3913
            code=code,
3914
        )
3915

3916
        layer.layer_versions[str(next_version)] = new_layer_version
1✔
3917

3918
        return api_utils.map_layer_out(new_layer_version)
1✔
3919

3920
    def get_layer_version(
1✔
3921
        self,
3922
        context: RequestContext,
3923
        layer_name: LayerName,
3924
        version_number: LayerVersionNumber,
3925
        **kwargs,
3926
    ) -> GetLayerVersionResponse:
3927
        # TODO: handle layer_name as an ARN
3928

3929
        region_name, account_id, layer_name, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
3930
        state = lambda_stores[account_id][region_name]
1✔
3931

3932
        layer = state.layers.get(layer_name)
1✔
3933
        if version_number < 1:
1✔
3934
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
3935
        if layer is None:
1✔
3936
            raise ResourceNotFoundException(
1✔
3937
                "The resource you requested does not exist.", Type="User"
3938
            )
3939
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3940
        if layer_version is None:
1✔
3941
            raise ResourceNotFoundException(
1✔
3942
                "The resource you requested does not exist.", Type="User"
3943
            )
3944
        return api_utils.map_layer_out(layer_version)
1✔
3945

3946
    def get_layer_version_by_arn(
1✔
3947
        self, context: RequestContext, arn: LayerVersionArn, **kwargs
3948
    ) -> GetLayerVersionResponse:
3949
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3950
            arn, context
3951
        )
3952

3953
        if not layer_version:
1✔
3954
            raise ValidationException(
1✔
3955
                f"1 validation error detected: Value '{arn}' at 'arn' failed to satisfy constraint: Member must satisfy regular expression pattern: "
3956
                + "(arn:(aws[a-zA-Z-]*)?:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+)"
3957
            )
3958

3959
        store = lambda_stores[account_id][region_name]
1✔
3960
        if not (layers := store.layers.get(layer_name)):
1✔
UNCOV
3961
            raise ResourceNotFoundException(
×
3962
                "The resource you requested does not exist.", Type="User"
3963
            )
3964

3965
        layer_version = layers.layer_versions.get(layer_version)
1✔
3966

3967
        if not layer_version:
1✔
3968
            raise ResourceNotFoundException(
1✔
3969
                "The resource you requested does not exist.", Type="User"
3970
            )
3971

3972
        return api_utils.map_layer_out(layer_version)
1✔
3973

3974
    def list_layers(
1✔
3975
        self,
3976
        context: RequestContext,
3977
        compatible_runtime: Runtime | None = None,
3978
        marker: String | None = None,
3979
        max_items: MaxLayerListItems | None = None,
3980
        compatible_architecture: Architecture | None = None,
3981
        **kwargs,
3982
    ) -> ListLayersResponse:
3983
        validation_errors = []
1✔
3984

3985
        validation_error_arch = api_utils.validate_layer_architecture(compatible_architecture)
1✔
3986
        if validation_error_arch:
1✔
3987
            validation_errors.append(validation_error_arch)
1✔
3988

3989
        validation_error_runtime = api_utils.validate_layer_runtime(compatible_runtime)
1✔
3990
        if validation_error_runtime:
1✔
3991
            validation_errors.append(validation_error_runtime)
1✔
3992

3993
        if validation_errors:
1✔
3994
            raise ValidationException(
1✔
3995
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
3996
            )
3997
        # TODO: handle filter: compatible_runtime
3998
        # TODO: handle filter: compatible_architecture
3999

4000
        state = lambda_stores[context.account_id][context.region]
×
4001
        layers = state.layers
×
4002

4003
        # TODO: test how filters behave together with only returning layers here? Does it return the latest "matching" layer, i.e. does it ignore later layer versions that don't match?
4004

UNCOV
4005
        responses: list[LayersListItem] = []
×
UNCOV
4006
        for layer_name, layer in layers.items():
×
4007
            # fetch latest version
UNCOV
4008
            layer_versions = list(layer.layer_versions.values())
×
UNCOV
4009
            sorted(layer_versions, key=lambda x: x.version)
×
UNCOV
4010
            latest_layer_version = layer_versions[-1]
×
4011
            responses.append(
×
4012
                LayersListItem(
4013
                    LayerName=layer_name,
4014
                    LayerArn=layer.arn,
4015
                    LatestMatchingVersion=api_utils.map_layer_out(latest_layer_version),
4016
                )
4017
            )
4018

UNCOV
4019
        responses = PaginatedList(responses)
×
UNCOV
4020
        page, token = responses.get_page(
×
4021
            lambda version: version,
4022
            marker,
4023
            max_items,
4024
        )
4025

UNCOV
4026
        return ListLayersResponse(NextMarker=token, Layers=page)
×
4027

4028
    def list_layer_versions(
1✔
4029
        self,
4030
        context: RequestContext,
4031
        layer_name: LayerName,
4032
        compatible_runtime: Runtime | None = None,
4033
        marker: String | None = None,
4034
        max_items: MaxLayerListItems | None = None,
4035
        compatible_architecture: Architecture | None = None,
4036
        **kwargs,
4037
    ) -> ListLayerVersionsResponse:
4038
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
4039
            [compatible_runtime] if compatible_runtime else [],
4040
            [compatible_architecture] if compatible_architecture else [],
4041
        )
4042
        if validation_errors:
1✔
UNCOV
4043
            raise ValidationException(
×
4044
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
4045
            )
4046

4047
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
4048
            layer_name, context
4049
        )
4050
        state = lambda_stores[account_id][region_name]
1✔
4051

4052
        # TODO: Test & handle filter: compatible_runtime
4053
        # TODO: Test & handle filter: compatible_architecture
4054
        all_layer_versions = []
1✔
4055
        layer = state.layers.get(layer_name)
1✔
4056
        if layer is not None:
1✔
4057
            for layer_version in layer.layer_versions.values():
1✔
4058
                all_layer_versions.append(api_utils.map_layer_out(layer_version))
1✔
4059

4060
        all_layer_versions.sort(key=lambda x: x["Version"], reverse=True)
1✔
4061
        all_layer_versions = PaginatedList(all_layer_versions)
1✔
4062
        page, token = all_layer_versions.get_page(
1✔
4063
            lambda version: version["LayerVersionArn"],
4064
            marker,
4065
            max_items,
4066
        )
4067
        return ListLayerVersionsResponse(NextMarker=token, LayerVersions=page)
1✔
4068

4069
    def delete_layer_version(
1✔
4070
        self,
4071
        context: RequestContext,
4072
        layer_name: LayerName,
4073
        version_number: LayerVersionNumber,
4074
        **kwargs,
4075
    ) -> None:
4076
        if version_number < 1:
1✔
4077
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
4078

4079
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
4080
            layer_name, context
4081
        )
4082

4083
        store = lambda_stores[account_id][region_name]
1✔
4084
        layer = store.layers.get(layer_name, {})
1✔
4085
        if layer:
1✔
4086
            layer.layer_versions.pop(str(version_number), None)
1✔
4087

4088
    # =======================================
4089
    # =====  Layer Version Permissions  =====
4090
    # =======================================
4091
    # TODO: lock updates that change revision IDs
4092

4093
    def add_layer_version_permission(
1✔
4094
        self,
4095
        context: RequestContext,
4096
        layer_name: LayerName,
4097
        version_number: LayerVersionNumber,
4098
        statement_id: StatementId,
4099
        action: LayerPermissionAllowedAction,
4100
        principal: LayerPermissionAllowedPrincipal,
4101
        organization_id: OrganizationId = None,
4102
        revision_id: String = None,
4103
        **kwargs,
4104
    ) -> AddLayerVersionPermissionResponse:
4105
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4106
        # `layer_n` contains the layer name.
4107
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
4108

4109
        if action != "lambda:GetLayerVersion":
1✔
4110
            raise ValidationException(
1✔
4111
                f"1 validation error detected: Value '{action}' at 'action' failed to satisfy constraint: Member must satisfy regular expression pattern: lambda:GetLayerVersion"
4112
            )
4113

4114
        store = lambda_stores[account_id][region_name]
1✔
4115
        layer = store.layers.get(layer_n)
1✔
4116

4117
        layer_version_arn = api_utils.layer_version_arn(
1✔
4118
            layer_name, account_id, region_name, str(version_number)
4119
        )
4120

4121
        if layer is None:
1✔
4122
            raise ResourceNotFoundException(
1✔
4123
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4124
            )
4125
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4126
        if layer_version is None:
1✔
4127
            raise ResourceNotFoundException(
1✔
4128
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4129
            )
4130
        # do we have a policy? if not set one
4131
        if layer_version.policy is None:
1✔
4132
            layer_version.policy = LayerPolicy()
1✔
4133

4134
        if statement_id in layer_version.policy.statements:
1✔
4135
            raise ResourceConflictException(
1✔
4136
                f"The statement id ({statement_id}) provided already exists. Please provide a new statement id, or remove the existing statement.",
4137
                Type="User",
4138
            )
4139

4140
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
4141
            raise PreconditionFailedException(
1✔
4142
                "The Revision Id provided does not match the latest Revision Id. "
4143
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
4144
                Type="User",
4145
            )
4146

4147
        statement = LayerPolicyStatement(
1✔
4148
            sid=statement_id, action=action, principal=principal, organization_id=organization_id
4149
        )
4150

4151
        old_statements = layer_version.policy.statements
1✔
4152
        layer_version.policy = dataclasses.replace(
1✔
4153
            layer_version.policy, statements={**old_statements, statement_id: statement}
4154
        )
4155

4156
        return AddLayerVersionPermissionResponse(
1✔
4157
            Statement=json.dumps(
4158
                {
4159
                    "Sid": statement.sid,
4160
                    "Effect": "Allow",
4161
                    "Principal": statement.principal,
4162
                    "Action": statement.action,
4163
                    "Resource": layer_version.layer_version_arn,
4164
                }
4165
            ),
4166
            RevisionId=layer_version.policy.revision_id,
4167
        )
4168

4169
    def remove_layer_version_permission(
1✔
4170
        self,
4171
        context: RequestContext,
4172
        layer_name: LayerName,
4173
        version_number: LayerVersionNumber,
4174
        statement_id: StatementId,
4175
        revision_id: String = None,
4176
        **kwargs,
4177
    ) -> None:
4178
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4179
        # `layer_n` contains the layer name.
4180
        region_name, account_id, layer_n, layer_version = LambdaProvider._resolve_layer(
1✔
4181
            layer_name, context
4182
        )
4183

4184
        layer_version_arn = api_utils.layer_version_arn(
1✔
4185
            layer_name, account_id, region_name, str(version_number)
4186
        )
4187

4188
        state = lambda_stores[account_id][region_name]
1✔
4189
        layer = state.layers.get(layer_n)
1✔
4190
        if layer is None:
1✔
4191
            raise ResourceNotFoundException(
1✔
4192
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4193
            )
4194
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4195
        if layer_version is None:
1✔
4196
            raise ResourceNotFoundException(
1✔
4197
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4198
            )
4199

4200
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
4201
            raise PreconditionFailedException(
1✔
4202
                "The Revision Id provided does not match the latest Revision Id. "
4203
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
4204
                Type="User",
4205
            )
4206

4207
        if statement_id not in layer_version.policy.statements:
1✔
4208
            raise ResourceNotFoundException(
1✔
4209
                f"Statement {statement_id} is not found in resource policy.", Type="User"
4210
            )
4211

4212
        old_statements = layer_version.policy.statements
1✔
4213
        layer_version.policy = dataclasses.replace(
1✔
4214
            layer_version.policy,
4215
            statements={k: v for k, v in old_statements.items() if k != statement_id},
4216
        )
4217

4218
    def get_layer_version_policy(
1✔
4219
        self,
4220
        context: RequestContext,
4221
        layer_name: LayerName,
4222
        version_number: LayerVersionNumber,
4223
        **kwargs,
4224
    ) -> GetLayerVersionPolicyResponse:
4225
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4226
        # `layer_n` contains the layer name.
4227
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
4228

4229
        layer_version_arn = api_utils.layer_version_arn(
1✔
4230
            layer_name, account_id, region_name, str(version_number)
4231
        )
4232

4233
        store = lambda_stores[account_id][region_name]
1✔
4234
        layer = store.layers.get(layer_n)
1✔
4235

4236
        if layer is None:
1✔
4237
            raise ResourceNotFoundException(
1✔
4238
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4239
            )
4240

4241
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4242
        if layer_version is None:
1✔
4243
            raise ResourceNotFoundException(
1✔
4244
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4245
            )
4246

4247
        if layer_version.policy is None:
1✔
4248
            raise ResourceNotFoundException(
1✔
4249
                "No policy is associated with the given resource.", Type="User"
4250
            )
4251

4252
        return GetLayerVersionPolicyResponse(
1✔
4253
            Policy=json.dumps(
4254
                {
4255
                    "Version": layer_version.policy.version,
4256
                    "Id": layer_version.policy.id,
4257
                    "Statement": [
4258
                        {
4259
                            "Sid": ps.sid,
4260
                            "Effect": "Allow",
4261
                            "Principal": ps.principal,
4262
                            "Action": ps.action,
4263
                            "Resource": layer_version.layer_version_arn,
4264
                        }
4265
                        for ps in layer_version.policy.statements.values()
4266
                    ],
4267
                }
4268
            ),
4269
            RevisionId=layer_version.policy.revision_id,
4270
        )
4271

4272
    # =======================================
4273
    # =======  Function Concurrency  ========
4274
    # =======================================
4275
    # (Reserved) function concurrency is scoped to the whole function
4276

4277
    def get_function_concurrency(
1✔
4278
        self, context: RequestContext, function_name: FunctionName, **kwargs
4279
    ) -> GetFunctionConcurrencyResponse:
4280
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4281
        function_name = api_utils.get_function_name(function_name, context)
1✔
4282
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
4283
        return GetFunctionConcurrencyResponse(
1✔
4284
            ReservedConcurrentExecutions=fn.reserved_concurrent_executions
4285
        )
4286

4287
    def put_function_concurrency(
1✔
4288
        self,
4289
        context: RequestContext,
4290
        function_name: FunctionName,
4291
        reserved_concurrent_executions: ReservedConcurrentExecutions,
4292
        **kwargs,
4293
    ) -> Concurrency:
4294
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4295

4296
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4297
        if qualifier:
1✔
4298
            raise InvalidParameterValueException(
1✔
4299
                "This operation is permitted on Lambda functions only. Aliases and versions do not support this operation. Please specify either a function name or an unqualified function ARN.",
4300
                Type="User",
4301
            )
4302

4303
        store = lambda_stores[account_id][region]
1✔
4304
        fn = store.functions.get(function_name)
1✔
4305
        if not fn:
1✔
4306
            fn_arn = api_utils.qualified_lambda_arn(
1✔
4307
                function_name,
4308
                qualifier="$LATEST",
4309
                account=account_id,
4310
                region=region,
4311
            )
4312
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
4313

4314
        settings = self.get_account_settings(context)
1✔
4315
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
4316
            "UnreservedConcurrentExecutions"
4317
        ]
4318

4319
        # The existing reserved concurrent executions for the same function are already deduced in
4320
        # unreserved_concurrent_executions but must not count because the new one will replace the existing one.
4321
        # Joel tested this behavior manually against AWS (2023-11-28).
4322
        existing_reserved_concurrent_executions = (
1✔
4323
            fn.reserved_concurrent_executions if fn.reserved_concurrent_executions else 0
4324
        )
4325
        if (
1✔
4326
            unreserved_concurrent_executions
4327
            - reserved_concurrent_executions
4328
            + existing_reserved_concurrent_executions
4329
        ) < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY:
4330
            raise InvalidParameterValueException(
1✔
4331
                f"Specified ReservedConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
4332
            )
4333

4334
        total_provisioned_concurrency = sum(
1✔
4335
            [
4336
                provisioned_configs.provisioned_concurrent_executions
4337
                for provisioned_configs in fn.provisioned_concurrency_configs.values()
4338
            ]
4339
        )
4340
        if total_provisioned_concurrency > reserved_concurrent_executions:
1✔
4341
            raise InvalidParameterValueException(
1✔
4342
                f" ReservedConcurrentExecutions  {reserved_concurrent_executions} should not be lower than function's total provisioned concurrency [{total_provisioned_concurrency}]."
4343
            )
4344

4345
        fn.reserved_concurrent_executions = reserved_concurrent_executions
1✔
4346

4347
        return Concurrency(ReservedConcurrentExecutions=fn.reserved_concurrent_executions)
1✔
4348

4349
    def delete_function_concurrency(
1✔
4350
        self, context: RequestContext, function_name: FunctionName, **kwargs
4351
    ) -> None:
4352
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4353
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4354
        store = lambda_stores[account_id][region]
1✔
4355
        fn = store.functions.get(function_name)
1✔
4356
        fn.reserved_concurrent_executions = None
1✔
4357

4358
    # =======================================
4359
    # ===============  TAGS   ===============
4360
    # =======================================
4361
    # only Function, Event Source Mapping, and Code Signing Config (not currently supported by LocalStack) ARNs an are available for tagging in AWS
4362

4363
    def _get_tags(self, resource: TaggableResource) -> dict[str, str]:
1✔
4364
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4365
        lambda_adapted_tags = {
1✔
4366
            tag["Key"]: tag["Value"]
4367
            for tag in state.TAGS.list_tags_for_resource(resource).get("Tags")
4368
        }
4369
        return lambda_adapted_tags
1✔
4370

4371
    def _store_tags(self, resource: TaggableResource, tags: dict[str, str]):
1✔
4372
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4373
        if len(state.TAGS.tags.get(resource, {}) | tags) > LAMBDA_TAG_LIMIT_PER_RESOURCE:
1✔
4374
            raise InvalidParameterValueException(
1✔
4375
                "Number of tags exceeds resource tag limit.", Type="User"
4376
            )
4377

4378
        tag_svc_adapted_tags = [{"Key": key, "Value": value} for key, value in tags.items()]
1✔
4379
        state.TAGS.tag_resource(resource, tag_svc_adapted_tags)
1✔
4380

4381
    def fetch_lambda_store_for_tagging(self, resource: TaggableResource) -> LambdaStore:
1✔
4382
        """
4383
        Takes a resource ARN for a TaggableResource (Lambda Function, Event Source Mapping, Code Signing Config, or Capacity Provider) and returns a corresponding
4384
        LambdaStore for its region and account.
4385

4386
        In addition, this function validates that the ARN is a valid TaggableResource type, and that the TaggableResource exists.
4387

4388
        Raises:
4389
            ValidationException: If the resource ARN is not a full ARN for a TaggableResource.
4390
            ResourceNotFoundException: If the specified resource does not exist.
4391
            InvalidParameterValueException: If the resource ARN is a qualified Lambda Function.
4392
        """
4393

4394
        def _raise_validation_exception():
1✔
4395
            raise ValidationException(
1✔
4396
                f"1 validation error detected: Value '{resource}' at 'resource' failed to satisfy constraint: Member must satisfy regular expression pattern: {api_utils.TAGGABLE_RESOURCE_ARN_PATTERN}"
4397
            )
4398

4399
        # Check whether the ARN we have been passed is correctly formatted
4400
        parsed_resource_arn: ArnData = None
1✔
4401
        try:
1✔
4402
            parsed_resource_arn = parse_arn(resource)
1✔
4403
        except Exception:
1✔
4404
            _raise_validation_exception()
1✔
4405

4406
        # TODO: Should we be checking whether this is a full ARN?
4407
        region, account_id, resource_type = map(
1✔
4408
            parsed_resource_arn.get, ("region", "account", "resource")
4409
        )
4410

4411
        if not all((region, account_id, resource_type)):
1✔
UNCOV
4412
            _raise_validation_exception()
×
4413

4414
        if not (parts := resource_type.split(":")):
1✔
UNCOV
4415
            _raise_validation_exception()
×
4416

4417
        resource_type, resource_identifier, *qualifier = parts
1✔
4418

4419
        # Qualifier validation raises before checking for NotFound
4420
        if qualifier:
1✔
4421
            if resource_type == "function":
1✔
4422
                raise InvalidParameterValueException(
1✔
4423
                    "Tags on function aliases and versions are not supported. Please specify a function ARN.",
4424
                    Type="User",
4425
                )
4426
            _raise_validation_exception()
1✔
4427

4428
        if resource_type == "event-source-mapping":
1✔
4429
            self._get_esm(resource_identifier, account_id, region)
1✔
4430
        elif resource_type == "code-signing-config":
1✔
4431
            raise NotImplementedError("Resource tagging on CSC not yet implemented.")
4432
        elif resource_type == "function":
1✔
4433
            self._get_function(
1✔
4434
                function_name=resource_identifier, account_id=account_id, region=region
4435
            )
4436
        elif resource_type == "capacity-provider":
1✔
4437
            self._get_capacity_provider(resource_identifier, account_id, region)
1✔
4438
        else:
4439
            _raise_validation_exception()
1✔
4440

4441
        # If no exceptions are raised, assume ARN and referenced resource is valid for tag operations
4442
        return lambda_stores[account_id][region]
1✔
4443

4444
    def tag_resource(
1✔
4445
        self, context: RequestContext, resource: TaggableResource, tags: Tags, **kwargs
4446
    ) -> None:
4447
        if not tags:
1✔
4448
            raise InvalidParameterValueException(
1✔
4449
                "An error occurred and the request cannot be processed.", Type="User"
4450
            )
4451
        self._store_tags(resource, tags)
1✔
4452

4453
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4454
            "function"
4455
        ):
4456
            name, _, account, region = function_locators_from_arn(resource)
1✔
4457
            function = self._get_function(name, account, region)
1✔
4458
            with function.lock:
1✔
4459
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4460
                latest_version = function.versions["$LATEST"]
1✔
4461
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4462
                    latest_version, config=dataclasses.replace(latest_version.config)
4463
                )
4464

4465
    def list_tags(
1✔
4466
        self, context: RequestContext, resource: TaggableResource, **kwargs
4467
    ) -> ListTagsResponse:
4468
        tags = self._get_tags(resource)
1✔
4469
        return ListTagsResponse(Tags=tags)
1✔
4470

4471
    def untag_resource(
1✔
4472
        self, context: RequestContext, resource: TaggableResource, tag_keys: TagKeyList, **kwargs
4473
    ) -> None:
4474
        if not tag_keys:
1✔
4475
            raise ValidationException(
1✔
4476
                "1 validation error detected: Value null at 'tagKeys' failed to satisfy constraint: Member must not be null"
4477
            )  # should probably be generalized a bit
4478

4479
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4480
        state.TAGS.untag_resource(resource, tag_keys)
1✔
4481

4482
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4483
            "function"
4484
        ):
4485
            name, _, account, region = function_locators_from_arn(resource)
1✔
4486
            function = self._get_function(name, account, region)
1✔
4487
            # TODO: Potential race condition
4488
            with function.lock:
1✔
4489
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4490
                latest_version = function.versions["$LATEST"]
1✔
4491
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4492
                    latest_version, config=dataclasses.replace(latest_version.config)
4493
                )
4494

4495
    # =======================================
4496
    # =======  LEGACY / DEPRECATED   ========
4497
    # =======================================
4498

4499
    def invoke_async(
1✔
4500
        self,
4501
        context: RequestContext,
4502
        function_name: NamespacedFunctionName,
4503
        invoke_args: IO[BlobStream],
4504
        **kwargs,
4505
    ) -> InvokeAsyncResponse:
4506
        """LEGACY API endpoint. Even AWS heavily discourages its usage."""
4507
        raise NotImplementedError
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc