• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 274ae585-9ad2-4b5f-8087-866ef08d3d6e

24 Apr 2025 05:15PM UTC coverage: 85.262% (-1.0%) from 86.266%
274ae585-9ad2-4b5f-8087-866ef08d3d6e

push

circleci

web-flow
CFn v2: support outputs (#12536)

10 of 29 new or added lines in 3 files covered. (34.48%)

1105 existing lines in 26 files now uncovered.

63256 of 74190 relevant lines covered (85.26%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.82
/localstack-core/localstack/services/lambda_/provider.py
1
import base64
1✔
2
import dataclasses
1✔
3
import datetime
1✔
4
import itertools
1✔
5
import json
1✔
6
import logging
1✔
7
import re
1✔
8
import threading
1✔
9
import time
1✔
10
from typing import IO, Any, Optional, Tuple
1✔
11

12
from botocore.exceptions import ClientError
1✔
13

14
from localstack import config
1✔
15
from localstack.aws.api import RequestContext, ServiceException, handler
1✔
16
from localstack.aws.api.lambda_ import (
1✔
17
    AccountLimit,
18
    AccountUsage,
19
    AddLayerVersionPermissionResponse,
20
    AddPermissionRequest,
21
    AddPermissionResponse,
22
    Alias,
23
    AliasConfiguration,
24
    AliasRoutingConfiguration,
25
    AllowedPublishers,
26
    Architecture,
27
    Arn,
28
    Blob,
29
    BlobStream,
30
    CodeSigningConfigArn,
31
    CodeSigningConfigNotFoundException,
32
    CodeSigningPolicies,
33
    CompatibleArchitectures,
34
    CompatibleRuntimes,
35
    Concurrency,
36
    Cors,
37
    CreateCodeSigningConfigResponse,
38
    CreateEventSourceMappingRequest,
39
    CreateFunctionRequest,
40
    CreateFunctionUrlConfigResponse,
41
    DeleteCodeSigningConfigResponse,
42
    Description,
43
    DestinationConfig,
44
    EventSourceMappingConfiguration,
45
    FunctionCodeLocation,
46
    FunctionConfiguration,
47
    FunctionEventInvokeConfig,
48
    FunctionName,
49
    FunctionUrlAuthType,
50
    FunctionUrlQualifier,
51
    GetAccountSettingsResponse,
52
    GetCodeSigningConfigResponse,
53
    GetFunctionCodeSigningConfigResponse,
54
    GetFunctionConcurrencyResponse,
55
    GetFunctionRecursionConfigResponse,
56
    GetFunctionResponse,
57
    GetFunctionUrlConfigResponse,
58
    GetLayerVersionPolicyResponse,
59
    GetLayerVersionResponse,
60
    GetPolicyResponse,
61
    GetProvisionedConcurrencyConfigResponse,
62
    InvalidParameterValueException,
63
    InvocationResponse,
64
    InvocationType,
65
    InvokeAsyncResponse,
66
    InvokeMode,
67
    LambdaApi,
68
    LastUpdateStatus,
69
    LayerName,
70
    LayerPermissionAllowedAction,
71
    LayerPermissionAllowedPrincipal,
72
    LayersListItem,
73
    LayerVersionArn,
74
    LayerVersionContentInput,
75
    LayerVersionNumber,
76
    LicenseInfo,
77
    ListAliasesResponse,
78
    ListCodeSigningConfigsResponse,
79
    ListEventSourceMappingsResponse,
80
    ListFunctionEventInvokeConfigsResponse,
81
    ListFunctionsByCodeSigningConfigResponse,
82
    ListFunctionsResponse,
83
    ListFunctionUrlConfigsResponse,
84
    ListLayersResponse,
85
    ListLayerVersionsResponse,
86
    ListProvisionedConcurrencyConfigsResponse,
87
    ListTagsResponse,
88
    ListVersionsByFunctionResponse,
89
    LogFormat,
90
    LoggingConfig,
91
    LogType,
92
    MasterRegion,
93
    MaxFunctionEventInvokeConfigListItems,
94
    MaximumEventAgeInSeconds,
95
    MaximumRetryAttempts,
96
    MaxItems,
97
    MaxLayerListItems,
98
    MaxListItems,
99
    MaxProvisionedConcurrencyConfigListItems,
100
    NamespacedFunctionName,
101
    NamespacedStatementId,
102
    OnFailure,
103
    OnSuccess,
104
    OrganizationId,
105
    PackageType,
106
    PositiveInteger,
107
    PreconditionFailedException,
108
    ProvisionedConcurrencyConfigListItem,
109
    ProvisionedConcurrencyConfigNotFoundException,
110
    ProvisionedConcurrencyStatusEnum,
111
    PublishLayerVersionResponse,
112
    PutFunctionCodeSigningConfigResponse,
113
    PutFunctionRecursionConfigResponse,
114
    PutProvisionedConcurrencyConfigResponse,
115
    Qualifier,
116
    RecursiveLoop,
117
    ReservedConcurrentExecutions,
118
    ResourceConflictException,
119
    ResourceNotFoundException,
120
    Runtime,
121
    RuntimeVersionConfig,
122
    SnapStart,
123
    SnapStartApplyOn,
124
    SnapStartOptimizationStatus,
125
    SnapStartResponse,
126
    State,
127
    StatementId,
128
    StateReasonCode,
129
    String,
130
    TaggableResource,
131
    TagKeyList,
132
    Tags,
133
    TracingMode,
134
    UnqualifiedFunctionName,
135
    UpdateCodeSigningConfigResponse,
136
    UpdateEventSourceMappingRequest,
137
    UpdateFunctionCodeRequest,
138
    UpdateFunctionConfigurationRequest,
139
    UpdateFunctionUrlConfigResponse,
140
    Version,
141
)
142
from localstack.aws.api.lambda_ import FunctionVersion as FunctionVersionApi
1✔
143
from localstack.aws.api.lambda_ import ServiceException as LambdaServiceException
1✔
144
from localstack.aws.api.pipes import (
1✔
145
    DynamoDBStreamStartPosition,
146
    KinesisStreamStartPosition,
147
)
148
from localstack.aws.connect import connect_to
1✔
149
from localstack.aws.spec import load_service
1✔
150
from localstack.services.edge import ROUTER
1✔
151
from localstack.services.lambda_ import api_utils
1✔
152
from localstack.services.lambda_ import hooks as lambda_hooks
1✔
153
from localstack.services.lambda_.analytics import (
1✔
154
    FunctionOperation,
155
    FunctionStatus,
156
    function_counter,
157
)
158
from localstack.services.lambda_.api_utils import (
1✔
159
    ARCHITECTURES,
160
    STATEMENT_ID_REGEX,
161
    SUBNET_ID_REGEX,
162
    function_locators_from_arn,
163
)
164
from localstack.services.lambda_.event_source_mapping.esm_config_factory import (
1✔
165
    EsmConfigFactory,
166
)
167
from localstack.services.lambda_.event_source_mapping.esm_worker import (
1✔
168
    EsmState,
169
    EsmWorker,
170
)
171
from localstack.services.lambda_.event_source_mapping.esm_worker_factory import (
1✔
172
    EsmWorkerFactory,
173
)
174
from localstack.services.lambda_.event_source_mapping.pipe_utils import get_internal_client
1✔
175
from localstack.services.lambda_.invocation import AccessDeniedException
1✔
176
from localstack.services.lambda_.invocation.execution_environment import (
1✔
177
    EnvironmentStartupTimeoutException,
178
)
179
from localstack.services.lambda_.invocation.lambda_models import (
1✔
180
    AliasRoutingConfig,
181
    CodeSigningConfig,
182
    EventInvokeConfig,
183
    Function,
184
    FunctionResourcePolicy,
185
    FunctionUrlConfig,
186
    FunctionVersion,
187
    ImageConfig,
188
    LambdaEphemeralStorage,
189
    Layer,
190
    LayerPolicy,
191
    LayerPolicyStatement,
192
    LayerVersion,
193
    ProvisionedConcurrencyConfiguration,
194
    RequestEntityTooLargeException,
195
    ResourcePolicy,
196
    UpdateStatus,
197
    ValidationException,
198
    VersionAlias,
199
    VersionFunctionConfiguration,
200
    VersionIdentifier,
201
    VersionState,
202
    VpcConfig,
203
)
204
from localstack.services.lambda_.invocation.lambda_service import (
1✔
205
    LambdaService,
206
    create_image_code,
207
    destroy_code_if_not_used,
208
    lambda_stores,
209
    store_lambda_archive,
210
    store_s3_bucket_archive,
211
)
212
from localstack.services.lambda_.invocation.models import LambdaStore
1✔
213
from localstack.services.lambda_.invocation.runtime_executor import get_runtime_executor
1✔
214
from localstack.services.lambda_.lambda_utils import HINT_LOG
1✔
215
from localstack.services.lambda_.layerfetcher.layer_fetcher import LayerFetcher
1✔
216
from localstack.services.lambda_.provider_utils import (
1✔
217
    LambdaLayerVersionIdentifier,
218
    get_function_version,
219
    get_function_version_from_arn,
220
)
221
from localstack.services.lambda_.runtimes import (
1✔
222
    ALL_RUNTIMES,
223
    DEPRECATED_RUNTIMES,
224
    DEPRECATED_RUNTIMES_UPGRADES,
225
    RUNTIMES_AGGREGATED,
226
    SNAP_START_SUPPORTED_RUNTIMES,
227
    VALID_RUNTIMES,
228
)
229
from localstack.services.lambda_.urlrouter import FunctionUrlRouter
1✔
230
from localstack.services.plugins import ServiceLifecycleHook
1✔
231
from localstack.state import StateVisitor
1✔
232
from localstack.utils.aws.arns import (
1✔
233
    ArnData,
234
    extract_resource_from_arn,
235
    extract_service_from_arn,
236
    get_partition,
237
    lambda_event_source_mapping_arn,
238
    parse_arn,
239
)
240
from localstack.utils.aws.client_types import ServicePrincipal
1✔
241
from localstack.utils.bootstrap import is_api_enabled
1✔
242
from localstack.utils.collections import PaginatedList
1✔
243
from localstack.utils.event_matcher import validate_event_pattern
1✔
244
from localstack.utils.lambda_debug_mode.lambda_debug_mode_session import LambdaDebugModeSession
1✔
245
from localstack.utils.strings import get_random_hex, short_uid, to_bytes, to_str
1✔
246
from localstack.utils.sync import poll_condition
1✔
247
from localstack.utils.urls import localstack_host
1✔
248

249
LOG = logging.getLogger(__name__)
1✔
250

251
LAMBDA_DEFAULT_TIMEOUT = 3
1✔
252
LAMBDA_DEFAULT_MEMORY_SIZE = 128
1✔
253

254
LAMBDA_TAG_LIMIT_PER_RESOURCE = 50
1✔
255
LAMBDA_LAYERS_LIMIT_PER_FUNCTION = 5
1✔
256

257
TAG_KEY_CUSTOM_URL = "_custom_id_"
1✔
258
# Requirements (from RFC3986 & co): not longer than 63, first char must be
259
# alpha, then alphanumeric or hyphen, except cannot start or end with hyphen
260
TAG_KEY_CUSTOM_URL_VALIDATOR = re.compile(r"^[A-Za-z]([A-Za-z0-9\-]{0,61}[A-Za-z0-9])?$")
1✔
261

262

263
class LambdaProvider(LambdaApi, ServiceLifecycleHook):
1✔
264
    lambda_service: LambdaService
1✔
265
    create_fn_lock: threading.RLock
1✔
266
    create_layer_lock: threading.RLock
1✔
267
    router: FunctionUrlRouter
1✔
268
    esm_workers: dict[str, EsmWorker]
1✔
269
    layer_fetcher: LayerFetcher | None
1✔
270

271
    def __init__(self) -> None:
1✔
272
        self.lambda_service = LambdaService()
1✔
273
        self.create_fn_lock = threading.RLock()
1✔
274
        self.create_layer_lock = threading.RLock()
1✔
275
        self.router = FunctionUrlRouter(ROUTER, self.lambda_service)
1✔
276
        self.esm_workers = {}
1✔
277
        self.layer_fetcher = None
1✔
278
        lambda_hooks.inject_layer_fetcher.run(self)
1✔
279

280
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
281
        visitor.visit(lambda_stores)
×
282

283
    def on_before_start(self):
1✔
284
        # Attempt to start the Lambda Debug Mode session object.
285
        try:
1✔
286
            lambda_debug_mode_session = LambdaDebugModeSession.get()
1✔
287
            lambda_debug_mode_session.ensure_running()
1✔
288
        except Exception as ex:
×
289
            LOG.error(
×
290
                "Unexpected error encountered when attempting to initialise Lambda Debug Mode '%s'.",
291
                ex,
292
            )
293

294
    def on_before_state_reset(self):
1✔
295
        self.lambda_service.stop()
×
296

297
    def on_after_state_reset(self):
1✔
298
        self.router.lambda_service = self.lambda_service = LambdaService()
×
299

300
    def on_before_state_load(self):
1✔
301
        self.lambda_service.stop()
×
302

303
    def on_after_state_load(self):
1✔
304
        self.lambda_service = LambdaService()
×
305
        self.router.lambda_service = self.lambda_service
×
306

307
        for account_id, account_bundle in lambda_stores.items():
×
308
            for region_name, state in account_bundle.items():
×
309
                for fn in state.functions.values():
×
310
                    for fn_version in fn.versions.values():
×
311
                        # restore the "Pending" state for every function version and start it
312
                        try:
×
313
                            new_state = VersionState(
×
314
                                state=State.Pending,
315
                                code=StateReasonCode.Creating,
316
                                reason="The function is being created.",
317
                            )
318
                            new_config = dataclasses.replace(fn_version.config, state=new_state)
×
319
                            new_version = dataclasses.replace(fn_version, config=new_config)
×
320
                            fn.versions[fn_version.id.qualifier] = new_version
×
321
                            self.lambda_service.create_function_version(fn_version).result(
×
322
                                timeout=5
323
                            )
324
                        except Exception:
×
325
                            LOG.warning(
×
326
                                "Failed to restore function version %s",
327
                                fn_version.id.qualified_arn(),
328
                                exc_info=True,
329
                            )
330
                    # restore provisioned concurrency per function considering both versions and aliases
331
                    for (
×
332
                        provisioned_qualifier,
333
                        provisioned_config,
334
                    ) in fn.provisioned_concurrency_configs.items():
335
                        fn_arn = None
×
336
                        try:
×
337
                            if api_utils.qualifier_is_alias(provisioned_qualifier):
×
338
                                alias = fn.aliases.get(provisioned_qualifier)
×
339
                                resolved_version = fn.versions.get(alias.function_version)
×
340
                                fn_arn = resolved_version.id.qualified_arn()
×
341
                            elif api_utils.qualifier_is_version(provisioned_qualifier):
×
342
                                fn_version = fn.versions.get(provisioned_qualifier)
×
343
                                fn_arn = fn_version.id.qualified_arn()
×
344
                            else:
345
                                raise InvalidParameterValueException(
×
346
                                    "Invalid qualifier type:"
347
                                    " Qualifier can only be an alias or a version for provisioned concurrency."
348
                                )
349

350
                            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
351
                            manager.update_provisioned_concurrency_config(
×
352
                                provisioned_config.provisioned_concurrent_executions
353
                            )
354
                        except Exception:
×
355
                            LOG.warning(
×
356
                                "Failed to restore provisioned concurrency %s for function %s",
357
                                provisioned_config,
358
                                fn_arn,
359
                                exc_info=True,
360
                            )
361

362
                for esm in state.event_source_mappings.values():
×
363
                    # Restores event source workers
364
                    function_arn = esm.get("FunctionArn")
×
365

366
                    # TODO: How do we know the event source is up?
367
                    # A basic poll to see if the mapped Lambda function is active/failed
368
                    if not poll_condition(
×
369
                        lambda: get_function_version_from_arn(function_arn).config.state.state
370
                        in [State.Active, State.Failed],
371
                        timeout=10,
372
                    ):
373
                        LOG.warning(
×
374
                            "Creating ESM for Lambda that is not in running state: %s",
375
                            function_arn,
376
                        )
377

378
                    function_version = get_function_version_from_arn(function_arn)
×
379
                    function_role = function_version.config.role
×
380

381
                    is_esm_enabled = esm.get("State", EsmState.DISABLED) not in (
×
382
                        EsmState.DISABLED,
383
                        EsmState.DISABLING,
384
                    )
385
                    esm_worker = EsmWorkerFactory(
×
386
                        esm, function_role, is_esm_enabled
387
                    ).get_esm_worker()
388

389
                    # Note: a worker is created in the DISABLED state if not enabled
390
                    esm_worker.create()
×
391
                    # TODO: assigning the esm_worker to the dict only works after .create(). Could it cause a race
392
                    #  condition if we get a shutdown here and have a worker thread spawned but not accounted for?
393
                    self.esm_workers[esm_worker.uuid] = esm_worker
×
394

395
    def on_after_init(self):
1✔
396
        self.router.register_routes()
1✔
397
        get_runtime_executor().validate_environment()
1✔
398

399
    def on_before_stop(self) -> None:
1✔
400
        for esm_worker in self.esm_workers.values():
1✔
401
            esm_worker.stop_for_shutdown()
1✔
402

403
        # TODO: should probably unregister routes?
404
        self.lambda_service.stop()
1✔
405
        # Attempt to signal to the Lambda Debug Mode session object to stop.
406
        try:
1✔
407
            lambda_debug_mode_session = LambdaDebugModeSession.get()
1✔
408
            lambda_debug_mode_session.signal_stop()
1✔
409
        except Exception as ex:
×
410
            LOG.error(
×
411
                "Unexpected error encountered when attempting to signal Lambda Debug Mode to stop '%s'.",
412
                ex,
413
            )
414

415
    @staticmethod
1✔
416
    def _get_function(function_name: str, account_id: str, region: str) -> Function:
1✔
417
        state = lambda_stores[account_id][region]
1✔
418
        function = state.functions.get(function_name)
1✔
419
        if not function:
1✔
420
            arn = api_utils.unqualified_lambda_arn(
1✔
421
                function_name=function_name,
422
                account=account_id,
423
                region=region,
424
            )
425
            raise ResourceNotFoundException(
1✔
426
                f"Function not found: {arn}",
427
                Type="User",
428
            )
429
        return function
1✔
430

431
    @staticmethod
1✔
432
    def _get_esm(uuid: str, account_id: str, region: str) -> EventSourceMappingConfiguration:
1✔
433
        state = lambda_stores[account_id][region]
1✔
434
        esm = state.event_source_mappings.get(uuid)
1✔
435
        if not esm:
1✔
436
            arn = lambda_event_source_mapping_arn(uuid, account_id, region)
1✔
437
            raise ResourceNotFoundException(
1✔
438
                f"Event source mapping not found: {arn}",
439
                Type="User",
440
            )
441
        return esm
1✔
442

443
    @staticmethod
1✔
444
    def _validate_qualifier_expression(qualifier: str) -> None:
1✔
445
        if error_messages := api_utils.validate_qualifier(qualifier):
1✔
446
            raise ValidationException(
×
447
                message=api_utils.construct_validation_exception_message(error_messages)
448
            )
449

450
    @staticmethod
1✔
451
    def _resolve_fn_qualifier(resolved_fn: Function, qualifier: str | None) -> tuple[str, str]:
1✔
452
        """Attempts to resolve a given qualifier and returns a qualifier that exists or
453
        raises an appropriate ResourceNotFoundException.
454

455
        :param resolved_fn: The resolved lambda function
456
        :param qualifier: The qualifier to be resolved or None
457
        :return: Tuple of (resolved qualifier, function arn either qualified or unqualified)"""
458
        function_name = resolved_fn.function_name
1✔
459
        # assuming function versions need to live in the same account and region
460
        account_id = resolved_fn.latest().id.account
1✔
461
        region = resolved_fn.latest().id.region
1✔
462
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
463
        if qualifier is not None:
1✔
464
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
465
            if api_utils.qualifier_is_alias(qualifier):
1✔
466
                if qualifier not in resolved_fn.aliases:
1✔
467
                    raise ResourceNotFoundException(f"Cannot find alias arn: {fn_arn}", Type="User")
1✔
468
            elif api_utils.qualifier_is_version(qualifier) or qualifier == "$LATEST":
1✔
469
                if qualifier not in resolved_fn.versions:
1✔
470
                    raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
471
            else:
472
                # matches qualifier pattern but invalid alias or version
473
                raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
474
        resolved_qualifier = qualifier or "$LATEST"
1✔
475
        return resolved_qualifier, fn_arn
1✔
476

477
    @staticmethod
1✔
478
    def _function_revision_id(resolved_fn: Function, resolved_qualifier: str) -> str:
1✔
479
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
480
            return resolved_fn.aliases[resolved_qualifier].revision_id
1✔
481
        # Assumes that a non-alias is a version
482
        else:
483
            return resolved_fn.versions[resolved_qualifier].config.revision_id
1✔
484

485
    def _resolve_vpc_id(self, account_id: str, region_name: str, subnet_id: str) -> str:
1✔
486
        ec2_client = connect_to(aws_access_key_id=account_id, region_name=region_name).ec2
1✔
487
        try:
1✔
488
            return ec2_client.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["VpcId"]
1✔
489
        except ec2_client.exceptions.ClientError as e:
1✔
490
            code = e.response["Error"]["Code"]
1✔
491
            message = e.response["Error"]["Message"]
1✔
492
            raise InvalidParameterValueException(
1✔
493
                f"Error occurred while DescribeSubnets. EC2 Error Code: {code}. EC2 Error Message: {message}",
494
                Type="User",
495
            )
496

497
    def _build_vpc_config(
1✔
498
        self,
499
        account_id: str,
500
        region_name: str,
501
        vpc_config: Optional[dict] = None,
502
    ) -> VpcConfig | None:
503
        if not vpc_config or not is_api_enabled("ec2"):
1✔
504
            return None
1✔
505

506
        subnet_ids = vpc_config.get("SubnetIds", [])
1✔
507
        if subnet_ids is not None and len(subnet_ids) == 0:
1✔
508
            return VpcConfig(vpc_id="", security_group_ids=[], subnet_ids=[])
1✔
509

510
        subnet_id = subnet_ids[0]
1✔
511
        if not bool(SUBNET_ID_REGEX.match(subnet_id)):
1✔
512
            raise ValidationException(
1✔
513
                f"1 validation error detected: Value '[{subnet_id}]' at 'vpcConfig.subnetIds' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 0, Member must satisfy regular expression pattern: ^subnet-[0-9a-z]*$]"
514
            )
515

516
        return VpcConfig(
1✔
517
            vpc_id=self._resolve_vpc_id(account_id, region_name, subnet_id),
518
            security_group_ids=vpc_config.get("SecurityGroupIds", []),
519
            subnet_ids=subnet_ids,
520
        )
521

522
    def _create_version_model(
1✔
523
        self,
524
        function_name: str,
525
        region: str,
526
        account_id: str,
527
        description: str | None = None,
528
        revision_id: str | None = None,
529
        code_sha256: str | None = None,
530
    ) -> tuple[FunctionVersion, bool]:
531
        """
532
        Release a new version to the model if all restrictions are met.
533
        Restrictions:
534
          - CodeSha256, if provided, must equal the current latest version code hash
535
          - RevisionId, if provided, must equal the current latest version revision id
536
          - Some changes have been done to the latest version since last publish
537
        Will return a tuple of the version, and whether the version was published (True) or the latest available version was taken (False).
538
        This can happen if the latest version has not been changed since the last version publish, in this case the last version will be returned.
539

540
        :param function_name: Function name to be published
541
        :param region: Region of the function
542
        :param account_id: Account of the function
543
        :param description: new description of the version (will be the description of the function if missing)
544
        :param revision_id: Revision id, function will raise error if it does not match latest revision id
545
        :param code_sha256: Code sha256, function will raise error if it does not match latest code hash
546
        :return: Tuple of (published version, whether version was released or last released version returned, since nothing changed)
547
        """
548
        current_latest_version = get_function_version(
1✔
549
            function_name=function_name, qualifier="$LATEST", account_id=account_id, region=region
550
        )
551
        if revision_id and current_latest_version.config.revision_id != revision_id:
1✔
552
            raise PreconditionFailedException(
1✔
553
                "The Revision Id provided does not match the latest Revision Id. Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
554
                Type="User",
555
            )
556

557
        # check if code hashes match if they are specified
558
        current_hash = (
1✔
559
            current_latest_version.config.code.code_sha256
560
            if current_latest_version.config.package_type == PackageType.Zip
561
            else current_latest_version.config.image.code_sha256
562
        )
563
        # if the code is a zip package and hot reloaded (hot reloading is currently only supported for zip packagetypes)
564
        # we cannot enforce the codesha256 check
565
        is_hot_reloaded_zip_package = (
1✔
566
            current_latest_version.config.package_type == PackageType.Zip
567
            and current_latest_version.config.code.is_hot_reloading()
568
        )
569
        if code_sha256 and current_hash != code_sha256 and not is_hot_reloaded_zip_package:
1✔
570
            raise InvalidParameterValueException(
1✔
571
                f"CodeSHA256 ({code_sha256}) is different from current CodeSHA256 in $LATEST ({current_hash}). Please try again with the CodeSHA256 in $LATEST.",
572
                Type="User",
573
            )
574

575
        state = lambda_stores[account_id][region]
1✔
576
        function = state.functions.get(function_name)
1✔
577
        changes = {}
1✔
578
        if description is not None:
1✔
579
            changes["description"] = description
1✔
580
        # TODO copy environment instead of restarting one, get rid of all the "Pending"s
581

582
        with function.lock:
1✔
583
            if function.next_version > 1 and (
1✔
584
                prev_version := function.versions.get(str(function.next_version - 1))
585
            ):
586
                if (
1✔
587
                    prev_version.config.internal_revision
588
                    == current_latest_version.config.internal_revision
589
                ):
590
                    return prev_version, False
1✔
591
            # TODO check if there was a change since last version
592
            next_version = str(function.next_version)
1✔
593
            function.next_version += 1
1✔
594
            new_id = VersionIdentifier(
1✔
595
                function_name=function_name,
596
                qualifier=next_version,
597
                region=region,
598
                account=account_id,
599
            )
600
            apply_on = current_latest_version.config.snap_start["ApplyOn"]
1✔
601
            optimization_status = SnapStartOptimizationStatus.Off
1✔
602
            if apply_on == SnapStartApplyOn.PublishedVersions:
1✔
603
                optimization_status = SnapStartOptimizationStatus.On
×
604
            snap_start = SnapStartResponse(
1✔
605
                ApplyOn=apply_on,
606
                OptimizationStatus=optimization_status,
607
            )
608
            new_version = dataclasses.replace(
1✔
609
                current_latest_version,
610
                config=dataclasses.replace(
611
                    current_latest_version.config,
612
                    last_update=None,  # versions never have a last update status
613
                    state=VersionState(
614
                        state=State.Pending,
615
                        code=StateReasonCode.Creating,
616
                        reason="The function is being created.",
617
                    ),
618
                    snap_start=snap_start,
619
                    **changes,
620
                ),
621
                id=new_id,
622
            )
623
            function.versions[next_version] = new_version
1✔
624
        return new_version, True
1✔
625

626
    def _publish_version_from_existing_version(
1✔
627
        self,
628
        function_name: str,
629
        region: str,
630
        account_id: str,
631
        description: str | None = None,
632
        revision_id: str | None = None,
633
        code_sha256: str | None = None,
634
    ) -> FunctionVersion:
635
        """
636
        Publish version from an existing, already initialized LATEST
637

638
        :param function_name: Function name
639
        :param region: region
640
        :param account_id: account id
641
        :param description: description
642
        :param revision_id: revision id (check if current version matches)
643
        :param code_sha256: code sha (check if current code matches)
644
        :return: new version
645
        """
646
        new_version, changed = self._create_version_model(
1✔
647
            function_name=function_name,
648
            region=region,
649
            account_id=account_id,
650
            description=description,
651
            revision_id=revision_id,
652
            code_sha256=code_sha256,
653
        )
654
        if not changed:
1✔
655
            return new_version
1✔
656
        self.lambda_service.publish_version(new_version)
1✔
657
        state = lambda_stores[account_id][region]
1✔
658
        function = state.functions.get(function_name)
1✔
659
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
660
        latest_version = function.versions["$LATEST"]
1✔
661
        function.versions["$LATEST"] = dataclasses.replace(
1✔
662
            latest_version, config=dataclasses.replace(latest_version.config)
663
        )
664
        return function.versions.get(new_version.id.qualifier)
1✔
665

666
    def _publish_version_with_changes(
1✔
667
        self,
668
        function_name: str,
669
        region: str,
670
        account_id: str,
671
        description: str | None = None,
672
        revision_id: str | None = None,
673
        code_sha256: str | None = None,
674
    ) -> FunctionVersion:
675
        """
676
        Publish version together with a new latest version (publish on create / update)
677

678
        :param function_name: Function name
679
        :param region: region
680
        :param account_id: account id
681
        :param description: description
682
        :param revision_id: revision id (check if current version matches)
683
        :param code_sha256: code sha (check if current code matches)
684
        :return: new version
685
        """
686
        new_version, changed = self._create_version_model(
1✔
687
            function_name=function_name,
688
            region=region,
689
            account_id=account_id,
690
            description=description,
691
            revision_id=revision_id,
692
            code_sha256=code_sha256,
693
        )
694
        if not changed:
1✔
695
            return new_version
×
696
        self.lambda_service.create_function_version(new_version)
1✔
697
        return new_version
1✔
698

699
    @staticmethod
1✔
700
    def _verify_env_variables(env_vars: dict[str, str]):
1✔
701
        dumped_env_vars = json.dumps(env_vars, separators=(",", ":"))
1✔
702
        if (
1✔
703
            len(dumped_env_vars.encode("utf-8"))
704
            > config.LAMBDA_LIMITS_MAX_FUNCTION_ENVVAR_SIZE_BYTES
705
        ):
706
            raise InvalidParameterValueException(
1✔
707
                f"Lambda was unable to configure your environment variables because the environment variables you have provided exceeded the 4KB limit. String measured: {dumped_env_vars}",
708
                Type="User",
709
            )
710

711
    @staticmethod
1✔
712
    def _validate_snapstart(snap_start: SnapStart, runtime: Runtime):
1✔
713
        apply_on = snap_start.get("ApplyOn")
1✔
714
        if apply_on not in [
1✔
715
            SnapStartApplyOn.PublishedVersions,
716
            SnapStartApplyOn.None_,
717
        ]:
718
            raise ValidationException(
1✔
719
                f"1 validation error detected: Value '{apply_on}' at 'snapStart.applyOn' failed to satisfy constraint: Member must satisfy enum value set: [PublishedVersions, None]"
720
            )
721

722
        if runtime not in SNAP_START_SUPPORTED_RUNTIMES:
1✔
723
            raise InvalidParameterValueException(
×
724
                f"{runtime} is not supported for SnapStart enabled functions.", Type="User"
725
            )
726

727
    def _validate_layers(self, new_layers: list[str], region: str, account_id: str):
1✔
728
        if len(new_layers) > LAMBDA_LAYERS_LIMIT_PER_FUNCTION:
1✔
729
            raise InvalidParameterValueException(
1✔
730
                "Cannot reference more than 5 layers.", Type="User"
731
            )
732

733
        visited_layers = dict()
1✔
734
        for layer_version_arn in new_layers:
1✔
735
            (
1✔
736
                layer_region,
737
                layer_account_id,
738
                layer_name,
739
                layer_version_str,
740
            ) = api_utils.parse_layer_arn(layer_version_arn)
741
            if layer_version_str is None:
1✔
742
                raise ValidationException(
1✔
743
                    f"1 validation error detected: Value '[{layer_version_arn}]'"
744
                    + r" at 'layers' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 140, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: (arn:[a-zA-Z0-9-]+:lambda:[a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\d{1}:\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+), Member must not be null]",
745
                )
746

747
            state = lambda_stores[layer_account_id][layer_region]
1✔
748
            layer = state.layers.get(layer_name)
1✔
749
            layer_version = None
1✔
750
            if layer is not None:
1✔
751
                layer_version = layer.layer_versions.get(layer_version_str)
1✔
752
            if layer_account_id == account_id:
1✔
753
                if region and layer_region != region:
1✔
754
                    raise InvalidParameterValueException(
1✔
755
                        f"Layers are not in the same region as the function. "
756
                        f"Layers are expected to be in region {region}.",
757
                        Type="User",
758
                    )
759
                if layer is None or layer.layer_versions.get(layer_version_str) is None:
1✔
760
                    raise InvalidParameterValueException(
1✔
761
                        f"Layer version {layer_version_arn} does not exist.", Type="User"
762
                    )
763
            else:  # External layer from other account
764
                # TODO: validate IAM layer policy here, allowing access by default for now and only checking region
765
                if region and layer_region != region:
×
766
                    # TODO: detect user or role from context when IAM users are implemented
767
                    user = "user/localstack-testing"
×
768
                    raise AccessDeniedException(
×
769
                        f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
770
                    )
771
                if layer is None or layer_version is None:
×
772
                    # Limitation: cannot fetch external layers when using the same account id as the target layer
773
                    # because we do not want to trigger the layer fetcher for every non-existing layer.
774
                    if self.layer_fetcher is None:
×
775
                        raise NotImplementedError(
776
                            "Fetching shared layers from AWS is a pro feature."
777
                        )
778

779
                    layer = self.layer_fetcher.fetch_layer(layer_version_arn)
×
780
                    if layer is None:
×
781
                        # TODO: detect user or role from context when IAM users are implemented
782
                        user = "user/localstack-testing"
×
783
                        raise AccessDeniedException(
×
784
                            f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
785
                        )
786

787
                    # Distinguish between new layer and new layer version
788
                    if layer_version is None:
×
789
                        # Create whole layer from scratch
790
                        state.layers[layer_name] = layer
×
791
                    else:
792
                        # Create layer version if another version of the same layer already exists
793
                        state.layers[layer_name].layer_versions[layer_version_str] = (
×
794
                            layer.layer_versions.get(layer_version_str)
795
                        )
796

797
            # only the first two matches in the array are considered for the error message
798
            layer_arn = ":".join(layer_version_arn.split(":")[:-1])
1✔
799
            if layer_arn in visited_layers:
1✔
800
                conflict_layer_version_arn = visited_layers[layer_arn]
1✔
801
                raise InvalidParameterValueException(
1✔
802
                    f"Two different versions of the same layer are not allowed to be referenced in the same function. {conflict_layer_version_arn} and {layer_version_arn} are versions of the same layer.",
803
                    Type="User",
804
                )
805
            visited_layers[layer_arn] = layer_version_arn
1✔
806

807
    @staticmethod
1✔
808
    def map_layers(new_layers: list[str]) -> list[LayerVersion]:
1✔
809
        layers = []
1✔
810
        for layer_version_arn in new_layers:
1✔
811
            region_name, account_id, layer_name, layer_version = api_utils.parse_layer_arn(
1✔
812
                layer_version_arn
813
            )
814
            layer = lambda_stores[account_id][region_name].layers.get(layer_name)
1✔
815
            layer_version = layer.layer_versions.get(layer_version)
1✔
816
            layers.append(layer_version)
1✔
817
        return layers
1✔
818

819
    def get_function_recursion_config(
1✔
820
        self,
821
        context: RequestContext,
822
        function_name: UnqualifiedFunctionName,
823
        **kwargs,
824
    ) -> GetFunctionRecursionConfigResponse:
825
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
826
        function_name = api_utils.get_function_name(function_name, context)
1✔
827
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
828
        return GetFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
829

830
    def put_function_recursion_config(
1✔
831
        self,
832
        context: RequestContext,
833
        function_name: UnqualifiedFunctionName,
834
        recursive_loop: RecursiveLoop,
835
        **kwargs,
836
    ) -> PutFunctionRecursionConfigResponse:
837
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
838
        function_name = api_utils.get_function_name(function_name, context)
1✔
839

840
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
841

842
        allowed_values = list(RecursiveLoop.__members__.values())
1✔
843
        if recursive_loop not in allowed_values:
1✔
844
            raise ValidationException(
1✔
845
                f"1 validation error detected: Value '{recursive_loop}' at 'recursiveLoop' failed to satisfy constraint: "
846
                f"Member must satisfy enum value set: [Terminate, Allow]"
847
            )
848

849
        fn.recursive_loop = recursive_loop
1✔
850
        return PutFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
851

852
    @handler(operation="CreateFunction", expand=False)
1✔
853
    def create_function(
1✔
854
        self,
855
        context: RequestContext,
856
        request: CreateFunctionRequest,
857
    ) -> FunctionConfiguration:
858
        context_region = context.region
1✔
859
        context_account_id = context.account_id
1✔
860

861
        zip_file = request.get("Code", {}).get("ZipFile")
1✔
862
        if zip_file and len(zip_file) > config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED:
1✔
863
            raise RequestEntityTooLargeException(
1✔
864
                f"Zipped size must be smaller than {config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED} bytes"
865
            )
866

867
        if context.request.content_length > config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE:
1✔
868
            raise RequestEntityTooLargeException(
1✔
869
                f"Request must be smaller than {config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE} bytes for the CreateFunction operation"
870
            )
871

872
        if architectures := request.get("Architectures"):
1✔
873
            if len(architectures) != 1:
1✔
874
                raise ValidationException(
1✔
875
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
876
                    f"satisfy constraint: Member must have length less than or equal to 1",
877
                )
878
            if architectures[0] not in ARCHITECTURES:
1✔
879
                raise ValidationException(
1✔
880
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
881
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
882
                    f"[x86_64, arm64], Member must not be null]",
883
                )
884

885
        if env_vars := request.get("Environment", {}).get("Variables"):
1✔
886
            self._verify_env_variables(env_vars)
1✔
887

888
        if layers := request.get("Layers", []):
1✔
889
            self._validate_layers(layers, region=context_region, account_id=context_account_id)
1✔
890

891
        if not api_utils.is_role_arn(request.get("Role")):
1✔
892
            raise ValidationException(
1✔
893
                f"1 validation error detected: Value '{request.get('Role')}'"
894
                + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
895
            )
896
        if not self.lambda_service.can_assume_role(request.get("Role"), context.region):
1✔
897
            raise InvalidParameterValueException(
×
898
                "The role defined for the function cannot be assumed by Lambda.", Type="User"
899
            )
900
        package_type = request.get("PackageType", PackageType.Zip)
1✔
901
        runtime = request.get("Runtime")
1✔
902
        self._validate_runtime(package_type, runtime)
1✔
903

904
        request_function_name = request.get("FunctionName")
1✔
905

906
        function_name, *_ = api_utils.get_name_and_qualifier(
1✔
907
            function_arn_or_name=request_function_name,
908
            qualifier=None,
909
            context=context,
910
        )
911

912
        if runtime in DEPRECATED_RUNTIMES:
1✔
913
            LOG.warning(
1✔
914
                "The Lambda runtime %s} is deprecated. "
915
                "Please upgrade the runtime for the function %s: "
916
                "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
917
                runtime,
918
                function_name,
919
            )
920
        if snap_start := request.get("SnapStart"):
1✔
921
            self._validate_snapstart(snap_start, runtime)
1✔
922
        state = lambda_stores[context_account_id][context_region]
1✔
923

924
        with self.create_fn_lock:
1✔
925
            if function_name in state.functions:
1✔
926
                raise ResourceConflictException(f"Function already exist: {function_name}")
×
927
            fn = Function(function_name=function_name)
1✔
928
            arn = VersionIdentifier(
1✔
929
                function_name=function_name,
930
                qualifier="$LATEST",
931
                region=context_region,
932
                account=context_account_id,
933
            )
934
            # save function code to s3
935
            code = None
1✔
936
            image = None
1✔
937
            image_config = None
1✔
938
            runtime_version_config = RuntimeVersionConfig(
1✔
939
                # Limitation: the runtime id (presumably sha256 of image) is currently hardcoded
940
                # Potential implementation: provide (cached) sha256 hash of used Docker image
941
                RuntimeVersionArn=f"arn:{context.partition}:lambda:{context_region}::runtime:8eeff65f6809a3ce81507fe733fe09b835899b99481ba22fd75b5a7338290ec1"
942
            )
943
            request_code = request.get("Code")
1✔
944
            if package_type == PackageType.Zip:
1✔
945
                # TODO verify if correct combination of code is set
946
                if zip_file := request_code.get("ZipFile"):
1✔
947
                    code = store_lambda_archive(
1✔
948
                        archive_file=zip_file,
949
                        function_name=function_name,
950
                        region_name=context_region,
951
                        account_id=context_account_id,
952
                    )
953
                elif s3_bucket := request_code.get("S3Bucket"):
1✔
954
                    s3_key = request_code["S3Key"]
1✔
955
                    s3_object_version = request_code.get("S3ObjectVersion")
1✔
956
                    code = store_s3_bucket_archive(
1✔
957
                        archive_bucket=s3_bucket,
958
                        archive_key=s3_key,
959
                        archive_version=s3_object_version,
960
                        function_name=function_name,
961
                        region_name=context_region,
962
                        account_id=context_account_id,
963
                    )
964
                else:
965
                    raise LambdaServiceException("Gotta have s3 bucket or zip file")
×
966
            elif package_type == PackageType.Image:
1✔
967
                image = request_code.get("ImageUri")
1✔
968
                if not image:
1✔
969
                    raise LambdaServiceException("Gotta have an image when package type is image")
×
970
                image = create_image_code(image_uri=image)
1✔
971

972
                image_config_req = request.get("ImageConfig", {})
1✔
973
                image_config = ImageConfig(
1✔
974
                    command=image_config_req.get("Command"),
975
                    entrypoint=image_config_req.get("EntryPoint"),
976
                    working_directory=image_config_req.get("WorkingDirectory"),
977
                )
978
                # Runtime management controls are not available when providing a custom image
979
                runtime_version_config = None
1✔
980
            if "LoggingConfig" in request:
1✔
981
                logging_config = request["LoggingConfig"]
1✔
982
                LOG.warning(
1✔
983
                    "Advanced Lambda Logging Configuration is currently mocked "
984
                    "and will not impact the logging behavior. "
985
                    "Please create a feature request if needed."
986
                )
987

988
                # when switching to JSON, app and system level log is auto set to INFO
989
                if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
990
                    logging_config = {
1✔
991
                        "ApplicationLogLevel": "INFO",
992
                        "SystemLogLevel": "INFO",
993
                        "LogGroup": f"/aws/lambda/{function_name}",
994
                    } | logging_config
995
                else:
996
                    logging_config = (
×
997
                        LoggingConfig(
998
                            LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
999
                        )
1000
                        | logging_config
1001
                    )
1002

1003
            else:
1004
                logging_config = LoggingConfig(
1✔
1005
                    LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
1006
                )
1007

1008
            version = FunctionVersion(
1✔
1009
                id=arn,
1010
                config=VersionFunctionConfiguration(
1011
                    last_modified=api_utils.format_lambda_date(datetime.datetime.now()),
1012
                    description=request.get("Description", ""),
1013
                    role=request["Role"],
1014
                    timeout=request.get("Timeout", LAMBDA_DEFAULT_TIMEOUT),
1015
                    runtime=request.get("Runtime"),
1016
                    memory_size=request.get("MemorySize", LAMBDA_DEFAULT_MEMORY_SIZE),
1017
                    handler=request.get("Handler"),
1018
                    package_type=package_type,
1019
                    environment=env_vars,
1020
                    architectures=request.get("Architectures") or [Architecture.x86_64],
1021
                    tracing_config_mode=request.get("TracingConfig", {}).get(
1022
                        "Mode", TracingMode.PassThrough
1023
                    ),
1024
                    image=image,
1025
                    image_config=image_config,
1026
                    code=code,
1027
                    layers=self.map_layers(layers),
1028
                    internal_revision=short_uid(),
1029
                    ephemeral_storage=LambdaEphemeralStorage(
1030
                        size=request.get("EphemeralStorage", {}).get("Size", 512)
1031
                    ),
1032
                    snap_start=SnapStartResponse(
1033
                        ApplyOn=request.get("SnapStart", {}).get("ApplyOn", SnapStartApplyOn.None_),
1034
                        OptimizationStatus=SnapStartOptimizationStatus.Off,
1035
                    ),
1036
                    runtime_version_config=runtime_version_config,
1037
                    dead_letter_arn=request.get("DeadLetterConfig", {}).get("TargetArn"),
1038
                    vpc_config=self._build_vpc_config(
1039
                        context_account_id, context_region, request.get("VpcConfig")
1040
                    ),
1041
                    state=VersionState(
1042
                        state=State.Pending,
1043
                        code=StateReasonCode.Creating,
1044
                        reason="The function is being created.",
1045
                    ),
1046
                    logging_config=logging_config,
1047
                ),
1048
            )
1049
            fn.versions["$LATEST"] = version
1✔
1050
            state.functions[function_name] = fn
1✔
1051
        function_counter.labels(
1✔
1052
            operation=FunctionOperation.create,
1053
            runtime=runtime or "n/a",
1054
            status=FunctionStatus.success,
1055
            invocation_type="n/a",
1056
            package_type=package_type,
1057
        )
1058
        self.lambda_service.create_function_version(version)
1✔
1059

1060
        if tags := request.get("Tags"):
1✔
1061
            # This will check whether the function exists.
1062
            self._store_tags(arn.unqualified_arn(), tags)
1✔
1063

1064
        if request.get("Publish"):
1✔
1065
            version = self._publish_version_with_changes(
1✔
1066
                function_name=function_name, region=context_region, account_id=context_account_id
1067
            )
1068

1069
        if config.LAMBDA_SYNCHRONOUS_CREATE:
1✔
1070
            # block via retrying until "terminal" condition reached before returning
1071
            if not poll_condition(
×
1072
                lambda: get_function_version(
1073
                    function_name, version.id.qualifier, version.id.account, version.id.region
1074
                ).config.state.state
1075
                in [State.Active, State.Failed],
1076
                timeout=10,
1077
            ):
1078
                LOG.warning(
×
1079
                    "LAMBDA_SYNCHRONOUS_CREATE is active, but waiting for %s reached timeout.",
1080
                    function_name,
1081
                )
1082

1083
        return api_utils.map_config_out(
1✔
1084
            version, return_qualified_arn=False, return_update_status=False
1085
        )
1086

1087
    def _validate_runtime(self, package_type, runtime):
1✔
1088
        runtimes = ALL_RUNTIMES
1✔
1089
        if config.LAMBDA_RUNTIME_VALIDATION:
1✔
1090
            runtimes = list(itertools.chain(RUNTIMES_AGGREGATED.values()))
1✔
1091

1092
        if package_type == PackageType.Zip and runtime not in runtimes:
1✔
1093
            # deprecated runtimes have different error
1094
            if runtime in DEPRECATED_RUNTIMES:
1✔
1095
                HINT_LOG.info(
1✔
1096
                    "Set env variable LAMBDA_RUNTIME_VALIDATION to 0"
1097
                    " in order to allow usage of deprecated runtimes"
1098
                )
1099
                self._check_for_recomended_migration_target(runtime)
1✔
1100

1101
            raise InvalidParameterValueException(
1✔
1102
                f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1103
                Type="User",
1104
            )
1105

1106
    def _check_for_recomended_migration_target(self, deprecated_runtime):
1✔
1107
        # AWS offers recommended runtime for migration for "newly" deprecated runtimes
1108
        # in order to preserve parity with error messages we need the code bellow
1109
        latest_runtime = DEPRECATED_RUNTIMES_UPGRADES.get(deprecated_runtime)
1✔
1110

1111
        if latest_runtime is not None:
1✔
1112
            LOG.debug(
1✔
1113
                "The Lambda runtime %s is deprecated. Please upgrade to a supported Lambda runtime such as %s.",
1114
                deprecated_runtime,
1115
                latest_runtime,
1116
            )
1117
            raise InvalidParameterValueException(
1✔
1118
                f"The runtime parameter of {deprecated_runtime} is no longer supported for creating or updating AWS Lambda functions. We recommend you use a supported runtime while creating or updating functions.",
1119
                Type="User",
1120
            )
1121

1122
    @handler(operation="UpdateFunctionConfiguration", expand=False)
1✔
1123
    def update_function_configuration(
1✔
1124
        self, context: RequestContext, request: UpdateFunctionConfigurationRequest
1125
    ) -> FunctionConfiguration:
1126
        """updates the $LATEST version of the function"""
1127
        function_name = request.get("FunctionName")
1✔
1128

1129
        # in case we got ARN or partial ARN
1130
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1131
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1132
        state = lambda_stores[account_id][region]
1✔
1133

1134
        if function_name not in state.functions:
1✔
1135
            raise ResourceNotFoundException(
×
1136
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1137
                Type="User",
1138
            )
1139
        function = state.functions[function_name]
1✔
1140

1141
        # TODO: lock modification of latest version
1142
        # TODO: notify service for changes relevant to re-provisioning of $LATEST
1143
        latest_version = function.latest()
1✔
1144
        latest_version_config = latest_version.config
1✔
1145

1146
        revision_id = request.get("RevisionId")
1✔
1147
        if revision_id and revision_id != latest_version.config.revision_id:
1✔
1148
            raise PreconditionFailedException(
1✔
1149
                "The Revision Id provided does not match the latest Revision Id. "
1150
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1151
                Type="User",
1152
            )
1153

1154
        replace_kwargs = {}
1✔
1155
        if "EphemeralStorage" in request:
1✔
1156
            replace_kwargs["ephemeral_storage"] = LambdaEphemeralStorage(
×
1157
                request.get("EphemeralStorage", {}).get("Size", 512)
1158
            )  # TODO: do defaults here apply as well?
1159

1160
        if "Role" in request:
1✔
1161
            if not api_utils.is_role_arn(request["Role"]):
1✔
1162
                raise ValidationException(
1✔
1163
                    f"1 validation error detected: Value '{request.get('Role')}'"
1164
                    + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
1165
                )
1166
            replace_kwargs["role"] = request["Role"]
1✔
1167

1168
        if "Description" in request:
1✔
1169
            replace_kwargs["description"] = request["Description"]
1✔
1170

1171
        if "Timeout" in request:
1✔
1172
            replace_kwargs["timeout"] = request["Timeout"]
1✔
1173

1174
        if "MemorySize" in request:
1✔
1175
            replace_kwargs["memory_size"] = request["MemorySize"]
1✔
1176

1177
        if "DeadLetterConfig" in request:
1✔
1178
            replace_kwargs["dead_letter_arn"] = request.get("DeadLetterConfig", {}).get("TargetArn")
1✔
1179

1180
        if vpc_config := request.get("VpcConfig"):
1✔
1181
            replace_kwargs["vpc_config"] = self._build_vpc_config(account_id, region, vpc_config)
1✔
1182

1183
        if "Handler" in request:
1✔
1184
            replace_kwargs["handler"] = request["Handler"]
1✔
1185

1186
        if "Runtime" in request:
1✔
1187
            runtime = request["Runtime"]
1✔
1188

1189
            if runtime not in ALL_RUNTIMES:
1✔
1190
                raise InvalidParameterValueException(
1✔
1191
                    f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1192
                    Type="User",
1193
                )
1194
            if runtime in DEPRECATED_RUNTIMES:
1✔
1195
                LOG.warning(
×
1196
                    "The Lambda runtime %s is deprecated. "
1197
                    "Please upgrade the runtime for the function %s: "
1198
                    "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
1199
                    runtime,
1200
                    function_name,
1201
                )
1202
            replace_kwargs["runtime"] = request["Runtime"]
1✔
1203

1204
        if snap_start := request.get("SnapStart"):
1✔
1205
            runtime = replace_kwargs.get("runtime") or latest_version_config.runtime
1✔
1206
            self._validate_snapstart(snap_start, runtime)
1✔
1207
            replace_kwargs["snap_start"] = SnapStartResponse(
1✔
1208
                ApplyOn=snap_start.get("ApplyOn", SnapStartApplyOn.None_),
1209
                OptimizationStatus=SnapStartOptimizationStatus.Off,
1210
            )
1211

1212
        if "Environment" in request:
1✔
1213
            if env_vars := request.get("Environment", {}).get("Variables", {}):
1✔
1214
                self._verify_env_variables(env_vars)
1✔
1215
            replace_kwargs["environment"] = env_vars
1✔
1216

1217
        if "Layers" in request:
1✔
1218
            new_layers = request["Layers"]
1✔
1219
            if new_layers:
1✔
1220
                self._validate_layers(new_layers, region=region, account_id=account_id)
1✔
1221
            replace_kwargs["layers"] = self.map_layers(new_layers)
1✔
1222

1223
        if "ImageConfig" in request:
1✔
1224
            new_image_config = request["ImageConfig"]
1✔
1225
            replace_kwargs["image_config"] = ImageConfig(
1✔
1226
                command=new_image_config.get("Command"),
1227
                entrypoint=new_image_config.get("EntryPoint"),
1228
                working_directory=new_image_config.get("WorkingDirectory"),
1229
            )
1230

1231
        if "LoggingConfig" in request:
1✔
1232
            logging_config = request["LoggingConfig"]
1✔
1233
            LOG.warning(
1✔
1234
                "Advanced Lambda Logging Configuration is currently mocked "
1235
                "and will not impact the logging behavior. "
1236
                "Please create a feature request if needed."
1237
            )
1238

1239
            # when switching to JSON, app and system level log is auto set to INFO
1240
            if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
1241
                logging_config = {
1✔
1242
                    "ApplicationLogLevel": "INFO",
1243
                    "SystemLogLevel": "INFO",
1244
                } | logging_config
1245

1246
            last_config = latest_version_config.logging_config
1✔
1247

1248
            # add partial update
1249
            new_logging_config = last_config | logging_config
1✔
1250

1251
            # in case we switched from JSON to Text we need to remove LogLevel keys
1252
            if (
1✔
1253
                new_logging_config.get("LogFormat") == LogFormat.Text
1254
                and last_config.get("LogFormat") == LogFormat.JSON
1255
            ):
1256
                new_logging_config.pop("ApplicationLogLevel", None)
1✔
1257
                new_logging_config.pop("SystemLogLevel", None)
1✔
1258

1259
            replace_kwargs["logging_config"] = new_logging_config
1✔
1260

1261
        if "TracingConfig" in request:
1✔
1262
            new_mode = request.get("TracingConfig", {}).get("Mode")
×
1263
            if new_mode:
×
1264
                replace_kwargs["tracing_config_mode"] = new_mode
×
1265

1266
        new_latest_version = dataclasses.replace(
1✔
1267
            latest_version,
1268
            config=dataclasses.replace(
1269
                latest_version_config,
1270
                last_modified=api_utils.generate_lambda_date(),
1271
                internal_revision=short_uid(),
1272
                last_update=UpdateStatus(
1273
                    status=LastUpdateStatus.InProgress,
1274
                    code="Creating",
1275
                    reason="The function is being created.",
1276
                ),
1277
                **replace_kwargs,
1278
            ),
1279
        )
1280
        function.versions["$LATEST"] = new_latest_version  # TODO: notify
1✔
1281
        self.lambda_service.update_version(new_version=new_latest_version)
1✔
1282

1283
        return api_utils.map_config_out(new_latest_version)
1✔
1284

1285
    @handler(operation="UpdateFunctionCode", expand=False)
1✔
1286
    def update_function_code(
1✔
1287
        self, context: RequestContext, request: UpdateFunctionCodeRequest
1288
    ) -> FunctionConfiguration:
1289
        """updates the $LATEST version of the function"""
1290
        # only supports normal zip packaging atm
1291
        # if request.get("Publish"):
1292
        #     self.lambda_service.create_function_version()
1293

1294
        function_name = request.get("FunctionName")
1✔
1295
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1296
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1297

1298
        store = lambda_stores[account_id][region]
1✔
1299
        if function_name not in store.functions:
1✔
1300
            raise ResourceNotFoundException(
×
1301
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1302
                Type="User",
1303
            )
1304
        function = store.functions[function_name]
1✔
1305

1306
        revision_id = request.get("RevisionId")
1✔
1307
        if revision_id and revision_id != function.latest().config.revision_id:
1✔
1308
            raise PreconditionFailedException(
1✔
1309
                "The Revision Id provided does not match the latest Revision Id. "
1310
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1311
                Type="User",
1312
            )
1313

1314
        # TODO verify if correct combination of code is set
1315
        image = None
1✔
1316
        if (
1✔
1317
            request.get("ZipFile") or request.get("S3Bucket")
1318
        ) and function.latest().config.package_type == PackageType.Image:
1319
            raise InvalidParameterValueException(
1✔
1320
                "Please provide ImageUri when updating a function with packageType Image.",
1321
                Type="User",
1322
            )
1323
        elif request.get("ImageUri") and function.latest().config.package_type == PackageType.Zip:
1✔
1324
            raise InvalidParameterValueException(
1✔
1325
                "Please don't provide ImageUri when updating a function with packageType Zip.",
1326
                Type="User",
1327
            )
1328

1329
        if zip_file := request.get("ZipFile"):
1✔
1330
            code = store_lambda_archive(
1✔
1331
                archive_file=zip_file,
1332
                function_name=function_name,
1333
                region_name=region,
1334
                account_id=account_id,
1335
            )
1336
        elif s3_bucket := request.get("S3Bucket"):
1✔
1337
            s3_key = request["S3Key"]
1✔
1338
            s3_object_version = request.get("S3ObjectVersion")
1✔
1339
            code = store_s3_bucket_archive(
1✔
1340
                archive_bucket=s3_bucket,
1341
                archive_key=s3_key,
1342
                archive_version=s3_object_version,
1343
                function_name=function_name,
1344
                region_name=region,
1345
                account_id=account_id,
1346
            )
1347
        elif image := request.get("ImageUri"):
1✔
1348
            code = None
1✔
1349
            image = create_image_code(image_uri=image)
1✔
1350
        else:
1351
            raise LambdaServiceException("Gotta have s3 bucket or zip file or image")
×
1352

1353
        old_function_version = function.versions.get("$LATEST")
1✔
1354
        replace_kwargs = {"code": code} if code else {"image": image}
1✔
1355

1356
        if architectures := request.get("Architectures"):
1✔
1357
            if len(architectures) != 1:
×
1358
                raise ValidationException(
×
1359
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1360
                    f"satisfy constraint: Member must have length less than or equal to 1",
1361
                )
1362
            # An empty list of architectures is also forbidden. Further exceptions are tested here for create_function:
1363
            # tests.aws.services.lambda_.test_lambda_api.TestLambdaFunction.test_create_lambda_exceptions
1364
            if architectures[0] not in ARCHITECTURES:
×
1365
                raise ValidationException(
×
1366
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1367
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
1368
                    f"[x86_64, arm64], Member must not be null]",
1369
                )
1370
            replace_kwargs["architectures"] = architectures
×
1371

1372
        config = dataclasses.replace(
1✔
1373
            old_function_version.config,
1374
            internal_revision=short_uid(),
1375
            last_modified=api_utils.generate_lambda_date(),
1376
            last_update=UpdateStatus(
1377
                status=LastUpdateStatus.InProgress,
1378
                code="Creating",
1379
                reason="The function is being created.",
1380
            ),
1381
            **replace_kwargs,
1382
        )
1383
        function_version = dataclasses.replace(old_function_version, config=config)
1✔
1384
        function.versions["$LATEST"] = function_version
1✔
1385

1386
        self.lambda_service.update_version(new_version=function_version)
1✔
1387
        if request.get("Publish"):
1✔
1388
            function_version = self._publish_version_with_changes(
1✔
1389
                function_name=function_name, region=region, account_id=account_id
1390
            )
1391
        return api_utils.map_config_out(
1✔
1392
            function_version, return_qualified_arn=bool(request.get("Publish"))
1393
        )
1394

1395
    # TODO: does deleting the latest published version affect the next versions number?
1396
    # TODO: what happens when we call this with a qualifier and a fully qualified ARN? (+ conflicts?)
1397
    # TODO: test different ARN patterns (shorthand ARN?)
1398
    # TODO: test deleting across regions?
1399
    # TODO: test mismatch between context region and region in ARN
1400
    # TODO: test qualifier $LATEST, alias-name and version
1401
    def delete_function(
1✔
1402
        self,
1403
        context: RequestContext,
1404
        function_name: FunctionName,
1405
        qualifier: Qualifier = None,
1406
        **kwargs,
1407
    ) -> None:
1408
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1409
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1410
            function_name, qualifier, context
1411
        )
1412

1413
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1414
            raise InvalidParameterValueException(
×
1415
                "Deletion of aliases is not currently supported.",
1416
                Type="User",
1417
            )
1418

1419
        store = lambda_stores[account_id][region]
1✔
1420
        if qualifier == "$LATEST":
1✔
1421
            raise InvalidParameterValueException(
1✔
1422
                "$LATEST version cannot be deleted without deleting the function.", Type="User"
1423
            )
1424

1425
        if function_name not in store.functions:
1✔
1426
            e = ResourceNotFoundException(
1✔
1427
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1428
                Type="User",
1429
            )
1430
            raise e
1✔
1431
        function = store.functions.get(function_name)
1✔
1432

1433
        if qualifier:
1✔
1434
            # delete a version of the function
1435
            version = function.versions.pop(qualifier, None)
1✔
1436
            if version:
1✔
1437
                self.lambda_service.stop_version(version.id.qualified_arn())
1✔
1438
                destroy_code_if_not_used(code=version.config.code, function=function)
1✔
1439
        else:
1440
            # delete the whole function
1441
            # TODO: introduce locking for safe deletion: We could create a new version at the API layer before
1442
            #  the old version gets cleaned up in the internal lambda service.
1443
            function = store.functions.pop(function_name)
1✔
1444
            for version in function.versions.values():
1✔
1445
                self.lambda_service.stop_version(qualified_arn=version.id.qualified_arn())
1✔
1446
                # we can safely destroy the code here
1447
                if version.config.code:
1✔
1448
                    version.config.code.destroy()
1✔
1449

1450
    def list_functions(
1✔
1451
        self,
1452
        context: RequestContext,
1453
        master_region: MasterRegion = None,  # (only relevant for lambda@edge)
1454
        function_version: FunctionVersionApi = None,
1455
        marker: String = None,
1456
        max_items: MaxListItems = None,
1457
        **kwargs,
1458
    ) -> ListFunctionsResponse:
1459
        state = lambda_stores[context.account_id][context.region]
1✔
1460

1461
        if function_version and function_version != FunctionVersionApi.ALL:
1✔
1462
            raise ValidationException(
1✔
1463
                f"1 validation error detected: Value '{function_version}'"
1464
                + " at 'functionVersion' failed to satisfy constraint: Member must satisfy enum value set: [ALL]"
1465
            )
1466

1467
        if function_version == FunctionVersionApi.ALL:
1✔
1468
            # include all versions for all function
1469
            versions = [v for f in state.functions.values() for v in f.versions.values()]
1✔
1470
            return_qualified_arn = True
1✔
1471
        else:
1472
            versions = [f.latest() for f in state.functions.values()]
1✔
1473
            return_qualified_arn = False
1✔
1474

1475
        versions = [
1✔
1476
            api_utils.map_to_list_response(
1477
                api_utils.map_config_out(fc, return_qualified_arn=return_qualified_arn)
1478
            )
1479
            for fc in versions
1480
        ]
1481
        versions = PaginatedList(versions)
1✔
1482
        page, token = versions.get_page(
1✔
1483
            lambda version: version["FunctionArn"],
1484
            marker,
1485
            max_items,
1486
        )
1487
        return ListFunctionsResponse(Functions=page, NextMarker=token)
1✔
1488

1489
    def get_function(
1✔
1490
        self,
1491
        context: RequestContext,
1492
        function_name: NamespacedFunctionName,
1493
        qualifier: Qualifier = None,
1494
        **kwargs,
1495
    ) -> GetFunctionResponse:
1496
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1497
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1498
            function_name, qualifier, context
1499
        )
1500

1501
        fn = lambda_stores[account_id][region].functions.get(function_name)
1✔
1502
        if fn is None:
1✔
1503
            if qualifier is None:
1✔
1504
                raise ResourceNotFoundException(
1✔
1505
                    f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
1506
                    Type="User",
1507
                )
1508
            else:
1509
                raise ResourceNotFoundException(
1✔
1510
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
1511
                    Type="User",
1512
                )
1513
        alias_name = None
1✔
1514
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1515
            if qualifier not in fn.aliases:
1✔
1516
                alias_arn = api_utils.qualified_lambda_arn(
1✔
1517
                    function_name, qualifier, account_id, region
1518
                )
1519
                raise ResourceNotFoundException(f"Function not found: {alias_arn}", Type="User")
1✔
1520
            alias_name = qualifier
1✔
1521
            qualifier = fn.aliases[alias_name].function_version
1✔
1522

1523
        version = get_function_version(
1✔
1524
            function_name=function_name,
1525
            qualifier=qualifier,
1526
            account_id=account_id,
1527
            region=region,
1528
        )
1529
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
1530
        additional_fields = {}
1✔
1531
        if tags:
1✔
1532
            additional_fields["Tags"] = tags
1✔
1533
        code_location = None
1✔
1534
        if code := version.config.code:
1✔
1535
            code_location = FunctionCodeLocation(
1✔
1536
                Location=code.generate_presigned_url(), RepositoryType="S3"
1537
            )
1538
        elif image := version.config.image:
1✔
1539
            code_location = FunctionCodeLocation(
1✔
1540
                ImageUri=image.image_uri,
1541
                RepositoryType=image.repository_type,
1542
                ResolvedImageUri=image.resolved_image_uri,
1543
            )
1544

1545
        return GetFunctionResponse(
1✔
1546
            Configuration=api_utils.map_config_out(
1547
                version, return_qualified_arn=bool(qualifier), alias_name=alias_name
1548
            ),
1549
            Code=code_location,  # TODO
1550
            **additional_fields,
1551
            # Concurrency={},  # TODO
1552
        )
1553

1554
    def get_function_configuration(
1✔
1555
        self,
1556
        context: RequestContext,
1557
        function_name: NamespacedFunctionName,
1558
        qualifier: Qualifier = None,
1559
        **kwargs,
1560
    ) -> FunctionConfiguration:
1561
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1562
        # CAVE: THIS RETURN VALUE IS *NOT* THE SAME AS IN get_function (!) but seems to be only configuration part?
1563
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1564
            function_name, qualifier, context
1565
        )
1566
        version = get_function_version(
1✔
1567
            function_name=function_name,
1568
            qualifier=qualifier,
1569
            account_id=account_id,
1570
            region=region,
1571
        )
1572
        return api_utils.map_config_out(version, return_qualified_arn=bool(qualifier))
1✔
1573

1574
    def invoke(
1✔
1575
        self,
1576
        context: RequestContext,
1577
        function_name: NamespacedFunctionName,
1578
        invocation_type: InvocationType = None,
1579
        log_type: LogType = None,
1580
        client_context: String = None,
1581
        payload: IO[Blob] = None,
1582
        qualifier: Qualifier = None,
1583
        **kwargs,
1584
    ) -> InvocationResponse:
1585
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1586
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1587
            function_name, qualifier, context
1588
        )
1589

1590
        time_before = time.perf_counter()
1✔
1591
        try:
1✔
1592
            invocation_result = self.lambda_service.invoke(
1✔
1593
                function_name=function_name,
1594
                qualifier=qualifier,
1595
                region=region,
1596
                account_id=account_id,
1597
                invocation_type=invocation_type,
1598
                client_context=client_context,
1599
                request_id=context.request_id,
1600
                trace_context=context.trace_context,
1601
                payload=payload.read() if payload else None,
1602
            )
1603
        except ServiceException:
1✔
1604
            raise
1✔
1605
        except EnvironmentStartupTimeoutException as e:
1✔
1606
            raise LambdaServiceException(
1✔
1607
                f"[{context.request_id}] Timeout while starting up lambda environment for function {function_name}:{qualifier}"
1608
            ) from e
1609
        except Exception as e:
1✔
1610
            LOG.error(
1✔
1611
                "[%s] Error while invoking lambda %s",
1612
                context.request_id,
1613
                function_name,
1614
                exc_info=LOG.isEnabledFor(logging.DEBUG),
1615
            )
1616
            raise LambdaServiceException(
1✔
1617
                f"[{context.request_id}] Internal error while executing lambda {function_name}:{qualifier}. Caused by {type(e).__name__}: {e}"
1618
            ) from e
1619

1620
        if invocation_type == InvocationType.Event:
1✔
1621
            # This happens when invocation type is event
1622
            return InvocationResponse(StatusCode=202)
1✔
1623
        if invocation_type == InvocationType.DryRun:
1✔
1624
            # This happens when invocation type is dryrun
1625
            return InvocationResponse(StatusCode=204)
1✔
1626
        LOG.debug("Lambda invocation duration: %0.2fms", (time.perf_counter() - time_before) * 1000)
1✔
1627

1628
        response = InvocationResponse(
1✔
1629
            StatusCode=200,
1630
            Payload=invocation_result.payload,
1631
            ExecutedVersion=invocation_result.executed_version,
1632
        )
1633

1634
        if invocation_result.is_error:
1✔
1635
            response["FunctionError"] = "Unhandled"
1✔
1636

1637
        if log_type == LogType.Tail:
1✔
1638
            response["LogResult"] = to_str(
1✔
1639
                base64.b64encode(to_bytes(invocation_result.logs)[-4096:])
1640
            )
1641

1642
        return response
1✔
1643

1644
    # Version operations
1645
    def publish_version(
1✔
1646
        self,
1647
        context: RequestContext,
1648
        function_name: FunctionName,
1649
        code_sha256: String = None,
1650
        description: Description = None,
1651
        revision_id: String = None,
1652
        **kwargs,
1653
    ) -> FunctionConfiguration:
1654
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1655
        function_name = api_utils.get_function_name(function_name, context)
1✔
1656
        new_version = self._publish_version_from_existing_version(
1✔
1657
            function_name=function_name,
1658
            description=description,
1659
            account_id=account_id,
1660
            region=region,
1661
            revision_id=revision_id,
1662
            code_sha256=code_sha256,
1663
        )
1664
        return api_utils.map_config_out(new_version, return_qualified_arn=True)
1✔
1665

1666
    def list_versions_by_function(
1✔
1667
        self,
1668
        context: RequestContext,
1669
        function_name: NamespacedFunctionName,
1670
        marker: String = None,
1671
        max_items: MaxListItems = None,
1672
        **kwargs,
1673
    ) -> ListVersionsByFunctionResponse:
1674
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1675
        function_name = api_utils.get_function_name(function_name, context)
1✔
1676
        function = self._get_function(
1✔
1677
            function_name=function_name, region=region, account_id=account_id
1678
        )
1679
        versions = [
1✔
1680
            api_utils.map_to_list_response(
1681
                api_utils.map_config_out(version=version, return_qualified_arn=True)
1682
            )
1683
            for version in function.versions.values()
1684
        ]
1685
        items = PaginatedList(versions)
1✔
1686
        page, token = items.get_page(
1✔
1687
            lambda item: item,
1688
            marker,
1689
            max_items,
1690
        )
1691
        return ListVersionsByFunctionResponse(Versions=page, NextMarker=token)
1✔
1692

1693
    # Alias
1694

1695
    def _create_routing_config_model(
1✔
1696
        self, routing_config_dict: dict[str, float], function_version: FunctionVersion
1697
    ):
1698
        if len(routing_config_dict) > 1:
1✔
1699
            raise InvalidParameterValueException(
1✔
1700
                "Number of items in AdditionalVersionWeights cannot be greater than 1",
1701
                Type="User",
1702
            )
1703
        # should be exactly one item here, still iterating, might be supported in the future
1704
        for key, value in routing_config_dict.items():
1✔
1705
            if value < 0.0 or value >= 1.0:
1✔
1706
                raise ValidationException(
1✔
1707
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map value must satisfy constraint: [Member must have value less than or equal to 1.0, Member must have value greater than or equal to 0.0, Member must not be null]"
1708
                )
1709
            if key == function_version.id.qualifier:
1✔
1710
                raise InvalidParameterValueException(
1✔
1711
                    f"Invalid function version {function_version.id.qualifier}. Function version {function_version.id.qualifier} is already included in routing configuration.",
1712
                    Type="User",
1713
                )
1714
            # check if version target is latest, then no routing config is allowed
1715
            if function_version.id.qualifier == "$LATEST":
1✔
1716
                raise InvalidParameterValueException(
1✔
1717
                    "$LATEST is not supported for an alias pointing to more than 1 version"
1718
                )
1719
            if not api_utils.qualifier_is_version(key):
1✔
1720
                raise ValidationException(
1✔
1721
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map keys must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: [0-9]+, Member must not be null]"
1722
                )
1723

1724
            # checking if the version in the config exists
1725
            get_function_version(
1✔
1726
                function_name=function_version.id.function_name,
1727
                qualifier=key,
1728
                region=function_version.id.region,
1729
                account_id=function_version.id.account,
1730
            )
1731
        return AliasRoutingConfig(version_weights=routing_config_dict)
1✔
1732

1733
    def create_alias(
1✔
1734
        self,
1735
        context: RequestContext,
1736
        function_name: FunctionName,
1737
        name: Alias,
1738
        function_version: Version,
1739
        description: Description = None,
1740
        routing_config: AliasRoutingConfiguration = None,
1741
        **kwargs,
1742
    ) -> AliasConfiguration:
1743
        if not api_utils.qualifier_is_alias(name):
1✔
1744
            raise ValidationException(
1✔
1745
                f"1 validation error detected: Value '{name}' at 'name' failed to satisfy constraint: Member must satisfy regular expression pattern: (?!^[0-9]+$)([a-zA-Z0-9-_]+)"
1746
            )
1747

1748
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1749
        function_name = api_utils.get_function_name(function_name, context)
1✔
1750
        target_version = get_function_version(
1✔
1751
            function_name=function_name,
1752
            qualifier=function_version,
1753
            region=region,
1754
            account_id=account_id,
1755
        )
1756
        function = self._get_function(
1✔
1757
            function_name=function_name, region=region, account_id=account_id
1758
        )
1759
        # description is always present, if not specified it's an empty string
1760
        description = description or ""
1✔
1761
        with function.lock:
1✔
1762
            if existing_alias := function.aliases.get(name):
1✔
1763
                raise ResourceConflictException(
1✔
1764
                    f"Alias already exists: {api_utils.map_alias_out(alias=existing_alias, function=function)['AliasArn']}",
1765
                    Type="User",
1766
                )
1767
            # checking if the version exists
1768
            routing_configuration = None
1✔
1769
            if routing_config and (
1✔
1770
                routing_config_dict := routing_config.get("AdditionalVersionWeights")
1771
            ):
1772
                routing_configuration = self._create_routing_config_model(
1✔
1773
                    routing_config_dict, target_version
1774
                )
1775

1776
            alias = VersionAlias(
1✔
1777
                name=name,
1778
                function_version=function_version,
1779
                description=description,
1780
                routing_configuration=routing_configuration,
1781
            )
1782
            function.aliases[name] = alias
1✔
1783
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1784

1785
    def list_aliases(
1✔
1786
        self,
1787
        context: RequestContext,
1788
        function_name: FunctionName,
1789
        function_version: Version = None,
1790
        marker: String = None,
1791
        max_items: MaxListItems = None,
1792
        **kwargs,
1793
    ) -> ListAliasesResponse:
1794
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1795
        function_name = api_utils.get_function_name(function_name, context)
1✔
1796
        function = self._get_function(
1✔
1797
            function_name=function_name, region=region, account_id=account_id
1798
        )
1799
        aliases = [
1✔
1800
            api_utils.map_alias_out(alias, function)
1801
            for alias in function.aliases.values()
1802
            if function_version is None or alias.function_version == function_version
1803
        ]
1804

1805
        aliases = PaginatedList(aliases)
1✔
1806
        page, token = aliases.get_page(
1✔
1807
            lambda alias: alias["AliasArn"],
1808
            marker,
1809
            max_items,
1810
        )
1811

1812
        return ListAliasesResponse(Aliases=page, NextMarker=token)
1✔
1813

1814
    def delete_alias(
1✔
1815
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
1816
    ) -> None:
1817
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1818
        function_name = api_utils.get_function_name(function_name, context)
1✔
1819
        function = self._get_function(
1✔
1820
            function_name=function_name, region=region, account_id=account_id
1821
        )
1822
        version_alias = function.aliases.pop(name, None)
1✔
1823

1824
        # cleanup related resources
1825
        if name in function.provisioned_concurrency_configs:
1✔
1826
            function.provisioned_concurrency_configs.pop(name)
1✔
1827

1828
        # TODO: Allow for deactivating/unregistering specific Lambda URLs
1829
        if version_alias and name in function.function_url_configs:
1✔
1830
            url_config = function.function_url_configs.pop(name)
1✔
1831
            LOG.debug(
1✔
1832
                "Stopping aliased Lambda Function URL %s for %s",
1833
                url_config.url,
1834
                url_config.function_name,
1835
            )
1836

1837
    def get_alias(
1✔
1838
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
1839
    ) -> AliasConfiguration:
1840
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1841
        function_name = api_utils.get_function_name(function_name, context)
1✔
1842
        function = self._get_function(
1✔
1843
            function_name=function_name, region=region, account_id=account_id
1844
        )
1845
        if not (alias := function.aliases.get(name)):
1✔
1846
            raise ResourceNotFoundException(
1✔
1847
                f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name=function_name, qualifier=name, region=region, account=account_id)}",
1848
                Type="User",
1849
            )
1850
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1851

1852
    def update_alias(
1✔
1853
        self,
1854
        context: RequestContext,
1855
        function_name: FunctionName,
1856
        name: Alias,
1857
        function_version: Version = None,
1858
        description: Description = None,
1859
        routing_config: AliasRoutingConfiguration = None,
1860
        revision_id: String = None,
1861
        **kwargs,
1862
    ) -> AliasConfiguration:
1863
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1864
        function_name = api_utils.get_function_name(function_name, context)
1✔
1865
        function = self._get_function(
1✔
1866
            function_name=function_name, region=region, account_id=account_id
1867
        )
1868
        if not (alias := function.aliases.get(name)):
1✔
1869
            fn_arn = api_utils.qualified_lambda_arn(function_name, name, account_id, region)
1✔
1870
            raise ResourceNotFoundException(
1✔
1871
                f"Alias not found: {fn_arn}",
1872
                Type="User",
1873
            )
1874
        if revision_id and alias.revision_id != revision_id:
1✔
1875
            raise PreconditionFailedException(
1✔
1876
                "The Revision Id provided does not match the latest Revision Id. "
1877
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1878
                Type="User",
1879
            )
1880
        changes = {}
1✔
1881
        if function_version is not None:
1✔
1882
            changes |= {"function_version": function_version}
1✔
1883
        if description is not None:
1✔
1884
            changes |= {"description": description}
1✔
1885
        if routing_config is not None:
1✔
1886
            # if it is an empty dict or AdditionalVersionWeights is empty, set routing config to None
1887
            new_routing_config = None
1✔
1888
            if routing_config_dict := routing_config.get("AdditionalVersionWeights"):
1✔
1889
                new_routing_config = self._create_routing_config_model(routing_config_dict)
×
1890
            changes |= {"routing_configuration": new_routing_config}
1✔
1891
        # even if no changes are done, we have to update revision id for some reason
1892
        old_alias = alias
1✔
1893
        alias = dataclasses.replace(alias, **changes)
1✔
1894
        function.aliases[name] = alias
1✔
1895

1896
        # TODO: signal lambda service that pointer potentially changed
1897
        self.lambda_service.update_alias(old_alias=old_alias, new_alias=alias, function=function)
1✔
1898

1899
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1900

1901
    # =======================================
1902
    # ======= EVENT SOURCE MAPPINGS =========
1903
    # =======================================
1904
    def check_service_resource_exists(
1✔
1905
        self, service: str, resource_arn: str, function_arn: str, function_role_arn: str
1906
    ):
1907
        """
1908
        Check if the service resource exists and if the function has access to it.
1909

1910
        Raises:
1911
            InvalidParameterValueException: If the service resource does not exist or the function does not have access to it.
1912
        """
1913
        arn = parse_arn(resource_arn)
1✔
1914
        source_client = get_internal_client(
1✔
1915
            arn=resource_arn,
1916
            role_arn=function_role_arn,
1917
            service_principal=ServicePrincipal.lambda_,
1918
            source_arn=function_arn,
1919
        )
1920
        if service in ["sqs", "sqs-fifo"]:
1✔
1921
            try:
1✔
1922
                # AWS uses `GetQueueAttributes` internally to verify the queue existence, but we need the `QueueUrl`
1923
                # which is not given directly. We build out a dummy `QueueUrl` which can be parsed by SQS to return
1924
                # the right value
1925
                queue_name = arn["resource"].split("/")[-1]
1✔
1926
                queue_url = f"http://sqs.{arn['region']}.domain/{arn['account']}/{queue_name}"
1✔
1927
                source_client.get_queue_attributes(QueueUrl=queue_url)
1✔
1928
            except ClientError as e:
1✔
1929
                error_code = e.response["Error"]["Code"]
1✔
1930
                if error_code == "AWS.SimpleQueueService.NonExistentQueue":
1✔
1931
                    raise InvalidParameterValueException(
1✔
1932
                        f"Error occurred while ReceiveMessage. SQS Error Code: {error_code}. SQS Error Message: {e.response['Error']['Message']}",
1933
                        Type="User",
1934
                    )
1935
                raise e
×
1936
        elif service in ["kinesis"]:
1✔
1937
            try:
1✔
1938
                source_client.describe_stream(StreamARN=resource_arn)
1✔
1939
            except ClientError as e:
1✔
1940
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
1941
                    raise InvalidParameterValueException(
1✔
1942
                        f"Stream not found: {resource_arn}",
1943
                        Type="User",
1944
                    )
1945
                raise e
×
1946
        elif service in ["dynamodb"]:
1✔
1947
            try:
1✔
1948
                source_client.describe_stream(StreamArn=resource_arn)
1✔
1949
            except ClientError as e:
1✔
1950
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
1951
                    raise InvalidParameterValueException(
1✔
1952
                        f"Stream not found: {resource_arn}",
1953
                        Type="User",
1954
                    )
1955
                raise e
×
1956

1957
    @handler("CreateEventSourceMapping", expand=False)
1✔
1958
    def create_event_source_mapping(
1✔
1959
        self,
1960
        context: RequestContext,
1961
        request: CreateEventSourceMappingRequest,
1962
    ) -> EventSourceMappingConfiguration:
1963
        return self.create_event_source_mapping_v2(context, request)
1✔
1964

1965
    def create_event_source_mapping_v2(
1✔
1966
        self,
1967
        context: RequestContext,
1968
        request: CreateEventSourceMappingRequest,
1969
    ) -> EventSourceMappingConfiguration:
1970
        # Validations
1971
        function_arn, function_name, state, function_version, function_role = (
1✔
1972
            self.validate_event_source_mapping(context, request)
1973
        )
1974

1975
        esm_config = EsmConfigFactory(request, context, function_arn).get_esm_config()
1✔
1976

1977
        # Copy esm_config to avoid a race condition with potential async update in the store
1978
        state.event_source_mappings[esm_config["UUID"]] = esm_config.copy()
1✔
1979
        enabled = request.get("Enabled", True)
1✔
1980
        # TODO: check for potential async race condition update -> think about locking
1981
        esm_worker = EsmWorkerFactory(esm_config, function_role, enabled).get_esm_worker()
1✔
1982
        self.esm_workers[esm_worker.uuid] = esm_worker
1✔
1983
        # TODO: check StateTransitionReason, LastModified, LastProcessingResult (concurrent updates requires locking!)
1984
        if tags := request.get("Tags"):
1✔
1985
            self._store_tags(esm_config.get("EventSourceMappingArn"), tags)
1✔
1986
        esm_worker.create()
1✔
1987
        return esm_config
1✔
1988

1989
    def validate_event_source_mapping(self, context, request):
1✔
1990
        # TODO: test whether stream ARNs are valid sources for Pipes or ESM or whether only DynamoDB table ARNs work
1991
        # TODO: Validate MaxRecordAgeInSeconds (i.e cannot subceed 60s but can be -1) and MaxRetryAttempts parameters.
1992
        # See https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumrecordageinseconds
1993
        is_create_esm_request = context.operation.name == self.create_event_source_mapping.operation
1✔
1994

1995
        if destination_config := request.get("DestinationConfig"):
1✔
1996
            if "OnSuccess" in destination_config:
1✔
1997
                raise InvalidParameterValueException(
1✔
1998
                    "Unsupported DestinationConfig parameter for given event source mapping type.",
1999
                    Type="User",
2000
                )
2001

2002
        service = None
1✔
2003
        if "SelfManagedEventSource" in request:
1✔
2004
            service = "kafka"
×
UNCOV
2005
            if "SourceAccessConfigurations" not in request:
×
UNCOV
2006
                raise InvalidParameterValueException(
×
2007
                    "Required 'sourceAccessConfigurations' parameter is missing.", Type="User"
2008
                )
2009
        if service is None and "EventSourceArn" not in request:
1✔
2010
            raise InvalidParameterValueException("Unrecognized event source.", Type="User")
1✔
2011
        if service is None:
1✔
2012
            service = extract_service_from_arn(request["EventSourceArn"])
1✔
2013

2014
        batch_size = api_utils.validate_and_set_batch_size(service, request.get("BatchSize"))
1✔
2015
        if service in ["dynamodb", "kinesis"]:
1✔
2016
            starting_position = request.get("StartingPosition")
1✔
2017
            if not starting_position:
1✔
2018
                raise InvalidParameterValueException(
1✔
2019
                    "1 validation error detected: Value null at 'startingPosition' failed to satisfy constraint: Member must not be null.",
2020
                    Type="User",
2021
                )
2022

2023
            if starting_position not in KinesisStreamStartPosition.__members__:
1✔
2024
                raise ValidationException(
1✔
2025
                    f"1 validation error detected: Value '{starting_position}' at 'startingPosition' failed to satisfy constraint: Member must satisfy enum value set: [LATEST, AT_TIMESTAMP, TRIM_HORIZON]"
2026
                )
2027
            # AT_TIMESTAMP is not allowed for DynamoDB Streams
2028
            elif (
1✔
2029
                service == "dynamodb"
2030
                and starting_position not in DynamoDBStreamStartPosition.__members__
2031
            ):
2032
                raise InvalidParameterValueException(
1✔
2033
                    f"Unsupported starting position for arn type: {request['EventSourceArn']}",
2034
                    Type="User",
2035
                )
2036

2037
        if service in ["sqs", "sqs-fifo"]:
1✔
2038
            if batch_size > 10 and request.get("MaximumBatchingWindowInSeconds", 0) == 0:
1✔
2039
                raise InvalidParameterValueException(
1✔
2040
                    "Maximum batch window in seconds must be greater than 0 if maximum batch size is greater than 10",
2041
                    Type="User",
2042
                )
2043

2044
        if (filter_criteria := request.get("FilterCriteria")) is not None:
1✔
2045
            for filter_ in filter_criteria.get("Filters", []):
1✔
2046
                pattern_str = filter_.get("Pattern")
1✔
2047
                if not pattern_str or not isinstance(pattern_str, str):
1✔
UNCOV
2048
                    raise InvalidParameterValueException(
×
2049
                        "Invalid filter pattern definition.", Type="User"
2050
                    )
2051

2052
                if not validate_event_pattern(pattern_str):
1✔
2053
                    raise InvalidParameterValueException(
1✔
2054
                        "Invalid filter pattern definition.", Type="User"
2055
                    )
2056

2057
        # Can either have a FunctionName (i.e CreateEventSourceMapping request) or
2058
        # an internal EventSourceMappingConfiguration representation
2059
        request_function_name = request.get("FunctionName") or request.get("FunctionArn")
1✔
2060
        # can be either a partial arn or a full arn for the version/alias
2061
        function_name, qualifier, account, region = function_locators_from_arn(
1✔
2062
            request_function_name
2063
        )
2064
        # TODO: validate `context.region` vs. `region(request["FunctionName"])` vs. `region(request["EventSourceArn"])`
2065
        account = account or context.account_id
1✔
2066
        region = region or context.region
1✔
2067
        state = lambda_stores[account][region]
1✔
2068
        fn = state.functions.get(function_name)
1✔
2069
        if not fn:
1✔
2070
            raise InvalidParameterValueException("Function does not exist", Type="User")
1✔
2071

2072
        if qualifier:
1✔
2073
            # make sure the function version/alias exists
2074
            if api_utils.qualifier_is_alias(qualifier):
1✔
2075
                fn_alias = fn.aliases.get(qualifier)
1✔
2076
                if not fn_alias:
1✔
UNCOV
2077
                    raise Exception("unknown alias")  # TODO: cover via test
×
2078
            elif api_utils.qualifier_is_version(qualifier):
1✔
2079
                fn_version = fn.versions.get(qualifier)
1✔
2080
                if not fn_version:
1✔
UNCOV
2081
                    raise Exception("unknown version")  # TODO: cover via test
×
2082
            elif qualifier == "$LATEST":
1✔
2083
                pass
1✔
2084
            else:
UNCOV
2085
                raise Exception("invalid functionname")  # TODO: cover via test
×
2086
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account, region)
1✔
2087

2088
        else:
2089
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account, region)
1✔
2090

2091
        function_version = get_function_version_from_arn(fn_arn)
1✔
2092
        function_role = function_version.config.role
1✔
2093

2094
        if source_arn := request.get("EventSourceArn"):
1✔
2095
            self.check_service_resource_exists(service, source_arn, fn_arn, function_role)
1✔
2096
        # Check we are validating a CreateEventSourceMapping request
2097
        if is_create_esm_request:
1✔
2098

2099
            def _get_mapping_sources(mapping: dict[str, Any]) -> list[str]:
1✔
2100
                if event_source_arn := mapping.get("EventSourceArn"):
1✔
2101
                    return [event_source_arn]
1✔
UNCOV
2102
                return (
×
2103
                    mapping.get("SelfManagedEventSource", {})
2104
                    .get("Endpoints", {})
2105
                    .get("KAFKA_BOOTSTRAP_SERVERS", [])
2106
                )
2107

2108
            # check for event source duplicates
2109
            # TODO: currently validated for sqs, kinesis, and dynamodb
2110
            service_id = load_service(service).service_id
1✔
2111
            for uuid, mapping in state.event_source_mappings.items():
1✔
2112
                mapping_sources = _get_mapping_sources(mapping)
1✔
2113
                request_sources = _get_mapping_sources(request)
1✔
2114
                if mapping["FunctionArn"] == fn_arn and (
1✔
2115
                    set(mapping_sources).intersection(request_sources)
2116
                ):
2117
                    if service == "sqs":
1✔
2118
                        # *shakes fist at SQS*
2119
                        raise ResourceConflictException(
1✔
2120
                            f'An event source mapping with {service_id} arn (" {mapping["EventSourceArn"]} ") '
2121
                            f'and function (" {function_name} ") already exists. Please update or delete the '
2122
                            f"existing mapping with UUID {uuid}",
2123
                            Type="User",
2124
                        )
2125
                    elif service == "kafka":
1✔
UNCOV
2126
                        if set(mapping["Topics"]).intersection(request["Topics"]):
×
UNCOV
2127
                            raise ResourceConflictException(
×
2128
                                f'An event source mapping with event source ("{",".join(request_sources)}"), '
2129
                                f'function ("{fn_arn}"), '
2130
                                f'topics ("{",".join(request["Topics"])}") already exists. Please update or delete the '
2131
                                f"existing mapping with UUID {uuid}",
2132
                                Type="User",
2133
                            )
2134
                    else:
2135
                        raise ResourceConflictException(
1✔
2136
                            f'The event source arn (" {mapping["EventSourceArn"]} ") and function '
2137
                            f'(" {function_name} ") provided mapping already exists. Please update or delete the '
2138
                            f"existing mapping with UUID {uuid}",
2139
                            Type="User",
2140
                        )
2141
        return fn_arn, function_name, state, function_version, function_role
1✔
2142

2143
    @handler("UpdateEventSourceMapping", expand=False)
1✔
2144
    def update_event_source_mapping(
1✔
2145
        self,
2146
        context: RequestContext,
2147
        request: UpdateEventSourceMappingRequest,
2148
    ) -> EventSourceMappingConfiguration:
2149
        return self.update_event_source_mapping_v2(context, request)
1✔
2150

2151
    def update_event_source_mapping_v2(
1✔
2152
        self,
2153
        context: RequestContext,
2154
        request: UpdateEventSourceMappingRequest,
2155
    ) -> EventSourceMappingConfiguration:
2156
        # TODO: test and implement this properly (quite complex with many validations and limitations!)
2157
        LOG.warning(
1✔
2158
            "Updating Lambda Event Source Mapping is in experimental state and not yet fully tested."
2159
        )
2160
        state = lambda_stores[context.account_id][context.region]
1✔
2161
        request_data = {**request}
1✔
2162
        uuid = request_data.pop("UUID", None)
1✔
2163
        if not uuid:
1✔
UNCOV
2164
            raise ResourceNotFoundException(
×
2165
                "The resource you requested does not exist.", Type="User"
2166
            )
2167
        old_event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2168
        esm_worker = self.esm_workers.get(uuid)
1✔
2169
        if old_event_source_mapping is None or esm_worker is None:
1✔
2170
            raise ResourceNotFoundException(
1✔
2171
                "The resource you requested does not exist.", Type="User"
2172
            )  # TODO: test?
2173

2174
        # normalize values to overwrite
2175
        event_source_mapping = old_event_source_mapping | request_data
1✔
2176

2177
        temp_params = {}  # values only set for the returned response, not saved internally (e.g. transient state)
1✔
2178

2179
        # Validate the newly updated ESM object. We ignore the output here since we only care whether an Exception is raised.
2180
        function_arn, _, _, function_version, function_role = self.validate_event_source_mapping(
1✔
2181
            context, event_source_mapping
2182
        )
2183

2184
        # remove the FunctionName field
2185
        event_source_mapping.pop("FunctionName", None)
1✔
2186

2187
        if function_arn:
1✔
2188
            event_source_mapping["FunctionArn"] = function_arn
1✔
2189

2190
        # Only apply update if the desired state differs
2191
        enabled = request.get("Enabled")
1✔
2192
        if enabled is not None:
1✔
2193
            if enabled and old_event_source_mapping["State"] != EsmState.ENABLED:
1✔
2194
                event_source_mapping["State"] = EsmState.ENABLING
1✔
2195
            # TODO: What happens when trying to update during an update or failed state?!
2196
            elif not enabled and old_event_source_mapping["State"] == EsmState.ENABLED:
1✔
2197
                event_source_mapping["State"] = EsmState.DISABLING
1✔
2198
        else:
2199
            event_source_mapping["State"] = EsmState.UPDATING
1✔
2200

2201
        # To ensure parity, certain responses need to be immediately returned
2202
        temp_params["State"] = event_source_mapping["State"]
1✔
2203

2204
        state.event_source_mappings[uuid] = event_source_mapping
1✔
2205

2206
        # TODO: Currently, we re-create the entire ESM worker. Look into approach with better performance.
2207
        worker_factory = EsmWorkerFactory(
1✔
2208
            event_source_mapping, function_role, request.get("Enabled", esm_worker.enabled)
2209
        )
2210

2211
        # Get a new ESM worker object but do not active it, since the factory holds all logic for creating new worker from configuration.
2212
        updated_esm_worker = worker_factory.get_esm_worker()
1✔
2213
        self.esm_workers[uuid] = updated_esm_worker
1✔
2214

2215
        # We should stop() the worker since the delete() will remove the ESM from the state mapping.
2216
        esm_worker.stop()
1✔
2217
        # This will either create an EsmWorker in the CREATING state if enabled. Otherwise, the DISABLING state is set.
2218
        updated_esm_worker.create()
1✔
2219

2220
        return {**event_source_mapping, **temp_params}
1✔
2221

2222
    def delete_event_source_mapping(
1✔
2223
        self, context: RequestContext, uuid: String, **kwargs
2224
    ) -> EventSourceMappingConfiguration:
2225
        state = lambda_stores[context.account_id][context.region]
1✔
2226
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2227
        if not event_source_mapping:
1✔
2228
            raise ResourceNotFoundException(
1✔
2229
                "The resource you requested does not exist.", Type="User"
2230
            )
2231
        esm = state.event_source_mappings[uuid]
1✔
2232
        # TODO: add proper locking
2233
        esm_worker = self.esm_workers.pop(uuid, None)
1✔
2234
        # Asynchronous delete in v2
2235
        if not esm_worker:
1✔
UNCOV
2236
            raise ResourceNotFoundException(
×
2237
                "The resource you requested does not exist.", Type="User"
2238
            )
2239
        esm_worker.delete()
1✔
2240
        return {**esm, "State": EsmState.DELETING}
1✔
2241

2242
    def get_event_source_mapping(
1✔
2243
        self, context: RequestContext, uuid: String, **kwargs
2244
    ) -> EventSourceMappingConfiguration:
2245
        state = lambda_stores[context.account_id][context.region]
1✔
2246
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2247
        if not event_source_mapping:
1✔
2248
            raise ResourceNotFoundException(
1✔
2249
                "The resource you requested does not exist.", Type="User"
2250
            )
2251
        esm_worker = self.esm_workers.get(uuid)
1✔
2252
        if not esm_worker:
1✔
UNCOV
2253
            raise ResourceNotFoundException(
×
2254
                "The resource you requested does not exist.", Type="User"
2255
            )
2256
        event_source_mapping["State"] = esm_worker.current_state
1✔
2257
        event_source_mapping["StateTransitionReason"] = esm_worker.state_transition_reason
1✔
2258
        return event_source_mapping
1✔
2259

2260
    def list_event_source_mappings(
1✔
2261
        self,
2262
        context: RequestContext,
2263
        event_source_arn: Arn = None,
2264
        function_name: FunctionName = None,
2265
        marker: String = None,
2266
        max_items: MaxListItems = None,
2267
        **kwargs,
2268
    ) -> ListEventSourceMappingsResponse:
2269
        state = lambda_stores[context.account_id][context.region]
1✔
2270

2271
        esms = state.event_source_mappings.values()
1✔
2272
        # TODO: update and test State and StateTransitionReason for ESM v2
2273

2274
        if event_source_arn:  # TODO: validate pattern
1✔
2275
            esms = [e for e in esms if e.get("EventSourceArn") == event_source_arn]
1✔
2276

2277
        if function_name:
1✔
2278
            esms = [e for e in esms if function_name in e["FunctionArn"]]
1✔
2279

2280
        esms = PaginatedList(esms)
1✔
2281
        page, token = esms.get_page(
1✔
2282
            lambda x: x["UUID"],
2283
            marker,
2284
            max_items,
2285
        )
2286
        return ListEventSourceMappingsResponse(EventSourceMappings=page, NextMarker=token)
1✔
2287

2288
    def get_source_type_from_request(self, request: dict[str, Any]) -> str:
1✔
2289
        if event_source_arn := request.get("EventSourceArn", ""):
×
2290
            service = extract_service_from_arn(event_source_arn)
×
2291
            if service == "sqs" and "fifo" in event_source_arn:
×
2292
                service = "sqs-fifo"
×
2293
            return service
×
UNCOV
2294
        elif request.get("SelfManagedEventSource"):
×
UNCOV
2295
            return "kafka"
×
2296

2297
    # =======================================
2298
    # ============ FUNCTION URLS ============
2299
    # =======================================
2300

2301
    @staticmethod
1✔
2302
    def _validate_qualifier(qualifier: str) -> None:
1✔
2303
        if qualifier == "$LATEST" or (qualifier and api_utils.qualifier_is_version(qualifier)):
1✔
2304
            raise ValidationException(
1✔
2305
                f"1 validation error detected: Value '{qualifier}' at 'qualifier' failed to satisfy constraint: Member must satisfy regular expression pattern: ((?!^\\d+$)^[0-9a-zA-Z-_]+$)"
2306
            )
2307

2308
    @staticmethod
1✔
2309
    def _validate_invoke_mode(invoke_mode: str) -> None:
1✔
2310
        if invoke_mode and invoke_mode not in [InvokeMode.BUFFERED, InvokeMode.RESPONSE_STREAM]:
1✔
2311
            raise ValidationException(
1✔
2312
                f"1 validation error detected: Value '{invoke_mode}' at 'invokeMode' failed to satisfy constraint: Member must satisfy enum value set: [RESPONSE_STREAM, BUFFERED]"
2313
            )
2314
        if invoke_mode == InvokeMode.RESPONSE_STREAM:
1✔
2315
            # TODO should we actually fail for setting RESPONSE_STREAM?
2316
            #  It should trigger InvokeWithResponseStream which is not implemented
2317
            LOG.warning(
1✔
2318
                "The invokeMode 'RESPONSE_STREAM' is not yet supported on LocalStack. The property is only mocked, the execution will still be 'BUFFERED'"
2319
            )
2320

2321
    # TODO: what happens if function state is not active?
2322
    def create_function_url_config(
1✔
2323
        self,
2324
        context: RequestContext,
2325
        function_name: FunctionName,
2326
        auth_type: FunctionUrlAuthType,
2327
        qualifier: FunctionUrlQualifier = None,
2328
        cors: Cors = None,
2329
        invoke_mode: InvokeMode = None,
2330
        **kwargs,
2331
    ) -> CreateFunctionUrlConfigResponse:
2332
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2333
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2334
            function_name, qualifier, context
2335
        )
2336
        state = lambda_stores[account_id][region]
1✔
2337
        self._validate_qualifier(qualifier)
1✔
2338
        self._validate_invoke_mode(invoke_mode)
1✔
2339

2340
        fn = state.functions.get(function_name)
1✔
2341
        if fn is None:
1✔
2342
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2343

2344
        url_config = fn.function_url_configs.get(qualifier or "$LATEST")
1✔
2345
        if url_config:
1✔
2346
            raise ResourceConflictException(
1✔
2347
                f"Failed to create function url config for [functionArn = {url_config.function_arn}]. Error message:  FunctionUrlConfig exists for this Lambda function",
2348
                Type="User",
2349
            )
2350

2351
        if qualifier and qualifier != "$LATEST" and qualifier not in fn.aliases:
1✔
2352
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2353

2354
        normalized_qualifier = qualifier or "$LATEST"
1✔
2355

2356
        function_arn = (
1✔
2357
            api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
2358
            if qualifier
2359
            else api_utils.unqualified_lambda_arn(function_name, account_id, region)
2360
        )
2361

2362
        custom_id: str | None = None
1✔
2363

2364
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
2365
        if TAG_KEY_CUSTOM_URL in tags:
1✔
2366
            # Note: I really wanted to add verification here that the
2367
            # url_id is unique, so we could surface that to the user ASAP.
2368
            # However, it seems like that information isn't available yet,
2369
            # since (as far as I can tell) we call
2370
            # self.router.register_routes() once, in a single shot, for all
2371
            # of the routes -- and we need to verify that it's unique not
2372
            # just for this particular lambda function, but for the entire
2373
            # lambda provider. Therefore... that idea proved non-trivial!
2374
            custom_id_tag_value = (
1✔
2375
                f"{tags[TAG_KEY_CUSTOM_URL]}-{qualifier}" if qualifier else tags[TAG_KEY_CUSTOM_URL]
2376
            )
2377
            if TAG_KEY_CUSTOM_URL_VALIDATOR.match(custom_id_tag_value):
1✔
2378
                custom_id = custom_id_tag_value
1✔
2379

2380
            else:
2381
                # Note: we're logging here instead of raising to prioritize
2382
                # strict parity with AWS over the localstack-only custom_id
2383
                LOG.warning(
1✔
2384
                    "Invalid custom ID tag value for lambda URL (%s=%s). "
2385
                    "Replaced with default (random id)",
2386
                    TAG_KEY_CUSTOM_URL,
2387
                    custom_id_tag_value,
2388
                )
2389

2390
        # The url_id is the subdomain used for the URL we're creating. This
2391
        # is either created randomly (as in AWS), or can be passed as a tag
2392
        # to the lambda itself (localstack-only).
2393
        url_id: str
2394
        if custom_id is None:
1✔
2395
            url_id = api_utils.generate_random_url_id()
1✔
2396
        else:
2397
            url_id = custom_id
1✔
2398

2399
        host_definition = localstack_host(custom_port=config.GATEWAY_LISTEN[0].port)
1✔
2400
        fn.function_url_configs[normalized_qualifier] = FunctionUrlConfig(
1✔
2401
            function_arn=function_arn,
2402
            function_name=function_name,
2403
            cors=cors,
2404
            url_id=url_id,
2405
            url=f"http://{url_id}.lambda-url.{context.region}.{host_definition.host_and_port()}/",  # TODO: https support
2406
            auth_type=auth_type,
2407
            creation_time=api_utils.generate_lambda_date(),
2408
            last_modified_time=api_utils.generate_lambda_date(),
2409
            invoke_mode=invoke_mode,
2410
        )
2411

2412
        # persist and start URL
2413
        # TODO: implement URL invoke
2414
        api_url_config = api_utils.map_function_url_config(
1✔
2415
            fn.function_url_configs[normalized_qualifier]
2416
        )
2417

2418
        return CreateFunctionUrlConfigResponse(
1✔
2419
            FunctionUrl=api_url_config["FunctionUrl"],
2420
            FunctionArn=api_url_config["FunctionArn"],
2421
            AuthType=api_url_config["AuthType"],
2422
            Cors=api_url_config["Cors"],
2423
            CreationTime=api_url_config["CreationTime"],
2424
            InvokeMode=api_url_config["InvokeMode"],
2425
        )
2426

2427
    def get_function_url_config(
1✔
2428
        self,
2429
        context: RequestContext,
2430
        function_name: FunctionName,
2431
        qualifier: FunctionUrlQualifier = None,
2432
        **kwargs,
2433
    ) -> GetFunctionUrlConfigResponse:
2434
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2435
        state = lambda_stores[account_id][region]
1✔
2436

2437
        fn_name, qualifier = api_utils.get_name_and_qualifier(function_name, qualifier, context)
1✔
2438

2439
        self._validate_qualifier(qualifier)
1✔
2440

2441
        resolved_fn = state.functions.get(fn_name)
1✔
2442
        if not resolved_fn:
1✔
2443
            raise ResourceNotFoundException(
1✔
2444
                "The resource you requested does not exist.", Type="User"
2445
            )
2446

2447
        qualifier = qualifier or "$LATEST"
1✔
2448
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2449
        if not url_config:
1✔
2450
            raise ResourceNotFoundException(
1✔
2451
                "The resource you requested does not exist.", Type="User"
2452
            )
2453

2454
        return api_utils.map_function_url_config(url_config)
1✔
2455

2456
    def update_function_url_config(
1✔
2457
        self,
2458
        context: RequestContext,
2459
        function_name: FunctionName,
2460
        qualifier: FunctionUrlQualifier = None,
2461
        auth_type: FunctionUrlAuthType = None,
2462
        cors: Cors = None,
2463
        invoke_mode: InvokeMode = None,
2464
        **kwargs,
2465
    ) -> UpdateFunctionUrlConfigResponse:
2466
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2467
        state = lambda_stores[account_id][region]
1✔
2468

2469
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2470
            function_name, qualifier, context
2471
        )
2472
        self._validate_qualifier(qualifier)
1✔
2473
        self._validate_invoke_mode(invoke_mode)
1✔
2474

2475
        fn = state.functions.get(function_name)
1✔
2476
        if not fn:
1✔
2477
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2478

2479
        normalized_qualifier = qualifier or "$LATEST"
1✔
2480

2481
        if (
1✔
2482
            api_utils.qualifier_is_alias(normalized_qualifier)
2483
            and normalized_qualifier not in fn.aliases
2484
        ):
2485
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2486

2487
        url_config = fn.function_url_configs.get(normalized_qualifier)
1✔
2488
        if not url_config:
1✔
2489
            raise ResourceNotFoundException(
1✔
2490
                "The resource you requested does not exist.", Type="User"
2491
            )
2492

2493
        changes = {
1✔
2494
            "last_modified_time": api_utils.generate_lambda_date(),
2495
            **({"cors": cors} if cors is not None else {}),
2496
            **({"auth_type": auth_type} if auth_type is not None else {}),
2497
        }
2498

2499
        if invoke_mode:
1✔
2500
            changes["invoke_mode"] = invoke_mode
1✔
2501

2502
        new_url_config = dataclasses.replace(url_config, **changes)
1✔
2503
        fn.function_url_configs[normalized_qualifier] = new_url_config
1✔
2504

2505
        return UpdateFunctionUrlConfigResponse(
1✔
2506
            FunctionUrl=new_url_config.url,
2507
            FunctionArn=new_url_config.function_arn,
2508
            AuthType=new_url_config.auth_type,
2509
            Cors=new_url_config.cors,
2510
            CreationTime=new_url_config.creation_time,
2511
            LastModifiedTime=new_url_config.last_modified_time,
2512
            InvokeMode=new_url_config.invoke_mode,
2513
        )
2514

2515
    def delete_function_url_config(
1✔
2516
        self,
2517
        context: RequestContext,
2518
        function_name: FunctionName,
2519
        qualifier: FunctionUrlQualifier = None,
2520
        **kwargs,
2521
    ) -> None:
2522
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2523
        state = lambda_stores[account_id][region]
1✔
2524

2525
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2526
            function_name, qualifier, context
2527
        )
2528
        self._validate_qualifier(qualifier)
1✔
2529

2530
        resolved_fn = state.functions.get(function_name)
1✔
2531
        if not resolved_fn:
1✔
2532
            raise ResourceNotFoundException(
1✔
2533
                "The resource you requested does not exist.", Type="User"
2534
            )
2535

2536
        qualifier = qualifier or "$LATEST"
1✔
2537
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2538
        if not url_config:
1✔
2539
            raise ResourceNotFoundException(
1✔
2540
                "The resource you requested does not exist.", Type="User"
2541
            )
2542

2543
        del resolved_fn.function_url_configs[qualifier]
1✔
2544

2545
    def list_function_url_configs(
1✔
2546
        self,
2547
        context: RequestContext,
2548
        function_name: FunctionName,
2549
        marker: String = None,
2550
        max_items: MaxItems = None,
2551
        **kwargs,
2552
    ) -> ListFunctionUrlConfigsResponse:
2553
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2554
        state = lambda_stores[account_id][region]
1✔
2555

2556
        fn_name = api_utils.get_function_name(function_name, context)
1✔
2557
        resolved_fn = state.functions.get(fn_name)
1✔
2558
        if not resolved_fn:
1✔
2559
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2560

2561
        url_configs = [
1✔
2562
            api_utils.map_function_url_config(fn_conf)
2563
            for fn_conf in resolved_fn.function_url_configs.values()
2564
        ]
2565
        url_configs = PaginatedList(url_configs)
1✔
2566
        page, token = url_configs.get_page(
1✔
2567
            lambda url_config: url_config["FunctionArn"],
2568
            marker,
2569
            max_items,
2570
        )
2571
        url_configs = page
1✔
2572
        return ListFunctionUrlConfigsResponse(FunctionUrlConfigs=url_configs, NextMarker=token)
1✔
2573

2574
    # =======================================
2575
    # ============  Permissions  ============
2576
    # =======================================
2577

2578
    @handler("AddPermission", expand=False)
1✔
2579
    def add_permission(
1✔
2580
        self,
2581
        context: RequestContext,
2582
        request: AddPermissionRequest,
2583
    ) -> AddPermissionResponse:
2584
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2585
            request.get("FunctionName"), request.get("Qualifier"), context
2586
        )
2587

2588
        # validate qualifier
2589
        if qualifier is not None:
1✔
2590
            self._validate_qualifier_expression(qualifier)
1✔
2591
            if qualifier == "$LATEST":
1✔
2592
                raise InvalidParameterValueException(
1✔
2593
                    "We currently do not support adding policies for $LATEST.", Type="User"
2594
                )
2595
        account_id, region = api_utils.get_account_and_region(request.get("FunctionName"), context)
1✔
2596

2597
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2598
        resolved_qualifier, fn_arn = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2599

2600
        revision_id = request.get("RevisionId")
1✔
2601
        if revision_id:
1✔
2602
            fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2603
            if revision_id != fn_revision_id:
1✔
2604
                raise PreconditionFailedException(
1✔
2605
                    "The Revision Id provided does not match the latest Revision Id. "
2606
                    "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2607
                    Type="User",
2608
                )
2609

2610
        request_sid = request["StatementId"]
1✔
2611
        if not bool(STATEMENT_ID_REGEX.match(request_sid)):
1✔
2612
            raise ValidationException(
1✔
2613
                f"1 validation error detected: Value '{request_sid}' at 'statementId' failed to satisfy constraint: Member must satisfy regular expression pattern: ([a-zA-Z0-9-_]+)"
2614
            )
2615
        # check for an already existing policy and any conflicts in existing statements
2616
        existing_policy = resolved_fn.permissions.get(resolved_qualifier)
1✔
2617
        if existing_policy:
1✔
2618
            if request_sid in [s["Sid"] for s in existing_policy.policy.Statement]:
1✔
2619
                # uniqueness scope: statement id needs to be unique per qualified function ($LATEST, version, or alias)
2620
                # Counterexample: the same sid can exist within $LATEST, version, and alias
2621
                raise ResourceConflictException(
1✔
2622
                    f"The statement id ({request_sid}) provided already exists. Please provide a new statement id, or remove the existing statement.",
2623
                    Type="User",
2624
                )
2625

2626
        permission_statement = api_utils.build_statement(
1✔
2627
            partition=context.partition,
2628
            resource_arn=fn_arn,
2629
            statement_id=request["StatementId"],
2630
            action=request["Action"],
2631
            principal=request["Principal"],
2632
            source_arn=request.get("SourceArn"),
2633
            source_account=request.get("SourceAccount"),
2634
            principal_org_id=request.get("PrincipalOrgID"),
2635
            event_source_token=request.get("EventSourceToken"),
2636
            auth_type=request.get("FunctionUrlAuthType"),
2637
        )
2638
        new_policy = existing_policy
1✔
2639
        if not existing_policy:
1✔
2640
            new_policy = FunctionResourcePolicy(
1✔
2641
                policy=ResourcePolicy(Version="2012-10-17", Id="default", Statement=[])
2642
            )
2643
        new_policy.policy.Statement.append(permission_statement)
1✔
2644
        if not existing_policy:
1✔
2645
            resolved_fn.permissions[resolved_qualifier] = new_policy
1✔
2646

2647
        # Update revision id of alias or version
2648
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2649
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2650
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2651
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2652
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
1✔
2653
        # Assumes that a non-alias is a version
2654
        else:
2655
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2656
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2657
                resolved_version, config=dataclasses.replace(resolved_version.config)
2658
            )
2659
        return AddPermissionResponse(Statement=json.dumps(permission_statement))
1✔
2660

2661
    def remove_permission(
1✔
2662
        self,
2663
        context: RequestContext,
2664
        function_name: FunctionName,
2665
        statement_id: NamespacedStatementId,
2666
        qualifier: Qualifier = None,
2667
        revision_id: String = None,
2668
        **kwargs,
2669
    ) -> None:
2670
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2671
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2672
            function_name, qualifier, context
2673
        )
2674
        if qualifier is not None:
1✔
2675
            self._validate_qualifier_expression(qualifier)
1✔
2676

2677
        state = lambda_stores[account_id][region]
1✔
2678
        resolved_fn = state.functions.get(function_name)
1✔
2679
        if resolved_fn is None:
1✔
2680
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2681
            raise ResourceNotFoundException(f"No policy found for: {fn_arn}", Type="User")
1✔
2682

2683
        resolved_qualifier, _ = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2684
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2685
        if not function_permission:
1✔
2686
            raise ResourceNotFoundException(
1✔
2687
                "No policy is associated with the given resource.", Type="User"
2688
            )
2689

2690
        # try to find statement in policy and delete it
2691
        statement = None
1✔
2692
        for s in function_permission.policy.Statement:
1✔
2693
            if s["Sid"] == statement_id:
1✔
2694
                statement = s
1✔
2695
                break
1✔
2696

2697
        if not statement:
1✔
2698
            raise ResourceNotFoundException(
1✔
2699
                f"Statement {statement_id} is not found in resource policy.", Type="User"
2700
            )
2701
        fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2702
        if revision_id and revision_id != fn_revision_id:
1✔
2703
            raise PreconditionFailedException(
1✔
2704
                "The Revision Id provided does not match the latest Revision Id. "
2705
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2706
                Type="User",
2707
            )
2708
        function_permission.policy.Statement.remove(statement)
1✔
2709

2710
        # Update revision id for alias or version
2711
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2712
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2713
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
UNCOV
2714
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
×
UNCOV
2715
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
×
2716
        # Assumes that a non-alias is a version
2717
        else:
2718
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2719
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2720
                resolved_version, config=dataclasses.replace(resolved_version.config)
2721
            )
2722

2723
        # remove the policy as a whole when there's no statement left in it
2724
        if len(function_permission.policy.Statement) == 0:
1✔
2725
            del resolved_fn.permissions[resolved_qualifier]
1✔
2726

2727
    def get_policy(
1✔
2728
        self,
2729
        context: RequestContext,
2730
        function_name: NamespacedFunctionName,
2731
        qualifier: Qualifier = None,
2732
        **kwargs,
2733
    ) -> GetPolicyResponse:
2734
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2735
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2736
            function_name, qualifier, context
2737
        )
2738

2739
        if qualifier is not None:
1✔
2740
            self._validate_qualifier_expression(qualifier)
1✔
2741

2742
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2743

2744
        resolved_qualifier = qualifier or "$LATEST"
1✔
2745
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2746
        if not function_permission:
1✔
2747
            raise ResourceNotFoundException(
1✔
2748
                "The resource you requested does not exist.", Type="User"
2749
            )
2750

2751
        fn_revision_id = None
1✔
2752
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2753
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2754
            fn_revision_id = resolved_alias.revision_id
1✔
2755
        # Assumes that a non-alias is a version
2756
        else:
2757
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2758
            fn_revision_id = resolved_version.config.revision_id
1✔
2759

2760
        return GetPolicyResponse(
1✔
2761
            Policy=json.dumps(dataclasses.asdict(function_permission.policy)),
2762
            RevisionId=fn_revision_id,
2763
        )
2764

2765
    # =======================================
2766
    # ========  Code signing config  ========
2767
    # =======================================
2768

2769
    def create_code_signing_config(
1✔
2770
        self,
2771
        context: RequestContext,
2772
        allowed_publishers: AllowedPublishers,
2773
        description: Description = None,
2774
        code_signing_policies: CodeSigningPolicies = None,
2775
        tags: Tags = None,
2776
        **kwargs,
2777
    ) -> CreateCodeSigningConfigResponse:
2778
        account = context.account_id
1✔
2779
        region = context.region
1✔
2780

2781
        state = lambda_stores[account][region]
1✔
2782
        # TODO: can there be duplicates?
2783
        csc_id = f"csc-{get_random_hex(17)}"  # e.g. 'csc-077c33b4c19e26036'
1✔
2784
        csc_arn = f"arn:{context.partition}:lambda:{region}:{account}:code-signing-config:{csc_id}"
1✔
2785
        csc = CodeSigningConfig(
1✔
2786
            csc_id=csc_id,
2787
            arn=csc_arn,
2788
            allowed_publishers=allowed_publishers,
2789
            policies=code_signing_policies,
2790
            last_modified=api_utils.generate_lambda_date(),
2791
            description=description,
2792
        )
2793
        state.code_signing_configs[csc_arn] = csc
1✔
2794
        return CreateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
2795

2796
    def put_function_code_signing_config(
1✔
2797
        self,
2798
        context: RequestContext,
2799
        code_signing_config_arn: CodeSigningConfigArn,
2800
        function_name: FunctionName,
2801
        **kwargs,
2802
    ) -> PutFunctionCodeSigningConfigResponse:
2803
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2804
        state = lambda_stores[account_id][region]
1✔
2805
        function_name = api_utils.get_function_name(function_name, context)
1✔
2806

2807
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2808
        if not csc:
1✔
2809
            raise CodeSigningConfigNotFoundException(
1✔
2810
                f"The code signing configuration cannot be found. Check that the provided configuration is not deleted: {code_signing_config_arn}.",
2811
                Type="User",
2812
            )
2813

2814
        fn = state.functions.get(function_name)
1✔
2815
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2816
        if not fn:
1✔
2817
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
2818

2819
        fn.code_signing_config_arn = code_signing_config_arn
1✔
2820
        return PutFunctionCodeSigningConfigResponse(
1✔
2821
            CodeSigningConfigArn=code_signing_config_arn, FunctionName=function_name
2822
        )
2823

2824
    def update_code_signing_config(
1✔
2825
        self,
2826
        context: RequestContext,
2827
        code_signing_config_arn: CodeSigningConfigArn,
2828
        description: Description = None,
2829
        allowed_publishers: AllowedPublishers = None,
2830
        code_signing_policies: CodeSigningPolicies = None,
2831
        **kwargs,
2832
    ) -> UpdateCodeSigningConfigResponse:
2833
        state = lambda_stores[context.account_id][context.region]
1✔
2834
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2835
        if not csc:
1✔
2836
            raise ResourceNotFoundException(
1✔
2837
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2838
            )
2839

2840
        changes = {
1✔
2841
            **(
2842
                {"allowed_publishers": allowed_publishers} if allowed_publishers is not None else {}
2843
            ),
2844
            **({"policies": code_signing_policies} if code_signing_policies is not None else {}),
2845
            **({"description": description} if description is not None else {}),
2846
        }
2847
        new_csc = dataclasses.replace(
1✔
2848
            csc, last_modified=api_utils.generate_lambda_date(), **changes
2849
        )
2850
        state.code_signing_configs[code_signing_config_arn] = new_csc
1✔
2851

2852
        return UpdateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(new_csc))
1✔
2853

2854
    def get_code_signing_config(
1✔
2855
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
2856
    ) -> GetCodeSigningConfigResponse:
2857
        state = lambda_stores[context.account_id][context.region]
1✔
2858
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2859
        if not csc:
1✔
2860
            raise ResourceNotFoundException(
1✔
2861
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2862
            )
2863

2864
        return GetCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
2865

2866
    def get_function_code_signing_config(
1✔
2867
        self, context: RequestContext, function_name: FunctionName, **kwargs
2868
    ) -> GetFunctionCodeSigningConfigResponse:
2869
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2870
        state = lambda_stores[account_id][region]
1✔
2871
        function_name = api_utils.get_function_name(function_name, context)
1✔
2872
        fn = state.functions.get(function_name)
1✔
2873
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2874
        if not fn:
1✔
2875
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
2876

2877
        if fn.code_signing_config_arn:
1✔
2878
            return GetFunctionCodeSigningConfigResponse(
1✔
2879
                CodeSigningConfigArn=fn.code_signing_config_arn, FunctionName=function_name
2880
            )
2881

2882
        return GetFunctionCodeSigningConfigResponse()
1✔
2883

2884
    def delete_function_code_signing_config(
1✔
2885
        self, context: RequestContext, function_name: FunctionName, **kwargs
2886
    ) -> None:
2887
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2888
        state = lambda_stores[account_id][region]
1✔
2889
        function_name = api_utils.get_function_name(function_name, context)
1✔
2890
        fn = state.functions.get(function_name)
1✔
2891
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2892
        if not fn:
1✔
2893
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
2894

2895
        fn.code_signing_config_arn = None
1✔
2896

2897
    def delete_code_signing_config(
1✔
2898
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
2899
    ) -> DeleteCodeSigningConfigResponse:
2900
        state = lambda_stores[context.account_id][context.region]
1✔
2901

2902
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2903
        if not csc:
1✔
2904
            raise ResourceNotFoundException(
1✔
2905
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2906
            )
2907

2908
        del state.code_signing_configs[code_signing_config_arn]
1✔
2909

2910
        return DeleteCodeSigningConfigResponse()
1✔
2911

2912
    def list_code_signing_configs(
1✔
2913
        self,
2914
        context: RequestContext,
2915
        marker: String = None,
2916
        max_items: MaxListItems = None,
2917
        **kwargs,
2918
    ) -> ListCodeSigningConfigsResponse:
2919
        state = lambda_stores[context.account_id][context.region]
1✔
2920

2921
        cscs = [api_utils.map_csc(csc) for csc in state.code_signing_configs.values()]
1✔
2922
        cscs = PaginatedList(cscs)
1✔
2923
        page, token = cscs.get_page(
1✔
2924
            lambda csc: csc["CodeSigningConfigId"],
2925
            marker,
2926
            max_items,
2927
        )
2928
        return ListCodeSigningConfigsResponse(CodeSigningConfigs=page, NextMarker=token)
1✔
2929

2930
    def list_functions_by_code_signing_config(
1✔
2931
        self,
2932
        context: RequestContext,
2933
        code_signing_config_arn: CodeSigningConfigArn,
2934
        marker: String = None,
2935
        max_items: MaxListItems = None,
2936
        **kwargs,
2937
    ) -> ListFunctionsByCodeSigningConfigResponse:
2938
        account = context.account_id
1✔
2939
        region = context.region
1✔
2940

2941
        state = lambda_stores[account][region]
1✔
2942

2943
        if code_signing_config_arn not in state.code_signing_configs:
1✔
2944
            raise ResourceNotFoundException(
1✔
2945
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2946
            )
2947

2948
        fn_arns = [
1✔
2949
            api_utils.unqualified_lambda_arn(fn.function_name, account, region)
2950
            for fn in state.functions.values()
2951
            if fn.code_signing_config_arn == code_signing_config_arn
2952
        ]
2953

2954
        cscs = PaginatedList(fn_arns)
1✔
2955
        page, token = cscs.get_page(
1✔
2956
            lambda x: x,
2957
            marker,
2958
            max_items,
2959
        )
2960
        return ListFunctionsByCodeSigningConfigResponse(FunctionArns=page, NextMarker=token)
1✔
2961

2962
    # =======================================
2963
    # =========  Account Settings   =========
2964
    # =======================================
2965

2966
    # CAVE: these settings & usages are *per* region!
2967
    # Lambda quotas: https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
2968
    def get_account_settings(self, context: RequestContext, **kwargs) -> GetAccountSettingsResponse:
1✔
2969
        state = lambda_stores[context.account_id][context.region]
1✔
2970

2971
        fn_count = 0
1✔
2972
        code_size_sum = 0
1✔
2973
        reserved_concurrency_sum = 0
1✔
2974
        for fn in state.functions.values():
1✔
2975
            fn_count += 1
1✔
2976
            for fn_version in fn.versions.values():
1✔
2977
                # Image-based Lambdas do not have a code attribute and count against the ECR quotas instead
2978
                if fn_version.config.package_type == PackageType.Zip:
1✔
2979
                    code_size_sum += fn_version.config.code.code_size
1✔
2980
            if fn.reserved_concurrent_executions is not None:
1✔
2981
                reserved_concurrency_sum += fn.reserved_concurrent_executions
1✔
2982
            for c in fn.provisioned_concurrency_configs.values():
1✔
2983
                reserved_concurrency_sum += c.provisioned_concurrent_executions
1✔
2984
        for layer in state.layers.values():
1✔
2985
            for layer_version in layer.layer_versions.values():
1✔
2986
                code_size_sum += layer_version.code.code_size
1✔
2987
        return GetAccountSettingsResponse(
1✔
2988
            AccountLimit=AccountLimit(
2989
                TotalCodeSize=config.LAMBDA_LIMITS_TOTAL_CODE_SIZE,
2990
                CodeSizeZipped=config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED,
2991
                CodeSizeUnzipped=config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED,
2992
                ConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS,
2993
                UnreservedConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS
2994
                - reserved_concurrency_sum,
2995
            ),
2996
            AccountUsage=AccountUsage(
2997
                TotalCodeSize=code_size_sum,
2998
                FunctionCount=fn_count,
2999
            ),
3000
        )
3001

3002
    # =======================================
3003
    # ==  Provisioned Concurrency Config   ==
3004
    # =======================================
3005

3006
    def _get_provisioned_config(
1✔
3007
        self, context: RequestContext, function_name: str, qualifier: str
3008
    ) -> ProvisionedConcurrencyConfiguration | None:
3009
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3010
        state = lambda_stores[account_id][region]
1✔
3011
        function_name = api_utils.get_function_name(function_name, context)
1✔
3012
        fn = state.functions.get(function_name)
1✔
3013
        if api_utils.qualifier_is_alias(qualifier):
1✔
3014
            fn_alias = None
1✔
3015
            if fn:
1✔
3016
                fn_alias = fn.aliases.get(qualifier)
1✔
3017
            if fn_alias is None:
1✔
3018
                raise ResourceNotFoundException(
1✔
3019
                    f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3020
                    Type="User",
3021
                )
3022
        elif api_utils.qualifier_is_version(qualifier):
1✔
3023
            fn_version = None
1✔
3024
            if fn:
1✔
3025
                fn_version = fn.versions.get(qualifier)
1✔
3026
            if fn_version is None:
1✔
3027
                raise ResourceNotFoundException(
1✔
3028
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3029
                    Type="User",
3030
                )
3031

3032
        return fn.provisioned_concurrency_configs.get(qualifier)
1✔
3033

3034
    def put_provisioned_concurrency_config(
1✔
3035
        self,
3036
        context: RequestContext,
3037
        function_name: FunctionName,
3038
        qualifier: Qualifier,
3039
        provisioned_concurrent_executions: PositiveInteger,
3040
        **kwargs,
3041
    ) -> PutProvisionedConcurrencyConfigResponse:
3042
        if provisioned_concurrent_executions <= 0:
1✔
3043
            raise ValidationException(
1✔
3044
                f"1 validation error detected: Value '{provisioned_concurrent_executions}' at 'provisionedConcurrentExecutions' failed to satisfy constraint: Member must have value greater than or equal to 1"
3045
            )
3046

3047
        if qualifier == "$LATEST":
1✔
3048
            raise InvalidParameterValueException(
1✔
3049
                "Provisioned Concurrency Configs cannot be applied to unpublished function versions.",
3050
                Type="User",
3051
            )
3052
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3053
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3054
            function_name, qualifier, context
3055
        )
3056
        state = lambda_stores[account_id][region]
1✔
3057
        fn = state.functions.get(function_name)
1✔
3058

3059
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3060

3061
        if provisioned_config:  # TODO: merge?
1✔
3062
            # TODO: add a test for partial updates (if possible)
3063
            LOG.warning(
1✔
3064
                "Partial update of provisioned concurrency config is currently not supported."
3065
            )
3066

3067
        other_provisioned_sum = sum(
1✔
3068
            [
3069
                provisioned_configs.provisioned_concurrent_executions
3070
                for provisioned_qualifier, provisioned_configs in fn.provisioned_concurrency_configs.items()
3071
                if provisioned_qualifier != qualifier
3072
            ]
3073
        )
3074

3075
        if (
1✔
3076
            fn.reserved_concurrent_executions is not None
3077
            and fn.reserved_concurrent_executions
3078
            < other_provisioned_sum + provisioned_concurrent_executions
3079
        ):
3080
            raise InvalidParameterValueException(
1✔
3081
                "Requested Provisioned Concurrency should not be greater than the reservedConcurrentExecution for function",
3082
                Type="User",
3083
            )
3084

3085
        if provisioned_concurrent_executions > config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS:
1✔
3086
            raise InvalidParameterValueException(
1✔
3087
                f"Specified ConcurrentExecutions for function is greater than account's unreserved concurrency"
3088
                f" [{config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS}]."
3089
            )
3090

3091
        settings = self.get_account_settings(context)
1✔
3092
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
3093
            "UnreservedConcurrentExecutions"
3094
        ]
3095
        if (
1✔
3096
            unreserved_concurrent_executions - provisioned_concurrent_executions
3097
            < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY
3098
        ):
3099
            raise InvalidParameterValueException(
1✔
3100
                f"Specified ConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below"
3101
                f" its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
3102
            )
3103

3104
        provisioned_config = ProvisionedConcurrencyConfiguration(
1✔
3105
            provisioned_concurrent_executions, api_utils.generate_lambda_date()
3106
        )
3107
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3108

3109
        if api_utils.qualifier_is_alias(qualifier):
1✔
3110
            alias = fn.aliases.get(qualifier)
1✔
3111
            resolved_version = fn.versions.get(alias.function_version)
1✔
3112

3113
            if (
1✔
3114
                resolved_version
3115
                and fn.provisioned_concurrency_configs.get(alias.function_version) is not None
3116
            ):
3117
                raise ResourceConflictException(
1✔
3118
                    "Alias can't be used for Provisioned Concurrency configuration on an already Provisioned version",
3119
                    Type="User",
3120
                )
3121
            fn_arn = resolved_version.id.qualified_arn()
1✔
3122
        elif api_utils.qualifier_is_version(qualifier):
1✔
3123
            fn_version = fn.versions.get(qualifier)
1✔
3124

3125
            # TODO: might be useful other places, utilize
3126
            pointing_aliases = []
1✔
3127
            for alias in fn.aliases.values():
1✔
3128
                if (
1✔
3129
                    alias.function_version == qualifier
3130
                    and fn.provisioned_concurrency_configs.get(alias.name) is not None
3131
                ):
3132
                    pointing_aliases.append(alias.name)
1✔
3133
            if pointing_aliases:
1✔
3134
                raise ResourceConflictException(
1✔
3135
                    "Version is pointed by a Provisioned Concurrency alias", Type="User"
3136
                )
3137

3138
            fn_arn = fn_version.id.qualified_arn()
1✔
3139

3140
        manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3141

3142
        fn.provisioned_concurrency_configs[qualifier] = provisioned_config
1✔
3143

3144
        manager.update_provisioned_concurrency_config(
1✔
3145
            provisioned_config.provisioned_concurrent_executions
3146
        )
3147

3148
        return PutProvisionedConcurrencyConfigResponse(
1✔
3149
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3150
            AvailableProvisionedConcurrentExecutions=0,
3151
            AllocatedProvisionedConcurrentExecutions=0,
3152
            Status=ProvisionedConcurrencyStatusEnum.IN_PROGRESS,
3153
            # StatusReason=manager.provisioned_state.status_reason,
3154
            LastModified=provisioned_config.last_modified,  # TODO: does change with configuration or also with state changes?
3155
        )
3156

3157
    def get_provisioned_concurrency_config(
1✔
3158
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3159
    ) -> GetProvisionedConcurrencyConfigResponse:
3160
        if qualifier == "$LATEST":
1✔
3161
            raise InvalidParameterValueException(
1✔
3162
                "The function resource provided must be an alias or a published version.",
3163
                Type="User",
3164
            )
3165
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3166
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3167
            function_name, qualifier, context
3168
        )
3169

3170
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3171
        if not provisioned_config:
1✔
3172
            raise ProvisionedConcurrencyConfigNotFoundException(
1✔
3173
                "No Provisioned Concurrency Config found for this function", Type="User"
3174
            )
3175

3176
        # TODO: make this compatible with alias pointer migration on update
3177
        if api_utils.qualifier_is_alias(qualifier):
1✔
3178
            state = lambda_stores[account_id][region]
1✔
3179
            fn = state.functions.get(function_name)
1✔
3180
            alias = fn.aliases.get(qualifier)
1✔
3181
            fn_arn = api_utils.qualified_lambda_arn(
1✔
3182
                function_name, alias.function_version, account_id, region
3183
            )
3184
        else:
3185
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3186

3187
        ver_manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3188

3189
        return GetProvisionedConcurrencyConfigResponse(
1✔
3190
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3191
            LastModified=provisioned_config.last_modified,
3192
            AvailableProvisionedConcurrentExecutions=ver_manager.provisioned_state.available,
3193
            AllocatedProvisionedConcurrentExecutions=ver_manager.provisioned_state.allocated,
3194
            Status=ver_manager.provisioned_state.status,
3195
            StatusReason=ver_manager.provisioned_state.status_reason,
3196
        )
3197

3198
    def list_provisioned_concurrency_configs(
1✔
3199
        self,
3200
        context: RequestContext,
3201
        function_name: FunctionName,
3202
        marker: String = None,
3203
        max_items: MaxProvisionedConcurrencyConfigListItems = None,
3204
        **kwargs,
3205
    ) -> ListProvisionedConcurrencyConfigsResponse:
3206
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3207
        state = lambda_stores[account_id][region]
1✔
3208

3209
        function_name = api_utils.get_function_name(function_name, context)
1✔
3210
        fn = state.functions.get(function_name)
1✔
3211
        if fn is None:
1✔
3212
            raise ResourceNotFoundException(
1✔
3213
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
3214
                Type="User",
3215
            )
3216

3217
        configs = []
1✔
3218
        for qualifier, pc_config in fn.provisioned_concurrency_configs.items():
1✔
3219
            if api_utils.qualifier_is_alias(qualifier):
×
UNCOV
3220
                alias = fn.aliases.get(qualifier)
×
UNCOV
3221
                fn_arn = api_utils.qualified_lambda_arn(
×
3222
                    function_name, alias.function_version, account_id, region
3223
                )
3224
            else:
UNCOV
3225
                fn_arn = api_utils.qualified_lambda_arn(
×
3226
                    function_name, qualifier, account_id, region
3227
                )
3228

3229
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
3230

UNCOV
3231
            configs.append(
×
3232
                ProvisionedConcurrencyConfigListItem(
3233
                    FunctionArn=api_utils.qualified_lambda_arn(
3234
                        function_name, qualifier, account_id, region
3235
                    ),
3236
                    RequestedProvisionedConcurrentExecutions=pc_config.provisioned_concurrent_executions,
3237
                    AvailableProvisionedConcurrentExecutions=manager.provisioned_state.available,
3238
                    AllocatedProvisionedConcurrentExecutions=manager.provisioned_state.allocated,
3239
                    Status=manager.provisioned_state.status,
3240
                    StatusReason=manager.provisioned_state.status_reason,
3241
                    LastModified=pc_config.last_modified,
3242
                )
3243
            )
3244

3245
        provisioned_concurrency_configs = configs
1✔
3246
        provisioned_concurrency_configs = PaginatedList(provisioned_concurrency_configs)
1✔
3247
        page, token = provisioned_concurrency_configs.get_page(
1✔
3248
            lambda x: x,
3249
            marker,
3250
            max_items,
3251
        )
3252
        return ListProvisionedConcurrencyConfigsResponse(
1✔
3253
            ProvisionedConcurrencyConfigs=page, NextMarker=token
3254
        )
3255

3256
    def delete_provisioned_concurrency_config(
1✔
3257
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3258
    ) -> None:
3259
        if qualifier == "$LATEST":
1✔
3260
            raise InvalidParameterValueException(
1✔
3261
                "The function resource provided must be an alias or a published version.",
3262
                Type="User",
3263
            )
3264
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3265
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3266
            function_name, qualifier, context
3267
        )
3268
        state = lambda_stores[account_id][region]
1✔
3269
        fn = state.functions.get(function_name)
1✔
3270

3271
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3272
        # delete is idempotent and doesn't actually care about the provisioned concurrency config not existing
3273
        if provisioned_config:
1✔
3274
            fn.provisioned_concurrency_configs.pop(qualifier)
1✔
3275
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3276
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3277
            manager.update_provisioned_concurrency_config(0)
1✔
3278

3279
    # =======================================
3280
    # =======  Event Invoke Config   ========
3281
    # =======================================
3282

3283
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)"
3284
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]2((-gov)|(-iso(b?)))?-[a-z]+-\\d1)?:(\\d12)?:(.*)" ... (expected → actual)
3285

3286
    def _validate_destination_config(
1✔
3287
        self, store: LambdaStore, function_name: str, destination_config: DestinationConfig
3288
    ):
3289
        def _validate_destination_arn(destination_arn) -> bool:
1✔
3290
            if not api_utils.DESTINATION_ARN_PATTERN.match(destination_arn):
1✔
3291
                # technically we shouldn't handle this in the provider
3292
                raise ValidationException(
1✔
3293
                    "1 validation error detected: Value '"
3294
                    + destination_arn
3295
                    + r"' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\-])+:([a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\d{1})?:(\d{12})?:(.*)"
3296
                )
3297

3298
            match destination_arn.split(":")[2]:
1✔
3299
                case "lambda":
1✔
3300
                    fn_parts = api_utils.FULL_FN_ARN_PATTERN.search(destination_arn).groupdict()
1✔
3301
                    if fn_parts:
1✔
3302
                        # check if it exists
3303
                        fn = store.functions.get(fn_parts["function_name"])
1✔
3304
                        if not fn:
1✔
3305
                            raise InvalidParameterValueException(
1✔
3306
                                f"The destination ARN {destination_arn} is invalid.", Type="User"
3307
                            )
3308
                        if fn_parts["function_name"] == function_name:
1✔
3309
                            raise InvalidParameterValueException(
1✔
3310
                                "You can't specify the function as a destination for itself.",
3311
                                Type="User",
3312
                            )
3313
                case "sns" | "sqs" | "events":
1✔
3314
                    pass
1✔
3315
                case _:
1✔
3316
                    return False
1✔
3317
            return True
1✔
3318

3319
        validation_err = False
1✔
3320

3321
        failure_destination = destination_config.get("OnFailure", {}).get("Destination")
1✔
3322
        if failure_destination:
1✔
3323
            validation_err = validation_err or not _validate_destination_arn(failure_destination)
1✔
3324

3325
        success_destination = destination_config.get("OnSuccess", {}).get("Destination")
1✔
3326
        if success_destination:
1✔
3327
            validation_err = validation_err or not _validate_destination_arn(success_destination)
1✔
3328

3329
        if validation_err:
1✔
3330
            on_success_part = (
1✔
3331
                f"OnSuccess(destination={success_destination})" if success_destination else "null"
3332
            )
3333
            on_failure_part = (
1✔
3334
                f"OnFailure(destination={failure_destination})" if failure_destination else "null"
3335
            )
3336
            raise InvalidParameterValueException(
1✔
3337
                f"The provided destination config DestinationConfig(onSuccess={on_success_part}, onFailure={on_failure_part}) is invalid.",
3338
                Type="User",
3339
            )
3340

3341
    def put_function_event_invoke_config(
1✔
3342
        self,
3343
        context: RequestContext,
3344
        function_name: FunctionName,
3345
        qualifier: Qualifier = None,
3346
        maximum_retry_attempts: MaximumRetryAttempts = None,
3347
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3348
        destination_config: DestinationConfig = None,
3349
        **kwargs,
3350
    ) -> FunctionEventInvokeConfig:
3351
        """
3352
        Destination ARNs can be:
3353
        * SQS arn
3354
        * SNS arn
3355
        * Lambda arn
3356
        * EventBridge arn
3357

3358
        Differences between put_ and update_:
3359
            * put overwrites any existing config
3360
            * update allows changes only single values while keeping the rest of existing ones
3361
            * update fails on non-existing configs
3362

3363
        Differences between destination and DLQ
3364
            * "However, a dead-letter queue is part of a function's version-specific configuration, so it is locked in when you publish a version."
3365
            *  "On-failure destinations also support additional targets and include details about the function's response in the invocation record."
3366

3367
        """
3368
        if (
1✔
3369
            maximum_event_age_in_seconds is None
3370
            and maximum_retry_attempts is None
3371
            and destination_config is None
3372
        ):
3373
            raise InvalidParameterValueException(
1✔
3374
                "You must specify at least one of error handling or destination setting.",
3375
                Type="User",
3376
            )
3377
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3378
        state = lambda_stores[account_id][region]
1✔
3379
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3380
            function_name, qualifier, context
3381
        )
3382
        fn = state.functions.get(function_name)
1✔
3383
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3384
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3385

3386
        qualifier = qualifier or "$LATEST"
1✔
3387

3388
        # validate and normalize destination config
3389
        if destination_config:
1✔
3390
            self._validate_destination_config(state, function_name, destination_config)
1✔
3391

3392
        destination_config = DestinationConfig(
1✔
3393
            OnSuccess=OnSuccess(
3394
                Destination=(destination_config or {}).get("OnSuccess", {}).get("Destination")
3395
            ),
3396
            OnFailure=OnFailure(
3397
                Destination=(destination_config or {}).get("OnFailure", {}).get("Destination")
3398
            ),
3399
        )
3400

3401
        config = EventInvokeConfig(
1✔
3402
            function_name=function_name,
3403
            qualifier=qualifier,
3404
            maximum_event_age_in_seconds=maximum_event_age_in_seconds,
3405
            maximum_retry_attempts=maximum_retry_attempts,
3406
            last_modified=api_utils.generate_lambda_date(),
3407
            destination_config=destination_config,
3408
        )
3409
        fn.event_invoke_configs[qualifier] = config
1✔
3410

3411
        return FunctionEventInvokeConfig(
1✔
3412
            LastModified=datetime.datetime.strptime(
3413
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3414
            ),
3415
            FunctionArn=api_utils.qualified_lambda_arn(
3416
                function_name, qualifier or "$LATEST", account_id, region
3417
            ),
3418
            DestinationConfig=destination_config,
3419
            MaximumEventAgeInSeconds=maximum_event_age_in_seconds,
3420
            MaximumRetryAttempts=maximum_retry_attempts,
3421
        )
3422

3423
    def get_function_event_invoke_config(
1✔
3424
        self,
3425
        context: RequestContext,
3426
        function_name: FunctionName,
3427
        qualifier: Qualifier = None,
3428
        **kwargs,
3429
    ) -> FunctionEventInvokeConfig:
3430
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3431
        state = lambda_stores[account_id][region]
1✔
3432
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3433
            function_name, qualifier, context
3434
        )
3435

3436
        qualifier = qualifier or "$LATEST"
1✔
3437
        fn = state.functions.get(function_name)
1✔
3438
        if not fn:
1✔
3439
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3440
            raise ResourceNotFoundException(
1✔
3441
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3442
            )
3443

3444
        config = fn.event_invoke_configs.get(qualifier)
1✔
3445
        if not config:
1✔
3446
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3447
            raise ResourceNotFoundException(
1✔
3448
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3449
            )
3450

3451
        return FunctionEventInvokeConfig(
1✔
3452
            LastModified=datetime.datetime.strptime(
3453
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3454
            ),
3455
            FunctionArn=api_utils.qualified_lambda_arn(
3456
                function_name, qualifier, account_id, region
3457
            ),
3458
            DestinationConfig=config.destination_config,
3459
            MaximumEventAgeInSeconds=config.maximum_event_age_in_seconds,
3460
            MaximumRetryAttempts=config.maximum_retry_attempts,
3461
        )
3462

3463
    def list_function_event_invoke_configs(
1✔
3464
        self,
3465
        context: RequestContext,
3466
        function_name: FunctionName,
3467
        marker: String = None,
3468
        max_items: MaxFunctionEventInvokeConfigListItems = None,
3469
        **kwargs,
3470
    ) -> ListFunctionEventInvokeConfigsResponse:
3471
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3472
        state = lambda_stores[account_id][region]
1✔
3473
        fn = state.functions.get(function_name)
1✔
3474
        if not fn:
1✔
3475
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3476

3477
        event_invoke_configs = [
1✔
3478
            FunctionEventInvokeConfig(
3479
                LastModified=c.last_modified,
3480
                FunctionArn=api_utils.qualified_lambda_arn(
3481
                    function_name, c.qualifier, account_id, region
3482
                ),
3483
                MaximumEventAgeInSeconds=c.maximum_event_age_in_seconds,
3484
                MaximumRetryAttempts=c.maximum_retry_attempts,
3485
                DestinationConfig=c.destination_config,
3486
            )
3487
            for c in fn.event_invoke_configs.values()
3488
        ]
3489

3490
        event_invoke_configs = PaginatedList(event_invoke_configs)
1✔
3491
        page, token = event_invoke_configs.get_page(
1✔
3492
            lambda x: x["FunctionArn"],
3493
            marker,
3494
            max_items,
3495
        )
3496
        return ListFunctionEventInvokeConfigsResponse(
1✔
3497
            FunctionEventInvokeConfigs=page, NextMarker=token
3498
        )
3499

3500
    def delete_function_event_invoke_config(
1✔
3501
        self,
3502
        context: RequestContext,
3503
        function_name: FunctionName,
3504
        qualifier: Qualifier = None,
3505
        **kwargs,
3506
    ) -> None:
3507
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3508
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3509
            function_name, qualifier, context
3510
        )
3511
        state = lambda_stores[account_id][region]
1✔
3512
        fn = state.functions.get(function_name)
1✔
3513
        resolved_qualifier = qualifier or "$LATEST"
1✔
3514
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3515
        if not fn:
1✔
3516
            raise ResourceNotFoundException(
1✔
3517
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3518
            )
3519

3520
        config = fn.event_invoke_configs.get(resolved_qualifier)
1✔
3521
        if not config:
1✔
3522
            raise ResourceNotFoundException(
1✔
3523
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3524
            )
3525

3526
        del fn.event_invoke_configs[resolved_qualifier]
1✔
3527

3528
    def update_function_event_invoke_config(
1✔
3529
        self,
3530
        context: RequestContext,
3531
        function_name: FunctionName,
3532
        qualifier: Qualifier = None,
3533
        maximum_retry_attempts: MaximumRetryAttempts = None,
3534
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3535
        destination_config: DestinationConfig = None,
3536
        **kwargs,
3537
    ) -> FunctionEventInvokeConfig:
3538
        # like put but only update single fields via replace
3539
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3540
        state = lambda_stores[account_id][region]
1✔
3541
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3542
            function_name, qualifier, context
3543
        )
3544

3545
        if (
1✔
3546
            maximum_event_age_in_seconds is None
3547
            and maximum_retry_attempts is None
3548
            and destination_config is None
3549
        ):
UNCOV
3550
            raise InvalidParameterValueException(
×
3551
                "You must specify at least one of error handling or destination setting.",
3552
                Type="User",
3553
            )
3554

3555
        fn = state.functions.get(function_name)
1✔
3556
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3557
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3558

3559
        qualifier = qualifier or "$LATEST"
1✔
3560

3561
        config = fn.event_invoke_configs.get(qualifier)
1✔
3562
        if not config:
1✔
3563
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3564
            raise ResourceNotFoundException(
1✔
3565
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3566
            )
3567

3568
        if destination_config:
1✔
UNCOV
3569
            self._validate_destination_config(state, function_name, destination_config)
×
3570

3571
        optional_kwargs = {
1✔
3572
            k: v
3573
            for k, v in {
3574
                "destination_config": destination_config,
3575
                "maximum_retry_attempts": maximum_retry_attempts,
3576
                "maximum_event_age_in_seconds": maximum_event_age_in_seconds,
3577
            }.items()
3578
            if v is not None
3579
        }
3580

3581
        new_config = dataclasses.replace(
1✔
3582
            config, last_modified=api_utils.generate_lambda_date(), **optional_kwargs
3583
        )
3584
        fn.event_invoke_configs[qualifier] = new_config
1✔
3585

3586
        return FunctionEventInvokeConfig(
1✔
3587
            LastModified=datetime.datetime.strptime(
3588
                new_config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3589
            ),
3590
            FunctionArn=api_utils.qualified_lambda_arn(
3591
                function_name, qualifier or "$LATEST", account_id, region
3592
            ),
3593
            DestinationConfig=new_config.destination_config,
3594
            MaximumEventAgeInSeconds=new_config.maximum_event_age_in_seconds,
3595
            MaximumRetryAttempts=new_config.maximum_retry_attempts,
3596
        )
3597

3598
    # =======================================
3599
    # ======  Layer & Layer Versions  =======
3600
    # =======================================
3601

3602
    @staticmethod
1✔
3603
    def _resolve_layer(
1✔
3604
        layer_name_or_arn: str, context: RequestContext
3605
    ) -> Tuple[str, str, str, Optional[str]]:
3606
        """
3607
        Return locator attributes for a given Lambda layer.
3608

3609
        :param layer_name_or_arn: Layer name or ARN
3610
        :param context: Request context
3611
        :return: Tuple of region, account ID, layer name, layer version
3612
        """
3613
        if api_utils.is_layer_arn(layer_name_or_arn):
1✔
3614
            return api_utils.parse_layer_arn(layer_name_or_arn)
1✔
3615

3616
        return context.region, context.account_id, layer_name_or_arn, None
1✔
3617

3618
    def publish_layer_version(
1✔
3619
        self,
3620
        context: RequestContext,
3621
        layer_name: LayerName,
3622
        content: LayerVersionContentInput,
3623
        description: Description = None,
3624
        compatible_runtimes: CompatibleRuntimes = None,
3625
        license_info: LicenseInfo = None,
3626
        compatible_architectures: CompatibleArchitectures = None,
3627
        **kwargs,
3628
    ) -> PublishLayerVersionResponse:
3629
        """
3630
        On first use of a LayerName a new layer is created and for each subsequent call with the same LayerName a new version is created.
3631
        Note that there are no $LATEST versions with layers!
3632

3633
        """
3634
        account = context.account_id
1✔
3635
        region = context.region
1✔
3636

3637
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
3638
            compatible_runtimes, compatible_architectures
3639
        )
3640
        if validation_errors:
1✔
3641
            raise ValidationException(
1✔
3642
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {'; '.join(validation_errors)}"
3643
            )
3644

3645
        state = lambda_stores[account][region]
1✔
3646
        with self.create_layer_lock:
1✔
3647
            if layer_name not in state.layers:
1✔
3648
                # we don't have a version so create new layer object
3649
                # lock is required to avoid creating two v1 objects for the same name
3650
                layer = Layer(
1✔
3651
                    arn=api_utils.layer_arn(layer_name=layer_name, account=account, region=region)
3652
                )
3653
                state.layers[layer_name] = layer
1✔
3654

3655
        layer = state.layers[layer_name]
1✔
3656
        with layer.next_version_lock:
1✔
3657
            next_version = LambdaLayerVersionIdentifier(
1✔
3658
                account_id=account, region=region, layer_name=layer_name
3659
            ).generate(next_version=layer.next_version)
3660
            # When creating a layer with user defined layer version, it is possible that we
3661
            # create layer versions out of order.
3662
            # ie. a user could replicate layer v2 then layer v1. It is important to always keep the maximum possible
3663
            # value for next layer to avoid overwriting existing versions
3664
            if layer.next_version <= next_version:
1✔
3665
                # We don't need to update layer.next_version if the created version is lower than the "next in line"
3666
                layer.next_version = max(next_version, layer.next_version) + 1
1✔
3667

3668
        # creating a new layer
3669
        if content.get("ZipFile"):
1✔
3670
            code = store_lambda_archive(
1✔
3671
                archive_file=content["ZipFile"],
3672
                function_name=layer_name,
3673
                region_name=region,
3674
                account_id=account,
3675
            )
3676
        else:
3677
            code = store_s3_bucket_archive(
1✔
3678
                archive_bucket=content["S3Bucket"],
3679
                archive_key=content["S3Key"],
3680
                archive_version=content.get("S3ObjectVersion"),
3681
                function_name=layer_name,
3682
                region_name=region,
3683
                account_id=account,
3684
            )
3685

3686
        new_layer_version = LayerVersion(
1✔
3687
            layer_version_arn=api_utils.layer_version_arn(
3688
                layer_name=layer_name,
3689
                account=account,
3690
                region=region,
3691
                version=str(next_version),
3692
            ),
3693
            layer_arn=layer.arn,
3694
            version=next_version,
3695
            description=description or "",
3696
            license_info=license_info,
3697
            compatible_runtimes=compatible_runtimes,
3698
            compatible_architectures=compatible_architectures,
3699
            created=api_utils.generate_lambda_date(),
3700
            code=code,
3701
        )
3702

3703
        layer.layer_versions[str(next_version)] = new_layer_version
1✔
3704

3705
        return api_utils.map_layer_out(new_layer_version)
1✔
3706

3707
    def get_layer_version(
1✔
3708
        self,
3709
        context: RequestContext,
3710
        layer_name: LayerName,
3711
        version_number: LayerVersionNumber,
3712
        **kwargs,
3713
    ) -> GetLayerVersionResponse:
3714
        # TODO: handle layer_name as an ARN
3715

3716
        region_name, account_id, layer_name, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
3717
        state = lambda_stores[account_id][region_name]
1✔
3718

3719
        layer = state.layers.get(layer_name)
1✔
3720
        if version_number < 1:
1✔
3721
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
3722
        if layer is None:
1✔
3723
            raise ResourceNotFoundException(
1✔
3724
                "The resource you requested does not exist.", Type="User"
3725
            )
3726
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3727
        if layer_version is None:
1✔
3728
            raise ResourceNotFoundException(
1✔
3729
                "The resource you requested does not exist.", Type="User"
3730
            )
3731
        return api_utils.map_layer_out(layer_version)
1✔
3732

3733
    def get_layer_version_by_arn(
1✔
3734
        self, context: RequestContext, arn: LayerVersionArn, **kwargs
3735
    ) -> GetLayerVersionResponse:
3736
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3737
            arn, context
3738
        )
3739

3740
        if not layer_version:
1✔
3741
            raise ValidationException(
1✔
3742
                f"1 validation error detected: Value '{arn}' at 'arn' failed to satisfy constraint: Member must satisfy regular expression pattern: "
3743
                + "(arn:(aws[a-zA-Z-]*)?:lambda:[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+)"
3744
            )
3745

3746
        store = lambda_stores[account_id][region_name]
1✔
3747
        if not (layers := store.layers.get(layer_name)):
1✔
UNCOV
3748
            raise ResourceNotFoundException(
×
3749
                "The resource you requested does not exist.", Type="User"
3750
            )
3751

3752
        layer_version = layers.layer_versions.get(layer_version)
1✔
3753

3754
        if not layer_version:
1✔
3755
            raise ResourceNotFoundException(
1✔
3756
                "The resource you requested does not exist.", Type="User"
3757
            )
3758

3759
        return api_utils.map_layer_out(layer_version)
1✔
3760

3761
    def list_layers(
1✔
3762
        self,
3763
        context: RequestContext,
3764
        compatible_runtime: Runtime = None,
3765
        marker: String = None,
3766
        max_items: MaxLayerListItems = None,
3767
        compatible_architecture: Architecture = None,
3768
        **kwargs,
3769
    ) -> ListLayersResponse:
3770
        validation_errors = []
1✔
3771

3772
        validation_error_arch = api_utils.validate_layer_architecture(compatible_architecture)
1✔
3773
        if validation_error_arch:
1✔
3774
            validation_errors.append(validation_error_arch)
1✔
3775

3776
        validation_error_runtime = api_utils.validate_layer_runtime(compatible_runtime)
1✔
3777
        if validation_error_runtime:
1✔
3778
            validation_errors.append(validation_error_runtime)
1✔
3779

3780
        if validation_errors:
1✔
3781
            raise ValidationException(
1✔
3782
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
3783
            )
3784
        # TODO: handle filter: compatible_runtime
3785
        # TODO: handle filter: compatible_architecture
3786

UNCOV
3787
        state = lambda_stores[context.account_id][context.region]
×
UNCOV
3788
        layers = state.layers
×
3789

3790
        # TODO: test how filters behave together with only returning layers here? Does it return the latest "matching" layer, i.e. does it ignore later layer versions that don't match?
3791

UNCOV
3792
        responses: list[LayersListItem] = []
×
3793
        for layer_name, layer in layers.items():
×
3794
            # fetch latest version
3795
            layer_versions = list(layer.layer_versions.values())
×
3796
            sorted(layer_versions, key=lambda x: x.version)
×
UNCOV
3797
            latest_layer_version = layer_versions[-1]
×
UNCOV
3798
            responses.append(
×
3799
                LayersListItem(
3800
                    LayerName=layer_name,
3801
                    LayerArn=layer.arn,
3802
                    LatestMatchingVersion=api_utils.map_layer_out(latest_layer_version),
3803
                )
3804
            )
3805

UNCOV
3806
        responses = PaginatedList(responses)
×
UNCOV
3807
        page, token = responses.get_page(
×
3808
            lambda version: version,
3809
            marker,
3810
            max_items,
3811
        )
3812

UNCOV
3813
        return ListLayersResponse(NextMarker=token, Layers=page)
×
3814

3815
    def list_layer_versions(
1✔
3816
        self,
3817
        context: RequestContext,
3818
        layer_name: LayerName,
3819
        compatible_runtime: Runtime = None,
3820
        marker: String = None,
3821
        max_items: MaxLayerListItems = None,
3822
        compatible_architecture: Architecture = None,
3823
        **kwargs,
3824
    ) -> ListLayerVersionsResponse:
3825
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
3826
            [compatible_runtime] if compatible_runtime else [],
3827
            [compatible_architecture] if compatible_architecture else [],
3828
        )
3829
        if validation_errors:
1✔
UNCOV
3830
            raise ValidationException(
×
3831
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
3832
            )
3833

3834
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3835
            layer_name, context
3836
        )
3837
        state = lambda_stores[account_id][region_name]
1✔
3838

3839
        # TODO: Test & handle filter: compatible_runtime
3840
        # TODO: Test & handle filter: compatible_architecture
3841
        all_layer_versions = []
1✔
3842
        layer = state.layers.get(layer_name)
1✔
3843
        if layer is not None:
1✔
3844
            for layer_version in layer.layer_versions.values():
1✔
3845
                all_layer_versions.append(api_utils.map_layer_out(layer_version))
1✔
3846

3847
        all_layer_versions.sort(key=lambda x: x["Version"], reverse=True)
1✔
3848
        all_layer_versions = PaginatedList(all_layer_versions)
1✔
3849
        page, token = all_layer_versions.get_page(
1✔
3850
            lambda version: version["LayerVersionArn"],
3851
            marker,
3852
            max_items,
3853
        )
3854
        return ListLayerVersionsResponse(NextMarker=token, LayerVersions=page)
1✔
3855

3856
    def delete_layer_version(
1✔
3857
        self,
3858
        context: RequestContext,
3859
        layer_name: LayerName,
3860
        version_number: LayerVersionNumber,
3861
        **kwargs,
3862
    ) -> None:
3863
        if version_number < 1:
1✔
3864
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
3865

3866
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3867
            layer_name, context
3868
        )
3869

3870
        store = lambda_stores[account_id][region_name]
1✔
3871
        layer = store.layers.get(layer_name, {})
1✔
3872
        if layer:
1✔
3873
            layer.layer_versions.pop(str(version_number), None)
1✔
3874

3875
    # =======================================
3876
    # =====  Layer Version Permissions  =====
3877
    # =======================================
3878
    # TODO: lock updates that change revision IDs
3879

3880
    def add_layer_version_permission(
1✔
3881
        self,
3882
        context: RequestContext,
3883
        layer_name: LayerName,
3884
        version_number: LayerVersionNumber,
3885
        statement_id: StatementId,
3886
        action: LayerPermissionAllowedAction,
3887
        principal: LayerPermissionAllowedPrincipal,
3888
        organization_id: OrganizationId = None,
3889
        revision_id: String = None,
3890
        **kwargs,
3891
    ) -> AddLayerVersionPermissionResponse:
3892
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
3893
        # `layer_n` contains the layer name.
3894
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
3895

3896
        if action != "lambda:GetLayerVersion":
1✔
3897
            raise ValidationException(
1✔
3898
                f"1 validation error detected: Value '{action}' at 'action' failed to satisfy constraint: Member must satisfy regular expression pattern: lambda:GetLayerVersion"
3899
            )
3900

3901
        store = lambda_stores[account_id][region_name]
1✔
3902
        layer = store.layers.get(layer_n)
1✔
3903

3904
        layer_version_arn = api_utils.layer_version_arn(
1✔
3905
            layer_name, account_id, region_name, str(version_number)
3906
        )
3907

3908
        if layer is None:
1✔
3909
            raise ResourceNotFoundException(
1✔
3910
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3911
            )
3912
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3913
        if layer_version is None:
1✔
3914
            raise ResourceNotFoundException(
1✔
3915
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3916
            )
3917
        # do we have a policy? if not set one
3918
        if layer_version.policy is None:
1✔
3919
            layer_version.policy = LayerPolicy()
1✔
3920

3921
        if statement_id in layer_version.policy.statements:
1✔
3922
            raise ResourceConflictException(
1✔
3923
                f"The statement id ({statement_id}) provided already exists. Please provide a new statement id, or remove the existing statement.",
3924
                Type="User",
3925
            )
3926

3927
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
3928
            raise PreconditionFailedException(
1✔
3929
                "The Revision Id provided does not match the latest Revision Id. "
3930
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
3931
                Type="User",
3932
            )
3933

3934
        statement = LayerPolicyStatement(
1✔
3935
            sid=statement_id, action=action, principal=principal, organization_id=organization_id
3936
        )
3937

3938
        old_statements = layer_version.policy.statements
1✔
3939
        layer_version.policy = dataclasses.replace(
1✔
3940
            layer_version.policy, statements={**old_statements, statement_id: statement}
3941
        )
3942

3943
        return AddLayerVersionPermissionResponse(
1✔
3944
            Statement=json.dumps(
3945
                {
3946
                    "Sid": statement.sid,
3947
                    "Effect": "Allow",
3948
                    "Principal": statement.principal,
3949
                    "Action": statement.action,
3950
                    "Resource": layer_version.layer_version_arn,
3951
                }
3952
            ),
3953
            RevisionId=layer_version.policy.revision_id,
3954
        )
3955

3956
    def remove_layer_version_permission(
1✔
3957
        self,
3958
        context: RequestContext,
3959
        layer_name: LayerName,
3960
        version_number: LayerVersionNumber,
3961
        statement_id: StatementId,
3962
        revision_id: String = None,
3963
        **kwargs,
3964
    ) -> None:
3965
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
3966
        # `layer_n` contains the layer name.
3967
        region_name, account_id, layer_n, layer_version = LambdaProvider._resolve_layer(
1✔
3968
            layer_name, context
3969
        )
3970

3971
        layer_version_arn = api_utils.layer_version_arn(
1✔
3972
            layer_name, account_id, region_name, str(version_number)
3973
        )
3974

3975
        state = lambda_stores[account_id][region_name]
1✔
3976
        layer = state.layers.get(layer_n)
1✔
3977
        if layer is None:
1✔
3978
            raise ResourceNotFoundException(
1✔
3979
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3980
            )
3981
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3982
        if layer_version is None:
1✔
3983
            raise ResourceNotFoundException(
1✔
3984
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3985
            )
3986

3987
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
3988
            raise PreconditionFailedException(
1✔
3989
                "The Revision Id provided does not match the latest Revision Id. "
3990
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
3991
                Type="User",
3992
            )
3993

3994
        if statement_id not in layer_version.policy.statements:
1✔
3995
            raise ResourceNotFoundException(
1✔
3996
                f"Statement {statement_id} is not found in resource policy.", Type="User"
3997
            )
3998

3999
        old_statements = layer_version.policy.statements
1✔
4000
        layer_version.policy = dataclasses.replace(
1✔
4001
            layer_version.policy,
4002
            statements={k: v for k, v in old_statements.items() if k != statement_id},
4003
        )
4004

4005
    def get_layer_version_policy(
1✔
4006
        self,
4007
        context: RequestContext,
4008
        layer_name: LayerName,
4009
        version_number: LayerVersionNumber,
4010
        **kwargs,
4011
    ) -> GetLayerVersionPolicyResponse:
4012
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4013
        # `layer_n` contains the layer name.
4014
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
4015

4016
        layer_version_arn = api_utils.layer_version_arn(
1✔
4017
            layer_name, account_id, region_name, str(version_number)
4018
        )
4019

4020
        store = lambda_stores[account_id][region_name]
1✔
4021
        layer = store.layers.get(layer_n)
1✔
4022

4023
        if layer is None:
1✔
4024
            raise ResourceNotFoundException(
1✔
4025
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4026
            )
4027

4028
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4029
        if layer_version is None:
1✔
4030
            raise ResourceNotFoundException(
1✔
4031
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4032
            )
4033

4034
        if layer_version.policy is None:
1✔
4035
            raise ResourceNotFoundException(
1✔
4036
                "No policy is associated with the given resource.", Type="User"
4037
            )
4038

4039
        return GetLayerVersionPolicyResponse(
1✔
4040
            Policy=json.dumps(
4041
                {
4042
                    "Version": layer_version.policy.version,
4043
                    "Id": layer_version.policy.id,
4044
                    "Statement": [
4045
                        {
4046
                            "Sid": ps.sid,
4047
                            "Effect": "Allow",
4048
                            "Principal": ps.principal,
4049
                            "Action": ps.action,
4050
                            "Resource": layer_version.layer_version_arn,
4051
                        }
4052
                        for ps in layer_version.policy.statements.values()
4053
                    ],
4054
                }
4055
            ),
4056
            RevisionId=layer_version.policy.revision_id,
4057
        )
4058

4059
    # =======================================
4060
    # =======  Function Concurrency  ========
4061
    # =======================================
4062
    # (Reserved) function concurrency is scoped to the whole function
4063

4064
    def get_function_concurrency(
1✔
4065
        self, context: RequestContext, function_name: FunctionName, **kwargs
4066
    ) -> GetFunctionConcurrencyResponse:
4067
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4068
        function_name = api_utils.get_function_name(function_name, context)
1✔
4069
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
4070
        return GetFunctionConcurrencyResponse(
1✔
4071
            ReservedConcurrentExecutions=fn.reserved_concurrent_executions
4072
        )
4073

4074
    def put_function_concurrency(
1✔
4075
        self,
4076
        context: RequestContext,
4077
        function_name: FunctionName,
4078
        reserved_concurrent_executions: ReservedConcurrentExecutions,
4079
        **kwargs,
4080
    ) -> Concurrency:
4081
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4082

4083
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4084
        if qualifier:
1✔
4085
            raise InvalidParameterValueException(
1✔
4086
                "This operation is permitted on Lambda functions only. Aliases and versions do not support this operation. Please specify either a function name or an unqualified function ARN.",
4087
                Type="User",
4088
            )
4089

4090
        store = lambda_stores[account_id][region]
1✔
4091
        fn = store.functions.get(function_name)
1✔
4092
        if not fn:
1✔
4093
            fn_arn = api_utils.qualified_lambda_arn(
1✔
4094
                function_name,
4095
                qualifier="$LATEST",
4096
                account=account_id,
4097
                region=region,
4098
            )
4099
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
4100

4101
        settings = self.get_account_settings(context)
1✔
4102
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
4103
            "UnreservedConcurrentExecutions"
4104
        ]
4105

4106
        # The existing reserved concurrent executions for the same function are already deduced in
4107
        # unreserved_concurrent_executions but must not count because the new one will replace the existing one.
4108
        # Joel tested this behavior manually against AWS (2023-11-28).
4109
        existing_reserved_concurrent_executions = (
1✔
4110
            fn.reserved_concurrent_executions if fn.reserved_concurrent_executions else 0
4111
        )
4112
        if (
1✔
4113
            unreserved_concurrent_executions
4114
            - reserved_concurrent_executions
4115
            + existing_reserved_concurrent_executions
4116
        ) < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY:
4117
            raise InvalidParameterValueException(
1✔
4118
                f"Specified ReservedConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
4119
            )
4120

4121
        total_provisioned_concurrency = sum(
1✔
4122
            [
4123
                provisioned_configs.provisioned_concurrent_executions
4124
                for provisioned_configs in fn.provisioned_concurrency_configs.values()
4125
            ]
4126
        )
4127
        if total_provisioned_concurrency > reserved_concurrent_executions:
1✔
4128
            raise InvalidParameterValueException(
1✔
4129
                f" ReservedConcurrentExecutions  {reserved_concurrent_executions} should not be lower than function's total provisioned concurrency [{total_provisioned_concurrency}]."
4130
            )
4131

4132
        fn.reserved_concurrent_executions = reserved_concurrent_executions
1✔
4133

4134
        return Concurrency(ReservedConcurrentExecutions=fn.reserved_concurrent_executions)
1✔
4135

4136
    def delete_function_concurrency(
1✔
4137
        self, context: RequestContext, function_name: FunctionName, **kwargs
4138
    ) -> None:
4139
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4140
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4141
        store = lambda_stores[account_id][region]
1✔
4142
        fn = store.functions.get(function_name)
1✔
4143
        fn.reserved_concurrent_executions = None
1✔
4144

4145
    # =======================================
4146
    # ===============  TAGS   ===============
4147
    # =======================================
4148
    # only Function, Event Source Mapping, and Code Signing Config (not currently supported by LocalStack) ARNs an are available for tagging in AWS
4149

4150
    def _get_tags(self, resource: TaggableResource) -> dict[str, str]:
1✔
4151
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4152
        lambda_adapted_tags = {
1✔
4153
            tag["Key"]: tag["Value"]
4154
            for tag in state.TAGS.list_tags_for_resource(resource).get("Tags")
4155
        }
4156
        return lambda_adapted_tags
1✔
4157

4158
    def _store_tags(self, resource: TaggableResource, tags: dict[str, str]):
1✔
4159
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4160
        if len(state.TAGS.tags.get(resource, {}) | tags) > LAMBDA_TAG_LIMIT_PER_RESOURCE:
1✔
4161
            raise InvalidParameterValueException(
1✔
4162
                "Number of tags exceeds resource tag limit.", Type="User"
4163
            )
4164

4165
        tag_svc_adapted_tags = [{"Key": key, "Value": value} for key, value in tags.items()]
1✔
4166
        state.TAGS.tag_resource(resource, tag_svc_adapted_tags)
1✔
4167

4168
    def fetch_lambda_store_for_tagging(self, resource: TaggableResource) -> LambdaStore:
1✔
4169
        """
4170
        Takes a resource ARN for a TaggableResource (Lambda Function, Event Source Mapping, or Code Signing Config) and returns a corresponding
4171
        LambdaStore for its region and account.
4172

4173
        In addition, this function validates that the ARN is a valid TaggableResource type, and that the TaggableResource exists.
4174

4175
        Raises:
4176
            ValidationException: If the resource ARN is not a full ARN for a TaggableResource.
4177
            ResourceNotFoundException: If the specified resource does not exist.
4178
            InvalidParameterValueException: If the resource ARN is a qualified Lambda Function.
4179
        """
4180

4181
        def _raise_validation_exception():
1✔
4182
            raise ValidationException(
1✔
4183
                f"1 validation error detected: Value '{resource}' at 'resource' failed to satisfy constraint: Member must satisfy regular expression pattern: {api_utils.TAGGABLE_RESOURCE_ARN_PATTERN}"
4184
            )
4185

4186
        # Check whether the ARN we have been passed is correctly formatted
4187
        parsed_resource_arn: ArnData = None
1✔
4188
        try:
1✔
4189
            parsed_resource_arn = parse_arn(resource)
1✔
4190
        except Exception:
1✔
4191
            _raise_validation_exception()
1✔
4192

4193
        # TODO: Should we be checking whether this is a full ARN?
4194
        region, account_id, resource_type = map(
1✔
4195
            parsed_resource_arn.get, ("region", "account", "resource")
4196
        )
4197

4198
        if not all((region, account_id, resource_type)):
1✔
UNCOV
4199
            _raise_validation_exception()
×
4200

4201
        if not (parts := resource_type.split(":")):
1✔
UNCOV
4202
            _raise_validation_exception()
×
4203

4204
        resource_type, resource_identifier, *qualifier = parts
1✔
4205
        if resource_type not in {"event-source-mapping", "code-signing-config", "function"}:
1✔
4206
            _raise_validation_exception()
1✔
4207

4208
        if qualifier:
1✔
4209
            if resource_type == "function":
1✔
4210
                raise InvalidParameterValueException(
1✔
4211
                    "Tags on function aliases and versions are not supported. Please specify a function ARN.",
4212
                    Type="User",
4213
                )
4214
            _raise_validation_exception()
1✔
4215

4216
        match resource_type:
1✔
4217
            case "event-source-mapping":
1✔
4218
                self._get_esm(resource_identifier, account_id, region)
1✔
4219
            case "code-signing-config":
1✔
4220
                raise NotImplementedError("Resource tagging on CSC not yet implemented.")
4221
            case "function":
1✔
4222
                self._get_function(
1✔
4223
                    function_name=resource_identifier, account_id=account_id, region=region
4224
                )
4225

4226
        # If no exceptions are raised, assume ARN and referenced resource is valid for tag operations
4227
        return lambda_stores[account_id][region]
1✔
4228

4229
    def tag_resource(
1✔
4230
        self, context: RequestContext, resource: TaggableResource, tags: Tags, **kwargs
4231
    ) -> None:
4232
        if not tags:
1✔
4233
            raise InvalidParameterValueException(
1✔
4234
                "An error occurred and the request cannot be processed.", Type="User"
4235
            )
4236
        self._store_tags(resource, tags)
1✔
4237

4238
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4239
            "function"
4240
        ):
4241
            name, _, account, region = function_locators_from_arn(resource)
1✔
4242
            function = self._get_function(name, account, region)
1✔
4243
            with function.lock:
1✔
4244
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4245
                latest_version = function.versions["$LATEST"]
1✔
4246
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4247
                    latest_version, config=dataclasses.replace(latest_version.config)
4248
                )
4249

4250
    def list_tags(
1✔
4251
        self, context: RequestContext, resource: TaggableResource, **kwargs
4252
    ) -> ListTagsResponse:
4253
        tags = self._get_tags(resource)
1✔
4254
        return ListTagsResponse(Tags=tags)
1✔
4255

4256
    def untag_resource(
1✔
4257
        self, context: RequestContext, resource: TaggableResource, tag_keys: TagKeyList, **kwargs
4258
    ) -> None:
4259
        if not tag_keys:
1✔
4260
            raise ValidationException(
1✔
4261
                "1 validation error detected: Value null at 'tagKeys' failed to satisfy constraint: Member must not be null"
4262
            )  # should probably be generalized a bit
4263

4264
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4265
        state.TAGS.untag_resource(resource, tag_keys)
1✔
4266

4267
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4268
            "function"
4269
        ):
4270
            name, _, account, region = function_locators_from_arn(resource)
1✔
4271
            function = self._get_function(name, account, region)
1✔
4272
            # TODO: Potential race condition
4273
            with function.lock:
1✔
4274
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4275
                latest_version = function.versions["$LATEST"]
1✔
4276
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4277
                    latest_version, config=dataclasses.replace(latest_version.config)
4278
                )
4279

4280
    # =======================================
4281
    # =======  LEGACY / DEPRECATED   ========
4282
    # =======================================
4283

4284
    def invoke_async(
1✔
4285
        self,
4286
        context: RequestContext,
4287
        function_name: NamespacedFunctionName,
4288
        invoke_args: IO[BlobStream],
4289
        **kwargs,
4290
    ) -> InvokeAsyncResponse:
4291
        """LEGACY API endpoint. Even AWS heavily discourages its usage."""
4292
        raise NotImplementedError
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc