• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19880423371

02 Dec 2025 08:25PM UTC coverage: 86.905% (-0.04%) from 86.945%
19880423371

push

github

web-flow
fix/external client CA bundle (#13451)

1 of 5 new or added lines in 1 file covered. (20.0%)

414 existing lines in 19 files now uncovered.

69738 of 80246 relevant lines covered (86.91%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.26
/localstack-core/localstack/services/lambda_/provider.py
1
import base64
1✔
2
import dataclasses
1✔
3
import datetime
1✔
4
import itertools
1✔
5
import json
1✔
6
import logging
1✔
7
import re
1✔
8
import threading
1✔
9
import time
1✔
10
from typing import IO, Any
1✔
11

12
from botocore.exceptions import ClientError
1✔
13

14
from localstack import config
1✔
15
from localstack.aws.api import RequestContext, ServiceException, handler
1✔
16
from localstack.aws.api.lambda_ import (
1✔
17
    AccountLimit,
18
    AccountUsage,
19
    AddLayerVersionPermissionResponse,
20
    AddPermissionRequest,
21
    AddPermissionResponse,
22
    Alias,
23
    AliasConfiguration,
24
    AliasRoutingConfiguration,
25
    AllowedPublishers,
26
    Architecture,
27
    Arn,
28
    Blob,
29
    BlobStream,
30
    CapacityProviderConfig,
31
    CodeSigningConfigArn,
32
    CodeSigningConfigNotFoundException,
33
    CodeSigningPolicies,
34
    CompatibleArchitectures,
35
    CompatibleRuntimes,
36
    Concurrency,
37
    Cors,
38
    CreateCodeSigningConfigResponse,
39
    CreateEventSourceMappingRequest,
40
    CreateFunctionRequest,
41
    CreateFunctionUrlConfigResponse,
42
    DeleteCodeSigningConfigResponse,
43
    DeleteFunctionResponse,
44
    Description,
45
    DestinationConfig,
46
    EventSourceMappingConfiguration,
47
    FunctionCodeLocation,
48
    FunctionConfiguration,
49
    FunctionEventInvokeConfig,
50
    FunctionName,
51
    FunctionUrlAuthType,
52
    FunctionUrlQualifier,
53
    FunctionVersionLatestPublished,
54
    GetAccountSettingsResponse,
55
    GetCodeSigningConfigResponse,
56
    GetFunctionCodeSigningConfigResponse,
57
    GetFunctionConcurrencyResponse,
58
    GetFunctionRecursionConfigResponse,
59
    GetFunctionResponse,
60
    GetFunctionUrlConfigResponse,
61
    GetLayerVersionPolicyResponse,
62
    GetLayerVersionResponse,
63
    GetPolicyResponse,
64
    GetProvisionedConcurrencyConfigResponse,
65
    InvalidParameterValueException,
66
    InvocationResponse,
67
    InvocationType,
68
    InvokeAsyncResponse,
69
    InvokeMode,
70
    LambdaApi,
71
    LambdaManagedInstancesCapacityProviderConfig,
72
    LastUpdateStatus,
73
    LayerName,
74
    LayerPermissionAllowedAction,
75
    LayerPermissionAllowedPrincipal,
76
    LayersListItem,
77
    LayerVersionArn,
78
    LayerVersionContentInput,
79
    LayerVersionNumber,
80
    LicenseInfo,
81
    ListAliasesResponse,
82
    ListCodeSigningConfigsResponse,
83
    ListEventSourceMappingsResponse,
84
    ListFunctionEventInvokeConfigsResponse,
85
    ListFunctionsByCodeSigningConfigResponse,
86
    ListFunctionsResponse,
87
    ListFunctionUrlConfigsResponse,
88
    ListLayersResponse,
89
    ListLayerVersionsResponse,
90
    ListProvisionedConcurrencyConfigsResponse,
91
    ListTagsResponse,
92
    ListVersionsByFunctionResponse,
93
    LogFormat,
94
    LoggingConfig,
95
    LogType,
96
    MasterRegion,
97
    MaxFunctionEventInvokeConfigListItems,
98
    MaximumEventAgeInSeconds,
99
    MaximumRetryAttempts,
100
    MaxItems,
101
    MaxLayerListItems,
102
    MaxListItems,
103
    MaxProvisionedConcurrencyConfigListItems,
104
    NamespacedFunctionName,
105
    NamespacedStatementId,
106
    NumericLatestPublishedOrAliasQualifier,
107
    OnFailure,
108
    OnSuccess,
109
    OrganizationId,
110
    PackageType,
111
    PositiveInteger,
112
    PreconditionFailedException,
113
    ProvisionedConcurrencyConfigListItem,
114
    ProvisionedConcurrencyConfigNotFoundException,
115
    ProvisionedConcurrencyStatusEnum,
116
    PublishLayerVersionResponse,
117
    PutFunctionCodeSigningConfigResponse,
118
    PutFunctionRecursionConfigResponse,
119
    PutProvisionedConcurrencyConfigResponse,
120
    Qualifier,
121
    RecursiveLoop,
122
    ReservedConcurrentExecutions,
123
    ResourceConflictException,
124
    ResourceNotFoundException,
125
    Runtime,
126
    RuntimeVersionConfig,
127
    SnapStart,
128
    SnapStartApplyOn,
129
    SnapStartOptimizationStatus,
130
    SnapStartResponse,
131
    State,
132
    StatementId,
133
    StateReasonCode,
134
    String,
135
    TaggableResource,
136
    TagKeyList,
137
    Tags,
138
    TenantId,
139
    TracingMode,
140
    UnqualifiedFunctionName,
141
    UpdateCodeSigningConfigResponse,
142
    UpdateEventSourceMappingRequest,
143
    UpdateFunctionCodeRequest,
144
    UpdateFunctionConfigurationRequest,
145
    UpdateFunctionUrlConfigResponse,
146
    VersionWithLatestPublished,
147
)
148
from localstack.aws.api.lambda_ import FunctionVersion as FunctionVersionApi
1✔
149
from localstack.aws.api.lambda_ import ServiceException as LambdaServiceException
1✔
150
from localstack.aws.api.pipes import (
1✔
151
    DynamoDBStreamStartPosition,
152
    KinesisStreamStartPosition,
153
)
154
from localstack.aws.connect import connect_to
1✔
155
from localstack.aws.spec import load_service
1✔
156
from localstack.services.edge import ROUTER
1✔
157
from localstack.services.lambda_ import api_utils
1✔
158
from localstack.services.lambda_ import hooks as lambda_hooks
1✔
159
from localstack.services.lambda_.analytics import (
1✔
160
    FunctionOperation,
161
    FunctionStatus,
162
    function_counter,
163
)
164
from localstack.services.lambda_.api_utils import (
1✔
165
    ARCHITECTURES,
166
    STATEMENT_ID_REGEX,
167
    SUBNET_ID_REGEX,
168
    function_locators_from_arn,
169
)
170
from localstack.services.lambda_.event_source_mapping.esm_config_factory import (
1✔
171
    EsmConfigFactory,
172
)
173
from localstack.services.lambda_.event_source_mapping.esm_worker import (
1✔
174
    EsmState,
175
    EsmWorker,
176
)
177
from localstack.services.lambda_.event_source_mapping.esm_worker_factory import (
1✔
178
    EsmWorkerFactory,
179
)
180
from localstack.services.lambda_.event_source_mapping.pipe_utils import get_internal_client
1✔
181
from localstack.services.lambda_.invocation import AccessDeniedException
1✔
182
from localstack.services.lambda_.invocation.execution_environment import (
1✔
183
    EnvironmentStartupTimeoutException,
184
)
185
from localstack.services.lambda_.invocation.lambda_models import (
1✔
186
    AliasRoutingConfig,
187
    CodeSigningConfig,
188
    EventInvokeConfig,
189
    Function,
190
    FunctionResourcePolicy,
191
    FunctionUrlConfig,
192
    FunctionVersion,
193
    ImageConfig,
194
    LambdaEphemeralStorage,
195
    Layer,
196
    LayerPolicy,
197
    LayerPolicyStatement,
198
    LayerVersion,
199
    ProvisionedConcurrencyConfiguration,
200
    RequestEntityTooLargeException,
201
    ResourcePolicy,
202
    UpdateStatus,
203
    ValidationException,
204
    VersionAlias,
205
    VersionFunctionConfiguration,
206
    VersionIdentifier,
207
    VersionState,
208
    VpcConfig,
209
)
210
from localstack.services.lambda_.invocation.lambda_service import (
1✔
211
    LambdaService,
212
    create_image_code,
213
    destroy_code_if_not_used,
214
    lambda_stores,
215
    store_lambda_archive,
216
    store_s3_bucket_archive,
217
)
218
from localstack.services.lambda_.invocation.models import CapacityProvider as CapacityProviderModel
1✔
219
from localstack.services.lambda_.invocation.models import LambdaStore
1✔
220
from localstack.services.lambda_.invocation.runtime_executor import get_runtime_executor
1✔
221
from localstack.services.lambda_.lambda_utils import HINT_LOG
1✔
222
from localstack.services.lambda_.layerfetcher.layer_fetcher import LayerFetcher
1✔
223
from localstack.services.lambda_.provider_utils import (
1✔
224
    LambdaLayerVersionIdentifier,
225
    get_function_version,
226
    get_function_version_from_arn,
227
)
228
from localstack.services.lambda_.runtimes import (
1✔
229
    ALL_RUNTIMES,
230
    DEPRECATED_RUNTIMES,
231
    DEPRECATED_RUNTIMES_UPGRADES,
232
    RUNTIMES_AGGREGATED,
233
    SNAP_START_SUPPORTED_RUNTIMES,
234
    VALID_RUNTIMES,
235
)
236
from localstack.services.lambda_.urlrouter import FunctionUrlRouter
1✔
237
from localstack.services.plugins import ServiceLifecycleHook
1✔
238
from localstack.state import StateVisitor
1✔
239
from localstack.utils.aws.arns import (
1✔
240
    ArnData,
241
    capacity_provider_arn,
242
    extract_resource_from_arn,
243
    extract_service_from_arn,
244
    get_partition,
245
    lambda_event_source_mapping_arn,
246
    parse_arn,
247
)
248
from localstack.utils.aws.client_types import ServicePrincipal
1✔
249
from localstack.utils.bootstrap import is_api_enabled
1✔
250
from localstack.utils.collections import PaginatedList, merge_recursive
1✔
251
from localstack.utils.event_matcher import validate_event_pattern
1✔
252
from localstack.utils.strings import get_random_hex, short_uid, to_bytes, to_str
1✔
253
from localstack.utils.sync import poll_condition
1✔
254
from localstack.utils.urls import localstack_host
1✔
255

256
LOG = logging.getLogger(__name__)
1✔
257

258
CAPACITY_PROVIDER_ARN_NAME = "arn:aws[a-zA-Z-]*:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:capacity-provider:[a-zA-Z0-9-_]+"
1✔
259
LAMBDA_DEFAULT_TIMEOUT = 3
1✔
260
LAMBDA_DEFAULT_MEMORY_SIZE = 128
1✔
261

262
LAMBDA_TAG_LIMIT_PER_RESOURCE = 50
1✔
263
LAMBDA_LAYERS_LIMIT_PER_FUNCTION = 5
1✔
264

265
TAG_KEY_CUSTOM_URL = "_custom_id_"
1✔
266
# Requirements (from RFC3986 & co): not longer than 63, first char must be
267
# alpha, then alphanumeric or hyphen, except cannot start or end with hyphen
268
TAG_KEY_CUSTOM_URL_VALIDATOR = re.compile(r"^[A-Za-z]([A-Za-z0-9\-]{0,61}[A-Za-z0-9])?$")
1✔
269

270

271
class LambdaProvider(LambdaApi, ServiceLifecycleHook):
1✔
272
    lambda_service: LambdaService
1✔
273
    create_fn_lock: threading.RLock
1✔
274
    create_layer_lock: threading.RLock
1✔
275
    router: FunctionUrlRouter
1✔
276
    esm_workers: dict[str, EsmWorker]
1✔
277
    layer_fetcher: LayerFetcher | None
1✔
278

279
    def __init__(self) -> None:
1✔
280
        self.lambda_service = LambdaService()
1✔
281
        self.create_fn_lock = threading.RLock()
1✔
282
        self.create_layer_lock = threading.RLock()
1✔
283
        self.router = FunctionUrlRouter(ROUTER, self.lambda_service)
1✔
284
        self.esm_workers = {}
1✔
285
        self.layer_fetcher = None
1✔
286
        lambda_hooks.inject_layer_fetcher.run(self)
1✔
287

288
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
UNCOV
289
        visitor.visit(lambda_stores)
×
290

291
    def on_before_state_reset(self):
1✔
UNCOV
292
        self.lambda_service.stop()
×
293

294
    def on_after_state_reset(self):
1✔
UNCOV
295
        self.router.lambda_service = self.lambda_service = LambdaService()
×
296

297
    def on_before_state_load(self):
1✔
UNCOV
298
        self.lambda_service.stop()
×
299

300
    def on_after_state_load(self):
1✔
301
        self.lambda_service = LambdaService()
×
UNCOV
302
        self.router.lambda_service = self.lambda_service
×
303

304
        for account_id, account_bundle in lambda_stores.items():
×
305
            for region_name, state in account_bundle.items():
×
306
                for fn in state.functions.values():
×
UNCOV
307
                    for fn_version in fn.versions.values():
×
308
                        # restore the "Pending" state for every function version and start it
309
                        try:
×
UNCOV
310
                            new_state = VersionState(
×
311
                                state=State.Pending,
312
                                code=StateReasonCode.Creating,
313
                                reason="The function is being created.",
314
                            )
315
                            new_config = dataclasses.replace(fn_version.config, state=new_state)
×
316
                            new_version = dataclasses.replace(fn_version, config=new_config)
×
UNCOV
317
                            fn.versions[fn_version.id.qualifier] = new_version
×
318
                            # TODO: consider skipping this for $LATEST versions of functions with a capacity provider
UNCOV
319
                            self.lambda_service.create_function_version(fn_version).result(
×
320
                                timeout=5
321
                            )
322
                        except Exception:
×
UNCOV
323
                            LOG.warning(
×
324
                                "Failed to restore function version %s",
325
                                fn_version.id.qualified_arn(),
326
                                exc_info=LOG.isEnabledFor(logging.DEBUG),
327
                            )
328
                    # restore provisioned concurrency per function considering both versions and aliases
UNCOV
329
                    for (
×
330
                        provisioned_qualifier,
331
                        provisioned_config,
332
                    ) in fn.provisioned_concurrency_configs.items():
333
                        fn_arn = None
×
334
                        try:
×
335
                            if api_utils.qualifier_is_alias(provisioned_qualifier):
×
336
                                alias = fn.aliases.get(provisioned_qualifier)
×
337
                                resolved_version = fn.versions.get(alias.function_version)
×
338
                                fn_arn = resolved_version.id.qualified_arn()
×
339
                            elif api_utils.qualifier_is_version(provisioned_qualifier):
×
340
                                fn_version = fn.versions.get(provisioned_qualifier)
×
UNCOV
341
                                fn_arn = fn_version.id.qualified_arn()
×
342
                            else:
UNCOV
343
                                raise InvalidParameterValueException(
×
344
                                    "Invalid qualifier type:"
345
                                    " Qualifier can only be an alias or a version for provisioned concurrency."
346
                                )
347

348
                            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
UNCOV
349
                            manager.update_provisioned_concurrency_config(
×
350
                                provisioned_config.provisioned_concurrent_executions
351
                            )
352
                        except Exception:
×
UNCOV
353
                            LOG.warning(
×
354
                                "Failed to restore provisioned concurrency %s for function %s",
355
                                provisioned_config,
356
                                fn_arn,
357
                                exc_info=LOG.isEnabledFor(logging.DEBUG),
358
                            )
359

UNCOV
360
                for esm in state.event_source_mappings.values():
×
361
                    # Restores event source workers
UNCOV
362
                    function_arn = esm.get("FunctionArn")
×
363

364
                    # TODO: How do we know the event source is up?
365
                    # A basic poll to see if the mapped Lambda function is active/failed
UNCOV
366
                    if not poll_condition(
×
367
                        lambda: get_function_version_from_arn(function_arn).config.state.state
368
                        in [State.Active, State.Failed],
369
                        timeout=10,
370
                    ):
UNCOV
371
                        LOG.warning(
×
372
                            "Creating ESM for Lambda that is not in running state: %s",
373
                            function_arn,
374
                        )
375

376
                    function_version = get_function_version_from_arn(function_arn)
×
UNCOV
377
                    function_role = function_version.config.role
×
378

UNCOV
379
                    is_esm_enabled = esm.get("State", EsmState.DISABLED) not in (
×
380
                        EsmState.DISABLED,
381
                        EsmState.DISABLING,
382
                    )
UNCOV
383
                    esm_worker = EsmWorkerFactory(
×
384
                        esm, function_role, is_esm_enabled
385
                    ).get_esm_worker()
386

387
                    # Note: a worker is created in the DISABLED state if not enabled
UNCOV
388
                    esm_worker.create()
×
389
                    # TODO: assigning the esm_worker to the dict only works after .create(). Could it cause a race
390
                    #  condition if we get a shutdown here and have a worker thread spawned but not accounted for?
UNCOV
391
                    self.esm_workers[esm_worker.uuid] = esm_worker
×
392

393
    def on_after_init(self):
1✔
394
        self.router.register_routes()
1✔
395
        get_runtime_executor().validate_environment()
1✔
396

397
    def on_before_stop(self) -> None:
1✔
398
        for esm_worker in self.esm_workers.values():
1✔
399
            esm_worker.stop_for_shutdown()
1✔
400

401
        # TODO: should probably unregister routes?
402
        self.lambda_service.stop()
1✔
403

404
    @staticmethod
1✔
405
    def _get_function(function_name: str, account_id: str, region: str) -> Function:
1✔
406
        state = lambda_stores[account_id][region]
1✔
407
        function = state.functions.get(function_name)
1✔
408
        if not function:
1✔
409
            arn = api_utils.unqualified_lambda_arn(
1✔
410
                function_name=function_name,
411
                account=account_id,
412
                region=region,
413
            )
414
            raise ResourceNotFoundException(
1✔
415
                f"Function not found: {arn}",
416
                Type="User",
417
            )
418
        return function
1✔
419

420
    @staticmethod
1✔
421
    def _get_esm(uuid: str, account_id: str, region: str) -> EventSourceMappingConfiguration:
1✔
422
        state = lambda_stores[account_id][region]
1✔
423
        esm = state.event_source_mappings.get(uuid)
1✔
424
        if not esm:
1✔
425
            arn = lambda_event_source_mapping_arn(uuid, account_id, region)
1✔
426
            raise ResourceNotFoundException(
1✔
427
                f"Event source mapping not found: {arn}",
428
                Type="User",
429
            )
430
        return esm
1✔
431

432
    @staticmethod
1✔
433
    def _get_capacity_provider(
1✔
434
        capacity_provider_name: str,
435
        account_id: str,
436
        region: str,
437
        error_msg_template: str = "Capacity provider not found: {}",
438
    ) -> CapacityProviderModel:
439
        state = lambda_stores[account_id][region]
1✔
440
        cp = state.capacity_providers.get(capacity_provider_name)
1✔
441
        if not cp:
1✔
442
            arn = capacity_provider_arn(capacity_provider_name, account_id, region)
1✔
443
            raise ResourceNotFoundException(
1✔
444
                error_msg_template.format(arn),
445
                Type="User",
446
            )
UNCOV
447
        return cp
×
448

449
    @staticmethod
1✔
450
    def _validate_qualifier_expression(qualifier: str) -> None:
1✔
451
        if error_messages := api_utils.validate_qualifier(qualifier):
1✔
UNCOV
452
            raise ValidationException(
×
453
                message=api_utils.construct_validation_exception_message(error_messages)
454
            )
455

456
    @staticmethod
1✔
457
    def _resolve_fn_qualifier(resolved_fn: Function, qualifier: str | None) -> tuple[str, str]:
1✔
458
        """Attempts to resolve a given qualifier and returns a qualifier that exists or
459
        raises an appropriate ResourceNotFoundException.
460

461
        :param resolved_fn: The resolved lambda function
462
        :param qualifier: The qualifier to be resolved or None
463
        :return: Tuple of (resolved qualifier, function arn either qualified or unqualified)"""
464
        function_name = resolved_fn.function_name
1✔
465
        # assuming function versions need to live in the same account and region
466
        account_id = resolved_fn.latest().id.account
1✔
467
        region = resolved_fn.latest().id.region
1✔
468
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
469
        if qualifier is not None:
1✔
470
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
471
            if api_utils.qualifier_is_alias(qualifier):
1✔
472
                if qualifier not in resolved_fn.aliases:
1✔
473
                    raise ResourceNotFoundException(f"Cannot find alias arn: {fn_arn}", Type="User")
1✔
474
            elif api_utils.qualifier_is_version(qualifier) or qualifier == "$LATEST":
1✔
475
                if qualifier not in resolved_fn.versions:
1✔
476
                    raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
477
            else:
478
                # matches qualifier pattern but invalid alias or version
479
                raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
480
        resolved_qualifier = qualifier or "$LATEST"
1✔
481
        return resolved_qualifier, fn_arn
1✔
482

483
    @staticmethod
1✔
484
    def _function_revision_id(resolved_fn: Function, resolved_qualifier: str) -> str:
1✔
485
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
486
            return resolved_fn.aliases[resolved_qualifier].revision_id
1✔
487
        # Assumes that a non-alias is a version
488
        else:
489
            return resolved_fn.versions[resolved_qualifier].config.revision_id
1✔
490

491
    def _resolve_vpc_id(self, account_id: str, region_name: str, subnet_id: str) -> str:
1✔
492
        ec2_client = connect_to(aws_access_key_id=account_id, region_name=region_name).ec2
1✔
493
        try:
1✔
494
            return ec2_client.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["VpcId"]
1✔
495
        except ec2_client.exceptions.ClientError as e:
1✔
496
            code = e.response["Error"]["Code"]
1✔
497
            message = e.response["Error"]["Message"]
1✔
498
            raise InvalidParameterValueException(
1✔
499
                f"Error occurred while DescribeSubnets. EC2 Error Code: {code}. EC2 Error Message: {message}",
500
                Type="User",
501
            )
502

503
    def _build_vpc_config(
1✔
504
        self,
505
        account_id: str,
506
        region_name: str,
507
        vpc_config: dict | None = None,
508
    ) -> VpcConfig | None:
509
        if not vpc_config or not is_api_enabled("ec2"):
1✔
510
            return None
1✔
511

512
        subnet_ids = vpc_config.get("SubnetIds", [])
1✔
513
        if subnet_ids is not None and len(subnet_ids) == 0:
1✔
514
            return VpcConfig(vpc_id="", security_group_ids=[], subnet_ids=[])
1✔
515

516
        subnet_id = subnet_ids[0]
1✔
517
        if not bool(SUBNET_ID_REGEX.match(subnet_id)):
1✔
518
            raise ValidationException(
1✔
519
                f"1 validation error detected: Value '[{subnet_id}]' at 'vpcConfig.subnetIds' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 0, Member must satisfy regular expression pattern: subnet-[0-9a-z]*]"
520
            )
521

522
        return VpcConfig(
1✔
523
            vpc_id=self._resolve_vpc_id(account_id, region_name, subnet_id),
524
            security_group_ids=vpc_config.get("SecurityGroupIds", []),
525
            subnet_ids=subnet_ids,
526
        )
527

528
    def _create_version_model(
1✔
529
        self,
530
        function_name: str,
531
        region: str,
532
        account_id: str,
533
        description: str | None = None,
534
        revision_id: str | None = None,
535
        code_sha256: str | None = None,
536
        publish_to: FunctionVersionLatestPublished | None = None,
537
        is_active: bool = False,
538
    ) -> tuple[FunctionVersion, bool]:
539
        """
540
        Release a new version to the model if all restrictions are met.
541
        Restrictions:
542
          - CodeSha256, if provided, must equal the current latest version code hash
543
          - RevisionId, if provided, must equal the current latest version revision id
544
          - Some changes have been done to the latest version since last publish
545
        Will return a tuple of the version, and whether the version was published (True) or the latest available version was taken (False).
546
        This can happen if the latest version has not been changed since the last version publish, in this case the last version will be returned.
547

548
        :param function_name: Function name to be published
549
        :param region: Region of the function
550
        :param account_id: Account of the function
551
        :param description: new description of the version (will be the description of the function if missing)
552
        :param revision_id: Revision id, function will raise error if it does not match latest revision id
553
        :param code_sha256: Code sha256, function will raise error if it does not match latest code hash
554
        :return: Tuple of (published version, whether version was released or last released version returned, since nothing changed)
555
        """
556
        current_latest_version = get_function_version(
1✔
557
            function_name=function_name, qualifier="$LATEST", account_id=account_id, region=region
558
        )
559
        if revision_id and current_latest_version.config.revision_id != revision_id:
1✔
560
            raise PreconditionFailedException(
1✔
561
                "The Revision Id provided does not match the latest Revision Id. Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
562
                Type="User",
563
            )
564

565
        # check if code hashes match if they are specified
566
        current_hash = (
1✔
567
            current_latest_version.config.code.code_sha256
568
            if current_latest_version.config.package_type == PackageType.Zip
569
            else current_latest_version.config.image.code_sha256
570
        )
571
        # if the code is a zip package and hot reloaded (hot reloading is currently only supported for zip packagetypes)
572
        # we cannot enforce the codesha256 check
573
        is_hot_reloaded_zip_package = (
1✔
574
            current_latest_version.config.package_type == PackageType.Zip
575
            and current_latest_version.config.code.is_hot_reloading()
576
        )
577
        if code_sha256 and current_hash != code_sha256 and not is_hot_reloaded_zip_package:
1✔
578
            raise InvalidParameterValueException(
1✔
579
                f"CodeSHA256 ({code_sha256}) is different from current CodeSHA256 in $LATEST ({current_hash}). Please try again with the CodeSHA256 in $LATEST.",
580
                Type="User",
581
            )
582

583
        state = lambda_stores[account_id][region]
1✔
584
        function = state.functions.get(function_name)
1✔
585
        changes = {}
1✔
586
        if description is not None:
1✔
587
            changes["description"] = description
1✔
588
        # TODO copy environment instead of restarting one, get rid of all the "Pending"s
589

590
        with function.lock:
1✔
591
            if function.next_version > 1 and (
1✔
592
                prev_version := function.versions.get(str(function.next_version - 1))
593
            ):
594
                if (
1✔
595
                    prev_version.config.internal_revision
596
                    == current_latest_version.config.internal_revision
597
                ):
598
                    return prev_version, False
1✔
599
            # TODO check if there was a change since last version
600
            if publish_to == FunctionVersionLatestPublished.LATEST_PUBLISHED:
1✔
UNCOV
601
                qualifier = "$LATEST.PUBLISHED"
×
602
            else:
603
                qualifier = str(function.next_version)
1✔
604
                function.next_version += 1
1✔
605
            new_id = VersionIdentifier(
1✔
606
                function_name=function_name,
607
                qualifier=qualifier,
608
                region=region,
609
                account=account_id,
610
            )
611

612
            if current_latest_version.config.CapacityProviderConfig:
1✔
613
                # for lambda managed functions, snap start is not supported
UNCOV
614
                snap_start = None
×
615
            else:
616
                apply_on = current_latest_version.config.snap_start["ApplyOn"]
1✔
617
                optimization_status = SnapStartOptimizationStatus.Off
1✔
618
                if apply_on == SnapStartApplyOn.PublishedVersions:
1✔
UNCOV
619
                    optimization_status = SnapStartOptimizationStatus.On
×
620
                snap_start = SnapStartResponse(
1✔
621
                    ApplyOn=apply_on,
622
                    OptimizationStatus=optimization_status,
623
                )
624

625
            last_update = None
1✔
626
            new_state = VersionState(
1✔
627
                state=State.Pending,
628
                code=StateReasonCode.Creating,
629
                reason="The function is being created.",
630
            )
631
            if publish_to == FunctionVersionLatestPublished.LATEST_PUBLISHED:
1✔
UNCOV
632
                last_update = UpdateStatus(
×
633
                    status=LastUpdateStatus.InProgress,
634
                    code="Updating",
635
                    reason="The function is being updated.",
636
                )
637
                if is_active:
×
UNCOV
638
                    new_state = VersionState(state=State.Active)
×
639
            new_version = dataclasses.replace(
1✔
640
                current_latest_version,
641
                config=dataclasses.replace(
642
                    current_latest_version.config,
643
                    last_update=last_update,
644
                    state=new_state,
645
                    snap_start=snap_start,
646
                    **changes,
647
                ),
648
                id=new_id,
649
            )
650
            function.versions[qualifier] = new_version
1✔
651
        return new_version, True
1✔
652

653
    def _publish_version_from_existing_version(
1✔
654
        self,
655
        function_name: str,
656
        region: str,
657
        account_id: str,
658
        description: str | None = None,
659
        revision_id: str | None = None,
660
        code_sha256: str | None = None,
661
        publish_to: FunctionVersionLatestPublished | None = None,
662
    ) -> FunctionVersion:
663
        """
664
        Publish version from an existing, already initialized LATEST
665

666
        :param function_name: Function name
667
        :param region: region
668
        :param account_id: account id
669
        :param description: description
670
        :param revision_id: revision id (check if current version matches)
671
        :param code_sha256: code sha (check if current code matches)
672
        :return: new version
673
        """
674
        is_active = True if publish_to == FunctionVersionLatestPublished.LATEST_PUBLISHED else False
1✔
675
        new_version, changed = self._create_version_model(
1✔
676
            function_name=function_name,
677
            region=region,
678
            account_id=account_id,
679
            description=description,
680
            revision_id=revision_id,
681
            code_sha256=code_sha256,
682
            publish_to=publish_to,
683
            is_active=is_active,
684
        )
685
        if not changed:
1✔
686
            return new_version
1✔
687

688
        if new_version.config.CapacityProviderConfig:
1✔
UNCOV
689
            self.lambda_service.publish_version_async(new_version)
×
690
        else:
691
            self.lambda_service.publish_version(new_version)
1✔
692
        state = lambda_stores[account_id][region]
1✔
693
        function = state.functions.get(function_name)
1✔
694

695
        # Update revision id for $LATEST version
696
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
697
        latest_version = function.versions["$LATEST"]
1✔
698
        function.versions["$LATEST"] = dataclasses.replace(
1✔
699
            latest_version, config=dataclasses.replace(latest_version.config)
700
        )
701
        if new_version.config.CapacityProviderConfig:
1✔
702
            # publish_version happens async for functions with a capacity provider.
703
            # Therefore, we return the new_version with State=Pending or LastUpdateStatus=InProgress ($LATEST.PUBLISHED)
UNCOV
704
            return new_version
×
705
        else:
706
            # Regular functions yield an Active state modified during `publish_version` (sync).
707
            # Therefore, we need to get the updated version from the store.
708
            updated_version = function.versions.get(new_version.id.qualifier)
1✔
709
            return updated_version
1✔
710

711
    def _publish_version_with_changes(
1✔
712
        self,
713
        function_name: str,
714
        region: str,
715
        account_id: str,
716
        description: str | None = None,
717
        revision_id: str | None = None,
718
        code_sha256: str | None = None,
719
        publish_to: FunctionVersionLatestPublished | None = None,
720
        is_active: bool = False,
721
    ) -> FunctionVersion:
722
        """
723
        Publish version together with a new latest version (publish on create / update)
724

725
        :param function_name: Function name
726
        :param region: region
727
        :param account_id: account id
728
        :param description: description
729
        :param revision_id: revision id (check if current version matches)
730
        :param code_sha256: code sha (check if current code matches)
731
        :return: new version
732
        """
733
        new_version, changed = self._create_version_model(
1✔
734
            function_name=function_name,
735
            region=region,
736
            account_id=account_id,
737
            description=description,
738
            revision_id=revision_id,
739
            code_sha256=code_sha256,
740
            publish_to=publish_to,
741
            is_active=is_active,
742
        )
743
        if not changed:
1✔
UNCOV
744
            return new_version
×
745
        self.lambda_service.create_function_version(new_version)
1✔
746
        return new_version
1✔
747

748
    @staticmethod
1✔
749
    def _verify_env_variables(env_vars: dict[str, str]):
1✔
750
        dumped_env_vars = json.dumps(env_vars, separators=(",", ":"))
1✔
751
        if (
1✔
752
            len(dumped_env_vars.encode("utf-8"))
753
            > config.LAMBDA_LIMITS_MAX_FUNCTION_ENVVAR_SIZE_BYTES
754
        ):
755
            raise InvalidParameterValueException(
1✔
756
                f"Lambda was unable to configure your environment variables because the environment variables you have provided exceeded the 4KB limit. String measured: {dumped_env_vars}",
757
                Type="User",
758
            )
759

760
    @staticmethod
1✔
761
    def _validate_snapstart(snap_start: SnapStart, runtime: Runtime):
1✔
762
        apply_on = snap_start.get("ApplyOn")
1✔
763
        if apply_on not in [
1✔
764
            SnapStartApplyOn.PublishedVersions,
765
            SnapStartApplyOn.None_,
766
        ]:
767
            raise ValidationException(
1✔
768
                f"1 validation error detected: Value '{apply_on}' at 'snapStart.applyOn' failed to satisfy constraint: Member must satisfy enum value set: [PublishedVersions, None]"
769
            )
770

771
        if runtime not in SNAP_START_SUPPORTED_RUNTIMES:
1✔
UNCOV
772
            raise InvalidParameterValueException(
×
773
                f"{runtime} is not supported for SnapStart enabled functions.", Type="User"
774
            )
775

776
    def _validate_layers(self, new_layers: list[str], region: str, account_id: str):
1✔
777
        if len(new_layers) > LAMBDA_LAYERS_LIMIT_PER_FUNCTION:
1✔
778
            raise InvalidParameterValueException(
1✔
779
                "Cannot reference more than 5 layers.", Type="User"
780
            )
781

782
        visited_layers = {}
1✔
783
        for layer_version_arn in new_layers:
1✔
784
            (
1✔
785
                layer_region,
786
                layer_account_id,
787
                layer_name,
788
                layer_version_str,
789
            ) = api_utils.parse_layer_arn(layer_version_arn)
790
            if layer_version_str is None:
1✔
791
                raise ValidationException(
1✔
792
                    f"1 validation error detected: Value '[{layer_version_arn}]'"
793
                    + " at 'layers' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 2048, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: "
794
                    + "(arn:(aws[a-zA-Z-]*)?:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+), Member must not be null]",
795
                )
796

797
            state = lambda_stores[layer_account_id][layer_region]
1✔
798
            layer = state.layers.get(layer_name)
1✔
799
            layer_version = None
1✔
800
            if layer is not None:
1✔
801
                layer_version = layer.layer_versions.get(layer_version_str)
1✔
802
            if layer_account_id == account_id:
1✔
803
                if region and layer_region != region:
1✔
804
                    raise InvalidParameterValueException(
1✔
805
                        f"Layers are not in the same region as the function. "
806
                        f"Layers are expected to be in region {region}.",
807
                        Type="User",
808
                    )
809
                if layer is None or layer.layer_versions.get(layer_version_str) is None:
1✔
810
                    raise InvalidParameterValueException(
1✔
811
                        f"Layer version {layer_version_arn} does not exist.", Type="User"
812
                    )
813
            else:  # External layer from other account
814
                # TODO: validate IAM layer policy here, allowing access by default for now and only checking region
UNCOV
815
                if region and layer_region != region:
×
816
                    # TODO: detect user or role from context when IAM users are implemented
817
                    user = "user/localstack-testing"
×
UNCOV
818
                    raise AccessDeniedException(
×
819
                        f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
820
                    )
UNCOV
821
                if layer is None or layer_version is None:
×
822
                    # Limitation: cannot fetch external layers when using the same account id as the target layer
823
                    # because we do not want to trigger the layer fetcher for every non-existing layer.
UNCOV
824
                    if self.layer_fetcher is None:
×
825
                        raise NotImplementedError(
826
                            "Fetching shared layers from AWS is a pro feature."
827
                        )
828

829
                    layer = self.layer_fetcher.fetch_layer(layer_version_arn)
×
UNCOV
830
                    if layer is None:
×
831
                        # TODO: detect user or role from context when IAM users are implemented
832
                        user = "user/localstack-testing"
×
UNCOV
833
                        raise AccessDeniedException(
×
834
                            f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
835
                        )
836

837
                    # Distinguish between new layer and new layer version
UNCOV
838
                    if layer_version is None:
×
839
                        # Create whole layer from scratch
UNCOV
840
                        state.layers[layer_name] = layer
×
841
                    else:
842
                        # Create layer version if another version of the same layer already exists
UNCOV
843
                        state.layers[layer_name].layer_versions[layer_version_str] = (
×
844
                            layer.layer_versions.get(layer_version_str)
845
                        )
846

847
            # only the first two matches in the array are considered for the error message
848
            layer_arn = ":".join(layer_version_arn.split(":")[:-1])
1✔
849
            if layer_arn in visited_layers:
1✔
850
                conflict_layer_version_arn = visited_layers[layer_arn]
1✔
851
                raise InvalidParameterValueException(
1✔
852
                    f"Two different versions of the same layer are not allowed to be referenced in the same function. {conflict_layer_version_arn} and {layer_version_arn} are versions of the same layer.",
853
                    Type="User",
854
                )
855
            visited_layers[layer_arn] = layer_version_arn
1✔
856

857
    def _validate_capacity_provider_config(
1✔
858
        self, capacity_provider_config: CapacityProviderConfig, context: RequestContext
859
    ):
UNCOV
860
        if not capacity_provider_config.get("LambdaManagedInstancesCapacityProviderConfig"):
×
UNCOV
861
            raise ValidationException(
×
862
                "1 validation error detected: Value null at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig' failed to satisfy constraint: Member must not be null"
863
            )
864

UNCOV
865
        capacity_provider_arn = capacity_provider_config.get(
×
866
            "LambdaManagedInstancesCapacityProviderConfig", {}
867
        ).get("CapacityProviderArn")
UNCOV
868
        if not capacity_provider_arn:
×
UNCOV
869
            raise ValidationException(
×
870
                "1 validation error detected: Value null at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig.capacityProviderArn' failed to satisfy constraint: Member must not be null"
871
            )
872

UNCOV
873
        if not re.match(CAPACITY_PROVIDER_ARN_NAME, capacity_provider_arn):
×
UNCOV
874
            raise ValidationException(
×
875
                f"1 validation error detected: Value '{capacity_provider_arn}' at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig.capacityProviderArn' failed to satisfy constraint: Member must satisfy regular expression pattern: {CAPACITY_PROVIDER_ARN_NAME}"
876
            )
877

UNCOV
878
        capacity_provider_name = capacity_provider_arn.split(":")[-1]
×
UNCOV
879
        self.get_capacity_provider(context, capacity_provider_name)
×
880

881
    @staticmethod
1✔
882
    def map_layers(new_layers: list[str]) -> list[LayerVersion]:
1✔
883
        layers = []
1✔
884
        for layer_version_arn in new_layers:
1✔
885
            region_name, account_id, layer_name, layer_version = api_utils.parse_layer_arn(
1✔
886
                layer_version_arn
887
            )
888
            layer = lambda_stores[account_id][region_name].layers.get(layer_name)
1✔
889
            layer_version = layer.layer_versions.get(layer_version)
1✔
890
            layers.append(layer_version)
1✔
891
        return layers
1✔
892

893
    def get_function_recursion_config(
1✔
894
        self,
895
        context: RequestContext,
896
        function_name: UnqualifiedFunctionName,
897
        **kwargs,
898
    ) -> GetFunctionRecursionConfigResponse:
899
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
900
        function_name = api_utils.get_function_name(function_name, context)
1✔
901
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
902
        return GetFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
903

904
    def put_function_recursion_config(
1✔
905
        self,
906
        context: RequestContext,
907
        function_name: UnqualifiedFunctionName,
908
        recursive_loop: RecursiveLoop,
909
        **kwargs,
910
    ) -> PutFunctionRecursionConfigResponse:
911
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
912
        function_name = api_utils.get_function_name(function_name, context)
1✔
913

914
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
915

916
        allowed_values = list(RecursiveLoop.__members__.values())
1✔
917
        if recursive_loop not in allowed_values:
1✔
918
            raise ValidationException(
1✔
919
                f"1 validation error detected: Value '{recursive_loop}' at 'recursiveLoop' failed to satisfy constraint: "
920
                f"Member must satisfy enum value set: [Terminate, Allow]"
921
            )
922

923
        fn.recursive_loop = recursive_loop
1✔
924
        return PutFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
925

926
    @handler(operation="CreateFunction", expand=False)
1✔
927
    def create_function(
1✔
928
        self,
929
        context: RequestContext,
930
        request: CreateFunctionRequest,
931
    ) -> FunctionConfiguration:
932
        context_region = context.region
1✔
933
        context_account_id = context.account_id
1✔
934

935
        zip_file = request.get("Code", {}).get("ZipFile")
1✔
936
        if zip_file and len(zip_file) > config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED:
1✔
937
            raise RequestEntityTooLargeException(
1✔
938
                f"Zipped size must be smaller than {config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED} bytes"
939
            )
940

941
        if context.request.content_length > config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE:
1✔
942
            raise RequestEntityTooLargeException(
1✔
943
                f"Request must be smaller than {config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE} bytes for the CreateFunction operation"
944
            )
945

946
        if architectures := request.get("Architectures"):
1✔
947
            if len(architectures) != 1:
1✔
948
                raise ValidationException(
1✔
949
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
950
                    f"satisfy constraint: Member must have length less than or equal to 1",
951
                )
952
            if architectures[0] not in ARCHITECTURES:
1✔
953
                raise ValidationException(
1✔
954
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
955
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
956
                    f"[x86_64, arm64], Member must not be null]",
957
                )
958

959
        if env_vars := request.get("Environment", {}).get("Variables"):
1✔
960
            self._verify_env_variables(env_vars)
1✔
961

962
        if layers := request.get("Layers", []):
1✔
963
            self._validate_layers(layers, region=context_region, account_id=context_account_id)
1✔
964

965
        if not api_utils.is_role_arn(request.get("Role")):
1✔
966
            raise ValidationException(
1✔
967
                f"1 validation error detected: Value '{request.get('Role')}'"
968
                + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
969
            )
970
        if not self.lambda_service.can_assume_role(request.get("Role"), context.region):
1✔
UNCOV
971
            raise InvalidParameterValueException(
×
972
                "The role defined for the function cannot be assumed by Lambda.", Type="User"
973
            )
974
        package_type = request.get("PackageType", PackageType.Zip)
1✔
975
        runtime = request.get("Runtime")
1✔
976
        self._validate_runtime(package_type, runtime)
1✔
977

978
        request_function_name = request.get("FunctionName")
1✔
979

980
        function_name, *_ = api_utils.get_name_and_qualifier(
1✔
981
            function_arn_or_name=request_function_name,
982
            qualifier=None,
983
            context=context,
984
        )
985

986
        if runtime in DEPRECATED_RUNTIMES:
1✔
987
            LOG.warning(
1✔
988
                "The Lambda runtime %s} is deprecated. "
989
                "Please upgrade the runtime for the function %s: "
990
                "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
991
                runtime,
992
                function_name,
993
            )
994
        if snap_start := request.get("SnapStart"):
1✔
995
            self._validate_snapstart(snap_start, runtime)
1✔
996
        state = lambda_stores[context_account_id][context_region]
1✔
997

998
        with self.create_fn_lock:
1✔
999
            if function_name in state.functions:
1✔
UNCOV
1000
                raise ResourceConflictException(f"Function already exist: {function_name}")
×
1001
            fn = Function(function_name=function_name)
1✔
1002
            arn = VersionIdentifier(
1✔
1003
                function_name=function_name,
1004
                qualifier="$LATEST",
1005
                region=context_region,
1006
                account=context_account_id,
1007
            )
1008
            # save function code to s3
1009
            code = None
1✔
1010
            image = None
1✔
1011
            image_config = None
1✔
1012
            runtime_version_config = RuntimeVersionConfig(
1✔
1013
                # Limitation: the runtime id (presumably sha256 of image) is currently hardcoded
1014
                # Potential implementation: provide (cached) sha256 hash of used Docker image
1015
                RuntimeVersionArn=f"arn:{context.partition}:lambda:{context_region}::runtime:8eeff65f6809a3ce81507fe733fe09b835899b99481ba22fd75b5a7338290ec1"
1016
            )
1017
            request_code = request.get("Code")
1✔
1018
            if package_type == PackageType.Zip:
1✔
1019
                # TODO verify if correct combination of code is set
1020
                if zip_file := request_code.get("ZipFile"):
1✔
1021
                    code = store_lambda_archive(
1✔
1022
                        archive_file=zip_file,
1023
                        function_name=function_name,
1024
                        region_name=context_region,
1025
                        account_id=context_account_id,
1026
                    )
1027
                elif s3_bucket := request_code.get("S3Bucket"):
1✔
1028
                    s3_key = request_code["S3Key"]
1✔
1029
                    s3_object_version = request_code.get("S3ObjectVersion")
1✔
1030
                    code = store_s3_bucket_archive(
1✔
1031
                        archive_bucket=s3_bucket,
1032
                        archive_key=s3_key,
1033
                        archive_version=s3_object_version,
1034
                        function_name=function_name,
1035
                        region_name=context_region,
1036
                        account_id=context_account_id,
1037
                    )
1038
                else:
UNCOV
1039
                    raise LambdaServiceException("A ZIP file or S3 bucket is required")
×
1040
            elif package_type == PackageType.Image:
1✔
1041
                image = request_code.get("ImageUri")
1✔
1042
                if not image:
1✔
1043
                    raise LambdaServiceException(
×
1044
                        "An image is required when the package type is set to 'image'"
1045
                    )
1046
                image = create_image_code(image_uri=image)
1✔
1047

1048
                image_config_req = request.get("ImageConfig", {})
1✔
1049
                image_config = ImageConfig(
1✔
1050
                    command=image_config_req.get("Command"),
1051
                    entrypoint=image_config_req.get("EntryPoint"),
1052
                    working_directory=image_config_req.get("WorkingDirectory"),
1053
                )
1054
                # Runtime management controls are not available when providing a custom image
1055
                runtime_version_config = None
1✔
1056

1057
            capacity_provider_config = None
1✔
1058
            memory_size = request.get("MemorySize", LAMBDA_DEFAULT_MEMORY_SIZE)
1✔
1059
            if "CapacityProviderConfig" in request:
1✔
UNCOV
1060
                capacity_provider_config = request["CapacityProviderConfig"]
×
UNCOV
1061
                self._validate_capacity_provider_config(capacity_provider_config, context)
×
1062

UNCOV
1063
                default_config = CapacityProviderConfig(
×
1064
                    LambdaManagedInstancesCapacityProviderConfig=LambdaManagedInstancesCapacityProviderConfig(
1065
                        ExecutionEnvironmentMemoryGiBPerVCpu=2.0,
1066
                        PerExecutionEnvironmentMaxConcurrency=16,
1067
                    )
1068
                )
UNCOV
1069
                capacity_provider_config = merge_recursive(default_config, capacity_provider_config)
×
UNCOV
1070
                memory_size = 2048
×
UNCOV
1071
                if request.get("LoggingConfig", {}).get("LogFormat") == LogFormat.Text:
×
UNCOV
1072
                    raise InvalidParameterValueException(
×
1073
                        'LogLevel is not supported when LogFormat is set to "Text". Remove LogLevel from your request or change the LogFormat to "JSON" and try again.',
1074
                        Type="User",
1075
                    )
1076
            if "LoggingConfig" in request:
1✔
1077
                logging_config = request["LoggingConfig"]
1✔
1078
                LOG.warning(
1✔
1079
                    "Advanced Lambda Logging Configuration is currently mocked "
1080
                    "and will not impact the logging behavior. "
1081
                    "Please create a feature request if needed."
1082
                )
1083

1084
                # when switching to JSON, app and system level log is auto set to INFO
1085
                if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
1086
                    logging_config = {
1✔
1087
                        "ApplicationLogLevel": "INFO",
1088
                        "SystemLogLevel": "INFO",
1089
                        "LogGroup": f"/aws/lambda/{function_name}",
1090
                    } | logging_config
1091
                else:
UNCOV
1092
                    logging_config = (
×
1093
                        LoggingConfig(
1094
                            LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
1095
                        )
1096
                        | logging_config
1097
                    )
1098

1099
            elif capacity_provider_config:
1✔
UNCOV
1100
                logging_config = LoggingConfig(
×
1101
                    LogFormat=LogFormat.JSON,
1102
                    LogGroup=f"/aws/lambda/{function_name}",
1103
                    ApplicationLogLevel="INFO",
1104
                    SystemLogLevel="INFO",
1105
                )
1106
            else:
1107
                logging_config = LoggingConfig(
1✔
1108
                    LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
1109
                )
1110
            snap_start = (
1✔
1111
                None
1112
                if capacity_provider_config
1113
                else SnapStartResponse(
1114
                    ApplyOn=request.get("SnapStart", {}).get("ApplyOn", SnapStartApplyOn.None_),
1115
                    OptimizationStatus=SnapStartOptimizationStatus.Off,
1116
                )
1117
            )
1118
            version = FunctionVersion(
1✔
1119
                id=arn,
1120
                config=VersionFunctionConfiguration(
1121
                    last_modified=api_utils.format_lambda_date(datetime.datetime.now()),
1122
                    description=request.get("Description", ""),
1123
                    role=request["Role"],
1124
                    timeout=request.get("Timeout", LAMBDA_DEFAULT_TIMEOUT),
1125
                    runtime=request.get("Runtime"),
1126
                    memory_size=memory_size,
1127
                    handler=request.get("Handler"),
1128
                    package_type=package_type,
1129
                    environment=env_vars,
1130
                    architectures=request.get("Architectures") or [Architecture.x86_64],
1131
                    tracing_config_mode=request.get("TracingConfig", {}).get(
1132
                        "Mode", TracingMode.PassThrough
1133
                    ),
1134
                    image=image,
1135
                    image_config=image_config,
1136
                    code=code,
1137
                    layers=self.map_layers(layers),
1138
                    internal_revision=short_uid(),
1139
                    ephemeral_storage=LambdaEphemeralStorage(
1140
                        size=request.get("EphemeralStorage", {}).get("Size", 512)
1141
                    ),
1142
                    snap_start=snap_start,
1143
                    runtime_version_config=runtime_version_config,
1144
                    dead_letter_arn=request.get("DeadLetterConfig", {}).get("TargetArn"),
1145
                    vpc_config=self._build_vpc_config(
1146
                        context_account_id, context_region, request.get("VpcConfig")
1147
                    ),
1148
                    state=VersionState(
1149
                        state=State.Pending,
1150
                        code=StateReasonCode.Creating,
1151
                        reason="The function is being created.",
1152
                    ),
1153
                    logging_config=logging_config,
1154
                    # TODO: might need something like **optional_kwargs if None
1155
                    #   -> Test with regular GetFunction (i.e., without a capacity provider)
1156
                    CapacityProviderConfig=capacity_provider_config,
1157
                ),
1158
            )
1159
            version_post_response = None
1✔
1160
            if capacity_provider_config:
1✔
UNCOV
1161
                version_post_response = dataclasses.replace(
×
1162
                    version,
1163
                    config=dataclasses.replace(
1164
                        version.config,
1165
                        last_update=UpdateStatus(status=LastUpdateStatus.Successful),
1166
                        state=VersionState(state=State.ActiveNonInvocable),
1167
                    ),
1168
                )
1169
            fn.versions["$LATEST"] = version_post_response or version
1✔
1170
            state.functions[function_name] = fn
1✔
1171
        function_counter.labels(
1✔
1172
            operation=FunctionOperation.create,
1173
            runtime=runtime or "n/a",
1174
            status=FunctionStatus.success,
1175
            invocation_type="n/a",
1176
            package_type=package_type,
1177
        )
1178
        # TODO: consider potential other side effects of not having a function version for $LATEST
1179
        # Provisioning happens upon publishing for functions using a capacity provider
1180
        if not capacity_provider_config:
1✔
1181
            self.lambda_service.create_function_version(version)
1✔
1182

1183
        if tags := request.get("Tags"):
1✔
1184
            # This will check whether the function exists.
1185
            self._store_tags(arn.unqualified_arn(), tags)
1✔
1186

1187
        if request.get("Publish"):
1✔
1188
            version = self._publish_version_with_changes(
1✔
1189
                function_name=function_name,
1190
                region=context_region,
1191
                account_id=context_account_id,
1192
                publish_to=request.get("PublishTo"),
1193
            )
1194

1195
        if config.LAMBDA_SYNCHRONOUS_CREATE:
1✔
1196
            # block via retrying until "terminal" condition reached before returning
UNCOV
1197
            if not poll_condition(
×
1198
                lambda: get_function_version(
1199
                    function_name, version.id.qualifier, version.id.account, version.id.region
1200
                ).config.state.state
1201
                in [State.Active, State.ActiveNonInvocable, State.Failed],
1202
                timeout=10,
1203
            ):
UNCOV
1204
                LOG.warning(
×
1205
                    "LAMBDA_SYNCHRONOUS_CREATE is active, but waiting for %s reached timeout.",
1206
                    function_name,
1207
                )
1208

1209
        return api_utils.map_config_out(
1✔
1210
            version, return_qualified_arn=False, return_update_status=False
1211
        )
1212

1213
    def _validate_runtime(self, package_type, runtime):
1✔
1214
        runtimes = ALL_RUNTIMES
1✔
1215
        if config.LAMBDA_RUNTIME_VALIDATION:
1✔
1216
            runtimes = list(itertools.chain(RUNTIMES_AGGREGATED.values()))
1✔
1217

1218
        if package_type == PackageType.Zip and runtime not in runtimes:
1✔
1219
            # deprecated runtimes have different error
1220
            if runtime in DEPRECATED_RUNTIMES:
1✔
1221
                HINT_LOG.info(
1✔
1222
                    "Set env variable LAMBDA_RUNTIME_VALIDATION to 0"
1223
                    " in order to allow usage of deprecated runtimes"
1224
                )
1225
                self._check_for_recomended_migration_target(runtime)
1✔
1226

1227
            raise InvalidParameterValueException(
1✔
1228
                f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1229
                Type="User",
1230
            )
1231

1232
    def _check_for_recomended_migration_target(self, deprecated_runtime):
1✔
1233
        # AWS offers recommended runtime for migration for "newly" deprecated runtimes
1234
        # in order to preserve parity with error messages we need the code bellow
1235
        latest_runtime = DEPRECATED_RUNTIMES_UPGRADES.get(deprecated_runtime)
1✔
1236

1237
        if latest_runtime is not None:
1✔
1238
            LOG.debug(
1✔
1239
                "The Lambda runtime %s is deprecated. Please upgrade to a supported Lambda runtime such as %s.",
1240
                deprecated_runtime,
1241
                latest_runtime,
1242
            )
1243
            raise InvalidParameterValueException(
1✔
1244
                f"The runtime parameter of {deprecated_runtime} is no longer supported for creating or updating AWS Lambda functions. We recommend you use a supported runtime while creating or updating functions.",
1245
                Type="User",
1246
            )
1247

1248
    @handler(operation="UpdateFunctionConfiguration", expand=False)
1✔
1249
    def update_function_configuration(
1✔
1250
        self, context: RequestContext, request: UpdateFunctionConfigurationRequest
1251
    ) -> FunctionConfiguration:
1252
        """updates the $LATEST version of the function"""
1253
        function_name = request.get("FunctionName")
1✔
1254

1255
        # in case we got ARN or partial ARN
1256
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1257
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1258
        state = lambda_stores[account_id][region]
1✔
1259

1260
        if function_name not in state.functions:
1✔
UNCOV
1261
            raise ResourceNotFoundException(
×
1262
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1263
                Type="User",
1264
            )
1265
        function = state.functions[function_name]
1✔
1266

1267
        # TODO: lock modification of latest version
1268
        # TODO: notify service for changes relevant to re-provisioning of $LATEST
1269
        latest_version = function.latest()
1✔
1270
        latest_version_config = latest_version.config
1✔
1271

1272
        revision_id = request.get("RevisionId")
1✔
1273
        if revision_id and revision_id != latest_version.config.revision_id:
1✔
1274
            raise PreconditionFailedException(
1✔
1275
                "The Revision Id provided does not match the latest Revision Id. "
1276
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1277
                Type="User",
1278
            )
1279

1280
        replace_kwargs = {}
1✔
1281
        if "EphemeralStorage" in request:
1✔
UNCOV
1282
            replace_kwargs["ephemeral_storage"] = LambdaEphemeralStorage(
×
1283
                request.get("EphemeralStorage", {}).get("Size", 512)
1284
            )  # TODO: do defaults here apply as well?
1285

1286
        if "Role" in request:
1✔
1287
            if not api_utils.is_role_arn(request["Role"]):
1✔
1288
                raise ValidationException(
1✔
1289
                    f"1 validation error detected: Value '{request.get('Role')}'"
1290
                    + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
1291
                )
1292
            replace_kwargs["role"] = request["Role"]
1✔
1293

1294
        if "Description" in request:
1✔
1295
            replace_kwargs["description"] = request["Description"]
1✔
1296

1297
        if "Timeout" in request:
1✔
1298
            replace_kwargs["timeout"] = request["Timeout"]
1✔
1299

1300
        if "MemorySize" in request:
1✔
1301
            replace_kwargs["memory_size"] = request["MemorySize"]
1✔
1302

1303
        if "DeadLetterConfig" in request:
1✔
1304
            replace_kwargs["dead_letter_arn"] = request.get("DeadLetterConfig", {}).get("TargetArn")
1✔
1305

1306
        if vpc_config := request.get("VpcConfig"):
1✔
1307
            replace_kwargs["vpc_config"] = self._build_vpc_config(account_id, region, vpc_config)
1✔
1308

1309
        if "Handler" in request:
1✔
1310
            replace_kwargs["handler"] = request["Handler"]
1✔
1311

1312
        if "Runtime" in request:
1✔
1313
            runtime = request["Runtime"]
1✔
1314

1315
            if runtime not in ALL_RUNTIMES:
1✔
1316
                raise InvalidParameterValueException(
1✔
1317
                    f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1318
                    Type="User",
1319
                )
1320
            if runtime in DEPRECATED_RUNTIMES:
1✔
UNCOV
1321
                LOG.warning(
×
1322
                    "The Lambda runtime %s is deprecated. "
1323
                    "Please upgrade the runtime for the function %s: "
1324
                    "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
1325
                    runtime,
1326
                    function_name,
1327
                )
1328
            replace_kwargs["runtime"] = request["Runtime"]
1✔
1329

1330
        if snap_start := request.get("SnapStart"):
1✔
1331
            runtime = replace_kwargs.get("runtime") or latest_version_config.runtime
1✔
1332
            self._validate_snapstart(snap_start, runtime)
1✔
1333
            replace_kwargs["snap_start"] = SnapStartResponse(
1✔
1334
                ApplyOn=snap_start.get("ApplyOn", SnapStartApplyOn.None_),
1335
                OptimizationStatus=SnapStartOptimizationStatus.Off,
1336
            )
1337

1338
        if "Environment" in request:
1✔
1339
            if env_vars := request.get("Environment", {}).get("Variables", {}):
1✔
1340
                self._verify_env_variables(env_vars)
1✔
1341
            replace_kwargs["environment"] = env_vars
1✔
1342

1343
        if "Layers" in request:
1✔
1344
            new_layers = request["Layers"]
1✔
1345
            if new_layers:
1✔
1346
                self._validate_layers(new_layers, region=region, account_id=account_id)
1✔
1347
            replace_kwargs["layers"] = self.map_layers(new_layers)
1✔
1348

1349
        if "ImageConfig" in request:
1✔
1350
            new_image_config = request["ImageConfig"]
1✔
1351
            replace_kwargs["image_config"] = ImageConfig(
1✔
1352
                command=new_image_config.get("Command"),
1353
                entrypoint=new_image_config.get("EntryPoint"),
1354
                working_directory=new_image_config.get("WorkingDirectory"),
1355
            )
1356

1357
        if "LoggingConfig" in request:
1✔
1358
            logging_config = request["LoggingConfig"]
1✔
1359
            LOG.warning(
1✔
1360
                "Advanced Lambda Logging Configuration is currently mocked "
1361
                "and will not impact the logging behavior. "
1362
                "Please create a feature request if needed."
1363
            )
1364

1365
            # when switching to JSON, app and system level log is auto set to INFO
1366
            if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
1367
                logging_config = {
1✔
1368
                    "ApplicationLogLevel": "INFO",
1369
                    "SystemLogLevel": "INFO",
1370
                } | logging_config
1371

1372
            last_config = latest_version_config.logging_config
1✔
1373

1374
            # add partial update
1375
            new_logging_config = last_config | logging_config
1✔
1376

1377
            # in case we switched from JSON to Text we need to remove LogLevel keys
1378
            if (
1✔
1379
                new_logging_config.get("LogFormat") == LogFormat.Text
1380
                and last_config.get("LogFormat") == LogFormat.JSON
1381
            ):
1382
                new_logging_config.pop("ApplicationLogLevel", None)
1✔
1383
                new_logging_config.pop("SystemLogLevel", None)
1✔
1384

1385
            replace_kwargs["logging_config"] = new_logging_config
1✔
1386

1387
        if "TracingConfig" in request:
1✔
UNCOV
1388
            new_mode = request.get("TracingConfig", {}).get("Mode")
×
UNCOV
1389
            if new_mode:
×
UNCOV
1390
                replace_kwargs["tracing_config_mode"] = new_mode
×
1391

1392
        if "CapacityProviderConfig" in request:
1✔
UNCOV
1393
            if latest_version.config.CapacityProviderConfig and not request[
×
1394
                "CapacityProviderConfig"
1395
            ].get("LambdaManagedInstancesCapacityProviderConfig"):
UNCOV
1396
                raise ValidationException(
×
1397
                    "1 validation error detected: Value null at 'capacityProviderConfig.lambdaManagedInstancesCapacityProviderConfig' failed to satisfy constraint: Member must not be null"
1398
                )
UNCOV
1399
            if not latest_version.config.CapacityProviderConfig:
×
1400
                raise InvalidParameterValueException(
×
1401
                    "CapacityProviderConfig isn't supported for Lambda Default functions.",
1402
                    Type="User",
1403
                )
1404

1405
        new_latest_version = dataclasses.replace(
1✔
1406
            latest_version,
1407
            config=dataclasses.replace(
1408
                latest_version_config,
1409
                last_modified=api_utils.generate_lambda_date(),
1410
                internal_revision=short_uid(),
1411
                last_update=UpdateStatus(
1412
                    status=LastUpdateStatus.InProgress,
1413
                    code="Creating",
1414
                    reason="The function is being created.",
1415
                ),
1416
                **replace_kwargs,
1417
            ),
1418
        )
1419
        function.versions["$LATEST"] = new_latest_version  # TODO: notify
1✔
1420
        self.lambda_service.update_version(new_version=new_latest_version)
1✔
1421

1422
        return api_utils.map_config_out(new_latest_version)
1✔
1423

1424
    @handler(operation="UpdateFunctionCode", expand=False)
1✔
1425
    def update_function_code(
1✔
1426
        self, context: RequestContext, request: UpdateFunctionCodeRequest
1427
    ) -> FunctionConfiguration:
1428
        """updates the $LATEST version of the function"""
1429
        # only supports normal zip packaging atm
1430
        # if request.get("Publish"):
1431
        #     self.lambda_service.create_function_version()
1432

1433
        function_name = request.get("FunctionName")
1✔
1434
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1435
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1436

1437
        store = lambda_stores[account_id][region]
1✔
1438
        if function_name not in store.functions:
1✔
UNCOV
1439
            raise ResourceNotFoundException(
×
1440
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1441
                Type="User",
1442
            )
1443
        function = store.functions[function_name]
1✔
1444

1445
        revision_id = request.get("RevisionId")
1✔
1446
        if revision_id and revision_id != function.latest().config.revision_id:
1✔
1447
            raise PreconditionFailedException(
1✔
1448
                "The Revision Id provided does not match the latest Revision Id. "
1449
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1450
                Type="User",
1451
            )
1452

1453
        # TODO verify if correct combination of code is set
1454
        image = None
1✔
1455
        if (
1✔
1456
            request.get("ZipFile") or request.get("S3Bucket")
1457
        ) and function.latest().config.package_type == PackageType.Image:
1458
            raise InvalidParameterValueException(
1✔
1459
                "Please provide ImageUri when updating a function with packageType Image.",
1460
                Type="User",
1461
            )
1462
        elif request.get("ImageUri") and function.latest().config.package_type == PackageType.Zip:
1✔
1463
            raise InvalidParameterValueException(
1✔
1464
                "Please don't provide ImageUri when updating a function with packageType Zip.",
1465
                Type="User",
1466
            )
1467

1468
        if zip_file := request.get("ZipFile"):
1✔
1469
            code = store_lambda_archive(
1✔
1470
                archive_file=zip_file,
1471
                function_name=function_name,
1472
                region_name=region,
1473
                account_id=account_id,
1474
            )
1475
        elif s3_bucket := request.get("S3Bucket"):
1✔
1476
            s3_key = request["S3Key"]
1✔
1477
            s3_object_version = request.get("S3ObjectVersion")
1✔
1478
            code = store_s3_bucket_archive(
1✔
1479
                archive_bucket=s3_bucket,
1480
                archive_key=s3_key,
1481
                archive_version=s3_object_version,
1482
                function_name=function_name,
1483
                region_name=region,
1484
                account_id=account_id,
1485
            )
1486
        elif image := request.get("ImageUri"):
1✔
1487
            code = None
1✔
1488
            image = create_image_code(image_uri=image)
1✔
1489
        else:
UNCOV
1490
            raise LambdaServiceException("A ZIP file, S3 bucket, or image is required")
×
1491

1492
        old_function_version = function.versions.get("$LATEST")
1✔
1493
        replace_kwargs = {"code": code} if code else {"image": image}
1✔
1494

1495
        if architectures := request.get("Architectures"):
1✔
UNCOV
1496
            if len(architectures) != 1:
×
UNCOV
1497
                raise ValidationException(
×
1498
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1499
                    f"satisfy constraint: Member must have length less than or equal to 1",
1500
                )
1501
            # An empty list of architectures is also forbidden. Further exceptions are tested here for create_function:
1502
            # tests.aws.services.lambda_.test_lambda_api.TestLambdaFunction.test_create_lambda_exceptions
UNCOV
1503
            if architectures[0] not in ARCHITECTURES:
×
UNCOV
1504
                raise ValidationException(
×
1505
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1506
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
1507
                    f"[x86_64, arm64], Member must not be null]",
1508
                )
UNCOV
1509
            replace_kwargs["architectures"] = architectures
×
1510

1511
        config = dataclasses.replace(
1✔
1512
            old_function_version.config,
1513
            internal_revision=short_uid(),
1514
            last_modified=api_utils.generate_lambda_date(),
1515
            last_update=UpdateStatus(
1516
                status=LastUpdateStatus.InProgress,
1517
                code="Creating",
1518
                reason="The function is being created.",
1519
            ),
1520
            **replace_kwargs,
1521
        )
1522
        function_version = dataclasses.replace(old_function_version, config=config)
1✔
1523
        function.versions["$LATEST"] = function_version
1✔
1524

1525
        self.lambda_service.update_version(new_version=function_version)
1✔
1526
        if request.get("Publish"):
1✔
1527
            function_version = self._publish_version_with_changes(
1✔
1528
                function_name=function_name,
1529
                region=region,
1530
                account_id=account_id,
1531
                # TODO: validations for PublishTo without Publish=True
1532
                publish_to=request.get("PublishTo"),
1533
                is_active=True,
1534
            )
1535
        return api_utils.map_config_out(
1✔
1536
            function_version, return_qualified_arn=bool(request.get("Publish"))
1537
        )
1538

1539
    # TODO: does deleting the latest published version affect the next versions number?
1540
    # TODO: what happens when we call this with a qualifier and a fully qualified ARN? (+ conflicts?)
1541
    # TODO: test different ARN patterns (shorthand ARN?)
1542
    # TODO: test deleting across regions?
1543
    # TODO: test mismatch between context region and region in ARN
1544
    # TODO: test qualifier $LATEST, alias-name and version
1545
    def delete_function(
1✔
1546
        self,
1547
        context: RequestContext,
1548
        function_name: NamespacedFunctionName,
1549
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1550
        **kwargs,
1551
    ) -> DeleteFunctionResponse:
1552
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1553
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1554
            function_name, qualifier, context
1555
        )
1556

1557
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1558
            raise InvalidParameterValueException(
×
1559
                "Deletion of aliases is not currently supported.",
1560
                Type="User",
1561
            )
1562

1563
        store = lambda_stores[account_id][region]
1✔
1564
        if qualifier == "$LATEST":
1✔
1565
            raise InvalidParameterValueException(
1✔
1566
                "$LATEST version cannot be deleted without deleting the function.", Type="User"
1567
            )
1568

1569
        if function_name not in store.functions:
1✔
1570
            e = ResourceNotFoundException(
1✔
1571
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1572
                Type="User",
1573
            )
1574
            raise e
1✔
1575
        function = store.functions.get(function_name)
1✔
1576

1577
        function_has_capacity_provider = False
1✔
1578
        if qualifier:
1✔
1579
            # delete a version of the function
1580
            version = function.versions.pop(qualifier, None)
1✔
1581
            if version:
1✔
1582
                if version.config.CapacityProviderConfig:
1✔
UNCOV
1583
                    function_has_capacity_provider = True
×
1584
                self.lambda_service.stop_version(version.id.qualified_arn())
1✔
1585
                destroy_code_if_not_used(code=version.config.code, function=function)
1✔
1586
        else:
1587
            # delete the whole function
1588
            # TODO: introduce locking for safe deletion: We could create a new version at the API layer before
1589
            #  the old version gets cleaned up in the internal lambda service.
1590
            function = store.functions.pop(function_name)
1✔
1591
            for version in function.versions.values():
1✔
1592
                # Functions with a capacity provider do NOT have a version manager for $LATEST because only
1593
                # published versions are invokable.
1594
                if version.config.CapacityProviderConfig:
1✔
UNCOV
1595
                    function_has_capacity_provider = True
×
UNCOV
1596
                    if version.id.qualifier == "$LATEST":
×
UNCOV
1597
                        pass
×
1598
                else:
1599
                    self.lambda_service.stop_version(qualified_arn=version.id.qualified_arn())
1✔
1600
                # we can safely destroy the code here
1601
                if version.config.code:
1✔
1602
                    version.config.code.destroy()
1✔
1603

1604
        return DeleteFunctionResponse(StatusCode=202 if function_has_capacity_provider else 204)
1✔
1605

1606
    def list_functions(
1✔
1607
        self,
1608
        context: RequestContext,
1609
        master_region: MasterRegion = None,  # (only relevant for lambda@edge)
1610
        function_version: FunctionVersionApi = None,
1611
        marker: String = None,
1612
        max_items: MaxListItems = None,
1613
        **kwargs,
1614
    ) -> ListFunctionsResponse:
1615
        state = lambda_stores[context.account_id][context.region]
1✔
1616

1617
        if function_version and function_version != FunctionVersionApi.ALL:
1✔
1618
            raise ValidationException(
1✔
1619
                f"1 validation error detected: Value '{function_version}'"
1620
                + " at 'functionVersion' failed to satisfy constraint: Member must satisfy enum value set: [ALL]"
1621
            )
1622

1623
        if function_version == FunctionVersionApi.ALL:
1✔
1624
            # include all versions for all function
1625
            versions = [v for f in state.functions.values() for v in f.versions.values()]
1✔
1626
            return_qualified_arn = True
1✔
1627
        else:
1628
            versions = [f.latest() for f in state.functions.values()]
1✔
1629
            return_qualified_arn = False
1✔
1630

1631
        versions = [
1✔
1632
            api_utils.map_to_list_response(
1633
                api_utils.map_config_out(fc, return_qualified_arn=return_qualified_arn)
1634
            )
1635
            for fc in versions
1636
        ]
1637
        versions = PaginatedList(versions)
1✔
1638
        page, token = versions.get_page(
1✔
1639
            lambda version: version["FunctionArn"],
1640
            marker,
1641
            max_items,
1642
        )
1643
        return ListFunctionsResponse(Functions=page, NextMarker=token)
1✔
1644

1645
    def get_function(
1✔
1646
        self,
1647
        context: RequestContext,
1648
        function_name: NamespacedFunctionName,
1649
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1650
        **kwargs,
1651
    ) -> GetFunctionResponse:
1652
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1653
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1654
            function_name, qualifier, context
1655
        )
1656

1657
        fn = lambda_stores[account_id][region].functions.get(function_name)
1✔
1658
        if fn is None:
1✔
1659
            if qualifier is None:
1✔
1660
                raise ResourceNotFoundException(
1✔
1661
                    f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
1662
                    Type="User",
1663
                )
1664
            else:
1665
                raise ResourceNotFoundException(
1✔
1666
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
1667
                    Type="User",
1668
                )
1669
        alias_name = None
1✔
1670
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1671
            if qualifier not in fn.aliases:
1✔
1672
                alias_arn = api_utils.qualified_lambda_arn(
1✔
1673
                    function_name, qualifier, account_id, region
1674
                )
1675
                raise ResourceNotFoundException(f"Function not found: {alias_arn}", Type="User")
1✔
1676
            alias_name = qualifier
1✔
1677
            qualifier = fn.aliases[alias_name].function_version
1✔
1678

1679
        version = get_function_version(
1✔
1680
            function_name=function_name,
1681
            qualifier=qualifier,
1682
            account_id=account_id,
1683
            region=region,
1684
        )
1685
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
1686
        additional_fields = {}
1✔
1687
        if tags:
1✔
1688
            additional_fields["Tags"] = tags
1✔
1689
        code_location = None
1✔
1690
        if code := version.config.code:
1✔
1691
            code_location = FunctionCodeLocation(
1✔
1692
                Location=code.generate_presigned_url(endpoint_url=config.external_service_url()),
1693
                RepositoryType="S3",
1694
            )
1695
        elif image := version.config.image:
1✔
1696
            code_location = FunctionCodeLocation(
1✔
1697
                ImageUri=image.image_uri,
1698
                RepositoryType=image.repository_type,
1699
                ResolvedImageUri=image.resolved_image_uri,
1700
            )
1701
        concurrency = None
1✔
1702
        if fn.reserved_concurrent_executions:
1✔
1703
            concurrency = Concurrency(
1✔
1704
                ReservedConcurrentExecutions=fn.reserved_concurrent_executions
1705
            )
1706

1707
        return GetFunctionResponse(
1✔
1708
            Configuration=api_utils.map_config_out(
1709
                version, return_qualified_arn=bool(qualifier), alias_name=alias_name
1710
            ),
1711
            Code=code_location,  # TODO
1712
            Concurrency=concurrency,
1713
            **additional_fields,
1714
        )
1715

1716
    def get_function_configuration(
1✔
1717
        self,
1718
        context: RequestContext,
1719
        function_name: NamespacedFunctionName,
1720
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1721
        **kwargs,
1722
    ) -> FunctionConfiguration:
1723
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1724
        # CAVE: THIS RETURN VALUE IS *NOT* THE SAME AS IN get_function (!) but seems to be only configuration part?
1725
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1726
            function_name, qualifier, context
1727
        )
1728
        version = get_function_version(
1✔
1729
            function_name=function_name,
1730
            qualifier=qualifier,
1731
            account_id=account_id,
1732
            region=region,
1733
        )
1734
        return api_utils.map_config_out(version, return_qualified_arn=bool(qualifier))
1✔
1735

1736
    def invoke(
1✔
1737
        self,
1738
        context: RequestContext,
1739
        function_name: NamespacedFunctionName,
1740
        invocation_type: InvocationType | None = None,
1741
        log_type: LogType | None = None,
1742
        client_context: String | None = None,
1743
        payload: IO[Blob] | None = None,
1744
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1745
        tenant_id: TenantId | None = None,
1746
        **kwargs,
1747
    ) -> InvocationResponse:
1748
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1749
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1750
            function_name, qualifier, context
1751
        )
1752

1753
        user_agent = context.request.user_agent.string
1✔
1754

1755
        time_before = time.perf_counter()
1✔
1756
        try:
1✔
1757
            invocation_result = self.lambda_service.invoke(
1✔
1758
                function_name=function_name,
1759
                qualifier=qualifier,
1760
                region=region,
1761
                account_id=account_id,
1762
                invocation_type=invocation_type,
1763
                client_context=client_context,
1764
                request_id=context.request_id,
1765
                trace_context=context.trace_context,
1766
                payload=payload.read() if payload else None,
1767
                user_agent=user_agent,
1768
            )
1769
        except ServiceException:
1✔
1770
            raise
1✔
1771
        except EnvironmentStartupTimeoutException as e:
1✔
1772
            raise LambdaServiceException(
1✔
1773
                f"[{context.request_id}] Timeout while starting up lambda environment for function {function_name}:{qualifier}"
1774
            ) from e
1775
        except Exception as e:
1✔
1776
            LOG.error(
1✔
1777
                "[%s] Error while invoking lambda %s",
1778
                context.request_id,
1779
                function_name,
1780
                exc_info=LOG.isEnabledFor(logging.DEBUG),
1781
            )
1782
            raise LambdaServiceException(
1✔
1783
                f"[{context.request_id}] Internal error while executing lambda {function_name}:{qualifier}. Caused by {type(e).__name__}: {e}"
1784
            ) from e
1785

1786
        if invocation_type == InvocationType.Event:
1✔
1787
            # This happens when invocation type is event
1788
            return InvocationResponse(StatusCode=202)
1✔
1789
        if invocation_type == InvocationType.DryRun:
1✔
1790
            # This happens when invocation type is dryrun
1791
            return InvocationResponse(StatusCode=204)
1✔
1792
        LOG.debug("Lambda invocation duration: %0.2fms", (time.perf_counter() - time_before) * 1000)
1✔
1793

1794
        response = InvocationResponse(
1✔
1795
            StatusCode=200,
1796
            Payload=invocation_result.payload,
1797
            ExecutedVersion=invocation_result.executed_version,
1798
        )
1799

1800
        if invocation_result.is_error:
1✔
1801
            response["FunctionError"] = "Unhandled"
1✔
1802

1803
        if log_type == LogType.Tail:
1✔
1804
            response["LogResult"] = to_str(
1✔
1805
                base64.b64encode(to_bytes(invocation_result.logs)[-4096:])
1806
            )
1807

1808
        return response
1✔
1809

1810
    # Version operations
1811
    def publish_version(
1✔
1812
        self,
1813
        context: RequestContext,
1814
        function_name: FunctionName,
1815
        code_sha256: String | None = None,
1816
        description: Description | None = None,
1817
        revision_id: String | None = None,
1818
        publish_to: FunctionVersionLatestPublished | None = None,
1819
        **kwargs,
1820
    ) -> FunctionConfiguration:
1821
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1822
        function_name = api_utils.get_function_name(function_name, context)
1✔
1823
        new_version = self._publish_version_from_existing_version(
1✔
1824
            function_name=function_name,
1825
            description=description,
1826
            account_id=account_id,
1827
            region=region,
1828
            revision_id=revision_id,
1829
            code_sha256=code_sha256,
1830
            publish_to=publish_to,
1831
        )
1832
        return api_utils.map_config_out(new_version, return_qualified_arn=True)
1✔
1833

1834
    def list_versions_by_function(
1✔
1835
        self,
1836
        context: RequestContext,
1837
        function_name: NamespacedFunctionName,
1838
        marker: String = None,
1839
        max_items: MaxListItems = None,
1840
        **kwargs,
1841
    ) -> ListVersionsByFunctionResponse:
1842
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1843
        function_name = api_utils.get_function_name(function_name, context)
1✔
1844
        function = self._get_function(
1✔
1845
            function_name=function_name, region=region, account_id=account_id
1846
        )
1847
        versions = [
1✔
1848
            api_utils.map_to_list_response(
1849
                api_utils.map_config_out(version=version, return_qualified_arn=True)
1850
            )
1851
            for version in function.versions.values()
1852
        ]
1853
        items = PaginatedList(versions)
1✔
1854
        page, token = items.get_page(
1✔
1855
            lambda item: item,
1856
            marker,
1857
            max_items,
1858
        )
1859
        return ListVersionsByFunctionResponse(Versions=page, NextMarker=token)
1✔
1860

1861
    # Alias
1862

1863
    def _create_routing_config_model(
1✔
1864
        self, routing_config_dict: dict[str, float], function_version: FunctionVersion
1865
    ):
1866
        if len(routing_config_dict) > 1:
1✔
1867
            raise InvalidParameterValueException(
1✔
1868
                "Number of items in AdditionalVersionWeights cannot be greater than 1",
1869
                Type="User",
1870
            )
1871
        # should be exactly one item here, still iterating, might be supported in the future
1872
        for key, value in routing_config_dict.items():
1✔
1873
            if value < 0.0 or value >= 1.0:
1✔
1874
                raise ValidationException(
1✔
1875
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map value must satisfy constraint: [Member must have value less than or equal to 1.0, Member must have value greater than or equal to 0.0, Member must not be null]"
1876
                )
1877
            if key == function_version.id.qualifier:
1✔
1878
                raise InvalidParameterValueException(
1✔
1879
                    f"Invalid function version {function_version.id.qualifier}. Function version {function_version.id.qualifier} is already included in routing configuration.",
1880
                    Type="User",
1881
                )
1882
            # check if version target is latest, then no routing config is allowed
1883
            if function_version.id.qualifier == "$LATEST":
1✔
1884
                raise InvalidParameterValueException(
1✔
1885
                    "$LATEST is not supported for an alias pointing to more than 1 version"
1886
                )
1887
            if not api_utils.qualifier_is_version(key):
1✔
1888
                raise ValidationException(
1✔
1889
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map keys must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: [0-9]+]"
1890
                )
1891

1892
            # checking if the version in the config exists
1893
            get_function_version(
1✔
1894
                function_name=function_version.id.function_name,
1895
                qualifier=key,
1896
                region=function_version.id.region,
1897
                account_id=function_version.id.account,
1898
            )
1899
        return AliasRoutingConfig(version_weights=routing_config_dict)
1✔
1900

1901
    def create_alias(
1✔
1902
        self,
1903
        context: RequestContext,
1904
        function_name: FunctionName,
1905
        name: Alias,
1906
        function_version: VersionWithLatestPublished,
1907
        description: Description = None,
1908
        routing_config: AliasRoutingConfiguration = None,
1909
        **kwargs,
1910
    ) -> AliasConfiguration:
1911
        if not api_utils.qualifier_is_alias(name):
1✔
1912
            raise ValidationException(
1✔
1913
                f"1 validation error detected: Value '{name}' at 'name' failed to satisfy constraint: Member must satisfy regular expression pattern: (?!^[0-9]+$)([a-zA-Z0-9-_]+)"
1914
            )
1915

1916
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1917
        function_name = api_utils.get_function_name(function_name, context)
1✔
1918
        target_version = get_function_version(
1✔
1919
            function_name=function_name,
1920
            qualifier=function_version,
1921
            region=region,
1922
            account_id=account_id,
1923
        )
1924
        function = self._get_function(
1✔
1925
            function_name=function_name, region=region, account_id=account_id
1926
        )
1927
        # description is always present, if not specified it's an empty string
1928
        description = description or ""
1✔
1929
        with function.lock:
1✔
1930
            if existing_alias := function.aliases.get(name):
1✔
1931
                raise ResourceConflictException(
1✔
1932
                    f"Alias already exists: {api_utils.map_alias_out(alias=existing_alias, function=function)['AliasArn']}",
1933
                    Type="User",
1934
                )
1935
            # checking if the version exists
1936
            routing_configuration = None
1✔
1937
            if routing_config and (
1✔
1938
                routing_config_dict := routing_config.get("AdditionalVersionWeights")
1939
            ):
1940
                routing_configuration = self._create_routing_config_model(
1✔
1941
                    routing_config_dict, target_version
1942
                )
1943

1944
            alias = VersionAlias(
1✔
1945
                name=name,
1946
                function_version=function_version,
1947
                description=description,
1948
                routing_configuration=routing_configuration,
1949
            )
1950
            function.aliases[name] = alias
1✔
1951
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1952

1953
    def list_aliases(
1✔
1954
        self,
1955
        context: RequestContext,
1956
        function_name: FunctionName,
1957
        function_version: VersionWithLatestPublished = None,
1958
        marker: String = None,
1959
        max_items: MaxListItems = None,
1960
        **kwargs,
1961
    ) -> ListAliasesResponse:
1962
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1963
        function_name = api_utils.get_function_name(function_name, context)
1✔
1964
        function = self._get_function(
1✔
1965
            function_name=function_name, region=region, account_id=account_id
1966
        )
1967
        aliases = [
1✔
1968
            api_utils.map_alias_out(alias, function)
1969
            for alias in function.aliases.values()
1970
            if function_version is None or alias.function_version == function_version
1971
        ]
1972

1973
        aliases = PaginatedList(aliases)
1✔
1974
        page, token = aliases.get_page(
1✔
1975
            lambda alias: alias["AliasArn"],
1976
            marker,
1977
            max_items,
1978
        )
1979

1980
        return ListAliasesResponse(Aliases=page, NextMarker=token)
1✔
1981

1982
    def delete_alias(
1✔
1983
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
1984
    ) -> None:
1985
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1986
        function_name = api_utils.get_function_name(function_name, context)
1✔
1987
        function = self._get_function(
1✔
1988
            function_name=function_name, region=region, account_id=account_id
1989
        )
1990
        version_alias = function.aliases.pop(name, None)
1✔
1991

1992
        # cleanup related resources
1993
        if name in function.provisioned_concurrency_configs:
1✔
1994
            function.provisioned_concurrency_configs.pop(name)
1✔
1995

1996
        # TODO: Allow for deactivating/unregistering specific Lambda URLs
1997
        if version_alias and name in function.function_url_configs:
1✔
1998
            url_config = function.function_url_configs.pop(name)
1✔
1999
            LOG.debug(
1✔
2000
                "Stopping aliased Lambda Function URL %s for %s",
2001
                url_config.url,
2002
                url_config.function_name,
2003
            )
2004

2005
    def get_alias(
1✔
2006
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
2007
    ) -> AliasConfiguration:
2008
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2009
        function_name = api_utils.get_function_name(function_name, context)
1✔
2010
        function = self._get_function(
1✔
2011
            function_name=function_name, region=region, account_id=account_id
2012
        )
2013
        if not (alias := function.aliases.get(name)):
1✔
2014
            raise ResourceNotFoundException(
1✔
2015
                f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name=function_name, qualifier=name, region=region, account=account_id)}",
2016
                Type="User",
2017
            )
2018
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
2019

2020
    def update_alias(
1✔
2021
        self,
2022
        context: RequestContext,
2023
        function_name: FunctionName,
2024
        name: Alias,
2025
        function_version: VersionWithLatestPublished = None,
2026
        description: Description = None,
2027
        routing_config: AliasRoutingConfiguration = None,
2028
        revision_id: String = None,
2029
        **kwargs,
2030
    ) -> AliasConfiguration:
2031
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2032
        function_name = api_utils.get_function_name(function_name, context)
1✔
2033
        function = self._get_function(
1✔
2034
            function_name=function_name, region=region, account_id=account_id
2035
        )
2036
        if not (alias := function.aliases.get(name)):
1✔
2037
            fn_arn = api_utils.qualified_lambda_arn(function_name, name, account_id, region)
1✔
2038
            raise ResourceNotFoundException(
1✔
2039
                f"Alias not found: {fn_arn}",
2040
                Type="User",
2041
            )
2042
        if revision_id and alias.revision_id != revision_id:
1✔
2043
            raise PreconditionFailedException(
1✔
2044
                "The Revision Id provided does not match the latest Revision Id. "
2045
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2046
                Type="User",
2047
            )
2048
        changes = {}
1✔
2049
        if function_version is not None:
1✔
2050
            changes |= {"function_version": function_version}
1✔
2051
        if description is not None:
1✔
2052
            changes |= {"description": description}
1✔
2053
        if routing_config is not None:
1✔
2054
            # if it is an empty dict or AdditionalVersionWeights is empty, set routing config to None
2055
            new_routing_config = None
1✔
2056
            if routing_config_dict := routing_config.get("AdditionalVersionWeights"):
1✔
UNCOV
2057
                new_routing_config = self._create_routing_config_model(routing_config_dict)
×
2058
            changes |= {"routing_configuration": new_routing_config}
1✔
2059
        # even if no changes are done, we have to update revision id for some reason
2060
        old_alias = alias
1✔
2061
        alias = dataclasses.replace(alias, **changes)
1✔
2062
        function.aliases[name] = alias
1✔
2063

2064
        # TODO: signal lambda service that pointer potentially changed
2065
        self.lambda_service.update_alias(old_alias=old_alias, new_alias=alias, function=function)
1✔
2066

2067
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
2068

2069
    # =======================================
2070
    # ======= EVENT SOURCE MAPPINGS =========
2071
    # =======================================
2072
    def check_service_resource_exists(
1✔
2073
        self, service: str, resource_arn: str, function_arn: str, function_role_arn: str
2074
    ):
2075
        """
2076
        Check if the service resource exists and if the function has access to it.
2077

2078
        Raises:
2079
            InvalidParameterValueException: If the service resource does not exist or the function does not have access to it.
2080
        """
2081
        arn = parse_arn(resource_arn)
1✔
2082
        source_client = get_internal_client(
1✔
2083
            arn=resource_arn,
2084
            role_arn=function_role_arn,
2085
            service_principal=ServicePrincipal.lambda_,
2086
            source_arn=function_arn,
2087
        )
2088
        if service in ["sqs", "sqs-fifo"]:
1✔
2089
            try:
1✔
2090
                # AWS uses `GetQueueAttributes` internally to verify the queue existence, but we need the `QueueUrl`
2091
                # which is not given directly. We build out a dummy `QueueUrl` which can be parsed by SQS to return
2092
                # the right value
2093
                queue_name = arn["resource"].split("/")[-1]
1✔
2094
                queue_url = f"http://sqs.{arn['region']}.domain/{arn['account']}/{queue_name}"
1✔
2095
                source_client.get_queue_attributes(QueueUrl=queue_url)
1✔
2096
            except ClientError as e:
1✔
2097
                error_code = e.response["Error"]["Code"]
1✔
2098
                if error_code == "AWS.SimpleQueueService.NonExistentQueue":
1✔
2099
                    raise InvalidParameterValueException(
1✔
2100
                        f"Error occurred while ReceiveMessage. SQS Error Code: {error_code}. SQS Error Message: {e.response['Error']['Message']}",
2101
                        Type="User",
2102
                    )
UNCOV
2103
                raise e
×
2104
        elif service in ["kinesis"]:
1✔
2105
            try:
1✔
2106
                source_client.describe_stream(StreamARN=resource_arn)
1✔
2107
            except ClientError as e:
1✔
2108
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
2109
                    raise InvalidParameterValueException(
1✔
2110
                        f"Stream not found: {resource_arn}",
2111
                        Type="User",
2112
                    )
UNCOV
2113
                raise e
×
2114
        elif service in ["dynamodb"]:
1✔
2115
            try:
1✔
2116
                source_client.describe_stream(StreamArn=resource_arn)
1✔
2117
            except ClientError as e:
1✔
2118
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
2119
                    raise InvalidParameterValueException(
1✔
2120
                        f"Stream not found: {resource_arn}",
2121
                        Type="User",
2122
                    )
UNCOV
2123
                raise e
×
2124

2125
    @handler("CreateEventSourceMapping", expand=False)
1✔
2126
    def create_event_source_mapping(
1✔
2127
        self,
2128
        context: RequestContext,
2129
        request: CreateEventSourceMappingRequest,
2130
    ) -> EventSourceMappingConfiguration:
2131
        return self.create_event_source_mapping_v2(context, request)
1✔
2132

2133
    def create_event_source_mapping_v2(
1✔
2134
        self,
2135
        context: RequestContext,
2136
        request: CreateEventSourceMappingRequest,
2137
    ) -> EventSourceMappingConfiguration:
2138
        # Validations
2139
        function_arn, function_name, state, function_version, function_role = (
1✔
2140
            self.validate_event_source_mapping(context, request)
2141
        )
2142

2143
        esm_config = EsmConfigFactory(request, context, function_arn).get_esm_config()
1✔
2144

2145
        # Copy esm_config to avoid a race condition with potential async update in the store
2146
        state.event_source_mappings[esm_config["UUID"]] = esm_config.copy()
1✔
2147
        enabled = request.get("Enabled", True)
1✔
2148
        # TODO: check for potential async race condition update -> think about locking
2149
        esm_worker = EsmWorkerFactory(esm_config, function_role, enabled).get_esm_worker()
1✔
2150
        self.esm_workers[esm_worker.uuid] = esm_worker
1✔
2151
        # TODO: check StateTransitionReason, LastModified, LastProcessingResult (concurrent updates requires locking!)
2152
        if tags := request.get("Tags"):
1✔
2153
            self._store_tags(esm_config.get("EventSourceMappingArn"), tags)
1✔
2154
        esm_worker.create()
1✔
2155
        return esm_config
1✔
2156

2157
    def validate_event_source_mapping(self, context, request):
1✔
2158
        # TODO: test whether stream ARNs are valid sources for Pipes or ESM or whether only DynamoDB table ARNs work
2159
        # TODO: Validate MaxRecordAgeInSeconds (i.e cannot subceed 60s but can be -1) and MaxRetryAttempts parameters.
2160
        # See https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumrecordageinseconds
2161
        is_create_esm_request = context.operation.name == self.create_event_source_mapping.operation
1✔
2162

2163
        if destination_config := request.get("DestinationConfig"):
1✔
2164
            if "OnSuccess" in destination_config:
1✔
2165
                raise InvalidParameterValueException(
1✔
2166
                    "Unsupported DestinationConfig parameter for given event source mapping type.",
2167
                    Type="User",
2168
                )
2169

2170
        service = None
1✔
2171
        if "SelfManagedEventSource" in request:
1✔
UNCOV
2172
            service = "kafka"
×
UNCOV
2173
            if "SourceAccessConfigurations" not in request:
×
UNCOV
2174
                raise InvalidParameterValueException(
×
2175
                    "Required 'sourceAccessConfigurations' parameter is missing.", Type="User"
2176
                )
2177
        if service is None and "EventSourceArn" not in request:
1✔
2178
            raise InvalidParameterValueException("Unrecognized event source.", Type="User")
1✔
2179
        if service is None:
1✔
2180
            service = extract_service_from_arn(request["EventSourceArn"])
1✔
2181

2182
        batch_size = api_utils.validate_and_set_batch_size(service, request.get("BatchSize"))
1✔
2183
        if service in ["dynamodb", "kinesis"]:
1✔
2184
            starting_position = request.get("StartingPosition")
1✔
2185
            if not starting_position:
1✔
2186
                raise InvalidParameterValueException(
1✔
2187
                    "1 validation error detected: Value null at 'startingPosition' failed to satisfy constraint: Member must not be null.",
2188
                    Type="User",
2189
                )
2190

2191
            if starting_position not in KinesisStreamStartPosition.__members__:
1✔
2192
                raise ValidationException(
1✔
2193
                    f"1 validation error detected: Value '{starting_position}' at 'startingPosition' failed to satisfy constraint: Member must satisfy enum value set: [LATEST, AT_TIMESTAMP, TRIM_HORIZON]"
2194
                )
2195
            # AT_TIMESTAMP is not allowed for DynamoDB Streams
2196
            elif (
1✔
2197
                service == "dynamodb"
2198
                and starting_position not in DynamoDBStreamStartPosition.__members__
2199
            ):
2200
                raise InvalidParameterValueException(
1✔
2201
                    f"Unsupported starting position for arn type: {request['EventSourceArn']}",
2202
                    Type="User",
2203
                )
2204

2205
        if service in ["sqs", "sqs-fifo"]:
1✔
2206
            if batch_size > 10 and request.get("MaximumBatchingWindowInSeconds", 0) == 0:
1✔
2207
                raise InvalidParameterValueException(
1✔
2208
                    "Maximum batch window in seconds must be greater than 0 if maximum batch size is greater than 10",
2209
                    Type="User",
2210
                )
2211

2212
        if (filter_criteria := request.get("FilterCriteria")) is not None:
1✔
2213
            for filter_ in filter_criteria.get("Filters", []):
1✔
2214
                pattern_str = filter_.get("Pattern")
1✔
2215
                if not pattern_str or not isinstance(pattern_str, str):
1✔
UNCOV
2216
                    raise InvalidParameterValueException(
×
2217
                        "Invalid filter pattern definition.", Type="User"
2218
                    )
2219

2220
                if not validate_event_pattern(pattern_str):
1✔
2221
                    raise InvalidParameterValueException(
1✔
2222
                        "Invalid filter pattern definition.", Type="User"
2223
                    )
2224

2225
        # Can either have a FunctionName (i.e CreateEventSourceMapping request) or
2226
        # an internal EventSourceMappingConfiguration representation
2227
        request_function_name = request.get("FunctionName") or request.get("FunctionArn")
1✔
2228
        # can be either a partial arn or a full arn for the version/alias
2229
        function_name, qualifier, account, region = function_locators_from_arn(
1✔
2230
            request_function_name
2231
        )
2232
        # TODO: validate `context.region` vs. `region(request["FunctionName"])` vs. `region(request["EventSourceArn"])`
2233
        account = account or context.account_id
1✔
2234
        region = region or context.region
1✔
2235
        state = lambda_stores[account][region]
1✔
2236
        fn = state.functions.get(function_name)
1✔
2237
        if not fn:
1✔
2238
            raise InvalidParameterValueException("Function does not exist", Type="User")
1✔
2239

2240
        if qualifier:
1✔
2241
            # make sure the function version/alias exists
2242
            if api_utils.qualifier_is_alias(qualifier):
1✔
2243
                fn_alias = fn.aliases.get(qualifier)
1✔
2244
                if not fn_alias:
1✔
UNCOV
2245
                    raise Exception("unknown alias")  # TODO: cover via test
×
2246
            elif api_utils.qualifier_is_version(qualifier):
1✔
2247
                fn_version = fn.versions.get(qualifier)
1✔
2248
                if not fn_version:
1✔
UNCOV
2249
                    raise Exception("unknown version")  # TODO: cover via test
×
2250
            elif qualifier == "$LATEST":
1✔
2251
                pass
1✔
2252
            else:
UNCOV
2253
                raise Exception("invalid functionname")  # TODO: cover via test
×
2254
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account, region)
1✔
2255

2256
        else:
2257
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account, region)
1✔
2258

2259
        function_version = get_function_version_from_arn(fn_arn)
1✔
2260
        function_role = function_version.config.role
1✔
2261

2262
        if source_arn := request.get("EventSourceArn"):
1✔
2263
            self.check_service_resource_exists(service, source_arn, fn_arn, function_role)
1✔
2264
        # Check we are validating a CreateEventSourceMapping request
2265
        if is_create_esm_request:
1✔
2266

2267
            def _get_mapping_sources(mapping: dict[str, Any]) -> list[str]:
1✔
2268
                if event_source_arn := mapping.get("EventSourceArn"):
1✔
2269
                    return [event_source_arn]
1✔
UNCOV
2270
                return (
×
2271
                    mapping.get("SelfManagedEventSource", {})
2272
                    .get("Endpoints", {})
2273
                    .get("KAFKA_BOOTSTRAP_SERVERS", [])
2274
                )
2275

2276
            # check for event source duplicates
2277
            # TODO: currently validated for sqs, kinesis, and dynamodb
2278
            service_id = load_service(service).service_id
1✔
2279
            for uuid, mapping in state.event_source_mappings.items():
1✔
2280
                mapping_sources = _get_mapping_sources(mapping)
1✔
2281
                request_sources = _get_mapping_sources(request)
1✔
2282
                if mapping["FunctionArn"] == fn_arn and (
1✔
2283
                    set(mapping_sources).intersection(request_sources)
2284
                ):
2285
                    if service == "sqs":
1✔
2286
                        # *shakes fist at SQS*
2287
                        raise ResourceConflictException(
1✔
2288
                            f'An event source mapping with {service_id} arn (" {mapping["EventSourceArn"]} ") '
2289
                            f'and function (" {function_name} ") already exists. Please update or delete the '
2290
                            f"existing mapping with UUID {uuid}",
2291
                            Type="User",
2292
                        )
2293
                    elif service == "kafka":
1✔
UNCOV
2294
                        if set(mapping["Topics"]).intersection(request["Topics"]):
×
UNCOV
2295
                            raise ResourceConflictException(
×
2296
                                f'An event source mapping with event source ("{",".join(request_sources)}"), '
2297
                                f'function ("{fn_arn}"), '
2298
                                f'topics ("{",".join(request["Topics"])}") already exists. Please update or delete the '
2299
                                f"existing mapping with UUID {uuid}",
2300
                                Type="User",
2301
                            )
2302
                    else:
2303
                        raise ResourceConflictException(
1✔
2304
                            f'The event source arn (" {mapping["EventSourceArn"]} ") and function '
2305
                            f'(" {function_name} ") provided mapping already exists. Please update or delete the '
2306
                            f"existing mapping with UUID {uuid}",
2307
                            Type="User",
2308
                        )
2309
        return fn_arn, function_name, state, function_version, function_role
1✔
2310

2311
    @handler("UpdateEventSourceMapping", expand=False)
1✔
2312
    def update_event_source_mapping(
1✔
2313
        self,
2314
        context: RequestContext,
2315
        request: UpdateEventSourceMappingRequest,
2316
    ) -> EventSourceMappingConfiguration:
2317
        return self.update_event_source_mapping_v2(context, request)
1✔
2318

2319
    def update_event_source_mapping_v2(
1✔
2320
        self,
2321
        context: RequestContext,
2322
        request: UpdateEventSourceMappingRequest,
2323
    ) -> EventSourceMappingConfiguration:
2324
        # TODO: test and implement this properly (quite complex with many validations and limitations!)
2325
        LOG.warning(
1✔
2326
            "Updating Lambda Event Source Mapping is in experimental state and not yet fully tested."
2327
        )
2328
        state = lambda_stores[context.account_id][context.region]
1✔
2329
        request_data = {**request}
1✔
2330
        uuid = request_data.pop("UUID", None)
1✔
2331
        if not uuid:
1✔
UNCOV
2332
            raise ResourceNotFoundException(
×
2333
                "The resource you requested does not exist.", Type="User"
2334
            )
2335
        old_event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2336
        esm_worker = self.esm_workers.get(uuid)
1✔
2337
        if old_event_source_mapping is None or esm_worker is None:
1✔
2338
            raise ResourceNotFoundException(
1✔
2339
                "The resource you requested does not exist.", Type="User"
2340
            )  # TODO: test?
2341

2342
        # normalize values to overwrite
2343
        event_source_mapping = old_event_source_mapping | request_data
1✔
2344

2345
        temp_params = {}  # values only set for the returned response, not saved internally (e.g. transient state)
1✔
2346

2347
        # Validate the newly updated ESM object. We ignore the output here since we only care whether an Exception is raised.
2348
        function_arn, _, _, function_version, function_role = self.validate_event_source_mapping(
1✔
2349
            context, event_source_mapping
2350
        )
2351

2352
        # remove the FunctionName field
2353
        event_source_mapping.pop("FunctionName", None)
1✔
2354

2355
        if function_arn:
1✔
2356
            event_source_mapping["FunctionArn"] = function_arn
1✔
2357

2358
        # Only apply update if the desired state differs
2359
        enabled = request.get("Enabled")
1✔
2360
        if enabled is not None:
1✔
2361
            if enabled and old_event_source_mapping["State"] != EsmState.ENABLED:
1✔
2362
                event_source_mapping["State"] = EsmState.ENABLING
1✔
2363
            # TODO: What happens when trying to update during an update or failed state?!
2364
            elif not enabled and old_event_source_mapping["State"] == EsmState.ENABLED:
1✔
2365
                event_source_mapping["State"] = EsmState.DISABLING
1✔
2366
        else:
2367
            event_source_mapping["State"] = EsmState.UPDATING
1✔
2368

2369
        # To ensure parity, certain responses need to be immediately returned
2370
        temp_params["State"] = event_source_mapping["State"]
1✔
2371

2372
        state.event_source_mappings[uuid] = event_source_mapping
1✔
2373

2374
        # TODO: Currently, we re-create the entire ESM worker. Look into approach with better performance.
2375
        worker_factory = EsmWorkerFactory(
1✔
2376
            event_source_mapping, function_role, request.get("Enabled", esm_worker.enabled)
2377
        )
2378

2379
        # Get a new ESM worker object but do not active it, since the factory holds all logic for creating new worker from configuration.
2380
        updated_esm_worker = worker_factory.get_esm_worker()
1✔
2381
        self.esm_workers[uuid] = updated_esm_worker
1✔
2382

2383
        # We should stop() the worker since the delete() will remove the ESM from the state mapping.
2384
        esm_worker.stop()
1✔
2385
        # This will either create an EsmWorker in the CREATING state if enabled. Otherwise, the DISABLING state is set.
2386
        updated_esm_worker.create()
1✔
2387

2388
        return {**event_source_mapping, **temp_params}
1✔
2389

2390
    def delete_event_source_mapping(
1✔
2391
        self, context: RequestContext, uuid: String, **kwargs
2392
    ) -> EventSourceMappingConfiguration:
2393
        state = lambda_stores[context.account_id][context.region]
1✔
2394
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2395
        if not event_source_mapping:
1✔
2396
            raise ResourceNotFoundException(
1✔
2397
                "The resource you requested does not exist.", Type="User"
2398
            )
2399
        esm = state.event_source_mappings[uuid]
1✔
2400
        # TODO: add proper locking
2401
        esm_worker = self.esm_workers.pop(uuid, None)
1✔
2402
        # Asynchronous delete in v2
2403
        if not esm_worker:
1✔
UNCOV
2404
            raise ResourceNotFoundException(
×
2405
                "The resource you requested does not exist.", Type="User"
2406
            )
2407
        esm_worker.delete()
1✔
2408
        return {**esm, "State": EsmState.DELETING}
1✔
2409

2410
    def get_event_source_mapping(
1✔
2411
        self, context: RequestContext, uuid: String, **kwargs
2412
    ) -> EventSourceMappingConfiguration:
2413
        state = lambda_stores[context.account_id][context.region]
1✔
2414
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2415
        if not event_source_mapping:
1✔
2416
            raise ResourceNotFoundException(
1✔
2417
                "The resource you requested does not exist.", Type="User"
2418
            )
2419
        esm_worker = self.esm_workers.get(uuid)
1✔
2420
        if not esm_worker:
1✔
2421
            raise ResourceNotFoundException(
×
2422
                "The resource you requested does not exist.", Type="User"
2423
            )
2424
        event_source_mapping["State"] = esm_worker.current_state
1✔
2425
        event_source_mapping["StateTransitionReason"] = esm_worker.state_transition_reason
1✔
2426
        return event_source_mapping
1✔
2427

2428
    def list_event_source_mappings(
1✔
2429
        self,
2430
        context: RequestContext,
2431
        event_source_arn: Arn = None,
2432
        function_name: FunctionName = None,
2433
        marker: String = None,
2434
        max_items: MaxListItems = None,
2435
        **kwargs,
2436
    ) -> ListEventSourceMappingsResponse:
2437
        state = lambda_stores[context.account_id][context.region]
1✔
2438

2439
        esms = state.event_source_mappings.values()
1✔
2440
        # TODO: update and test State and StateTransitionReason for ESM v2
2441

2442
        if event_source_arn:  # TODO: validate pattern
1✔
2443
            esms = [e for e in esms if e.get("EventSourceArn") == event_source_arn]
1✔
2444

2445
        if function_name:
1✔
2446
            esms = [e for e in esms if function_name in e["FunctionArn"]]
1✔
2447

2448
        esms = PaginatedList(esms)
1✔
2449
        page, token = esms.get_page(
1✔
2450
            lambda x: x["UUID"],
2451
            marker,
2452
            max_items,
2453
        )
2454
        return ListEventSourceMappingsResponse(EventSourceMappings=page, NextMarker=token)
1✔
2455

2456
    def get_source_type_from_request(self, request: dict[str, Any]) -> str:
1✔
UNCOV
2457
        if event_source_arn := request.get("EventSourceArn", ""):
×
UNCOV
2458
            service = extract_service_from_arn(event_source_arn)
×
UNCOV
2459
            if service == "sqs" and "fifo" in event_source_arn:
×
UNCOV
2460
                service = "sqs-fifo"
×
UNCOV
2461
            return service
×
UNCOV
2462
        elif request.get("SelfManagedEventSource"):
×
UNCOV
2463
            return "kafka"
×
2464

2465
    # =======================================
2466
    # ============ FUNCTION URLS ============
2467
    # =======================================
2468

2469
    @staticmethod
1✔
2470
    def _validate_qualifier(qualifier: str) -> None:
1✔
2471
        if qualifier == "$LATEST" or (qualifier and api_utils.qualifier_is_version(qualifier)):
1✔
2472
            raise ValidationException(
1✔
2473
                f"1 validation error detected: Value '{qualifier}' at 'qualifier' failed to satisfy constraint: Member must satisfy regular expression pattern: ((?!^\\d+$)^[0-9a-zA-Z-_]+$)"
2474
            )
2475

2476
    @staticmethod
1✔
2477
    def _validate_invoke_mode(invoke_mode: str) -> None:
1✔
2478
        if invoke_mode and invoke_mode not in [InvokeMode.BUFFERED, InvokeMode.RESPONSE_STREAM]:
1✔
2479
            raise ValidationException(
1✔
2480
                f"1 validation error detected: Value '{invoke_mode}' at 'invokeMode' failed to satisfy constraint: Member must satisfy enum value set: [RESPONSE_STREAM, BUFFERED]"
2481
            )
2482
        if invoke_mode == InvokeMode.RESPONSE_STREAM:
1✔
2483
            # TODO should we actually fail for setting RESPONSE_STREAM?
2484
            #  It should trigger InvokeWithResponseStream which is not implemented
2485
            LOG.warning(
1✔
2486
                "The invokeMode 'RESPONSE_STREAM' is not yet supported on LocalStack. The property is only mocked, the execution will still be 'BUFFERED'"
2487
            )
2488

2489
    # TODO: what happens if function state is not active?
2490
    def create_function_url_config(
1✔
2491
        self,
2492
        context: RequestContext,
2493
        function_name: FunctionName,
2494
        auth_type: FunctionUrlAuthType,
2495
        qualifier: FunctionUrlQualifier = None,
2496
        cors: Cors = None,
2497
        invoke_mode: InvokeMode = None,
2498
        **kwargs,
2499
    ) -> CreateFunctionUrlConfigResponse:
2500
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2501
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2502
            function_name, qualifier, context
2503
        )
2504
        state = lambda_stores[account_id][region]
1✔
2505
        self._validate_qualifier(qualifier)
1✔
2506
        self._validate_invoke_mode(invoke_mode)
1✔
2507

2508
        fn = state.functions.get(function_name)
1✔
2509
        if fn is None:
1✔
2510
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2511

2512
        url_config = fn.function_url_configs.get(qualifier or "$LATEST")
1✔
2513
        if url_config:
1✔
2514
            raise ResourceConflictException(
1✔
2515
                f"Failed to create function url config for [functionArn = {url_config.function_arn}]. Error message:  FunctionUrlConfig exists for this Lambda function",
2516
                Type="User",
2517
            )
2518

2519
        if qualifier and qualifier != "$LATEST" and qualifier not in fn.aliases:
1✔
2520
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2521

2522
        normalized_qualifier = qualifier or "$LATEST"
1✔
2523

2524
        function_arn = (
1✔
2525
            api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
2526
            if qualifier
2527
            else api_utils.unqualified_lambda_arn(function_name, account_id, region)
2528
        )
2529

2530
        custom_id: str | None = None
1✔
2531

2532
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
2533
        if TAG_KEY_CUSTOM_URL in tags:
1✔
2534
            # Note: I really wanted to add verification here that the
2535
            # url_id is unique, so we could surface that to the user ASAP.
2536
            # However, it seems like that information isn't available yet,
2537
            # since (as far as I can tell) we call
2538
            # self.router.register_routes() once, in a single shot, for all
2539
            # of the routes -- and we need to verify that it's unique not
2540
            # just for this particular lambda function, but for the entire
2541
            # lambda provider. Therefore... that idea proved non-trivial!
2542
            custom_id_tag_value = (
1✔
2543
                f"{tags[TAG_KEY_CUSTOM_URL]}-{qualifier}" if qualifier else tags[TAG_KEY_CUSTOM_URL]
2544
            )
2545
            if TAG_KEY_CUSTOM_URL_VALIDATOR.match(custom_id_tag_value):
1✔
2546
                custom_id = custom_id_tag_value
1✔
2547

2548
            else:
2549
                # Note: we're logging here instead of raising to prioritize
2550
                # strict parity with AWS over the localstack-only custom_id
2551
                LOG.warning(
1✔
2552
                    "Invalid custom ID tag value for lambda URL (%s=%s). "
2553
                    "Replaced with default (random id)",
2554
                    TAG_KEY_CUSTOM_URL,
2555
                    custom_id_tag_value,
2556
                )
2557

2558
        # The url_id is the subdomain used for the URL we're creating. This
2559
        # is either created randomly (as in AWS), or can be passed as a tag
2560
        # to the lambda itself (localstack-only).
2561
        url_id: str
2562
        if custom_id is None:
1✔
2563
            url_id = api_utils.generate_random_url_id()
1✔
2564
        else:
2565
            url_id = custom_id
1✔
2566

2567
        host_definition = localstack_host(custom_port=config.GATEWAY_LISTEN[0].port)
1✔
2568
        fn.function_url_configs[normalized_qualifier] = FunctionUrlConfig(
1✔
2569
            function_arn=function_arn,
2570
            function_name=function_name,
2571
            cors=cors,
2572
            url_id=url_id,
2573
            url=f"http://{url_id}.lambda-url.{context.region}.{host_definition.host_and_port()}/",  # TODO: https support
2574
            auth_type=auth_type,
2575
            creation_time=api_utils.generate_lambda_date(),
2576
            last_modified_time=api_utils.generate_lambda_date(),
2577
            invoke_mode=invoke_mode,
2578
        )
2579

2580
        # persist and start URL
2581
        # TODO: implement URL invoke
2582
        api_url_config = api_utils.map_function_url_config(
1✔
2583
            fn.function_url_configs[normalized_qualifier]
2584
        )
2585

2586
        return CreateFunctionUrlConfigResponse(
1✔
2587
            FunctionUrl=api_url_config["FunctionUrl"],
2588
            FunctionArn=api_url_config["FunctionArn"],
2589
            AuthType=api_url_config["AuthType"],
2590
            Cors=api_url_config["Cors"],
2591
            CreationTime=api_url_config["CreationTime"],
2592
            InvokeMode=api_url_config["InvokeMode"],
2593
        )
2594

2595
    def get_function_url_config(
1✔
2596
        self,
2597
        context: RequestContext,
2598
        function_name: FunctionName,
2599
        qualifier: FunctionUrlQualifier = None,
2600
        **kwargs,
2601
    ) -> GetFunctionUrlConfigResponse:
2602
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2603
        state = lambda_stores[account_id][region]
1✔
2604

2605
        fn_name, qualifier = api_utils.get_name_and_qualifier(function_name, qualifier, context)
1✔
2606

2607
        self._validate_qualifier(qualifier)
1✔
2608

2609
        resolved_fn = state.functions.get(fn_name)
1✔
2610
        if not resolved_fn:
1✔
2611
            raise ResourceNotFoundException(
1✔
2612
                "The resource you requested does not exist.", Type="User"
2613
            )
2614

2615
        qualifier = qualifier or "$LATEST"
1✔
2616
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2617
        if not url_config:
1✔
2618
            raise ResourceNotFoundException(
1✔
2619
                "The resource you requested does not exist.", Type="User"
2620
            )
2621

2622
        return api_utils.map_function_url_config(url_config)
1✔
2623

2624
    def update_function_url_config(
1✔
2625
        self,
2626
        context: RequestContext,
2627
        function_name: FunctionName,
2628
        qualifier: FunctionUrlQualifier = None,
2629
        auth_type: FunctionUrlAuthType = None,
2630
        cors: Cors = None,
2631
        invoke_mode: InvokeMode = None,
2632
        **kwargs,
2633
    ) -> UpdateFunctionUrlConfigResponse:
2634
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2635
        state = lambda_stores[account_id][region]
1✔
2636

2637
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2638
            function_name, qualifier, context
2639
        )
2640
        self._validate_qualifier(qualifier)
1✔
2641
        self._validate_invoke_mode(invoke_mode)
1✔
2642

2643
        fn = state.functions.get(function_name)
1✔
2644
        if not fn:
1✔
2645
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2646

2647
        normalized_qualifier = qualifier or "$LATEST"
1✔
2648

2649
        if (
1✔
2650
            api_utils.qualifier_is_alias(normalized_qualifier)
2651
            and normalized_qualifier not in fn.aliases
2652
        ):
2653
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2654

2655
        url_config = fn.function_url_configs.get(normalized_qualifier)
1✔
2656
        if not url_config:
1✔
2657
            raise ResourceNotFoundException(
1✔
2658
                "The resource you requested does not exist.", Type="User"
2659
            )
2660

2661
        changes = {
1✔
2662
            "last_modified_time": api_utils.generate_lambda_date(),
2663
            **({"cors": cors} if cors is not None else {}),
2664
            **({"auth_type": auth_type} if auth_type is not None else {}),
2665
        }
2666

2667
        if invoke_mode:
1✔
2668
            changes["invoke_mode"] = invoke_mode
1✔
2669

2670
        new_url_config = dataclasses.replace(url_config, **changes)
1✔
2671
        fn.function_url_configs[normalized_qualifier] = new_url_config
1✔
2672

2673
        return UpdateFunctionUrlConfigResponse(
1✔
2674
            FunctionUrl=new_url_config.url,
2675
            FunctionArn=new_url_config.function_arn,
2676
            AuthType=new_url_config.auth_type,
2677
            Cors=new_url_config.cors,
2678
            CreationTime=new_url_config.creation_time,
2679
            LastModifiedTime=new_url_config.last_modified_time,
2680
            InvokeMode=new_url_config.invoke_mode,
2681
        )
2682

2683
    def delete_function_url_config(
1✔
2684
        self,
2685
        context: RequestContext,
2686
        function_name: FunctionName,
2687
        qualifier: FunctionUrlQualifier = None,
2688
        **kwargs,
2689
    ) -> None:
2690
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2691
        state = lambda_stores[account_id][region]
1✔
2692

2693
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2694
            function_name, qualifier, context
2695
        )
2696
        self._validate_qualifier(qualifier)
1✔
2697

2698
        resolved_fn = state.functions.get(function_name)
1✔
2699
        if not resolved_fn:
1✔
2700
            raise ResourceNotFoundException(
1✔
2701
                "The resource you requested does not exist.", Type="User"
2702
            )
2703

2704
        qualifier = qualifier or "$LATEST"
1✔
2705
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2706
        if not url_config:
1✔
2707
            raise ResourceNotFoundException(
1✔
2708
                "The resource you requested does not exist.", Type="User"
2709
            )
2710

2711
        del resolved_fn.function_url_configs[qualifier]
1✔
2712

2713
    def list_function_url_configs(
1✔
2714
        self,
2715
        context: RequestContext,
2716
        function_name: FunctionName,
2717
        marker: String = None,
2718
        max_items: MaxItems = None,
2719
        **kwargs,
2720
    ) -> ListFunctionUrlConfigsResponse:
2721
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2722
        state = lambda_stores[account_id][region]
1✔
2723

2724
        fn_name = api_utils.get_function_name(function_name, context)
1✔
2725
        resolved_fn = state.functions.get(fn_name)
1✔
2726
        if not resolved_fn:
1✔
2727
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2728

2729
        url_configs = [
1✔
2730
            api_utils.map_function_url_config(fn_conf)
2731
            for fn_conf in resolved_fn.function_url_configs.values()
2732
        ]
2733
        url_configs = PaginatedList(url_configs)
1✔
2734
        page, token = url_configs.get_page(
1✔
2735
            lambda url_config: url_config["FunctionArn"],
2736
            marker,
2737
            max_items,
2738
        )
2739
        url_configs = page
1✔
2740
        return ListFunctionUrlConfigsResponse(FunctionUrlConfigs=url_configs, NextMarker=token)
1✔
2741

2742
    # =======================================
2743
    # ============  Permissions  ============
2744
    # =======================================
2745

2746
    @handler("AddPermission", expand=False)
1✔
2747
    def add_permission(
1✔
2748
        self,
2749
        context: RequestContext,
2750
        request: AddPermissionRequest,
2751
    ) -> AddPermissionResponse:
2752
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2753
            request.get("FunctionName"), request.get("Qualifier"), context
2754
        )
2755

2756
        # validate qualifier
2757
        if qualifier is not None:
1✔
2758
            self._validate_qualifier_expression(qualifier)
1✔
2759
            if qualifier == "$LATEST":
1✔
2760
                raise InvalidParameterValueException(
1✔
2761
                    "We currently do not support adding policies for $LATEST.", Type="User"
2762
                )
2763
        account_id, region = api_utils.get_account_and_region(request.get("FunctionName"), context)
1✔
2764

2765
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2766
        resolved_qualifier, fn_arn = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2767

2768
        revision_id = request.get("RevisionId")
1✔
2769
        if revision_id:
1✔
2770
            fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2771
            if revision_id != fn_revision_id:
1✔
2772
                raise PreconditionFailedException(
1✔
2773
                    "The Revision Id provided does not match the latest Revision Id. "
2774
                    "Call the GetPolicy API to retrieve the latest Revision Id",
2775
                    Type="User",
2776
                )
2777

2778
        request_sid = request["StatementId"]
1✔
2779
        if not bool(STATEMENT_ID_REGEX.match(request_sid)):
1✔
2780
            raise ValidationException(
1✔
2781
                f"1 validation error detected: Value '{request_sid}' at 'statementId' failed to satisfy constraint: Member must satisfy regular expression pattern: ([a-zA-Z0-9-_]+)"
2782
            )
2783
        # check for an already existing policy and any conflicts in existing statements
2784
        existing_policy = resolved_fn.permissions.get(resolved_qualifier)
1✔
2785
        if existing_policy:
1✔
2786
            if request_sid in [s["Sid"] for s in existing_policy.policy.Statement]:
1✔
2787
                # uniqueness scope: statement id needs to be unique per qualified function ($LATEST, version, or alias)
2788
                # Counterexample: the same sid can exist within $LATEST, version, and alias
2789
                raise ResourceConflictException(
1✔
2790
                    f"The statement id ({request_sid}) provided already exists. Please provide a new statement id, or remove the existing statement.",
2791
                    Type="User",
2792
                )
2793

2794
        permission_statement = api_utils.build_statement(
1✔
2795
            partition=context.partition,
2796
            resource_arn=fn_arn,
2797
            statement_id=request["StatementId"],
2798
            action=request["Action"],
2799
            principal=request["Principal"],
2800
            source_arn=request.get("SourceArn"),
2801
            source_account=request.get("SourceAccount"),
2802
            principal_org_id=request.get("PrincipalOrgID"),
2803
            event_source_token=request.get("EventSourceToken"),
2804
            auth_type=request.get("FunctionUrlAuthType"),
2805
        )
2806
        new_policy = existing_policy
1✔
2807
        if not existing_policy:
1✔
2808
            new_policy = FunctionResourcePolicy(
1✔
2809
                policy=ResourcePolicy(Version="2012-10-17", Id="default", Statement=[])
2810
            )
2811
        new_policy.policy.Statement.append(permission_statement)
1✔
2812
        if not existing_policy:
1✔
2813
            resolved_fn.permissions[resolved_qualifier] = new_policy
1✔
2814

2815
        # Update revision id of alias or version
2816
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2817
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2818
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2819
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2820
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
1✔
2821
        # Assumes that a non-alias is a version
2822
        else:
2823
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2824
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2825
                resolved_version, config=dataclasses.replace(resolved_version.config)
2826
            )
2827
        return AddPermissionResponse(Statement=json.dumps(permission_statement))
1✔
2828

2829
    def remove_permission(
1✔
2830
        self,
2831
        context: RequestContext,
2832
        function_name: NamespacedFunctionName,
2833
        statement_id: NamespacedStatementId,
2834
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
2835
        revision_id: String | None = None,
2836
        **kwargs,
2837
    ) -> None:
2838
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2839
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2840
            function_name, qualifier, context
2841
        )
2842
        if qualifier is not None:
1✔
2843
            self._validate_qualifier_expression(qualifier)
1✔
2844

2845
        state = lambda_stores[account_id][region]
1✔
2846
        resolved_fn = state.functions.get(function_name)
1✔
2847
        if resolved_fn is None:
1✔
2848
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2849
            raise ResourceNotFoundException(f"No policy found for: {fn_arn}", Type="User")
1✔
2850

2851
        resolved_qualifier, _ = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2852
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2853
        if not function_permission:
1✔
2854
            raise ResourceNotFoundException(
1✔
2855
                "No policy is associated with the given resource.", Type="User"
2856
            )
2857

2858
        # try to find statement in policy and delete it
2859
        statement = None
1✔
2860
        for s in function_permission.policy.Statement:
1✔
2861
            if s["Sid"] == statement_id:
1✔
2862
                statement = s
1✔
2863
                break
1✔
2864

2865
        if not statement:
1✔
2866
            raise ResourceNotFoundException(
1✔
2867
                f"Statement {statement_id} is not found in resource policy.", Type="User"
2868
            )
2869
        fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2870
        if revision_id and revision_id != fn_revision_id:
1✔
UNCOV
2871
            raise PreconditionFailedException(
×
2872
                "The Revision Id provided does not match the latest Revision Id. "
2873
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2874
                Type="User",
2875
            )
2876
        function_permission.policy.Statement.remove(statement)
1✔
2877

2878
        # Update revision id for alias or version
2879
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2880
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2881
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
UNCOV
2882
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
×
UNCOV
2883
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
×
2884
        # Assumes that a non-alias is a version
2885
        else:
2886
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2887
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2888
                resolved_version, config=dataclasses.replace(resolved_version.config)
2889
            )
2890

2891
        # remove the policy as a whole when there's no statement left in it
2892
        if len(function_permission.policy.Statement) == 0:
1✔
2893
            del resolved_fn.permissions[resolved_qualifier]
1✔
2894

2895
    def get_policy(
1✔
2896
        self,
2897
        context: RequestContext,
2898
        function_name: NamespacedFunctionName,
2899
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
2900
        **kwargs,
2901
    ) -> GetPolicyResponse:
2902
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2903
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2904
            function_name, qualifier, context
2905
        )
2906

2907
        if qualifier is not None:
1✔
2908
            self._validate_qualifier_expression(qualifier)
1✔
2909

2910
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2911

2912
        resolved_qualifier = qualifier or "$LATEST"
1✔
2913
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2914
        if not function_permission:
1✔
2915
            raise ResourceNotFoundException(
1✔
2916
                "The resource you requested does not exist.", Type="User"
2917
            )
2918

2919
        fn_revision_id = None
1✔
2920
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2921
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2922
            fn_revision_id = resolved_alias.revision_id
1✔
2923
        # Assumes that a non-alias is a version
2924
        else:
2925
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2926
            fn_revision_id = resolved_version.config.revision_id
1✔
2927

2928
        return GetPolicyResponse(
1✔
2929
            Policy=json.dumps(dataclasses.asdict(function_permission.policy)),
2930
            RevisionId=fn_revision_id,
2931
        )
2932

2933
    # =======================================
2934
    # ========  Code signing config  ========
2935
    # =======================================
2936

2937
    def create_code_signing_config(
1✔
2938
        self,
2939
        context: RequestContext,
2940
        allowed_publishers: AllowedPublishers,
2941
        description: Description | None = None,
2942
        code_signing_policies: CodeSigningPolicies | None = None,
2943
        tags: Tags | None = None,
2944
        **kwargs,
2945
    ) -> CreateCodeSigningConfigResponse:
2946
        account = context.account_id
1✔
2947
        region = context.region
1✔
2948

2949
        state = lambda_stores[account][region]
1✔
2950
        # TODO: can there be duplicates?
2951
        csc_id = f"csc-{get_random_hex(17)}"  # e.g. 'csc-077c33b4c19e26036'
1✔
2952
        csc_arn = f"arn:{context.partition}:lambda:{region}:{account}:code-signing-config:{csc_id}"
1✔
2953
        csc = CodeSigningConfig(
1✔
2954
            csc_id=csc_id,
2955
            arn=csc_arn,
2956
            allowed_publishers=allowed_publishers,
2957
            policies=code_signing_policies,
2958
            last_modified=api_utils.generate_lambda_date(),
2959
            description=description,
2960
        )
2961
        state.code_signing_configs[csc_arn] = csc
1✔
2962
        return CreateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
2963

2964
    def put_function_code_signing_config(
1✔
2965
        self,
2966
        context: RequestContext,
2967
        code_signing_config_arn: CodeSigningConfigArn,
2968
        function_name: NamespacedFunctionName,
2969
        **kwargs,
2970
    ) -> PutFunctionCodeSigningConfigResponse:
2971
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2972
        state = lambda_stores[account_id][region]
1✔
2973
        function_name = api_utils.get_function_name(function_name, context)
1✔
2974

2975
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2976
        if not csc:
1✔
2977
            raise CodeSigningConfigNotFoundException(
1✔
2978
                f"The code signing configuration cannot be found. Check that the provided configuration is not deleted: {code_signing_config_arn}.",
2979
                Type="User",
2980
            )
2981

2982
        fn = state.functions.get(function_name)
1✔
2983
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2984
        if not fn:
1✔
2985
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
2986

2987
        fn.code_signing_config_arn = code_signing_config_arn
1✔
2988
        return PutFunctionCodeSigningConfigResponse(
1✔
2989
            CodeSigningConfigArn=code_signing_config_arn, FunctionName=function_name
2990
        )
2991

2992
    def update_code_signing_config(
1✔
2993
        self,
2994
        context: RequestContext,
2995
        code_signing_config_arn: CodeSigningConfigArn,
2996
        description: Description = None,
2997
        allowed_publishers: AllowedPublishers = None,
2998
        code_signing_policies: CodeSigningPolicies = None,
2999
        **kwargs,
3000
    ) -> UpdateCodeSigningConfigResponse:
3001
        state = lambda_stores[context.account_id][context.region]
1✔
3002
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3003
        if not csc:
1✔
3004
            raise ResourceNotFoundException(
1✔
3005
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3006
            )
3007

3008
        changes = {
1✔
3009
            **(
3010
                {"allowed_publishers": allowed_publishers} if allowed_publishers is not None else {}
3011
            ),
3012
            **({"policies": code_signing_policies} if code_signing_policies is not None else {}),
3013
            **({"description": description} if description is not None else {}),
3014
        }
3015
        new_csc = dataclasses.replace(
1✔
3016
            csc, last_modified=api_utils.generate_lambda_date(), **changes
3017
        )
3018
        state.code_signing_configs[code_signing_config_arn] = new_csc
1✔
3019

3020
        return UpdateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(new_csc))
1✔
3021

3022
    def get_code_signing_config(
1✔
3023
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
3024
    ) -> GetCodeSigningConfigResponse:
3025
        state = lambda_stores[context.account_id][context.region]
1✔
3026
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3027
        if not csc:
1✔
3028
            raise ResourceNotFoundException(
1✔
3029
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3030
            )
3031

3032
        return GetCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
3033

3034
    def get_function_code_signing_config(
1✔
3035
        self, context: RequestContext, function_name: NamespacedFunctionName, **kwargs
3036
    ) -> GetFunctionCodeSigningConfigResponse:
3037
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3038
        state = lambda_stores[account_id][region]
1✔
3039
        function_name = api_utils.get_function_name(function_name, context)
1✔
3040
        fn = state.functions.get(function_name)
1✔
3041
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
3042
        if not fn:
1✔
3043
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
3044

3045
        if fn.code_signing_config_arn:
1✔
3046
            return GetFunctionCodeSigningConfigResponse(
1✔
3047
                CodeSigningConfigArn=fn.code_signing_config_arn, FunctionName=function_name
3048
            )
3049

3050
        return GetFunctionCodeSigningConfigResponse()
1✔
3051

3052
    def delete_function_code_signing_config(
1✔
3053
        self, context: RequestContext, function_name: NamespacedFunctionName, **kwargs
3054
    ) -> None:
3055
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3056
        state = lambda_stores[account_id][region]
1✔
3057
        function_name = api_utils.get_function_name(function_name, context)
1✔
3058
        fn = state.functions.get(function_name)
1✔
3059
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
3060
        if not fn:
1✔
3061
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
3062

3063
        fn.code_signing_config_arn = None
1✔
3064

3065
    def delete_code_signing_config(
1✔
3066
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
3067
    ) -> DeleteCodeSigningConfigResponse:
3068
        state = lambda_stores[context.account_id][context.region]
1✔
3069

3070
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3071
        if not csc:
1✔
3072
            raise ResourceNotFoundException(
1✔
3073
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3074
            )
3075

3076
        del state.code_signing_configs[code_signing_config_arn]
1✔
3077

3078
        return DeleteCodeSigningConfigResponse()
1✔
3079

3080
    def list_code_signing_configs(
1✔
3081
        self,
3082
        context: RequestContext,
3083
        marker: String = None,
3084
        max_items: MaxListItems = None,
3085
        **kwargs,
3086
    ) -> ListCodeSigningConfigsResponse:
3087
        state = lambda_stores[context.account_id][context.region]
1✔
3088

3089
        cscs = [api_utils.map_csc(csc) for csc in state.code_signing_configs.values()]
1✔
3090
        cscs = PaginatedList(cscs)
1✔
3091
        page, token = cscs.get_page(
1✔
3092
            lambda csc: csc["CodeSigningConfigId"],
3093
            marker,
3094
            max_items,
3095
        )
3096
        return ListCodeSigningConfigsResponse(CodeSigningConfigs=page, NextMarker=token)
1✔
3097

3098
    def list_functions_by_code_signing_config(
1✔
3099
        self,
3100
        context: RequestContext,
3101
        code_signing_config_arn: CodeSigningConfigArn,
3102
        marker: String = None,
3103
        max_items: MaxListItems = None,
3104
        **kwargs,
3105
    ) -> ListFunctionsByCodeSigningConfigResponse:
3106
        account = context.account_id
1✔
3107
        region = context.region
1✔
3108

3109
        state = lambda_stores[account][region]
1✔
3110

3111
        if code_signing_config_arn not in state.code_signing_configs:
1✔
3112
            raise ResourceNotFoundException(
1✔
3113
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3114
            )
3115

3116
        fn_arns = [
1✔
3117
            api_utils.unqualified_lambda_arn(fn.function_name, account, region)
3118
            for fn in state.functions.values()
3119
            if fn.code_signing_config_arn == code_signing_config_arn
3120
        ]
3121

3122
        cscs = PaginatedList(fn_arns)
1✔
3123
        page, token = cscs.get_page(
1✔
3124
            lambda x: x,
3125
            marker,
3126
            max_items,
3127
        )
3128
        return ListFunctionsByCodeSigningConfigResponse(FunctionArns=page, NextMarker=token)
1✔
3129

3130
    # =======================================
3131
    # =========  Account Settings   =========
3132
    # =======================================
3133

3134
    # CAVE: these settings & usages are *per* region!
3135
    # Lambda quotas: https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
3136
    def get_account_settings(self, context: RequestContext, **kwargs) -> GetAccountSettingsResponse:
1✔
3137
        state = lambda_stores[context.account_id][context.region]
1✔
3138

3139
        fn_count = 0
1✔
3140
        code_size_sum = 0
1✔
3141
        reserved_concurrency_sum = 0
1✔
3142
        for fn in state.functions.values():
1✔
3143
            fn_count += 1
1✔
3144
            for fn_version in fn.versions.values():
1✔
3145
                # Image-based Lambdas do not have a code attribute and count against the ECR quotas instead
3146
                if fn_version.config.package_type == PackageType.Zip:
1✔
3147
                    code_size_sum += fn_version.config.code.code_size
1✔
3148
            if fn.reserved_concurrent_executions is not None:
1✔
3149
                reserved_concurrency_sum += fn.reserved_concurrent_executions
1✔
3150
            for c in fn.provisioned_concurrency_configs.values():
1✔
3151
                reserved_concurrency_sum += c.provisioned_concurrent_executions
1✔
3152
        for layer in state.layers.values():
1✔
3153
            for layer_version in layer.layer_versions.values():
1✔
3154
                code_size_sum += layer_version.code.code_size
1✔
3155
        return GetAccountSettingsResponse(
1✔
3156
            AccountLimit=AccountLimit(
3157
                TotalCodeSize=config.LAMBDA_LIMITS_TOTAL_CODE_SIZE,
3158
                CodeSizeZipped=config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED,
3159
                CodeSizeUnzipped=config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED,
3160
                ConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS,
3161
                UnreservedConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS
3162
                - reserved_concurrency_sum,
3163
            ),
3164
            AccountUsage=AccountUsage(
3165
                TotalCodeSize=code_size_sum,
3166
                FunctionCount=fn_count,
3167
            ),
3168
        )
3169

3170
    # =======================================
3171
    # ==  Provisioned Concurrency Config   ==
3172
    # =======================================
3173

3174
    def _get_provisioned_config(
1✔
3175
        self, context: RequestContext, function_name: str, qualifier: str
3176
    ) -> ProvisionedConcurrencyConfiguration | None:
3177
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3178
        state = lambda_stores[account_id][region]
1✔
3179
        function_name = api_utils.get_function_name(function_name, context)
1✔
3180
        fn = state.functions.get(function_name)
1✔
3181
        if api_utils.qualifier_is_alias(qualifier):
1✔
3182
            fn_alias = None
1✔
3183
            if fn:
1✔
3184
                fn_alias = fn.aliases.get(qualifier)
1✔
3185
            if fn_alias is None:
1✔
3186
                raise ResourceNotFoundException(
1✔
3187
                    f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3188
                    Type="User",
3189
                )
3190
        elif api_utils.qualifier_is_version(qualifier):
1✔
3191
            fn_version = None
1✔
3192
            if fn:
1✔
3193
                fn_version = fn.versions.get(qualifier)
1✔
3194
            if fn_version is None:
1✔
3195
                raise ResourceNotFoundException(
1✔
3196
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3197
                    Type="User",
3198
                )
3199

3200
        return fn.provisioned_concurrency_configs.get(qualifier)
1✔
3201

3202
    def put_provisioned_concurrency_config(
1✔
3203
        self,
3204
        context: RequestContext,
3205
        function_name: FunctionName,
3206
        qualifier: Qualifier,
3207
        provisioned_concurrent_executions: PositiveInteger,
3208
        **kwargs,
3209
    ) -> PutProvisionedConcurrencyConfigResponse:
3210
        if provisioned_concurrent_executions <= 0:
1✔
3211
            raise ValidationException(
1✔
3212
                f"1 validation error detected: Value '{provisioned_concurrent_executions}' at 'provisionedConcurrentExecutions' failed to satisfy constraint: Member must have value greater than or equal to 1"
3213
            )
3214

3215
        if qualifier == "$LATEST":
1✔
3216
            raise InvalidParameterValueException(
1✔
3217
                "Provisioned Concurrency Configs cannot be applied to unpublished function versions.",
3218
                Type="User",
3219
            )
3220
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3221
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3222
            function_name, qualifier, context
3223
        )
3224
        state = lambda_stores[account_id][region]
1✔
3225
        fn = state.functions.get(function_name)
1✔
3226

3227
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3228

3229
        if provisioned_config:  # TODO: merge?
1✔
3230
            # TODO: add a test for partial updates (if possible)
3231
            LOG.warning(
1✔
3232
                "Partial update of provisioned concurrency config is currently not supported."
3233
            )
3234

3235
        other_provisioned_sum = sum(
1✔
3236
            [
3237
                provisioned_configs.provisioned_concurrent_executions
3238
                for provisioned_qualifier, provisioned_configs in fn.provisioned_concurrency_configs.items()
3239
                if provisioned_qualifier != qualifier
3240
            ]
3241
        )
3242

3243
        if (
1✔
3244
            fn.reserved_concurrent_executions is not None
3245
            and fn.reserved_concurrent_executions
3246
            < other_provisioned_sum + provisioned_concurrent_executions
3247
        ):
3248
            raise InvalidParameterValueException(
1✔
3249
                "Requested Provisioned Concurrency should not be greater than the reservedConcurrentExecution for function",
3250
                Type="User",
3251
            )
3252

3253
        if provisioned_concurrent_executions > config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS:
1✔
3254
            raise InvalidParameterValueException(
1✔
3255
                f"Specified ConcurrentExecutions for function is greater than account's unreserved concurrency"
3256
                f" [{config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS}]."
3257
            )
3258

3259
        settings = self.get_account_settings(context)
1✔
3260
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
3261
            "UnreservedConcurrentExecutions"
3262
        ]
3263
        if (
1✔
3264
            unreserved_concurrent_executions - provisioned_concurrent_executions
3265
            < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY
3266
        ):
3267
            raise InvalidParameterValueException(
1✔
3268
                f"Specified ConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below"
3269
                f" its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
3270
            )
3271

3272
        provisioned_config = ProvisionedConcurrencyConfiguration(
1✔
3273
            provisioned_concurrent_executions, api_utils.generate_lambda_date()
3274
        )
3275
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3276

3277
        if api_utils.qualifier_is_alias(qualifier):
1✔
3278
            alias = fn.aliases.get(qualifier)
1✔
3279
            resolved_version = fn.versions.get(alias.function_version)
1✔
3280

3281
            if (
1✔
3282
                resolved_version
3283
                and fn.provisioned_concurrency_configs.get(alias.function_version) is not None
3284
            ):
3285
                raise ResourceConflictException(
1✔
3286
                    "Alias can't be used for Provisioned Concurrency configuration on an already Provisioned version",
3287
                    Type="User",
3288
                )
3289
            fn_arn = resolved_version.id.qualified_arn()
1✔
3290
        elif api_utils.qualifier_is_version(qualifier):
1✔
3291
            fn_version = fn.versions.get(qualifier)
1✔
3292

3293
            # TODO: might be useful other places, utilize
3294
            pointing_aliases = []
1✔
3295
            for alias in fn.aliases.values():
1✔
3296
                if (
1✔
3297
                    alias.function_version == qualifier
3298
                    and fn.provisioned_concurrency_configs.get(alias.name) is not None
3299
                ):
3300
                    pointing_aliases.append(alias.name)
1✔
3301
            if pointing_aliases:
1✔
3302
                raise ResourceConflictException(
1✔
3303
                    "Version is pointed by a Provisioned Concurrency alias", Type="User"
3304
                )
3305

3306
            fn_arn = fn_version.id.qualified_arn()
1✔
3307

3308
        manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3309

3310
        fn.provisioned_concurrency_configs[qualifier] = provisioned_config
1✔
3311

3312
        manager.update_provisioned_concurrency_config(
1✔
3313
            provisioned_config.provisioned_concurrent_executions
3314
        )
3315

3316
        return PutProvisionedConcurrencyConfigResponse(
1✔
3317
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3318
            AvailableProvisionedConcurrentExecutions=0,
3319
            AllocatedProvisionedConcurrentExecutions=0,
3320
            Status=ProvisionedConcurrencyStatusEnum.IN_PROGRESS,
3321
            # StatusReason=manager.provisioned_state.status_reason,
3322
            LastModified=provisioned_config.last_modified,  # TODO: does change with configuration or also with state changes?
3323
        )
3324

3325
    def get_provisioned_concurrency_config(
1✔
3326
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3327
    ) -> GetProvisionedConcurrencyConfigResponse:
3328
        if qualifier == "$LATEST":
1✔
3329
            raise InvalidParameterValueException(
1✔
3330
                "The function resource provided must be an alias or a published version.",
3331
                Type="User",
3332
            )
3333
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3334
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3335
            function_name, qualifier, context
3336
        )
3337

3338
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3339
        if not provisioned_config:
1✔
3340
            raise ProvisionedConcurrencyConfigNotFoundException(
1✔
3341
                "No Provisioned Concurrency Config found for this function", Type="User"
3342
            )
3343

3344
        # TODO: make this compatible with alias pointer migration on update
3345
        if api_utils.qualifier_is_alias(qualifier):
1✔
3346
            state = lambda_stores[account_id][region]
1✔
3347
            fn = state.functions.get(function_name)
1✔
3348
            alias = fn.aliases.get(qualifier)
1✔
3349
            fn_arn = api_utils.qualified_lambda_arn(
1✔
3350
                function_name, alias.function_version, account_id, region
3351
            )
3352
        else:
3353
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3354

3355
        ver_manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3356

3357
        return GetProvisionedConcurrencyConfigResponse(
1✔
3358
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3359
            LastModified=provisioned_config.last_modified,
3360
            AvailableProvisionedConcurrentExecutions=ver_manager.provisioned_state.available,
3361
            AllocatedProvisionedConcurrentExecutions=ver_manager.provisioned_state.allocated,
3362
            Status=ver_manager.provisioned_state.status,
3363
            StatusReason=ver_manager.provisioned_state.status_reason,
3364
        )
3365

3366
    def list_provisioned_concurrency_configs(
1✔
3367
        self,
3368
        context: RequestContext,
3369
        function_name: FunctionName,
3370
        marker: String = None,
3371
        max_items: MaxProvisionedConcurrencyConfigListItems = None,
3372
        **kwargs,
3373
    ) -> ListProvisionedConcurrencyConfigsResponse:
3374
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3375
        state = lambda_stores[account_id][region]
1✔
3376

3377
        function_name = api_utils.get_function_name(function_name, context)
1✔
3378
        fn = state.functions.get(function_name)
1✔
3379
        if fn is None:
1✔
3380
            raise ResourceNotFoundException(
1✔
3381
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
3382
                Type="User",
3383
            )
3384

3385
        configs = []
1✔
3386
        for qualifier, pc_config in fn.provisioned_concurrency_configs.items():
1✔
UNCOV
3387
            if api_utils.qualifier_is_alias(qualifier):
×
UNCOV
3388
                alias = fn.aliases.get(qualifier)
×
UNCOV
3389
                fn_arn = api_utils.qualified_lambda_arn(
×
3390
                    function_name, alias.function_version, account_id, region
3391
                )
3392
            else:
UNCOV
3393
                fn_arn = api_utils.qualified_lambda_arn(
×
3394
                    function_name, qualifier, account_id, region
3395
                )
3396

UNCOV
3397
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
3398

UNCOV
3399
            configs.append(
×
3400
                ProvisionedConcurrencyConfigListItem(
3401
                    FunctionArn=api_utils.qualified_lambda_arn(
3402
                        function_name, qualifier, account_id, region
3403
                    ),
3404
                    RequestedProvisionedConcurrentExecutions=pc_config.provisioned_concurrent_executions,
3405
                    AvailableProvisionedConcurrentExecutions=manager.provisioned_state.available,
3406
                    AllocatedProvisionedConcurrentExecutions=manager.provisioned_state.allocated,
3407
                    Status=manager.provisioned_state.status,
3408
                    StatusReason=manager.provisioned_state.status_reason,
3409
                    LastModified=pc_config.last_modified,
3410
                )
3411
            )
3412

3413
        provisioned_concurrency_configs = configs
1✔
3414
        provisioned_concurrency_configs = PaginatedList(provisioned_concurrency_configs)
1✔
3415
        page, token = provisioned_concurrency_configs.get_page(
1✔
3416
            lambda x: x,
3417
            marker,
3418
            max_items,
3419
        )
3420
        return ListProvisionedConcurrencyConfigsResponse(
1✔
3421
            ProvisionedConcurrencyConfigs=page, NextMarker=token
3422
        )
3423

3424
    def delete_provisioned_concurrency_config(
1✔
3425
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3426
    ) -> None:
3427
        if qualifier == "$LATEST":
1✔
3428
            raise InvalidParameterValueException(
1✔
3429
                "The function resource provided must be an alias or a published version.",
3430
                Type="User",
3431
            )
3432
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3433
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3434
            function_name, qualifier, context
3435
        )
3436
        state = lambda_stores[account_id][region]
1✔
3437
        fn = state.functions.get(function_name)
1✔
3438

3439
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3440
        # delete is idempotent and doesn't actually care about the provisioned concurrency config not existing
3441
        if provisioned_config:
1✔
3442
            fn.provisioned_concurrency_configs.pop(qualifier)
1✔
3443
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3444
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3445
            manager.update_provisioned_concurrency_config(0)
1✔
3446

3447
    # =======================================
3448
    # =======  Event Invoke Config   ========
3449
    # =======================================
3450

3451
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)"
3452
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]2((-gov)|(-iso(b?)))?-[a-z]+-\\d1)?:(\\d12)?:(.*)" ... (expected → actual)
3453

3454
    def _validate_destination_config(
1✔
3455
        self, store: LambdaStore, function_name: str, destination_config: DestinationConfig
3456
    ):
3457
        def _validate_destination_arn(destination_arn) -> bool:
1✔
3458
            if not api_utils.DESTINATION_ARN_PATTERN.match(destination_arn):
1✔
3459
                # technically we shouldn't handle this in the provider
3460
                raise ValidationException(
1✔
3461
                    "1 validation error detected: Value '"
3462
                    + destination_arn
3463
                    + "' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: "
3464
                    + "$|kafka://([^.]([a-zA-Z0-9\\-_.]{0,248}))|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:((eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)"
3465
                )
3466

3467
            match destination_arn.split(":")[2]:
1✔
3468
                case "lambda":
1✔
3469
                    fn_parts = api_utils.FULL_FN_ARN_PATTERN.search(destination_arn).groupdict()
1✔
3470
                    if fn_parts:
1✔
3471
                        # check if it exists
3472
                        fn = store.functions.get(fn_parts["function_name"])
1✔
3473
                        if not fn:
1✔
3474
                            raise InvalidParameterValueException(
1✔
3475
                                f"The destination ARN {destination_arn} is invalid.", Type="User"
3476
                            )
3477
                        if fn_parts["function_name"] == function_name:
1✔
3478
                            raise InvalidParameterValueException(
1✔
3479
                                "You can't specify the function as a destination for itself.",
3480
                                Type="User",
3481
                            )
3482
                case "sns" | "sqs" | "events":
1✔
3483
                    pass
1✔
3484
                case _:
1✔
3485
                    return False
1✔
3486
            return True
1✔
3487

3488
        validation_err = False
1✔
3489

3490
        failure_destination = destination_config.get("OnFailure", {}).get("Destination")
1✔
3491
        if failure_destination:
1✔
3492
            validation_err = validation_err or not _validate_destination_arn(failure_destination)
1✔
3493

3494
        success_destination = destination_config.get("OnSuccess", {}).get("Destination")
1✔
3495
        if success_destination:
1✔
3496
            validation_err = validation_err or not _validate_destination_arn(success_destination)
1✔
3497

3498
        if validation_err:
1✔
3499
            on_success_part = (
1✔
3500
                f"OnSuccess(destination={success_destination})" if success_destination else "null"
3501
            )
3502
            on_failure_part = (
1✔
3503
                f"OnFailure(destination={failure_destination})" if failure_destination else "null"
3504
            )
3505
            raise InvalidParameterValueException(
1✔
3506
                f"The provided destination config DestinationConfig(onSuccess={on_success_part}, onFailure={on_failure_part}) is invalid.",
3507
                Type="User",
3508
            )
3509

3510
    def put_function_event_invoke_config(
1✔
3511
        self,
3512
        context: RequestContext,
3513
        function_name: FunctionName,
3514
        qualifier: NumericLatestPublishedOrAliasQualifier = None,
3515
        maximum_retry_attempts: MaximumRetryAttempts = None,
3516
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3517
        destination_config: DestinationConfig = None,
3518
        **kwargs,
3519
    ) -> FunctionEventInvokeConfig:
3520
        """
3521
        Destination ARNs can be:
3522
        * SQS arn
3523
        * SNS arn
3524
        * Lambda arn
3525
        * EventBridge arn
3526

3527
        Differences between put_ and update_:
3528
            * put overwrites any existing config
3529
            * update allows changes only single values while keeping the rest of existing ones
3530
            * update fails on non-existing configs
3531

3532
        Differences between destination and DLQ
3533
            * "However, a dead-letter queue is part of a function's version-specific configuration, so it is locked in when you publish a version."
3534
            *  "On-failure destinations also support additional targets and include details about the function's response in the invocation record."
3535

3536
        """
3537
        if (
1✔
3538
            maximum_event_age_in_seconds is None
3539
            and maximum_retry_attempts is None
3540
            and destination_config is None
3541
        ):
3542
            raise InvalidParameterValueException(
1✔
3543
                "You must specify at least one of error handling or destination setting.",
3544
                Type="User",
3545
            )
3546
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3547
        state = lambda_stores[account_id][region]
1✔
3548
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3549
            function_name, qualifier, context
3550
        )
3551
        fn = state.functions.get(function_name)
1✔
3552
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3553
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3554

3555
        qualifier = qualifier or "$LATEST"
1✔
3556

3557
        # validate and normalize destination config
3558
        if destination_config:
1✔
3559
            self._validate_destination_config(state, function_name, destination_config)
1✔
3560

3561
        destination_config = DestinationConfig(
1✔
3562
            OnSuccess=OnSuccess(
3563
                Destination=(destination_config or {}).get("OnSuccess", {}).get("Destination")
3564
            ),
3565
            OnFailure=OnFailure(
3566
                Destination=(destination_config or {}).get("OnFailure", {}).get("Destination")
3567
            ),
3568
        )
3569

3570
        config = EventInvokeConfig(
1✔
3571
            function_name=function_name,
3572
            qualifier=qualifier,
3573
            maximum_event_age_in_seconds=maximum_event_age_in_seconds,
3574
            maximum_retry_attempts=maximum_retry_attempts,
3575
            last_modified=api_utils.generate_lambda_date(),
3576
            destination_config=destination_config,
3577
        )
3578
        fn.event_invoke_configs[qualifier] = config
1✔
3579

3580
        return FunctionEventInvokeConfig(
1✔
3581
            LastModified=datetime.datetime.strptime(
3582
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3583
            ),
3584
            FunctionArn=api_utils.qualified_lambda_arn(
3585
                function_name, qualifier or "$LATEST", account_id, region
3586
            ),
3587
            DestinationConfig=destination_config,
3588
            MaximumEventAgeInSeconds=maximum_event_age_in_seconds,
3589
            MaximumRetryAttempts=maximum_retry_attempts,
3590
        )
3591

3592
    def get_function_event_invoke_config(
1✔
3593
        self,
3594
        context: RequestContext,
3595
        function_name: FunctionName,
3596
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
3597
        **kwargs,
3598
    ) -> FunctionEventInvokeConfig:
3599
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3600
        state = lambda_stores[account_id][region]
1✔
3601
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3602
            function_name, qualifier, context
3603
        )
3604

3605
        qualifier = qualifier or "$LATEST"
1✔
3606
        fn = state.functions.get(function_name)
1✔
3607
        if not fn:
1✔
3608
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3609
            raise ResourceNotFoundException(
1✔
3610
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3611
            )
3612

3613
        config = fn.event_invoke_configs.get(qualifier)
1✔
3614
        if not config:
1✔
3615
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3616
            raise ResourceNotFoundException(
1✔
3617
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3618
            )
3619

3620
        return FunctionEventInvokeConfig(
1✔
3621
            LastModified=datetime.datetime.strptime(
3622
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3623
            ),
3624
            FunctionArn=api_utils.qualified_lambda_arn(
3625
                function_name, qualifier, account_id, region
3626
            ),
3627
            DestinationConfig=config.destination_config,
3628
            MaximumEventAgeInSeconds=config.maximum_event_age_in_seconds,
3629
            MaximumRetryAttempts=config.maximum_retry_attempts,
3630
        )
3631

3632
    def list_function_event_invoke_configs(
1✔
3633
        self,
3634
        context: RequestContext,
3635
        function_name: FunctionName,
3636
        marker: String = None,
3637
        max_items: MaxFunctionEventInvokeConfigListItems = None,
3638
        **kwargs,
3639
    ) -> ListFunctionEventInvokeConfigsResponse:
3640
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3641
        state = lambda_stores[account_id][region]
1✔
3642
        fn = state.functions.get(function_name)
1✔
3643
        if not fn:
1✔
3644
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3645

3646
        event_invoke_configs = [
1✔
3647
            FunctionEventInvokeConfig(
3648
                LastModified=c.last_modified,
3649
                FunctionArn=api_utils.qualified_lambda_arn(
3650
                    function_name, c.qualifier, account_id, region
3651
                ),
3652
                MaximumEventAgeInSeconds=c.maximum_event_age_in_seconds,
3653
                MaximumRetryAttempts=c.maximum_retry_attempts,
3654
                DestinationConfig=c.destination_config,
3655
            )
3656
            for c in fn.event_invoke_configs.values()
3657
        ]
3658

3659
        event_invoke_configs = PaginatedList(event_invoke_configs)
1✔
3660
        page, token = event_invoke_configs.get_page(
1✔
3661
            lambda x: x["FunctionArn"],
3662
            marker,
3663
            max_items,
3664
        )
3665
        return ListFunctionEventInvokeConfigsResponse(
1✔
3666
            FunctionEventInvokeConfigs=page, NextMarker=token
3667
        )
3668

3669
    def delete_function_event_invoke_config(
1✔
3670
        self,
3671
        context: RequestContext,
3672
        function_name: FunctionName,
3673
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
3674
        **kwargs,
3675
    ) -> None:
3676
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3677
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3678
            function_name, qualifier, context
3679
        )
3680
        state = lambda_stores[account_id][region]
1✔
3681
        fn = state.functions.get(function_name)
1✔
3682
        resolved_qualifier = qualifier or "$LATEST"
1✔
3683
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3684
        if not fn:
1✔
3685
            raise ResourceNotFoundException(
1✔
3686
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3687
            )
3688

3689
        config = fn.event_invoke_configs.get(resolved_qualifier)
1✔
3690
        if not config:
1✔
3691
            raise ResourceNotFoundException(
1✔
3692
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3693
            )
3694

3695
        del fn.event_invoke_configs[resolved_qualifier]
1✔
3696

3697
    def update_function_event_invoke_config(
1✔
3698
        self,
3699
        context: RequestContext,
3700
        function_name: FunctionName,
3701
        qualifier: NumericLatestPublishedOrAliasQualifier = None,
3702
        maximum_retry_attempts: MaximumRetryAttempts = None,
3703
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3704
        destination_config: DestinationConfig = None,
3705
        **kwargs,
3706
    ) -> FunctionEventInvokeConfig:
3707
        # like put but only update single fields via replace
3708
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3709
        state = lambda_stores[account_id][region]
1✔
3710
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3711
            function_name, qualifier, context
3712
        )
3713

3714
        if (
1✔
3715
            maximum_event_age_in_seconds is None
3716
            and maximum_retry_attempts is None
3717
            and destination_config is None
3718
        ):
UNCOV
3719
            raise InvalidParameterValueException(
×
3720
                "You must specify at least one of error handling or destination setting.",
3721
                Type="User",
3722
            )
3723

3724
        fn = state.functions.get(function_name)
1✔
3725
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3726
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3727

3728
        qualifier = qualifier or "$LATEST"
1✔
3729

3730
        config = fn.event_invoke_configs.get(qualifier)
1✔
3731
        if not config:
1✔
3732
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3733
            raise ResourceNotFoundException(
1✔
3734
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3735
            )
3736

3737
        if destination_config:
1✔
UNCOV
3738
            self._validate_destination_config(state, function_name, destination_config)
×
3739

3740
        optional_kwargs = {
1✔
3741
            k: v
3742
            for k, v in {
3743
                "destination_config": destination_config,
3744
                "maximum_retry_attempts": maximum_retry_attempts,
3745
                "maximum_event_age_in_seconds": maximum_event_age_in_seconds,
3746
            }.items()
3747
            if v is not None
3748
        }
3749

3750
        new_config = dataclasses.replace(
1✔
3751
            config, last_modified=api_utils.generate_lambda_date(), **optional_kwargs
3752
        )
3753
        fn.event_invoke_configs[qualifier] = new_config
1✔
3754

3755
        return FunctionEventInvokeConfig(
1✔
3756
            LastModified=datetime.datetime.strptime(
3757
                new_config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3758
            ),
3759
            FunctionArn=api_utils.qualified_lambda_arn(
3760
                function_name, qualifier or "$LATEST", account_id, region
3761
            ),
3762
            DestinationConfig=new_config.destination_config,
3763
            MaximumEventAgeInSeconds=new_config.maximum_event_age_in_seconds,
3764
            MaximumRetryAttempts=new_config.maximum_retry_attempts,
3765
        )
3766

3767
    # =======================================
3768
    # ======  Layer & Layer Versions  =======
3769
    # =======================================
3770

3771
    @staticmethod
1✔
3772
    def _resolve_layer(
1✔
3773
        layer_name_or_arn: str, context: RequestContext
3774
    ) -> tuple[str, str, str, str | None]:
3775
        """
3776
        Return locator attributes for a given Lambda layer.
3777

3778
        :param layer_name_or_arn: Layer name or ARN
3779
        :param context: Request context
3780
        :return: Tuple of region, account ID, layer name, layer version
3781
        """
3782
        if api_utils.is_layer_arn(layer_name_or_arn):
1✔
3783
            return api_utils.parse_layer_arn(layer_name_or_arn)
1✔
3784

3785
        return context.region, context.account_id, layer_name_or_arn, None
1✔
3786

3787
    def publish_layer_version(
1✔
3788
        self,
3789
        context: RequestContext,
3790
        layer_name: LayerName,
3791
        content: LayerVersionContentInput,
3792
        description: Description | None = None,
3793
        compatible_runtimes: CompatibleRuntimes | None = None,
3794
        license_info: LicenseInfo | None = None,
3795
        compatible_architectures: CompatibleArchitectures | None = None,
3796
        **kwargs,
3797
    ) -> PublishLayerVersionResponse:
3798
        """
3799
        On first use of a LayerName a new layer is created and for each subsequent call with the same LayerName a new version is created.
3800
        Note that there are no $LATEST versions with layers!
3801

3802
        """
3803
        account = context.account_id
1✔
3804
        region = context.region
1✔
3805

3806
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
3807
            compatible_runtimes, compatible_architectures
3808
        )
3809
        if validation_errors:
1✔
3810
            raise ValidationException(
1✔
3811
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {'; '.join(validation_errors)}"
3812
            )
3813

3814
        state = lambda_stores[account][region]
1✔
3815
        with self.create_layer_lock:
1✔
3816
            if layer_name not in state.layers:
1✔
3817
                # we don't have a version so create new layer object
3818
                # lock is required to avoid creating two v1 objects for the same name
3819
                layer = Layer(
1✔
3820
                    arn=api_utils.layer_arn(layer_name=layer_name, account=account, region=region)
3821
                )
3822
                state.layers[layer_name] = layer
1✔
3823

3824
        layer = state.layers[layer_name]
1✔
3825
        with layer.next_version_lock:
1✔
3826
            next_version = LambdaLayerVersionIdentifier(
1✔
3827
                account_id=account, region=region, layer_name=layer_name
3828
            ).generate(next_version=layer.next_version)
3829
            # When creating a layer with user defined layer version, it is possible that we
3830
            # create layer versions out of order.
3831
            # ie. a user could replicate layer v2 then layer v1. It is important to always keep the maximum possible
3832
            # value for next layer to avoid overwriting existing versions
3833
            if layer.next_version <= next_version:
1✔
3834
                # We don't need to update layer.next_version if the created version is lower than the "next in line"
3835
                layer.next_version = max(next_version, layer.next_version) + 1
1✔
3836

3837
        # creating a new layer
3838
        if content.get("ZipFile"):
1✔
3839
            code = store_lambda_archive(
1✔
3840
                archive_file=content["ZipFile"],
3841
                function_name=layer_name,
3842
                region_name=region,
3843
                account_id=account,
3844
            )
3845
        else:
3846
            code = store_s3_bucket_archive(
1✔
3847
                archive_bucket=content["S3Bucket"],
3848
                archive_key=content["S3Key"],
3849
                archive_version=content.get("S3ObjectVersion"),
3850
                function_name=layer_name,
3851
                region_name=region,
3852
                account_id=account,
3853
            )
3854

3855
        new_layer_version = LayerVersion(
1✔
3856
            layer_version_arn=api_utils.layer_version_arn(
3857
                layer_name=layer_name,
3858
                account=account,
3859
                region=region,
3860
                version=str(next_version),
3861
            ),
3862
            layer_arn=layer.arn,
3863
            version=next_version,
3864
            description=description or "",
3865
            license_info=license_info,
3866
            compatible_runtimes=compatible_runtimes,
3867
            compatible_architectures=compatible_architectures,
3868
            created=api_utils.generate_lambda_date(),
3869
            code=code,
3870
        )
3871

3872
        layer.layer_versions[str(next_version)] = new_layer_version
1✔
3873

3874
        return api_utils.map_layer_out(new_layer_version)
1✔
3875

3876
    def get_layer_version(
1✔
3877
        self,
3878
        context: RequestContext,
3879
        layer_name: LayerName,
3880
        version_number: LayerVersionNumber,
3881
        **kwargs,
3882
    ) -> GetLayerVersionResponse:
3883
        # TODO: handle layer_name as an ARN
3884

3885
        region_name, account_id, layer_name, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
3886
        state = lambda_stores[account_id][region_name]
1✔
3887

3888
        layer = state.layers.get(layer_name)
1✔
3889
        if version_number < 1:
1✔
3890
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
3891
        if layer is None:
1✔
3892
            raise ResourceNotFoundException(
1✔
3893
                "The resource you requested does not exist.", Type="User"
3894
            )
3895
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3896
        if layer_version is None:
1✔
3897
            raise ResourceNotFoundException(
1✔
3898
                "The resource you requested does not exist.", Type="User"
3899
            )
3900
        return api_utils.map_layer_out(layer_version)
1✔
3901

3902
    def get_layer_version_by_arn(
1✔
3903
        self, context: RequestContext, arn: LayerVersionArn, **kwargs
3904
    ) -> GetLayerVersionResponse:
3905
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3906
            arn, context
3907
        )
3908

3909
        if not layer_version:
1✔
3910
            raise ValidationException(
1✔
3911
                f"1 validation error detected: Value '{arn}' at 'arn' failed to satisfy constraint: Member must satisfy regular expression pattern: "
3912
                + "(arn:(aws[a-zA-Z-]*)?:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+)"
3913
            )
3914

3915
        store = lambda_stores[account_id][region_name]
1✔
3916
        if not (layers := store.layers.get(layer_name)):
1✔
3917
            raise ResourceNotFoundException(
×
3918
                "The resource you requested does not exist.", Type="User"
3919
            )
3920

3921
        layer_version = layers.layer_versions.get(layer_version)
1✔
3922

3923
        if not layer_version:
1✔
3924
            raise ResourceNotFoundException(
1✔
3925
                "The resource you requested does not exist.", Type="User"
3926
            )
3927

3928
        return api_utils.map_layer_out(layer_version)
1✔
3929

3930
    def list_layers(
1✔
3931
        self,
3932
        context: RequestContext,
3933
        compatible_runtime: Runtime | None = None,
3934
        marker: String | None = None,
3935
        max_items: MaxLayerListItems | None = None,
3936
        compatible_architecture: Architecture | None = None,
3937
        **kwargs,
3938
    ) -> ListLayersResponse:
3939
        validation_errors = []
1✔
3940

3941
        validation_error_arch = api_utils.validate_layer_architecture(compatible_architecture)
1✔
3942
        if validation_error_arch:
1✔
3943
            validation_errors.append(validation_error_arch)
1✔
3944

3945
        validation_error_runtime = api_utils.validate_layer_runtime(compatible_runtime)
1✔
3946
        if validation_error_runtime:
1✔
3947
            validation_errors.append(validation_error_runtime)
1✔
3948

3949
        if validation_errors:
1✔
3950
            raise ValidationException(
1✔
3951
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
3952
            )
3953
        # TODO: handle filter: compatible_runtime
3954
        # TODO: handle filter: compatible_architecture
3955

UNCOV
3956
        state = lambda_stores[context.account_id][context.region]
×
UNCOV
3957
        layers = state.layers
×
3958

3959
        # TODO: test how filters behave together with only returning layers here? Does it return the latest "matching" layer, i.e. does it ignore later layer versions that don't match?
3960

UNCOV
3961
        responses: list[LayersListItem] = []
×
UNCOV
3962
        for layer_name, layer in layers.items():
×
3963
            # fetch latest version
UNCOV
3964
            layer_versions = list(layer.layer_versions.values())
×
UNCOV
3965
            sorted(layer_versions, key=lambda x: x.version)
×
UNCOV
3966
            latest_layer_version = layer_versions[-1]
×
UNCOV
3967
            responses.append(
×
3968
                LayersListItem(
3969
                    LayerName=layer_name,
3970
                    LayerArn=layer.arn,
3971
                    LatestMatchingVersion=api_utils.map_layer_out(latest_layer_version),
3972
                )
3973
            )
3974

UNCOV
3975
        responses = PaginatedList(responses)
×
UNCOV
3976
        page, token = responses.get_page(
×
3977
            lambda version: version,
3978
            marker,
3979
            max_items,
3980
        )
3981

UNCOV
3982
        return ListLayersResponse(NextMarker=token, Layers=page)
×
3983

3984
    def list_layer_versions(
1✔
3985
        self,
3986
        context: RequestContext,
3987
        layer_name: LayerName,
3988
        compatible_runtime: Runtime | None = None,
3989
        marker: String | None = None,
3990
        max_items: MaxLayerListItems | None = None,
3991
        compatible_architecture: Architecture | None = None,
3992
        **kwargs,
3993
    ) -> ListLayerVersionsResponse:
3994
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
3995
            [compatible_runtime] if compatible_runtime else [],
3996
            [compatible_architecture] if compatible_architecture else [],
3997
        )
3998
        if validation_errors:
1✔
UNCOV
3999
            raise ValidationException(
×
4000
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
4001
            )
4002

4003
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
4004
            layer_name, context
4005
        )
4006
        state = lambda_stores[account_id][region_name]
1✔
4007

4008
        # TODO: Test & handle filter: compatible_runtime
4009
        # TODO: Test & handle filter: compatible_architecture
4010
        all_layer_versions = []
1✔
4011
        layer = state.layers.get(layer_name)
1✔
4012
        if layer is not None:
1✔
4013
            for layer_version in layer.layer_versions.values():
1✔
4014
                all_layer_versions.append(api_utils.map_layer_out(layer_version))
1✔
4015

4016
        all_layer_versions.sort(key=lambda x: x["Version"], reverse=True)
1✔
4017
        all_layer_versions = PaginatedList(all_layer_versions)
1✔
4018
        page, token = all_layer_versions.get_page(
1✔
4019
            lambda version: version["LayerVersionArn"],
4020
            marker,
4021
            max_items,
4022
        )
4023
        return ListLayerVersionsResponse(NextMarker=token, LayerVersions=page)
1✔
4024

4025
    def delete_layer_version(
1✔
4026
        self,
4027
        context: RequestContext,
4028
        layer_name: LayerName,
4029
        version_number: LayerVersionNumber,
4030
        **kwargs,
4031
    ) -> None:
4032
        if version_number < 1:
1✔
4033
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
4034

4035
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
4036
            layer_name, context
4037
        )
4038

4039
        store = lambda_stores[account_id][region_name]
1✔
4040
        layer = store.layers.get(layer_name, {})
1✔
4041
        if layer:
1✔
4042
            layer.layer_versions.pop(str(version_number), None)
1✔
4043

4044
    # =======================================
4045
    # =====  Layer Version Permissions  =====
4046
    # =======================================
4047
    # TODO: lock updates that change revision IDs
4048

4049
    def add_layer_version_permission(
1✔
4050
        self,
4051
        context: RequestContext,
4052
        layer_name: LayerName,
4053
        version_number: LayerVersionNumber,
4054
        statement_id: StatementId,
4055
        action: LayerPermissionAllowedAction,
4056
        principal: LayerPermissionAllowedPrincipal,
4057
        organization_id: OrganizationId = None,
4058
        revision_id: String = None,
4059
        **kwargs,
4060
    ) -> AddLayerVersionPermissionResponse:
4061
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4062
        # `layer_n` contains the layer name.
4063
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
4064

4065
        if action != "lambda:GetLayerVersion":
1✔
4066
            raise ValidationException(
1✔
4067
                f"1 validation error detected: Value '{action}' at 'action' failed to satisfy constraint: Member must satisfy regular expression pattern: lambda:GetLayerVersion"
4068
            )
4069

4070
        store = lambda_stores[account_id][region_name]
1✔
4071
        layer = store.layers.get(layer_n)
1✔
4072

4073
        layer_version_arn = api_utils.layer_version_arn(
1✔
4074
            layer_name, account_id, region_name, str(version_number)
4075
        )
4076

4077
        if layer is None:
1✔
4078
            raise ResourceNotFoundException(
1✔
4079
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4080
            )
4081
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4082
        if layer_version is None:
1✔
4083
            raise ResourceNotFoundException(
1✔
4084
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4085
            )
4086
        # do we have a policy? if not set one
4087
        if layer_version.policy is None:
1✔
4088
            layer_version.policy = LayerPolicy()
1✔
4089

4090
        if statement_id in layer_version.policy.statements:
1✔
4091
            raise ResourceConflictException(
1✔
4092
                f"The statement id ({statement_id}) provided already exists. Please provide a new statement id, or remove the existing statement.",
4093
                Type="User",
4094
            )
4095

4096
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
4097
            raise PreconditionFailedException(
1✔
4098
                "The Revision Id provided does not match the latest Revision Id. "
4099
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
4100
                Type="User",
4101
            )
4102

4103
        statement = LayerPolicyStatement(
1✔
4104
            sid=statement_id, action=action, principal=principal, organization_id=organization_id
4105
        )
4106

4107
        old_statements = layer_version.policy.statements
1✔
4108
        layer_version.policy = dataclasses.replace(
1✔
4109
            layer_version.policy, statements={**old_statements, statement_id: statement}
4110
        )
4111

4112
        return AddLayerVersionPermissionResponse(
1✔
4113
            Statement=json.dumps(
4114
                {
4115
                    "Sid": statement.sid,
4116
                    "Effect": "Allow",
4117
                    "Principal": statement.principal,
4118
                    "Action": statement.action,
4119
                    "Resource": layer_version.layer_version_arn,
4120
                }
4121
            ),
4122
            RevisionId=layer_version.policy.revision_id,
4123
        )
4124

4125
    def remove_layer_version_permission(
1✔
4126
        self,
4127
        context: RequestContext,
4128
        layer_name: LayerName,
4129
        version_number: LayerVersionNumber,
4130
        statement_id: StatementId,
4131
        revision_id: String = None,
4132
        **kwargs,
4133
    ) -> None:
4134
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4135
        # `layer_n` contains the layer name.
4136
        region_name, account_id, layer_n, layer_version = LambdaProvider._resolve_layer(
1✔
4137
            layer_name, context
4138
        )
4139

4140
        layer_version_arn = api_utils.layer_version_arn(
1✔
4141
            layer_name, account_id, region_name, str(version_number)
4142
        )
4143

4144
        state = lambda_stores[account_id][region_name]
1✔
4145
        layer = state.layers.get(layer_n)
1✔
4146
        if layer is None:
1✔
4147
            raise ResourceNotFoundException(
1✔
4148
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4149
            )
4150
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4151
        if layer_version is None:
1✔
4152
            raise ResourceNotFoundException(
1✔
4153
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4154
            )
4155

4156
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
4157
            raise PreconditionFailedException(
1✔
4158
                "The Revision Id provided does not match the latest Revision Id. "
4159
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
4160
                Type="User",
4161
            )
4162

4163
        if statement_id not in layer_version.policy.statements:
1✔
4164
            raise ResourceNotFoundException(
1✔
4165
                f"Statement {statement_id} is not found in resource policy.", Type="User"
4166
            )
4167

4168
        old_statements = layer_version.policy.statements
1✔
4169
        layer_version.policy = dataclasses.replace(
1✔
4170
            layer_version.policy,
4171
            statements={k: v for k, v in old_statements.items() if k != statement_id},
4172
        )
4173

4174
    def get_layer_version_policy(
1✔
4175
        self,
4176
        context: RequestContext,
4177
        layer_name: LayerName,
4178
        version_number: LayerVersionNumber,
4179
        **kwargs,
4180
    ) -> GetLayerVersionPolicyResponse:
4181
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4182
        # `layer_n` contains the layer name.
4183
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
4184

4185
        layer_version_arn = api_utils.layer_version_arn(
1✔
4186
            layer_name, account_id, region_name, str(version_number)
4187
        )
4188

4189
        store = lambda_stores[account_id][region_name]
1✔
4190
        layer = store.layers.get(layer_n)
1✔
4191

4192
        if layer is None:
1✔
4193
            raise ResourceNotFoundException(
1✔
4194
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4195
            )
4196

4197
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4198
        if layer_version is None:
1✔
4199
            raise ResourceNotFoundException(
1✔
4200
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4201
            )
4202

4203
        if layer_version.policy is None:
1✔
4204
            raise ResourceNotFoundException(
1✔
4205
                "No policy is associated with the given resource.", Type="User"
4206
            )
4207

4208
        return GetLayerVersionPolicyResponse(
1✔
4209
            Policy=json.dumps(
4210
                {
4211
                    "Version": layer_version.policy.version,
4212
                    "Id": layer_version.policy.id,
4213
                    "Statement": [
4214
                        {
4215
                            "Sid": ps.sid,
4216
                            "Effect": "Allow",
4217
                            "Principal": ps.principal,
4218
                            "Action": ps.action,
4219
                            "Resource": layer_version.layer_version_arn,
4220
                        }
4221
                        for ps in layer_version.policy.statements.values()
4222
                    ],
4223
                }
4224
            ),
4225
            RevisionId=layer_version.policy.revision_id,
4226
        )
4227

4228
    # =======================================
4229
    # =======  Function Concurrency  ========
4230
    # =======================================
4231
    # (Reserved) function concurrency is scoped to the whole function
4232

4233
    def get_function_concurrency(
1✔
4234
        self, context: RequestContext, function_name: FunctionName, **kwargs
4235
    ) -> GetFunctionConcurrencyResponse:
4236
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4237
        function_name = api_utils.get_function_name(function_name, context)
1✔
4238
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
4239
        return GetFunctionConcurrencyResponse(
1✔
4240
            ReservedConcurrentExecutions=fn.reserved_concurrent_executions
4241
        )
4242

4243
    def put_function_concurrency(
1✔
4244
        self,
4245
        context: RequestContext,
4246
        function_name: FunctionName,
4247
        reserved_concurrent_executions: ReservedConcurrentExecutions,
4248
        **kwargs,
4249
    ) -> Concurrency:
4250
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4251

4252
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4253
        if qualifier:
1✔
4254
            raise InvalidParameterValueException(
1✔
4255
                "This operation is permitted on Lambda functions only. Aliases and versions do not support this operation. Please specify either a function name or an unqualified function ARN.",
4256
                Type="User",
4257
            )
4258

4259
        store = lambda_stores[account_id][region]
1✔
4260
        fn = store.functions.get(function_name)
1✔
4261
        if not fn:
1✔
4262
            fn_arn = api_utils.qualified_lambda_arn(
1✔
4263
                function_name,
4264
                qualifier="$LATEST",
4265
                account=account_id,
4266
                region=region,
4267
            )
4268
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
4269

4270
        settings = self.get_account_settings(context)
1✔
4271
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
4272
            "UnreservedConcurrentExecutions"
4273
        ]
4274

4275
        # The existing reserved concurrent executions for the same function are already deduced in
4276
        # unreserved_concurrent_executions but must not count because the new one will replace the existing one.
4277
        # Joel tested this behavior manually against AWS (2023-11-28).
4278
        existing_reserved_concurrent_executions = (
1✔
4279
            fn.reserved_concurrent_executions if fn.reserved_concurrent_executions else 0
4280
        )
4281
        if (
1✔
4282
            unreserved_concurrent_executions
4283
            - reserved_concurrent_executions
4284
            + existing_reserved_concurrent_executions
4285
        ) < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY:
4286
            raise InvalidParameterValueException(
1✔
4287
                f"Specified ReservedConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
4288
            )
4289

4290
        total_provisioned_concurrency = sum(
1✔
4291
            [
4292
                provisioned_configs.provisioned_concurrent_executions
4293
                for provisioned_configs in fn.provisioned_concurrency_configs.values()
4294
            ]
4295
        )
4296
        if total_provisioned_concurrency > reserved_concurrent_executions:
1✔
4297
            raise InvalidParameterValueException(
1✔
4298
                f" ReservedConcurrentExecutions  {reserved_concurrent_executions} should not be lower than function's total provisioned concurrency [{total_provisioned_concurrency}]."
4299
            )
4300

4301
        fn.reserved_concurrent_executions = reserved_concurrent_executions
1✔
4302

4303
        return Concurrency(ReservedConcurrentExecutions=fn.reserved_concurrent_executions)
1✔
4304

4305
    def delete_function_concurrency(
1✔
4306
        self, context: RequestContext, function_name: FunctionName, **kwargs
4307
    ) -> None:
4308
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4309
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4310
        store = lambda_stores[account_id][region]
1✔
4311
        fn = store.functions.get(function_name)
1✔
4312
        fn.reserved_concurrent_executions = None
1✔
4313

4314
    # =======================================
4315
    # ===============  TAGS   ===============
4316
    # =======================================
4317
    # only Function, Event Source Mapping, and Code Signing Config (not currently supported by LocalStack) ARNs an are available for tagging in AWS
4318

4319
    def _get_tags(self, resource: TaggableResource) -> dict[str, str]:
1✔
4320
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4321
        lambda_adapted_tags = {
1✔
4322
            tag["Key"]: tag["Value"]
4323
            for tag in state.TAGS.list_tags_for_resource(resource).get("Tags")
4324
        }
4325
        return lambda_adapted_tags
1✔
4326

4327
    def _store_tags(self, resource: TaggableResource, tags: dict[str, str]):
1✔
4328
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4329
        if len(state.TAGS.tags.get(resource, {}) | tags) > LAMBDA_TAG_LIMIT_PER_RESOURCE:
1✔
4330
            raise InvalidParameterValueException(
1✔
4331
                "Number of tags exceeds resource tag limit.", Type="User"
4332
            )
4333

4334
        tag_svc_adapted_tags = [{"Key": key, "Value": value} for key, value in tags.items()]
1✔
4335
        state.TAGS.tag_resource(resource, tag_svc_adapted_tags)
1✔
4336

4337
    def fetch_lambda_store_for_tagging(self, resource: TaggableResource) -> LambdaStore:
1✔
4338
        """
4339
        Takes a resource ARN for a TaggableResource (Lambda Function, Event Source Mapping, Code Signing Config, or Capacity Provider) and returns a corresponding
4340
        LambdaStore for its region and account.
4341

4342
        In addition, this function validates that the ARN is a valid TaggableResource type, and that the TaggableResource exists.
4343

4344
        Raises:
4345
            ValidationException: If the resource ARN is not a full ARN for a TaggableResource.
4346
            ResourceNotFoundException: If the specified resource does not exist.
4347
            InvalidParameterValueException: If the resource ARN is a qualified Lambda Function.
4348
        """
4349

4350
        def _raise_validation_exception():
1✔
4351
            raise ValidationException(
1✔
4352
                f"1 validation error detected: Value '{resource}' at 'resource' failed to satisfy constraint: Member must satisfy regular expression pattern: {api_utils.TAGGABLE_RESOURCE_ARN_PATTERN}"
4353
            )
4354

4355
        # Check whether the ARN we have been passed is correctly formatted
4356
        parsed_resource_arn: ArnData = None
1✔
4357
        try:
1✔
4358
            parsed_resource_arn = parse_arn(resource)
1✔
4359
        except Exception:
1✔
4360
            _raise_validation_exception()
1✔
4361

4362
        # TODO: Should we be checking whether this is a full ARN?
4363
        region, account_id, resource_type = map(
1✔
4364
            parsed_resource_arn.get, ("region", "account", "resource")
4365
        )
4366

4367
        if not all((region, account_id, resource_type)):
1✔
UNCOV
4368
            _raise_validation_exception()
×
4369

4370
        if not (parts := resource_type.split(":")):
1✔
UNCOV
4371
            _raise_validation_exception()
×
4372

4373
        resource_type, resource_identifier, *qualifier = parts
1✔
4374

4375
        # Qualifier validation raises before checking for NotFound
4376
        if qualifier:
1✔
4377
            if resource_type == "function":
1✔
4378
                raise InvalidParameterValueException(
1✔
4379
                    "Tags on function aliases and versions are not supported. Please specify a function ARN.",
4380
                    Type="User",
4381
                )
4382
            _raise_validation_exception()
1✔
4383

4384
        if resource_type == "event-source-mapping":
1✔
4385
            self._get_esm(resource_identifier, account_id, region)
1✔
4386
        elif resource_type == "code-signing-config":
1✔
4387
            raise NotImplementedError("Resource tagging on CSC not yet implemented.")
4388
        elif resource_type == "function":
1✔
4389
            self._get_function(
1✔
4390
                function_name=resource_identifier, account_id=account_id, region=region
4391
            )
4392
        elif resource_type == "capacity-provider":
1✔
4393
            self._get_capacity_provider(resource_identifier, account_id, region)
1✔
4394
        else:
4395
            _raise_validation_exception()
1✔
4396

4397
        # If no exceptions are raised, assume ARN and referenced resource is valid for tag operations
4398
        return lambda_stores[account_id][region]
1✔
4399

4400
    def tag_resource(
1✔
4401
        self, context: RequestContext, resource: TaggableResource, tags: Tags, **kwargs
4402
    ) -> None:
4403
        if not tags:
1✔
4404
            raise InvalidParameterValueException(
1✔
4405
                "An error occurred and the request cannot be processed.", Type="User"
4406
            )
4407
        self._store_tags(resource, tags)
1✔
4408

4409
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4410
            "function"
4411
        ):
4412
            name, _, account, region = function_locators_from_arn(resource)
1✔
4413
            function = self._get_function(name, account, region)
1✔
4414
            with function.lock:
1✔
4415
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4416
                latest_version = function.versions["$LATEST"]
1✔
4417
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4418
                    latest_version, config=dataclasses.replace(latest_version.config)
4419
                )
4420

4421
    def list_tags(
1✔
4422
        self, context: RequestContext, resource: TaggableResource, **kwargs
4423
    ) -> ListTagsResponse:
4424
        tags = self._get_tags(resource)
1✔
4425
        return ListTagsResponse(Tags=tags)
1✔
4426

4427
    def untag_resource(
1✔
4428
        self, context: RequestContext, resource: TaggableResource, tag_keys: TagKeyList, **kwargs
4429
    ) -> None:
4430
        if not tag_keys:
1✔
4431
            raise ValidationException(
1✔
4432
                "1 validation error detected: Value null at 'tagKeys' failed to satisfy constraint: Member must not be null"
4433
            )  # should probably be generalized a bit
4434

4435
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4436
        state.TAGS.untag_resource(resource, tag_keys)
1✔
4437

4438
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4439
            "function"
4440
        ):
4441
            name, _, account, region = function_locators_from_arn(resource)
1✔
4442
            function = self._get_function(name, account, region)
1✔
4443
            # TODO: Potential race condition
4444
            with function.lock:
1✔
4445
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4446
                latest_version = function.versions["$LATEST"]
1✔
4447
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4448
                    latest_version, config=dataclasses.replace(latest_version.config)
4449
                )
4450

4451
    # =======================================
4452
    # =======  LEGACY / DEPRECATED   ========
4453
    # =======================================
4454

4455
    def invoke_async(
1✔
4456
        self,
4457
        context: RequestContext,
4458
        function_name: NamespacedFunctionName,
4459
        invoke_args: IO[BlobStream],
4460
        **kwargs,
4461
    ) -> InvokeAsyncResponse:
4462
        """LEGACY API endpoint. Even AWS heavily discourages its usage."""
4463
        raise NotImplementedError
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc