• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19690651035

25 Nov 2025 03:57PM UTC coverage: 86.873% (+0.006%) from 86.867%
19690651035

push

github

web-flow
fix(lambda): updated error messages for deployment artifacts (#13417)

0 of 3 new or added lines in 1 file covered. (0.0%)

50 existing lines in 6 files now uncovered.

68873 of 79280 relevant lines covered (86.87%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.02
/localstack-core/localstack/services/lambda_/provider.py
1
import base64
1✔
2
import dataclasses
1✔
3
import datetime
1✔
4
import itertools
1✔
5
import json
1✔
6
import logging
1✔
7
import re
1✔
8
import threading
1✔
9
import time
1✔
10
from typing import IO, Any
1✔
11

12
from botocore.exceptions import ClientError
1✔
13

14
from localstack import config
1✔
15
from localstack.aws.api import RequestContext, ServiceException, handler
1✔
16
from localstack.aws.api.lambda_ import (
1✔
17
    AccountLimit,
18
    AccountUsage,
19
    AddLayerVersionPermissionResponse,
20
    AddPermissionRequest,
21
    AddPermissionResponse,
22
    Alias,
23
    AliasConfiguration,
24
    AliasRoutingConfiguration,
25
    AllowedPublishers,
26
    Architecture,
27
    Arn,
28
    Blob,
29
    BlobStream,
30
    CodeSigningConfigArn,
31
    CodeSigningConfigNotFoundException,
32
    CodeSigningPolicies,
33
    CompatibleArchitectures,
34
    CompatibleRuntimes,
35
    Concurrency,
36
    Cors,
37
    CreateCodeSigningConfigResponse,
38
    CreateEventSourceMappingRequest,
39
    CreateFunctionRequest,
40
    CreateFunctionUrlConfigResponse,
41
    DeleteCodeSigningConfigResponse,
42
    Description,
43
    DestinationConfig,
44
    EventSourceMappingConfiguration,
45
    FunctionCodeLocation,
46
    FunctionConfiguration,
47
    FunctionEventInvokeConfig,
48
    FunctionName,
49
    FunctionUrlAuthType,
50
    FunctionUrlQualifier,
51
    GetAccountSettingsResponse,
52
    GetCodeSigningConfigResponse,
53
    GetFunctionCodeSigningConfigResponse,
54
    GetFunctionConcurrencyResponse,
55
    GetFunctionRecursionConfigResponse,
56
    GetFunctionResponse,
57
    GetFunctionUrlConfigResponse,
58
    GetLayerVersionPolicyResponse,
59
    GetLayerVersionResponse,
60
    GetPolicyResponse,
61
    GetProvisionedConcurrencyConfigResponse,
62
    InvalidParameterValueException,
63
    InvocationResponse,
64
    InvocationType,
65
    InvokeAsyncResponse,
66
    InvokeMode,
67
    LambdaApi,
68
    LastUpdateStatus,
69
    LayerName,
70
    LayerPermissionAllowedAction,
71
    LayerPermissionAllowedPrincipal,
72
    LayersListItem,
73
    LayerVersionArn,
74
    LayerVersionContentInput,
75
    LayerVersionNumber,
76
    LicenseInfo,
77
    ListAliasesResponse,
78
    ListCodeSigningConfigsResponse,
79
    ListEventSourceMappingsResponse,
80
    ListFunctionEventInvokeConfigsResponse,
81
    ListFunctionsByCodeSigningConfigResponse,
82
    ListFunctionsResponse,
83
    ListFunctionUrlConfigsResponse,
84
    ListLayersResponse,
85
    ListLayerVersionsResponse,
86
    ListProvisionedConcurrencyConfigsResponse,
87
    ListTagsResponse,
88
    ListVersionsByFunctionResponse,
89
    LogFormat,
90
    LoggingConfig,
91
    LogType,
92
    MasterRegion,
93
    MaxFunctionEventInvokeConfigListItems,
94
    MaximumEventAgeInSeconds,
95
    MaximumRetryAttempts,
96
    MaxItems,
97
    MaxLayerListItems,
98
    MaxListItems,
99
    MaxProvisionedConcurrencyConfigListItems,
100
    NamespacedFunctionName,
101
    NamespacedStatementId,
102
    OnFailure,
103
    OnSuccess,
104
    OrganizationId,
105
    PackageType,
106
    PositiveInteger,
107
    PreconditionFailedException,
108
    ProvisionedConcurrencyConfigListItem,
109
    ProvisionedConcurrencyConfigNotFoundException,
110
    ProvisionedConcurrencyStatusEnum,
111
    PublishLayerVersionResponse,
112
    PutFunctionCodeSigningConfigResponse,
113
    PutFunctionRecursionConfigResponse,
114
    PutProvisionedConcurrencyConfigResponse,
115
    Qualifier,
116
    RecursiveLoop,
117
    ReservedConcurrentExecutions,
118
    ResourceConflictException,
119
    ResourceNotFoundException,
120
    Runtime,
121
    RuntimeVersionConfig,
122
    SnapStart,
123
    SnapStartApplyOn,
124
    SnapStartOptimizationStatus,
125
    SnapStartResponse,
126
    State,
127
    StatementId,
128
    StateReasonCode,
129
    String,
130
    TaggableResource,
131
    TagKeyList,
132
    Tags,
133
    TenantId,
134
    TracingMode,
135
    UnqualifiedFunctionName,
136
    UpdateCodeSigningConfigResponse,
137
    UpdateEventSourceMappingRequest,
138
    UpdateFunctionCodeRequest,
139
    UpdateFunctionConfigurationRequest,
140
    UpdateFunctionUrlConfigResponse,
141
    Version,
142
)
143
from localstack.aws.api.lambda_ import FunctionVersion as FunctionVersionApi
1✔
144
from localstack.aws.api.lambda_ import ServiceException as LambdaServiceException
1✔
145
from localstack.aws.api.pipes import (
1✔
146
    DynamoDBStreamStartPosition,
147
    KinesisStreamStartPosition,
148
)
149
from localstack.aws.connect import connect_to
1✔
150
from localstack.aws.spec import load_service
1✔
151
from localstack.services.edge import ROUTER
1✔
152
from localstack.services.lambda_ import api_utils
1✔
153
from localstack.services.lambda_ import hooks as lambda_hooks
1✔
154
from localstack.services.lambda_.analytics import (
1✔
155
    FunctionOperation,
156
    FunctionStatus,
157
    function_counter,
158
)
159
from localstack.services.lambda_.api_utils import (
1✔
160
    ARCHITECTURES,
161
    STATEMENT_ID_REGEX,
162
    SUBNET_ID_REGEX,
163
    function_locators_from_arn,
164
)
165
from localstack.services.lambda_.event_source_mapping.esm_config_factory import (
1✔
166
    EsmConfigFactory,
167
)
168
from localstack.services.lambda_.event_source_mapping.esm_worker import (
1✔
169
    EsmState,
170
    EsmWorker,
171
)
172
from localstack.services.lambda_.event_source_mapping.esm_worker_factory import (
1✔
173
    EsmWorkerFactory,
174
)
175
from localstack.services.lambda_.event_source_mapping.pipe_utils import get_internal_client
1✔
176
from localstack.services.lambda_.invocation import AccessDeniedException
1✔
177
from localstack.services.lambda_.invocation.execution_environment import (
1✔
178
    EnvironmentStartupTimeoutException,
179
)
180
from localstack.services.lambda_.invocation.lambda_models import (
1✔
181
    AliasRoutingConfig,
182
    CodeSigningConfig,
183
    EventInvokeConfig,
184
    Function,
185
    FunctionResourcePolicy,
186
    FunctionUrlConfig,
187
    FunctionVersion,
188
    ImageConfig,
189
    LambdaEphemeralStorage,
190
    Layer,
191
    LayerPolicy,
192
    LayerPolicyStatement,
193
    LayerVersion,
194
    ProvisionedConcurrencyConfiguration,
195
    RequestEntityTooLargeException,
196
    ResourcePolicy,
197
    UpdateStatus,
198
    ValidationException,
199
    VersionAlias,
200
    VersionFunctionConfiguration,
201
    VersionIdentifier,
202
    VersionState,
203
    VpcConfig,
204
)
205
from localstack.services.lambda_.invocation.lambda_service import (
1✔
206
    LambdaService,
207
    create_image_code,
208
    destroy_code_if_not_used,
209
    lambda_stores,
210
    store_lambda_archive,
211
    store_s3_bucket_archive,
212
)
213
from localstack.services.lambda_.invocation.models import LambdaStore
1✔
214
from localstack.services.lambda_.invocation.runtime_executor import get_runtime_executor
1✔
215
from localstack.services.lambda_.lambda_utils import HINT_LOG
1✔
216
from localstack.services.lambda_.layerfetcher.layer_fetcher import LayerFetcher
1✔
217
from localstack.services.lambda_.provider_utils import (
1✔
218
    LambdaLayerVersionIdentifier,
219
    get_function_version,
220
    get_function_version_from_arn,
221
)
222
from localstack.services.lambda_.runtimes import (
1✔
223
    ALL_RUNTIMES,
224
    DEPRECATED_RUNTIMES,
225
    DEPRECATED_RUNTIMES_UPGRADES,
226
    RUNTIMES_AGGREGATED,
227
    SNAP_START_SUPPORTED_RUNTIMES,
228
    VALID_RUNTIMES,
229
)
230
from localstack.services.lambda_.urlrouter import FunctionUrlRouter
1✔
231
from localstack.services.plugins import ServiceLifecycleHook
1✔
232
from localstack.state import StateVisitor
1✔
233
from localstack.utils.aws.arns import (
1✔
234
    ArnData,
235
    extract_resource_from_arn,
236
    extract_service_from_arn,
237
    get_partition,
238
    lambda_event_source_mapping_arn,
239
    parse_arn,
240
)
241
from localstack.utils.aws.client_types import ServicePrincipal
1✔
242
from localstack.utils.bootstrap import is_api_enabled
1✔
243
from localstack.utils.collections import PaginatedList
1✔
244
from localstack.utils.event_matcher import validate_event_pattern
1✔
245
from localstack.utils.strings import get_random_hex, short_uid, to_bytes, to_str
1✔
246
from localstack.utils.sync import poll_condition
1✔
247
from localstack.utils.urls import localstack_host
1✔
248

249
LOG = logging.getLogger(__name__)
1✔
250

251
LAMBDA_DEFAULT_TIMEOUT = 3
1✔
252
LAMBDA_DEFAULT_MEMORY_SIZE = 128
1✔
253

254
LAMBDA_TAG_LIMIT_PER_RESOURCE = 50
1✔
255
LAMBDA_LAYERS_LIMIT_PER_FUNCTION = 5
1✔
256

257
TAG_KEY_CUSTOM_URL = "_custom_id_"
1✔
258
# Requirements (from RFC3986 & co): not longer than 63, first char must be
259
# alpha, then alphanumeric or hyphen, except cannot start or end with hyphen
260
TAG_KEY_CUSTOM_URL_VALIDATOR = re.compile(r"^[A-Za-z]([A-Za-z0-9\-]{0,61}[A-Za-z0-9])?$")
1✔
261

262

263
class LambdaProvider(LambdaApi, ServiceLifecycleHook):
1✔
264
    lambda_service: LambdaService
1✔
265
    create_fn_lock: threading.RLock
1✔
266
    create_layer_lock: threading.RLock
1✔
267
    router: FunctionUrlRouter
1✔
268
    esm_workers: dict[str, EsmWorker]
1✔
269
    layer_fetcher: LayerFetcher | None
1✔
270

271
    def __init__(self) -> None:
1✔
272
        self.lambda_service = LambdaService()
1✔
273
        self.create_fn_lock = threading.RLock()
1✔
274
        self.create_layer_lock = threading.RLock()
1✔
275
        self.router = FunctionUrlRouter(ROUTER, self.lambda_service)
1✔
276
        self.esm_workers = {}
1✔
277
        self.layer_fetcher = None
1✔
278
        lambda_hooks.inject_layer_fetcher.run(self)
1✔
279

280
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
281
        visitor.visit(lambda_stores)
×
282

283
    def on_before_state_reset(self):
1✔
284
        self.lambda_service.stop()
×
285

286
    def on_after_state_reset(self):
1✔
287
        self.router.lambda_service = self.lambda_service = LambdaService()
×
288

289
    def on_before_state_load(self):
1✔
290
        self.lambda_service.stop()
×
291

292
    def on_after_state_load(self):
1✔
293
        self.lambda_service = LambdaService()
×
294
        self.router.lambda_service = self.lambda_service
×
295

296
        for account_id, account_bundle in lambda_stores.items():
×
297
            for region_name, state in account_bundle.items():
×
298
                for fn in state.functions.values():
×
299
                    for fn_version in fn.versions.values():
×
300
                        # restore the "Pending" state for every function version and start it
301
                        try:
×
302
                            new_state = VersionState(
×
303
                                state=State.Pending,
304
                                code=StateReasonCode.Creating,
305
                                reason="The function is being created.",
306
                            )
307
                            new_config = dataclasses.replace(fn_version.config, state=new_state)
×
308
                            new_version = dataclasses.replace(fn_version, config=new_config)
×
309
                            fn.versions[fn_version.id.qualifier] = new_version
×
310
                            self.lambda_service.create_function_version(fn_version).result(
×
311
                                timeout=5
312
                            )
313
                        except Exception:
×
314
                            LOG.warning(
×
315
                                "Failed to restore function version %s",
316
                                fn_version.id.qualified_arn(),
317
                                exc_info=LOG.isEnabledFor(logging.DEBUG),
318
                            )
319
                    # restore provisioned concurrency per function considering both versions and aliases
320
                    for (
×
321
                        provisioned_qualifier,
322
                        provisioned_config,
323
                    ) in fn.provisioned_concurrency_configs.items():
324
                        fn_arn = None
×
325
                        try:
×
326
                            if api_utils.qualifier_is_alias(provisioned_qualifier):
×
327
                                alias = fn.aliases.get(provisioned_qualifier)
×
328
                                resolved_version = fn.versions.get(alias.function_version)
×
329
                                fn_arn = resolved_version.id.qualified_arn()
×
330
                            elif api_utils.qualifier_is_version(provisioned_qualifier):
×
331
                                fn_version = fn.versions.get(provisioned_qualifier)
×
332
                                fn_arn = fn_version.id.qualified_arn()
×
333
                            else:
334
                                raise InvalidParameterValueException(
×
335
                                    "Invalid qualifier type:"
336
                                    " Qualifier can only be an alias or a version for provisioned concurrency."
337
                                )
338

339
                            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
340
                            manager.update_provisioned_concurrency_config(
×
341
                                provisioned_config.provisioned_concurrent_executions
342
                            )
343
                        except Exception:
×
344
                            LOG.warning(
×
345
                                "Failed to restore provisioned concurrency %s for function %s",
346
                                provisioned_config,
347
                                fn_arn,
348
                                exc_info=LOG.isEnabledFor(logging.DEBUG),
349
                            )
350

351
                for esm in state.event_source_mappings.values():
×
352
                    # Restores event source workers
353
                    function_arn = esm.get("FunctionArn")
×
354

355
                    # TODO: How do we know the event source is up?
356
                    # A basic poll to see if the mapped Lambda function is active/failed
357
                    if not poll_condition(
×
358
                        lambda: get_function_version_from_arn(function_arn).config.state.state
359
                        in [State.Active, State.Failed],
360
                        timeout=10,
361
                    ):
362
                        LOG.warning(
×
363
                            "Creating ESM for Lambda that is not in running state: %s",
364
                            function_arn,
365
                        )
366

367
                    function_version = get_function_version_from_arn(function_arn)
×
368
                    function_role = function_version.config.role
×
369

370
                    is_esm_enabled = esm.get("State", EsmState.DISABLED) not in (
×
371
                        EsmState.DISABLED,
372
                        EsmState.DISABLING,
373
                    )
374
                    esm_worker = EsmWorkerFactory(
×
375
                        esm, function_role, is_esm_enabled
376
                    ).get_esm_worker()
377

378
                    # Note: a worker is created in the DISABLED state if not enabled
379
                    esm_worker.create()
×
380
                    # TODO: assigning the esm_worker to the dict only works after .create(). Could it cause a race
381
                    #  condition if we get a shutdown here and have a worker thread spawned but not accounted for?
382
                    self.esm_workers[esm_worker.uuid] = esm_worker
×
383

384
    def on_after_init(self):
1✔
385
        self.router.register_routes()
1✔
386
        get_runtime_executor().validate_environment()
1✔
387

388
    def on_before_stop(self) -> None:
1✔
389
        for esm_worker in self.esm_workers.values():
1✔
390
            esm_worker.stop_for_shutdown()
1✔
391

392
        # TODO: should probably unregister routes?
393
        self.lambda_service.stop()
1✔
394

395
    @staticmethod
1✔
396
    def _get_function(function_name: str, account_id: str, region: str) -> Function:
1✔
397
        state = lambda_stores[account_id][region]
1✔
398
        function = state.functions.get(function_name)
1✔
399
        if not function:
1✔
400
            arn = api_utils.unqualified_lambda_arn(
1✔
401
                function_name=function_name,
402
                account=account_id,
403
                region=region,
404
            )
405
            raise ResourceNotFoundException(
1✔
406
                f"Function not found: {arn}",
407
                Type="User",
408
            )
409
        return function
1✔
410

411
    @staticmethod
1✔
412
    def _get_esm(uuid: str, account_id: str, region: str) -> EventSourceMappingConfiguration:
1✔
413
        state = lambda_stores[account_id][region]
1✔
414
        esm = state.event_source_mappings.get(uuid)
1✔
415
        if not esm:
1✔
416
            arn = lambda_event_source_mapping_arn(uuid, account_id, region)
1✔
417
            raise ResourceNotFoundException(
1✔
418
                f"Event source mapping not found: {arn}",
419
                Type="User",
420
            )
421
        return esm
1✔
422

423
    @staticmethod
1✔
424
    def _validate_qualifier_expression(qualifier: str) -> None:
1✔
425
        if error_messages := api_utils.validate_qualifier(qualifier):
1✔
426
            raise ValidationException(
×
427
                message=api_utils.construct_validation_exception_message(error_messages)
428
            )
429

430
    @staticmethod
1✔
431
    def _resolve_fn_qualifier(resolved_fn: Function, qualifier: str | None) -> tuple[str, str]:
1✔
432
        """Attempts to resolve a given qualifier and returns a qualifier that exists or
433
        raises an appropriate ResourceNotFoundException.
434

435
        :param resolved_fn: The resolved lambda function
436
        :param qualifier: The qualifier to be resolved or None
437
        :return: Tuple of (resolved qualifier, function arn either qualified or unqualified)"""
438
        function_name = resolved_fn.function_name
1✔
439
        # assuming function versions need to live in the same account and region
440
        account_id = resolved_fn.latest().id.account
1✔
441
        region = resolved_fn.latest().id.region
1✔
442
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
443
        if qualifier is not None:
1✔
444
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
445
            if api_utils.qualifier_is_alias(qualifier):
1✔
446
                if qualifier not in resolved_fn.aliases:
1✔
447
                    raise ResourceNotFoundException(f"Cannot find alias arn: {fn_arn}", Type="User")
1✔
448
            elif api_utils.qualifier_is_version(qualifier) or qualifier == "$LATEST":
1✔
449
                if qualifier not in resolved_fn.versions:
1✔
450
                    raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
451
            else:
452
                # matches qualifier pattern but invalid alias or version
453
                raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
454
        resolved_qualifier = qualifier or "$LATEST"
1✔
455
        return resolved_qualifier, fn_arn
1✔
456

457
    @staticmethod
1✔
458
    def _function_revision_id(resolved_fn: Function, resolved_qualifier: str) -> str:
1✔
459
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
460
            return resolved_fn.aliases[resolved_qualifier].revision_id
1✔
461
        # Assumes that a non-alias is a version
462
        else:
463
            return resolved_fn.versions[resolved_qualifier].config.revision_id
1✔
464

465
    def _resolve_vpc_id(self, account_id: str, region_name: str, subnet_id: str) -> str:
1✔
466
        ec2_client = connect_to(aws_access_key_id=account_id, region_name=region_name).ec2
1✔
467
        try:
1✔
468
            return ec2_client.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["VpcId"]
1✔
469
        except ec2_client.exceptions.ClientError as e:
1✔
470
            code = e.response["Error"]["Code"]
1✔
471
            message = e.response["Error"]["Message"]
1✔
472
            raise InvalidParameterValueException(
1✔
473
                f"Error occurred while DescribeSubnets. EC2 Error Code: {code}. EC2 Error Message: {message}",
474
                Type="User",
475
            )
476

477
    def _build_vpc_config(
1✔
478
        self,
479
        account_id: str,
480
        region_name: str,
481
        vpc_config: dict | None = None,
482
    ) -> VpcConfig | None:
483
        if not vpc_config or not is_api_enabled("ec2"):
1✔
484
            return None
1✔
485

486
        subnet_ids = vpc_config.get("SubnetIds", [])
1✔
487
        if subnet_ids is not None and len(subnet_ids) == 0:
1✔
488
            return VpcConfig(vpc_id="", security_group_ids=[], subnet_ids=[])
1✔
489

490
        subnet_id = subnet_ids[0]
1✔
491
        if not bool(SUBNET_ID_REGEX.match(subnet_id)):
1✔
492
            raise ValidationException(
1✔
493
                f"1 validation error detected: Value '[{subnet_id}]' at 'vpcConfig.subnetIds' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 0, Member must satisfy regular expression pattern: ^subnet-[0-9a-z]*$]"
494
            )
495

496
        return VpcConfig(
1✔
497
            vpc_id=self._resolve_vpc_id(account_id, region_name, subnet_id),
498
            security_group_ids=vpc_config.get("SecurityGroupIds", []),
499
            subnet_ids=subnet_ids,
500
        )
501

502
    def _create_version_model(
1✔
503
        self,
504
        function_name: str,
505
        region: str,
506
        account_id: str,
507
        description: str | None = None,
508
        revision_id: str | None = None,
509
        code_sha256: str | None = None,
510
    ) -> tuple[FunctionVersion, bool]:
511
        """
512
        Release a new version to the model if all restrictions are met.
513
        Restrictions:
514
          - CodeSha256, if provided, must equal the current latest version code hash
515
          - RevisionId, if provided, must equal the current latest version revision id
516
          - Some changes have been done to the latest version since last publish
517
        Will return a tuple of the version, and whether the version was published (True) or the latest available version was taken (False).
518
        This can happen if the latest version has not been changed since the last version publish, in this case the last version will be returned.
519

520
        :param function_name: Function name to be published
521
        :param region: Region of the function
522
        :param account_id: Account of the function
523
        :param description: new description of the version (will be the description of the function if missing)
524
        :param revision_id: Revision id, function will raise error if it does not match latest revision id
525
        :param code_sha256: Code sha256, function will raise error if it does not match latest code hash
526
        :return: Tuple of (published version, whether version was released or last released version returned, since nothing changed)
527
        """
528
        current_latest_version = get_function_version(
1✔
529
            function_name=function_name, qualifier="$LATEST", account_id=account_id, region=region
530
        )
531
        if revision_id and current_latest_version.config.revision_id != revision_id:
1✔
532
            raise PreconditionFailedException(
1✔
533
                "The Revision Id provided does not match the latest Revision Id. Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
534
                Type="User",
535
            )
536

537
        # check if code hashes match if they are specified
538
        current_hash = (
1✔
539
            current_latest_version.config.code.code_sha256
540
            if current_latest_version.config.package_type == PackageType.Zip
541
            else current_latest_version.config.image.code_sha256
542
        )
543
        # if the code is a zip package and hot reloaded (hot reloading is currently only supported for zip packagetypes)
544
        # we cannot enforce the codesha256 check
545
        is_hot_reloaded_zip_package = (
1✔
546
            current_latest_version.config.package_type == PackageType.Zip
547
            and current_latest_version.config.code.is_hot_reloading()
548
        )
549
        if code_sha256 and current_hash != code_sha256 and not is_hot_reloaded_zip_package:
1✔
550
            raise InvalidParameterValueException(
1✔
551
                f"CodeSHA256 ({code_sha256}) is different from current CodeSHA256 in $LATEST ({current_hash}). Please try again with the CodeSHA256 in $LATEST.",
552
                Type="User",
553
            )
554

555
        state = lambda_stores[account_id][region]
1✔
556
        function = state.functions.get(function_name)
1✔
557
        changes = {}
1✔
558
        if description is not None:
1✔
559
            changes["description"] = description
1✔
560
        # TODO copy environment instead of restarting one, get rid of all the "Pending"s
561

562
        with function.lock:
1✔
563
            if function.next_version > 1 and (
1✔
564
                prev_version := function.versions.get(str(function.next_version - 1))
565
            ):
566
                if (
1✔
567
                    prev_version.config.internal_revision
568
                    == current_latest_version.config.internal_revision
569
                ):
570
                    return prev_version, False
1✔
571
            # TODO check if there was a change since last version
572
            next_version = str(function.next_version)
1✔
573
            function.next_version += 1
1✔
574
            new_id = VersionIdentifier(
1✔
575
                function_name=function_name,
576
                qualifier=next_version,
577
                region=region,
578
                account=account_id,
579
            )
580
            apply_on = current_latest_version.config.snap_start["ApplyOn"]
1✔
581
            optimization_status = SnapStartOptimizationStatus.Off
1✔
582
            if apply_on == SnapStartApplyOn.PublishedVersions:
1✔
583
                optimization_status = SnapStartOptimizationStatus.On
×
584
            snap_start = SnapStartResponse(
1✔
585
                ApplyOn=apply_on,
586
                OptimizationStatus=optimization_status,
587
            )
588
            new_version = dataclasses.replace(
1✔
589
                current_latest_version,
590
                config=dataclasses.replace(
591
                    current_latest_version.config,
592
                    last_update=None,  # versions never have a last update status
593
                    state=VersionState(
594
                        state=State.Pending,
595
                        code=StateReasonCode.Creating,
596
                        reason="The function is being created.",
597
                    ),
598
                    snap_start=snap_start,
599
                    **changes,
600
                ),
601
                id=new_id,
602
            )
603
            function.versions[next_version] = new_version
1✔
604
        return new_version, True
1✔
605

606
    def _publish_version_from_existing_version(
1✔
607
        self,
608
        function_name: str,
609
        region: str,
610
        account_id: str,
611
        description: str | None = None,
612
        revision_id: str | None = None,
613
        code_sha256: str | None = None,
614
    ) -> FunctionVersion:
615
        """
616
        Publish version from an existing, already initialized LATEST
617

618
        :param function_name: Function name
619
        :param region: region
620
        :param account_id: account id
621
        :param description: description
622
        :param revision_id: revision id (check if current version matches)
623
        :param code_sha256: code sha (check if current code matches)
624
        :return: new version
625
        """
626
        new_version, changed = self._create_version_model(
1✔
627
            function_name=function_name,
628
            region=region,
629
            account_id=account_id,
630
            description=description,
631
            revision_id=revision_id,
632
            code_sha256=code_sha256,
633
        )
634
        if not changed:
1✔
635
            return new_version
1✔
636
        self.lambda_service.publish_version(new_version)
1✔
637
        state = lambda_stores[account_id][region]
1✔
638
        function = state.functions.get(function_name)
1✔
639
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
640
        latest_version = function.versions["$LATEST"]
1✔
641
        function.versions["$LATEST"] = dataclasses.replace(
1✔
642
            latest_version, config=dataclasses.replace(latest_version.config)
643
        )
644
        return function.versions.get(new_version.id.qualifier)
1✔
645

646
    def _publish_version_with_changes(
1✔
647
        self,
648
        function_name: str,
649
        region: str,
650
        account_id: str,
651
        description: str | None = None,
652
        revision_id: str | None = None,
653
        code_sha256: str | None = None,
654
    ) -> FunctionVersion:
655
        """
656
        Publish version together with a new latest version (publish on create / update)
657

658
        :param function_name: Function name
659
        :param region: region
660
        :param account_id: account id
661
        :param description: description
662
        :param revision_id: revision id (check if current version matches)
663
        :param code_sha256: code sha (check if current code matches)
664
        :return: new version
665
        """
666
        new_version, changed = self._create_version_model(
1✔
667
            function_name=function_name,
668
            region=region,
669
            account_id=account_id,
670
            description=description,
671
            revision_id=revision_id,
672
            code_sha256=code_sha256,
673
        )
674
        if not changed:
1✔
675
            return new_version
×
676
        self.lambda_service.create_function_version(new_version)
1✔
677
        return new_version
1✔
678

679
    @staticmethod
1✔
680
    def _verify_env_variables(env_vars: dict[str, str]):
1✔
681
        dumped_env_vars = json.dumps(env_vars, separators=(",", ":"))
1✔
682
        if (
1✔
683
            len(dumped_env_vars.encode("utf-8"))
684
            > config.LAMBDA_LIMITS_MAX_FUNCTION_ENVVAR_SIZE_BYTES
685
        ):
686
            raise InvalidParameterValueException(
1✔
687
                f"Lambda was unable to configure your environment variables because the environment variables you have provided exceeded the 4KB limit. String measured: {dumped_env_vars}",
688
                Type="User",
689
            )
690

691
    @staticmethod
1✔
692
    def _validate_snapstart(snap_start: SnapStart, runtime: Runtime):
1✔
693
        apply_on = snap_start.get("ApplyOn")
1✔
694
        if apply_on not in [
1✔
695
            SnapStartApplyOn.PublishedVersions,
696
            SnapStartApplyOn.None_,
697
        ]:
698
            raise ValidationException(
1✔
699
                f"1 validation error detected: Value '{apply_on}' at 'snapStart.applyOn' failed to satisfy constraint: Member must satisfy enum value set: [PublishedVersions, None]"
700
            )
701

702
        if runtime not in SNAP_START_SUPPORTED_RUNTIMES:
1✔
703
            raise InvalidParameterValueException(
×
704
                f"{runtime} is not supported for SnapStart enabled functions.", Type="User"
705
            )
706

707
    def _validate_layers(self, new_layers: list[str], region: str, account_id: str):
1✔
708
        if len(new_layers) > LAMBDA_LAYERS_LIMIT_PER_FUNCTION:
1✔
709
            raise InvalidParameterValueException(
1✔
710
                "Cannot reference more than 5 layers.", Type="User"
711
            )
712

713
        visited_layers = {}
1✔
714
        for layer_version_arn in new_layers:
1✔
715
            (
1✔
716
                layer_region,
717
                layer_account_id,
718
                layer_name,
719
                layer_version_str,
720
            ) = api_utils.parse_layer_arn(layer_version_arn)
721
            if layer_version_str is None:
1✔
722
                raise ValidationException(
1✔
723
                    f"1 validation error detected: Value '[{layer_version_arn}]'"
724
                    + r" at 'layers' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 140, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: (arn:[a-zA-Z0-9-]+:lambda:[a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\d{1}:\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+), Member must not be null]",
725
                )
726

727
            state = lambda_stores[layer_account_id][layer_region]
1✔
728
            layer = state.layers.get(layer_name)
1✔
729
            layer_version = None
1✔
730
            if layer is not None:
1✔
731
                layer_version = layer.layer_versions.get(layer_version_str)
1✔
732
            if layer_account_id == account_id:
1✔
733
                if region and layer_region != region:
1✔
734
                    raise InvalidParameterValueException(
1✔
735
                        f"Layers are not in the same region as the function. "
736
                        f"Layers are expected to be in region {region}.",
737
                        Type="User",
738
                    )
739
                if layer is None or layer.layer_versions.get(layer_version_str) is None:
1✔
740
                    raise InvalidParameterValueException(
1✔
741
                        f"Layer version {layer_version_arn} does not exist.", Type="User"
742
                    )
743
            else:  # External layer from other account
744
                # TODO: validate IAM layer policy here, allowing access by default for now and only checking region
745
                if region and layer_region != region:
×
746
                    # TODO: detect user or role from context when IAM users are implemented
747
                    user = "user/localstack-testing"
×
748
                    raise AccessDeniedException(
×
749
                        f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
750
                    )
751
                if layer is None or layer_version is None:
×
752
                    # Limitation: cannot fetch external layers when using the same account id as the target layer
753
                    # because we do not want to trigger the layer fetcher for every non-existing layer.
754
                    if self.layer_fetcher is None:
×
755
                        raise NotImplementedError(
756
                            "Fetching shared layers from AWS is a pro feature."
757
                        )
758

759
                    layer = self.layer_fetcher.fetch_layer(layer_version_arn)
×
760
                    if layer is None:
×
761
                        # TODO: detect user or role from context when IAM users are implemented
762
                        user = "user/localstack-testing"
×
763
                        raise AccessDeniedException(
×
764
                            f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
765
                        )
766

767
                    # Distinguish between new layer and new layer version
768
                    if layer_version is None:
×
769
                        # Create whole layer from scratch
770
                        state.layers[layer_name] = layer
×
771
                    else:
772
                        # Create layer version if another version of the same layer already exists
773
                        state.layers[layer_name].layer_versions[layer_version_str] = (
×
774
                            layer.layer_versions.get(layer_version_str)
775
                        )
776

777
            # only the first two matches in the array are considered for the error message
778
            layer_arn = ":".join(layer_version_arn.split(":")[:-1])
1✔
779
            if layer_arn in visited_layers:
1✔
780
                conflict_layer_version_arn = visited_layers[layer_arn]
1✔
781
                raise InvalidParameterValueException(
1✔
782
                    f"Two different versions of the same layer are not allowed to be referenced in the same function. {conflict_layer_version_arn} and {layer_version_arn} are versions of the same layer.",
783
                    Type="User",
784
                )
785
            visited_layers[layer_arn] = layer_version_arn
1✔
786

787
    @staticmethod
1✔
788
    def map_layers(new_layers: list[str]) -> list[LayerVersion]:
1✔
789
        layers = []
1✔
790
        for layer_version_arn in new_layers:
1✔
791
            region_name, account_id, layer_name, layer_version = api_utils.parse_layer_arn(
1✔
792
                layer_version_arn
793
            )
794
            layer = lambda_stores[account_id][region_name].layers.get(layer_name)
1✔
795
            layer_version = layer.layer_versions.get(layer_version)
1✔
796
            layers.append(layer_version)
1✔
797
        return layers
1✔
798

799
    def get_function_recursion_config(
1✔
800
        self,
801
        context: RequestContext,
802
        function_name: UnqualifiedFunctionName,
803
        **kwargs,
804
    ) -> GetFunctionRecursionConfigResponse:
805
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
806
        function_name = api_utils.get_function_name(function_name, context)
1✔
807
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
808
        return GetFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
809

810
    def put_function_recursion_config(
1✔
811
        self,
812
        context: RequestContext,
813
        function_name: UnqualifiedFunctionName,
814
        recursive_loop: RecursiveLoop,
815
        **kwargs,
816
    ) -> PutFunctionRecursionConfigResponse:
817
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
818
        function_name = api_utils.get_function_name(function_name, context)
1✔
819

820
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
821

822
        allowed_values = list(RecursiveLoop.__members__.values())
1✔
823
        if recursive_loop not in allowed_values:
1✔
824
            raise ValidationException(
1✔
825
                f"1 validation error detected: Value '{recursive_loop}' at 'recursiveLoop' failed to satisfy constraint: "
826
                f"Member must satisfy enum value set: [Terminate, Allow]"
827
            )
828

829
        fn.recursive_loop = recursive_loop
1✔
830
        return PutFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
831

832
    @handler(operation="CreateFunction", expand=False)
1✔
833
    def create_function(
1✔
834
        self,
835
        context: RequestContext,
836
        request: CreateFunctionRequest,
837
    ) -> FunctionConfiguration:
838
        context_region = context.region
1✔
839
        context_account_id = context.account_id
1✔
840

841
        zip_file = request.get("Code", {}).get("ZipFile")
1✔
842
        if zip_file and len(zip_file) > config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED:
1✔
843
            raise RequestEntityTooLargeException(
1✔
844
                f"Zipped size must be smaller than {config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED} bytes"
845
            )
846

847
        if context.request.content_length > config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE:
1✔
848
            raise RequestEntityTooLargeException(
1✔
849
                f"Request must be smaller than {config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE} bytes for the CreateFunction operation"
850
            )
851

852
        if architectures := request.get("Architectures"):
1✔
853
            if len(architectures) != 1:
1✔
854
                raise ValidationException(
1✔
855
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
856
                    f"satisfy constraint: Member must have length less than or equal to 1",
857
                )
858
            if architectures[0] not in ARCHITECTURES:
1✔
859
                raise ValidationException(
1✔
860
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
861
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
862
                    f"[x86_64, arm64], Member must not be null]",
863
                )
864

865
        if env_vars := request.get("Environment", {}).get("Variables"):
1✔
866
            self._verify_env_variables(env_vars)
1✔
867

868
        if layers := request.get("Layers", []):
1✔
869
            self._validate_layers(layers, region=context_region, account_id=context_account_id)
1✔
870

871
        if not api_utils.is_role_arn(request.get("Role")):
1✔
872
            raise ValidationException(
1✔
873
                f"1 validation error detected: Value '{request.get('Role')}'"
874
                + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
875
            )
876
        if not self.lambda_service.can_assume_role(request.get("Role"), context.region):
1✔
877
            raise InvalidParameterValueException(
×
878
                "The role defined for the function cannot be assumed by Lambda.", Type="User"
879
            )
880
        package_type = request.get("PackageType", PackageType.Zip)
1✔
881
        runtime = request.get("Runtime")
1✔
882
        self._validate_runtime(package_type, runtime)
1✔
883

884
        request_function_name = request.get("FunctionName")
1✔
885

886
        function_name, *_ = api_utils.get_name_and_qualifier(
1✔
887
            function_arn_or_name=request_function_name,
888
            qualifier=None,
889
            context=context,
890
        )
891

892
        if runtime in DEPRECATED_RUNTIMES:
1✔
893
            LOG.warning(
1✔
894
                "The Lambda runtime %s} is deprecated. "
895
                "Please upgrade the runtime for the function %s: "
896
                "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
897
                runtime,
898
                function_name,
899
            )
900
        if snap_start := request.get("SnapStart"):
1✔
901
            self._validate_snapstart(snap_start, runtime)
1✔
902
        state = lambda_stores[context_account_id][context_region]
1✔
903

904
        with self.create_fn_lock:
1✔
905
            if function_name in state.functions:
1✔
906
                raise ResourceConflictException(f"Function already exist: {function_name}")
×
907
            fn = Function(function_name=function_name)
1✔
908
            arn = VersionIdentifier(
1✔
909
                function_name=function_name,
910
                qualifier="$LATEST",
911
                region=context_region,
912
                account=context_account_id,
913
            )
914
            # save function code to s3
915
            code = None
1✔
916
            image = None
1✔
917
            image_config = None
1✔
918
            runtime_version_config = RuntimeVersionConfig(
1✔
919
                # Limitation: the runtime id (presumably sha256 of image) is currently hardcoded
920
                # Potential implementation: provide (cached) sha256 hash of used Docker image
921
                RuntimeVersionArn=f"arn:{context.partition}:lambda:{context_region}::runtime:8eeff65f6809a3ce81507fe733fe09b835899b99481ba22fd75b5a7338290ec1"
922
            )
923
            request_code = request.get("Code")
1✔
924
            if package_type == PackageType.Zip:
1✔
925
                # TODO verify if correct combination of code is set
926
                if zip_file := request_code.get("ZipFile"):
1✔
927
                    code = store_lambda_archive(
1✔
928
                        archive_file=zip_file,
929
                        function_name=function_name,
930
                        region_name=context_region,
931
                        account_id=context_account_id,
932
                    )
933
                elif s3_bucket := request_code.get("S3Bucket"):
1✔
934
                    s3_key = request_code["S3Key"]
1✔
935
                    s3_object_version = request_code.get("S3ObjectVersion")
1✔
936
                    code = store_s3_bucket_archive(
1✔
937
                        archive_bucket=s3_bucket,
938
                        archive_key=s3_key,
939
                        archive_version=s3_object_version,
940
                        function_name=function_name,
941
                        region_name=context_region,
942
                        account_id=context_account_id,
943
                    )
944
                else:
NEW
945
                    raise LambdaServiceException("A ZIP file or S3 bucket is required")
×
946
            elif package_type == PackageType.Image:
1✔
947
                image = request_code.get("ImageUri")
1✔
948
                if not image:
1✔
NEW
949
                    raise LambdaServiceException(
×
950
                        "An image is required when the package type is set to 'image'"
951
                    )
952
                image = create_image_code(image_uri=image)
1✔
953

954
                image_config_req = request.get("ImageConfig", {})
1✔
955
                image_config = ImageConfig(
1✔
956
                    command=image_config_req.get("Command"),
957
                    entrypoint=image_config_req.get("EntryPoint"),
958
                    working_directory=image_config_req.get("WorkingDirectory"),
959
                )
960
                # Runtime management controls are not available when providing a custom image
961
                runtime_version_config = None
1✔
962
            if "LoggingConfig" in request:
1✔
963
                logging_config = request["LoggingConfig"]
1✔
964
                LOG.warning(
1✔
965
                    "Advanced Lambda Logging Configuration is currently mocked "
966
                    "and will not impact the logging behavior. "
967
                    "Please create a feature request if needed."
968
                )
969

970
                # when switching to JSON, app and system level log is auto set to INFO
971
                if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
972
                    logging_config = {
1✔
973
                        "ApplicationLogLevel": "INFO",
974
                        "SystemLogLevel": "INFO",
975
                        "LogGroup": f"/aws/lambda/{function_name}",
976
                    } | logging_config
977
                else:
978
                    logging_config = (
×
979
                        LoggingConfig(
980
                            LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
981
                        )
982
                        | logging_config
983
                    )
984

985
            else:
986
                logging_config = LoggingConfig(
1✔
987
                    LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
988
                )
989

990
            version = FunctionVersion(
1✔
991
                id=arn,
992
                config=VersionFunctionConfiguration(
993
                    last_modified=api_utils.format_lambda_date(datetime.datetime.now()),
994
                    description=request.get("Description", ""),
995
                    role=request["Role"],
996
                    timeout=request.get("Timeout", LAMBDA_DEFAULT_TIMEOUT),
997
                    runtime=request.get("Runtime"),
998
                    memory_size=request.get("MemorySize", LAMBDA_DEFAULT_MEMORY_SIZE),
999
                    handler=request.get("Handler"),
1000
                    package_type=package_type,
1001
                    environment=env_vars,
1002
                    architectures=request.get("Architectures") or [Architecture.x86_64],
1003
                    tracing_config_mode=request.get("TracingConfig", {}).get(
1004
                        "Mode", TracingMode.PassThrough
1005
                    ),
1006
                    image=image,
1007
                    image_config=image_config,
1008
                    code=code,
1009
                    layers=self.map_layers(layers),
1010
                    internal_revision=short_uid(),
1011
                    ephemeral_storage=LambdaEphemeralStorage(
1012
                        size=request.get("EphemeralStorage", {}).get("Size", 512)
1013
                    ),
1014
                    snap_start=SnapStartResponse(
1015
                        ApplyOn=request.get("SnapStart", {}).get("ApplyOn", SnapStartApplyOn.None_),
1016
                        OptimizationStatus=SnapStartOptimizationStatus.Off,
1017
                    ),
1018
                    runtime_version_config=runtime_version_config,
1019
                    dead_letter_arn=request.get("DeadLetterConfig", {}).get("TargetArn"),
1020
                    vpc_config=self._build_vpc_config(
1021
                        context_account_id, context_region, request.get("VpcConfig")
1022
                    ),
1023
                    state=VersionState(
1024
                        state=State.Pending,
1025
                        code=StateReasonCode.Creating,
1026
                        reason="The function is being created.",
1027
                    ),
1028
                    logging_config=logging_config,
1029
                ),
1030
            )
1031
            fn.versions["$LATEST"] = version
1✔
1032
            state.functions[function_name] = fn
1✔
1033
        function_counter.labels(
1✔
1034
            operation=FunctionOperation.create,
1035
            runtime=runtime or "n/a",
1036
            status=FunctionStatus.success,
1037
            invocation_type="n/a",
1038
            package_type=package_type,
1039
        )
1040
        self.lambda_service.create_function_version(version)
1✔
1041

1042
        if tags := request.get("Tags"):
1✔
1043
            # This will check whether the function exists.
1044
            self._store_tags(arn.unqualified_arn(), tags)
1✔
1045

1046
        if request.get("Publish"):
1✔
1047
            version = self._publish_version_with_changes(
1✔
1048
                function_name=function_name, region=context_region, account_id=context_account_id
1049
            )
1050

1051
        if config.LAMBDA_SYNCHRONOUS_CREATE:
1✔
1052
            # block via retrying until "terminal" condition reached before returning
1053
            if not poll_condition(
×
1054
                lambda: get_function_version(
1055
                    function_name, version.id.qualifier, version.id.account, version.id.region
1056
                ).config.state.state
1057
                in [State.Active, State.Failed],
1058
                timeout=10,
1059
            ):
1060
                LOG.warning(
×
1061
                    "LAMBDA_SYNCHRONOUS_CREATE is active, but waiting for %s reached timeout.",
1062
                    function_name,
1063
                )
1064

1065
        return api_utils.map_config_out(
1✔
1066
            version, return_qualified_arn=False, return_update_status=False
1067
        )
1068

1069
    def _validate_runtime(self, package_type, runtime):
1✔
1070
        runtimes = ALL_RUNTIMES
1✔
1071
        if config.LAMBDA_RUNTIME_VALIDATION:
1✔
1072
            runtimes = list(itertools.chain(RUNTIMES_AGGREGATED.values()))
1✔
1073

1074
        if package_type == PackageType.Zip and runtime not in runtimes:
1✔
1075
            # deprecated runtimes have different error
1076
            if runtime in DEPRECATED_RUNTIMES:
1✔
1077
                HINT_LOG.info(
1✔
1078
                    "Set env variable LAMBDA_RUNTIME_VALIDATION to 0"
1079
                    " in order to allow usage of deprecated runtimes"
1080
                )
1081
                self._check_for_recomended_migration_target(runtime)
1✔
1082

1083
            raise InvalidParameterValueException(
1✔
1084
                f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1085
                Type="User",
1086
            )
1087

1088
    def _check_for_recomended_migration_target(self, deprecated_runtime):
1✔
1089
        # AWS offers recommended runtime for migration for "newly" deprecated runtimes
1090
        # in order to preserve parity with error messages we need the code bellow
1091
        latest_runtime = DEPRECATED_RUNTIMES_UPGRADES.get(deprecated_runtime)
1✔
1092

1093
        if latest_runtime is not None:
1✔
1094
            LOG.debug(
1✔
1095
                "The Lambda runtime %s is deprecated. Please upgrade to a supported Lambda runtime such as %s.",
1096
                deprecated_runtime,
1097
                latest_runtime,
1098
            )
1099
            raise InvalidParameterValueException(
1✔
1100
                f"The runtime parameter of {deprecated_runtime} is no longer supported for creating or updating AWS Lambda functions. We recommend you use a supported runtime while creating or updating functions.",
1101
                Type="User",
1102
            )
1103

1104
    @handler(operation="UpdateFunctionConfiguration", expand=False)
1✔
1105
    def update_function_configuration(
1✔
1106
        self, context: RequestContext, request: UpdateFunctionConfigurationRequest
1107
    ) -> FunctionConfiguration:
1108
        """updates the $LATEST version of the function"""
1109
        function_name = request.get("FunctionName")
1✔
1110

1111
        # in case we got ARN or partial ARN
1112
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1113
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1114
        state = lambda_stores[account_id][region]
1✔
1115

1116
        if function_name not in state.functions:
1✔
1117
            raise ResourceNotFoundException(
×
1118
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1119
                Type="User",
1120
            )
1121
        function = state.functions[function_name]
1✔
1122

1123
        # TODO: lock modification of latest version
1124
        # TODO: notify service for changes relevant to re-provisioning of $LATEST
1125
        latest_version = function.latest()
1✔
1126
        latest_version_config = latest_version.config
1✔
1127

1128
        revision_id = request.get("RevisionId")
1✔
1129
        if revision_id and revision_id != latest_version.config.revision_id:
1✔
1130
            raise PreconditionFailedException(
1✔
1131
                "The Revision Id provided does not match the latest Revision Id. "
1132
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1133
                Type="User",
1134
            )
1135

1136
        replace_kwargs = {}
1✔
1137
        if "EphemeralStorage" in request:
1✔
1138
            replace_kwargs["ephemeral_storage"] = LambdaEphemeralStorage(
×
1139
                request.get("EphemeralStorage", {}).get("Size", 512)
1140
            )  # TODO: do defaults here apply as well?
1141

1142
        if "Role" in request:
1✔
1143
            if not api_utils.is_role_arn(request["Role"]):
1✔
1144
                raise ValidationException(
1✔
1145
                    f"1 validation error detected: Value '{request.get('Role')}'"
1146
                    + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
1147
                )
1148
            replace_kwargs["role"] = request["Role"]
1✔
1149

1150
        if "Description" in request:
1✔
1151
            replace_kwargs["description"] = request["Description"]
1✔
1152

1153
        if "Timeout" in request:
1✔
1154
            replace_kwargs["timeout"] = request["Timeout"]
1✔
1155

1156
        if "MemorySize" in request:
1✔
1157
            replace_kwargs["memory_size"] = request["MemorySize"]
1✔
1158

1159
        if "DeadLetterConfig" in request:
1✔
1160
            replace_kwargs["dead_letter_arn"] = request.get("DeadLetterConfig", {}).get("TargetArn")
1✔
1161

1162
        if vpc_config := request.get("VpcConfig"):
1✔
1163
            replace_kwargs["vpc_config"] = self._build_vpc_config(account_id, region, vpc_config)
1✔
1164

1165
        if "Handler" in request:
1✔
1166
            replace_kwargs["handler"] = request["Handler"]
1✔
1167

1168
        if "Runtime" in request:
1✔
1169
            runtime = request["Runtime"]
1✔
1170

1171
            if runtime not in ALL_RUNTIMES:
1✔
1172
                raise InvalidParameterValueException(
1✔
1173
                    f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1174
                    Type="User",
1175
                )
1176
            if runtime in DEPRECATED_RUNTIMES:
1✔
1177
                LOG.warning(
×
1178
                    "The Lambda runtime %s is deprecated. "
1179
                    "Please upgrade the runtime for the function %s: "
1180
                    "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
1181
                    runtime,
1182
                    function_name,
1183
                )
1184
            replace_kwargs["runtime"] = request["Runtime"]
1✔
1185

1186
        if snap_start := request.get("SnapStart"):
1✔
1187
            runtime = replace_kwargs.get("runtime") or latest_version_config.runtime
1✔
1188
            self._validate_snapstart(snap_start, runtime)
1✔
1189
            replace_kwargs["snap_start"] = SnapStartResponse(
1✔
1190
                ApplyOn=snap_start.get("ApplyOn", SnapStartApplyOn.None_),
1191
                OptimizationStatus=SnapStartOptimizationStatus.Off,
1192
            )
1193

1194
        if "Environment" in request:
1✔
1195
            if env_vars := request.get("Environment", {}).get("Variables", {}):
1✔
1196
                self._verify_env_variables(env_vars)
1✔
1197
            replace_kwargs["environment"] = env_vars
1✔
1198

1199
        if "Layers" in request:
1✔
1200
            new_layers = request["Layers"]
1✔
1201
            if new_layers:
1✔
1202
                self._validate_layers(new_layers, region=region, account_id=account_id)
1✔
1203
            replace_kwargs["layers"] = self.map_layers(new_layers)
1✔
1204

1205
        if "ImageConfig" in request:
1✔
1206
            new_image_config = request["ImageConfig"]
1✔
1207
            replace_kwargs["image_config"] = ImageConfig(
1✔
1208
                command=new_image_config.get("Command"),
1209
                entrypoint=new_image_config.get("EntryPoint"),
1210
                working_directory=new_image_config.get("WorkingDirectory"),
1211
            )
1212

1213
        if "LoggingConfig" in request:
1✔
1214
            logging_config = request["LoggingConfig"]
1✔
1215
            LOG.warning(
1✔
1216
                "Advanced Lambda Logging Configuration is currently mocked "
1217
                "and will not impact the logging behavior. "
1218
                "Please create a feature request if needed."
1219
            )
1220

1221
            # when switching to JSON, app and system level log is auto set to INFO
1222
            if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
1223
                logging_config = {
1✔
1224
                    "ApplicationLogLevel": "INFO",
1225
                    "SystemLogLevel": "INFO",
1226
                } | logging_config
1227

1228
            last_config = latest_version_config.logging_config
1✔
1229

1230
            # add partial update
1231
            new_logging_config = last_config | logging_config
1✔
1232

1233
            # in case we switched from JSON to Text we need to remove LogLevel keys
1234
            if (
1✔
1235
                new_logging_config.get("LogFormat") == LogFormat.Text
1236
                and last_config.get("LogFormat") == LogFormat.JSON
1237
            ):
1238
                new_logging_config.pop("ApplicationLogLevel", None)
1✔
1239
                new_logging_config.pop("SystemLogLevel", None)
1✔
1240

1241
            replace_kwargs["logging_config"] = new_logging_config
1✔
1242

1243
        if "TracingConfig" in request:
1✔
1244
            new_mode = request.get("TracingConfig", {}).get("Mode")
×
1245
            if new_mode:
×
1246
                replace_kwargs["tracing_config_mode"] = new_mode
×
1247

1248
        new_latest_version = dataclasses.replace(
1✔
1249
            latest_version,
1250
            config=dataclasses.replace(
1251
                latest_version_config,
1252
                last_modified=api_utils.generate_lambda_date(),
1253
                internal_revision=short_uid(),
1254
                last_update=UpdateStatus(
1255
                    status=LastUpdateStatus.InProgress,
1256
                    code="Creating",
1257
                    reason="The function is being created.",
1258
                ),
1259
                **replace_kwargs,
1260
            ),
1261
        )
1262
        function.versions["$LATEST"] = new_latest_version  # TODO: notify
1✔
1263
        self.lambda_service.update_version(new_version=new_latest_version)
1✔
1264

1265
        return api_utils.map_config_out(new_latest_version)
1✔
1266

1267
    @handler(operation="UpdateFunctionCode", expand=False)
1✔
1268
    def update_function_code(
1✔
1269
        self, context: RequestContext, request: UpdateFunctionCodeRequest
1270
    ) -> FunctionConfiguration:
1271
        """updates the $LATEST version of the function"""
1272
        # only supports normal zip packaging atm
1273
        # if request.get("Publish"):
1274
        #     self.lambda_service.create_function_version()
1275

1276
        function_name = request.get("FunctionName")
1✔
1277
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1278
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1279

1280
        store = lambda_stores[account_id][region]
1✔
1281
        if function_name not in store.functions:
1✔
1282
            raise ResourceNotFoundException(
×
1283
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1284
                Type="User",
1285
            )
1286
        function = store.functions[function_name]
1✔
1287

1288
        revision_id = request.get("RevisionId")
1✔
1289
        if revision_id and revision_id != function.latest().config.revision_id:
1✔
1290
            raise PreconditionFailedException(
1✔
1291
                "The Revision Id provided does not match the latest Revision Id. "
1292
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1293
                Type="User",
1294
            )
1295

1296
        # TODO verify if correct combination of code is set
1297
        image = None
1✔
1298
        if (
1✔
1299
            request.get("ZipFile") or request.get("S3Bucket")
1300
        ) and function.latest().config.package_type == PackageType.Image:
1301
            raise InvalidParameterValueException(
1✔
1302
                "Please provide ImageUri when updating a function with packageType Image.",
1303
                Type="User",
1304
            )
1305
        elif request.get("ImageUri") and function.latest().config.package_type == PackageType.Zip:
1✔
1306
            raise InvalidParameterValueException(
1✔
1307
                "Please don't provide ImageUri when updating a function with packageType Zip.",
1308
                Type="User",
1309
            )
1310

1311
        if zip_file := request.get("ZipFile"):
1✔
1312
            code = store_lambda_archive(
1✔
1313
                archive_file=zip_file,
1314
                function_name=function_name,
1315
                region_name=region,
1316
                account_id=account_id,
1317
            )
1318
        elif s3_bucket := request.get("S3Bucket"):
1✔
1319
            s3_key = request["S3Key"]
1✔
1320
            s3_object_version = request.get("S3ObjectVersion")
1✔
1321
            code = store_s3_bucket_archive(
1✔
1322
                archive_bucket=s3_bucket,
1323
                archive_key=s3_key,
1324
                archive_version=s3_object_version,
1325
                function_name=function_name,
1326
                region_name=region,
1327
                account_id=account_id,
1328
            )
1329
        elif image := request.get("ImageUri"):
1✔
1330
            code = None
1✔
1331
            image = create_image_code(image_uri=image)
1✔
1332
        else:
NEW
1333
            raise LambdaServiceException("A ZIP file, S3 bucket, or image is required")
×
1334

1335
        old_function_version = function.versions.get("$LATEST")
1✔
1336
        replace_kwargs = {"code": code} if code else {"image": image}
1✔
1337

1338
        if architectures := request.get("Architectures"):
1✔
1339
            if len(architectures) != 1:
×
1340
                raise ValidationException(
×
1341
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1342
                    f"satisfy constraint: Member must have length less than or equal to 1",
1343
                )
1344
            # An empty list of architectures is also forbidden. Further exceptions are tested here for create_function:
1345
            # tests.aws.services.lambda_.test_lambda_api.TestLambdaFunction.test_create_lambda_exceptions
1346
            if architectures[0] not in ARCHITECTURES:
×
1347
                raise ValidationException(
×
1348
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1349
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
1350
                    f"[x86_64, arm64], Member must not be null]",
1351
                )
1352
            replace_kwargs["architectures"] = architectures
×
1353

1354
        config = dataclasses.replace(
1✔
1355
            old_function_version.config,
1356
            internal_revision=short_uid(),
1357
            last_modified=api_utils.generate_lambda_date(),
1358
            last_update=UpdateStatus(
1359
                status=LastUpdateStatus.InProgress,
1360
                code="Creating",
1361
                reason="The function is being created.",
1362
            ),
1363
            **replace_kwargs,
1364
        )
1365
        function_version = dataclasses.replace(old_function_version, config=config)
1✔
1366
        function.versions["$LATEST"] = function_version
1✔
1367

1368
        self.lambda_service.update_version(new_version=function_version)
1✔
1369
        if request.get("Publish"):
1✔
1370
            function_version = self._publish_version_with_changes(
1✔
1371
                function_name=function_name, region=region, account_id=account_id
1372
            )
1373
        return api_utils.map_config_out(
1✔
1374
            function_version, return_qualified_arn=bool(request.get("Publish"))
1375
        )
1376

1377
    # TODO: does deleting the latest published version affect the next versions number?
1378
    # TODO: what happens when we call this with a qualifier and a fully qualified ARN? (+ conflicts?)
1379
    # TODO: test different ARN patterns (shorthand ARN?)
1380
    # TODO: test deleting across regions?
1381
    # TODO: test mismatch between context region and region in ARN
1382
    # TODO: test qualifier $LATEST, alias-name and version
1383
    def delete_function(
1✔
1384
        self,
1385
        context: RequestContext,
1386
        function_name: FunctionName,
1387
        qualifier: Qualifier = None,
1388
        **kwargs,
1389
    ) -> None:
1390
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1391
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1392
            function_name, qualifier, context
1393
        )
1394

1395
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1396
            raise InvalidParameterValueException(
×
1397
                "Deletion of aliases is not currently supported.",
1398
                Type="User",
1399
            )
1400

1401
        store = lambda_stores[account_id][region]
1✔
1402
        if qualifier == "$LATEST":
1✔
1403
            raise InvalidParameterValueException(
1✔
1404
                "$LATEST version cannot be deleted without deleting the function.", Type="User"
1405
            )
1406

1407
        if function_name not in store.functions:
1✔
1408
            e = ResourceNotFoundException(
1✔
1409
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1410
                Type="User",
1411
            )
1412
            raise e
1✔
1413
        function = store.functions.get(function_name)
1✔
1414

1415
        if qualifier:
1✔
1416
            # delete a version of the function
1417
            version = function.versions.pop(qualifier, None)
1✔
1418
            if version:
1✔
1419
                self.lambda_service.stop_version(version.id.qualified_arn())
1✔
1420
                destroy_code_if_not_used(code=version.config.code, function=function)
1✔
1421
        else:
1422
            # delete the whole function
1423
            # TODO: introduce locking for safe deletion: We could create a new version at the API layer before
1424
            #  the old version gets cleaned up in the internal lambda service.
1425
            function = store.functions.pop(function_name)
1✔
1426
            for version in function.versions.values():
1✔
1427
                self.lambda_service.stop_version(qualified_arn=version.id.qualified_arn())
1✔
1428
                # we can safely destroy the code here
1429
                if version.config.code:
1✔
1430
                    version.config.code.destroy()
1✔
1431

1432
    def list_functions(
1✔
1433
        self,
1434
        context: RequestContext,
1435
        master_region: MasterRegion = None,  # (only relevant for lambda@edge)
1436
        function_version: FunctionVersionApi = None,
1437
        marker: String = None,
1438
        max_items: MaxListItems = None,
1439
        **kwargs,
1440
    ) -> ListFunctionsResponse:
1441
        state = lambda_stores[context.account_id][context.region]
1✔
1442

1443
        if function_version and function_version != FunctionVersionApi.ALL:
1✔
1444
            raise ValidationException(
1✔
1445
                f"1 validation error detected: Value '{function_version}'"
1446
                + " at 'functionVersion' failed to satisfy constraint: Member must satisfy enum value set: [ALL]"
1447
            )
1448

1449
        if function_version == FunctionVersionApi.ALL:
1✔
1450
            # include all versions for all function
1451
            versions = [v for f in state.functions.values() for v in f.versions.values()]
1✔
1452
            return_qualified_arn = True
1✔
1453
        else:
1454
            versions = [f.latest() for f in state.functions.values()]
1✔
1455
            return_qualified_arn = False
1✔
1456

1457
        versions = [
1✔
1458
            api_utils.map_to_list_response(
1459
                api_utils.map_config_out(fc, return_qualified_arn=return_qualified_arn)
1460
            )
1461
            for fc in versions
1462
        ]
1463
        versions = PaginatedList(versions)
1✔
1464
        page, token = versions.get_page(
1✔
1465
            lambda version: version["FunctionArn"],
1466
            marker,
1467
            max_items,
1468
        )
1469
        return ListFunctionsResponse(Functions=page, NextMarker=token)
1✔
1470

1471
    def get_function(
1✔
1472
        self,
1473
        context: RequestContext,
1474
        function_name: NamespacedFunctionName,
1475
        qualifier: Qualifier = None,
1476
        **kwargs,
1477
    ) -> GetFunctionResponse:
1478
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1479
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1480
            function_name, qualifier, context
1481
        )
1482

1483
        fn = lambda_stores[account_id][region].functions.get(function_name)
1✔
1484
        if fn is None:
1✔
1485
            if qualifier is None:
1✔
1486
                raise ResourceNotFoundException(
1✔
1487
                    f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
1488
                    Type="User",
1489
                )
1490
            else:
1491
                raise ResourceNotFoundException(
1✔
1492
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
1493
                    Type="User",
1494
                )
1495
        alias_name = None
1✔
1496
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1497
            if qualifier not in fn.aliases:
1✔
1498
                alias_arn = api_utils.qualified_lambda_arn(
1✔
1499
                    function_name, qualifier, account_id, region
1500
                )
1501
                raise ResourceNotFoundException(f"Function not found: {alias_arn}", Type="User")
1✔
1502
            alias_name = qualifier
1✔
1503
            qualifier = fn.aliases[alias_name].function_version
1✔
1504

1505
        version = get_function_version(
1✔
1506
            function_name=function_name,
1507
            qualifier=qualifier,
1508
            account_id=account_id,
1509
            region=region,
1510
        )
1511
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
1512
        additional_fields = {}
1✔
1513
        if tags:
1✔
1514
            additional_fields["Tags"] = tags
1✔
1515
        code_location = None
1✔
1516
        if code := version.config.code:
1✔
1517
            code_location = FunctionCodeLocation(
1✔
1518
                Location=code.generate_presigned_url(endpoint_url=config.external_service_url()),
1519
                RepositoryType="S3",
1520
            )
1521
        elif image := version.config.image:
1✔
1522
            code_location = FunctionCodeLocation(
1✔
1523
                ImageUri=image.image_uri,
1524
                RepositoryType=image.repository_type,
1525
                ResolvedImageUri=image.resolved_image_uri,
1526
            )
1527
        concurrency = None
1✔
1528
        if fn.reserved_concurrent_executions:
1✔
1529
            concurrency = Concurrency(
1✔
1530
                ReservedConcurrentExecutions=fn.reserved_concurrent_executions
1531
            )
1532

1533
        return GetFunctionResponse(
1✔
1534
            Configuration=api_utils.map_config_out(
1535
                version, return_qualified_arn=bool(qualifier), alias_name=alias_name
1536
            ),
1537
            Code=code_location,  # TODO
1538
            Concurrency=concurrency,
1539
            **additional_fields,
1540
        )
1541

1542
    def get_function_configuration(
1✔
1543
        self,
1544
        context: RequestContext,
1545
        function_name: NamespacedFunctionName,
1546
        qualifier: Qualifier = None,
1547
        **kwargs,
1548
    ) -> FunctionConfiguration:
1549
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1550
        # CAVE: THIS RETURN VALUE IS *NOT* THE SAME AS IN get_function (!) but seems to be only configuration part?
1551
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1552
            function_name, qualifier, context
1553
        )
1554
        version = get_function_version(
1✔
1555
            function_name=function_name,
1556
            qualifier=qualifier,
1557
            account_id=account_id,
1558
            region=region,
1559
        )
1560
        return api_utils.map_config_out(version, return_qualified_arn=bool(qualifier))
1✔
1561

1562
    def invoke(
1✔
1563
        self,
1564
        context: RequestContext,
1565
        function_name: NamespacedFunctionName,
1566
        invocation_type: InvocationType | None = None,
1567
        log_type: LogType | None = None,
1568
        client_context: String | None = None,
1569
        payload: IO[Blob] | None = None,
1570
        qualifier: Qualifier | None = None,
1571
        tenant_id: TenantId | None = None,
1572
        **kwargs,
1573
    ) -> InvocationResponse:
1574
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1575
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1576
            function_name, qualifier, context
1577
        )
1578

1579
        user_agent = context.request.user_agent.string
1✔
1580

1581
        time_before = time.perf_counter()
1✔
1582
        try:
1✔
1583
            invocation_result = self.lambda_service.invoke(
1✔
1584
                function_name=function_name,
1585
                qualifier=qualifier,
1586
                region=region,
1587
                account_id=account_id,
1588
                invocation_type=invocation_type,
1589
                client_context=client_context,
1590
                request_id=context.request_id,
1591
                trace_context=context.trace_context,
1592
                payload=payload.read() if payload else None,
1593
                user_agent=user_agent,
1594
            )
1595
        except ServiceException:
1✔
1596
            raise
1✔
1597
        except EnvironmentStartupTimeoutException as e:
1✔
1598
            raise LambdaServiceException(
1✔
1599
                f"[{context.request_id}] Timeout while starting up lambda environment for function {function_name}:{qualifier}"
1600
            ) from e
1601
        except Exception as e:
1✔
1602
            LOG.error(
1✔
1603
                "[%s] Error while invoking lambda %s",
1604
                context.request_id,
1605
                function_name,
1606
                exc_info=LOG.isEnabledFor(logging.DEBUG),
1607
            )
1608
            raise LambdaServiceException(
1✔
1609
                f"[{context.request_id}] Internal error while executing lambda {function_name}:{qualifier}. Caused by {type(e).__name__}: {e}"
1610
            ) from e
1611

1612
        if invocation_type == InvocationType.Event:
1✔
1613
            # This happens when invocation type is event
1614
            return InvocationResponse(StatusCode=202)
1✔
1615
        if invocation_type == InvocationType.DryRun:
1✔
1616
            # This happens when invocation type is dryrun
1617
            return InvocationResponse(StatusCode=204)
1✔
1618
        LOG.debug("Lambda invocation duration: %0.2fms", (time.perf_counter() - time_before) * 1000)
1✔
1619

1620
        response = InvocationResponse(
1✔
1621
            StatusCode=200,
1622
            Payload=invocation_result.payload,
1623
            ExecutedVersion=invocation_result.executed_version,
1624
        )
1625

1626
        if invocation_result.is_error:
1✔
1627
            response["FunctionError"] = "Unhandled"
1✔
1628

1629
        if log_type == LogType.Tail:
1✔
1630
            response["LogResult"] = to_str(
1✔
1631
                base64.b64encode(to_bytes(invocation_result.logs)[-4096:])
1632
            )
1633

1634
        return response
1✔
1635

1636
    # Version operations
1637
    def publish_version(
1✔
1638
        self,
1639
        context: RequestContext,
1640
        function_name: FunctionName,
1641
        code_sha256: String = None,
1642
        description: Description = None,
1643
        revision_id: String = None,
1644
        **kwargs,
1645
    ) -> FunctionConfiguration:
1646
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1647
        function_name = api_utils.get_function_name(function_name, context)
1✔
1648
        new_version = self._publish_version_from_existing_version(
1✔
1649
            function_name=function_name,
1650
            description=description,
1651
            account_id=account_id,
1652
            region=region,
1653
            revision_id=revision_id,
1654
            code_sha256=code_sha256,
1655
        )
1656
        return api_utils.map_config_out(new_version, return_qualified_arn=True)
1✔
1657

1658
    def list_versions_by_function(
1✔
1659
        self,
1660
        context: RequestContext,
1661
        function_name: NamespacedFunctionName,
1662
        marker: String = None,
1663
        max_items: MaxListItems = None,
1664
        **kwargs,
1665
    ) -> ListVersionsByFunctionResponse:
1666
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1667
        function_name = api_utils.get_function_name(function_name, context)
1✔
1668
        function = self._get_function(
1✔
1669
            function_name=function_name, region=region, account_id=account_id
1670
        )
1671
        versions = [
1✔
1672
            api_utils.map_to_list_response(
1673
                api_utils.map_config_out(version=version, return_qualified_arn=True)
1674
            )
1675
            for version in function.versions.values()
1676
        ]
1677
        items = PaginatedList(versions)
1✔
1678
        page, token = items.get_page(
1✔
1679
            lambda item: item,
1680
            marker,
1681
            max_items,
1682
        )
1683
        return ListVersionsByFunctionResponse(Versions=page, NextMarker=token)
1✔
1684

1685
    # Alias
1686

1687
    def _create_routing_config_model(
1✔
1688
        self, routing_config_dict: dict[str, float], function_version: FunctionVersion
1689
    ):
1690
        if len(routing_config_dict) > 1:
1✔
1691
            raise InvalidParameterValueException(
1✔
1692
                "Number of items in AdditionalVersionWeights cannot be greater than 1",
1693
                Type="User",
1694
            )
1695
        # should be exactly one item here, still iterating, might be supported in the future
1696
        for key, value in routing_config_dict.items():
1✔
1697
            if value < 0.0 or value >= 1.0:
1✔
1698
                raise ValidationException(
1✔
1699
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map value must satisfy constraint: [Member must have value less than or equal to 1.0, Member must have value greater than or equal to 0.0, Member must not be null]"
1700
                )
1701
            if key == function_version.id.qualifier:
1✔
1702
                raise InvalidParameterValueException(
1✔
1703
                    f"Invalid function version {function_version.id.qualifier}. Function version {function_version.id.qualifier} is already included in routing configuration.",
1704
                    Type="User",
1705
                )
1706
            # check if version target is latest, then no routing config is allowed
1707
            if function_version.id.qualifier == "$LATEST":
1✔
1708
                raise InvalidParameterValueException(
1✔
1709
                    "$LATEST is not supported for an alias pointing to more than 1 version"
1710
                )
1711
            if not api_utils.qualifier_is_version(key):
1✔
1712
                raise ValidationException(
1✔
1713
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map keys must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: [0-9]+, Member must not be null]"
1714
                )
1715

1716
            # checking if the version in the config exists
1717
            get_function_version(
1✔
1718
                function_name=function_version.id.function_name,
1719
                qualifier=key,
1720
                region=function_version.id.region,
1721
                account_id=function_version.id.account,
1722
            )
1723
        return AliasRoutingConfig(version_weights=routing_config_dict)
1✔
1724

1725
    def create_alias(
1✔
1726
        self,
1727
        context: RequestContext,
1728
        function_name: FunctionName,
1729
        name: Alias,
1730
        function_version: Version,
1731
        description: Description = None,
1732
        routing_config: AliasRoutingConfiguration = None,
1733
        **kwargs,
1734
    ) -> AliasConfiguration:
1735
        if not api_utils.qualifier_is_alias(name):
1✔
1736
            raise ValidationException(
1✔
1737
                f"1 validation error detected: Value '{name}' at 'name' failed to satisfy constraint: Member must satisfy regular expression pattern: (?!^[0-9]+$)([a-zA-Z0-9-_]+)"
1738
            )
1739

1740
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1741
        function_name = api_utils.get_function_name(function_name, context)
1✔
1742
        target_version = get_function_version(
1✔
1743
            function_name=function_name,
1744
            qualifier=function_version,
1745
            region=region,
1746
            account_id=account_id,
1747
        )
1748
        function = self._get_function(
1✔
1749
            function_name=function_name, region=region, account_id=account_id
1750
        )
1751
        # description is always present, if not specified it's an empty string
1752
        description = description or ""
1✔
1753
        with function.lock:
1✔
1754
            if existing_alias := function.aliases.get(name):
1✔
1755
                raise ResourceConflictException(
1✔
1756
                    f"Alias already exists: {api_utils.map_alias_out(alias=existing_alias, function=function)['AliasArn']}",
1757
                    Type="User",
1758
                )
1759
            # checking if the version exists
1760
            routing_configuration = None
1✔
1761
            if routing_config and (
1✔
1762
                routing_config_dict := routing_config.get("AdditionalVersionWeights")
1763
            ):
1764
                routing_configuration = self._create_routing_config_model(
1✔
1765
                    routing_config_dict, target_version
1766
                )
1767

1768
            alias = VersionAlias(
1✔
1769
                name=name,
1770
                function_version=function_version,
1771
                description=description,
1772
                routing_configuration=routing_configuration,
1773
            )
1774
            function.aliases[name] = alias
1✔
1775
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1776

1777
    def list_aliases(
1✔
1778
        self,
1779
        context: RequestContext,
1780
        function_name: FunctionName,
1781
        function_version: Version = None,
1782
        marker: String = None,
1783
        max_items: MaxListItems = None,
1784
        **kwargs,
1785
    ) -> ListAliasesResponse:
1786
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1787
        function_name = api_utils.get_function_name(function_name, context)
1✔
1788
        function = self._get_function(
1✔
1789
            function_name=function_name, region=region, account_id=account_id
1790
        )
1791
        aliases = [
1✔
1792
            api_utils.map_alias_out(alias, function)
1793
            for alias in function.aliases.values()
1794
            if function_version is None or alias.function_version == function_version
1795
        ]
1796

1797
        aliases = PaginatedList(aliases)
1✔
1798
        page, token = aliases.get_page(
1✔
1799
            lambda alias: alias["AliasArn"],
1800
            marker,
1801
            max_items,
1802
        )
1803

1804
        return ListAliasesResponse(Aliases=page, NextMarker=token)
1✔
1805

1806
    def delete_alias(
1✔
1807
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
1808
    ) -> None:
1809
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1810
        function_name = api_utils.get_function_name(function_name, context)
1✔
1811
        function = self._get_function(
1✔
1812
            function_name=function_name, region=region, account_id=account_id
1813
        )
1814
        version_alias = function.aliases.pop(name, None)
1✔
1815

1816
        # cleanup related resources
1817
        if name in function.provisioned_concurrency_configs:
1✔
1818
            function.provisioned_concurrency_configs.pop(name)
1✔
1819

1820
        # TODO: Allow for deactivating/unregistering specific Lambda URLs
1821
        if version_alias and name in function.function_url_configs:
1✔
1822
            url_config = function.function_url_configs.pop(name)
1✔
1823
            LOG.debug(
1✔
1824
                "Stopping aliased Lambda Function URL %s for %s",
1825
                url_config.url,
1826
                url_config.function_name,
1827
            )
1828

1829
    def get_alias(
1✔
1830
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
1831
    ) -> AliasConfiguration:
1832
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1833
        function_name = api_utils.get_function_name(function_name, context)
1✔
1834
        function = self._get_function(
1✔
1835
            function_name=function_name, region=region, account_id=account_id
1836
        )
1837
        if not (alias := function.aliases.get(name)):
1✔
1838
            raise ResourceNotFoundException(
1✔
1839
                f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name=function_name, qualifier=name, region=region, account=account_id)}",
1840
                Type="User",
1841
            )
1842
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1843

1844
    def update_alias(
1✔
1845
        self,
1846
        context: RequestContext,
1847
        function_name: FunctionName,
1848
        name: Alias,
1849
        function_version: Version = None,
1850
        description: Description = None,
1851
        routing_config: AliasRoutingConfiguration = None,
1852
        revision_id: String = None,
1853
        **kwargs,
1854
    ) -> AliasConfiguration:
1855
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1856
        function_name = api_utils.get_function_name(function_name, context)
1✔
1857
        function = self._get_function(
1✔
1858
            function_name=function_name, region=region, account_id=account_id
1859
        )
1860
        if not (alias := function.aliases.get(name)):
1✔
1861
            fn_arn = api_utils.qualified_lambda_arn(function_name, name, account_id, region)
1✔
1862
            raise ResourceNotFoundException(
1✔
1863
                f"Alias not found: {fn_arn}",
1864
                Type="User",
1865
            )
1866
        if revision_id and alias.revision_id != revision_id:
1✔
1867
            raise PreconditionFailedException(
1✔
1868
                "The Revision Id provided does not match the latest Revision Id. "
1869
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1870
                Type="User",
1871
            )
1872
        changes = {}
1✔
1873
        if function_version is not None:
1✔
1874
            changes |= {"function_version": function_version}
1✔
1875
        if description is not None:
1✔
1876
            changes |= {"description": description}
1✔
1877
        if routing_config is not None:
1✔
1878
            # if it is an empty dict or AdditionalVersionWeights is empty, set routing config to None
1879
            new_routing_config = None
1✔
1880
            if routing_config_dict := routing_config.get("AdditionalVersionWeights"):
1✔
1881
                new_routing_config = self._create_routing_config_model(routing_config_dict)
×
1882
            changes |= {"routing_configuration": new_routing_config}
1✔
1883
        # even if no changes are done, we have to update revision id for some reason
1884
        old_alias = alias
1✔
1885
        alias = dataclasses.replace(alias, **changes)
1✔
1886
        function.aliases[name] = alias
1✔
1887

1888
        # TODO: signal lambda service that pointer potentially changed
1889
        self.lambda_service.update_alias(old_alias=old_alias, new_alias=alias, function=function)
1✔
1890

1891
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1892

1893
    # =======================================
1894
    # ======= EVENT SOURCE MAPPINGS =========
1895
    # =======================================
1896
    def check_service_resource_exists(
1✔
1897
        self, service: str, resource_arn: str, function_arn: str, function_role_arn: str
1898
    ):
1899
        """
1900
        Check if the service resource exists and if the function has access to it.
1901

1902
        Raises:
1903
            InvalidParameterValueException: If the service resource does not exist or the function does not have access to it.
1904
        """
1905
        arn = parse_arn(resource_arn)
1✔
1906
        source_client = get_internal_client(
1✔
1907
            arn=resource_arn,
1908
            role_arn=function_role_arn,
1909
            service_principal=ServicePrincipal.lambda_,
1910
            source_arn=function_arn,
1911
        )
1912
        if service in ["sqs", "sqs-fifo"]:
1✔
1913
            try:
1✔
1914
                # AWS uses `GetQueueAttributes` internally to verify the queue existence, but we need the `QueueUrl`
1915
                # which is not given directly. We build out a dummy `QueueUrl` which can be parsed by SQS to return
1916
                # the right value
1917
                queue_name = arn["resource"].split("/")[-1]
1✔
1918
                queue_url = f"http://sqs.{arn['region']}.domain/{arn['account']}/{queue_name}"
1✔
1919
                source_client.get_queue_attributes(QueueUrl=queue_url)
1✔
1920
            except ClientError as e:
1✔
1921
                error_code = e.response["Error"]["Code"]
1✔
1922
                if error_code == "AWS.SimpleQueueService.NonExistentQueue":
1✔
1923
                    raise InvalidParameterValueException(
1✔
1924
                        f"Error occurred while ReceiveMessage. SQS Error Code: {error_code}. SQS Error Message: {e.response['Error']['Message']}",
1925
                        Type="User",
1926
                    )
1927
                raise e
×
1928
        elif service in ["kinesis"]:
1✔
1929
            try:
1✔
1930
                source_client.describe_stream(StreamARN=resource_arn)
1✔
1931
            except ClientError as e:
1✔
1932
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
1933
                    raise InvalidParameterValueException(
1✔
1934
                        f"Stream not found: {resource_arn}",
1935
                        Type="User",
1936
                    )
1937
                raise e
×
1938
        elif service in ["dynamodb"]:
1✔
1939
            try:
1✔
1940
                source_client.describe_stream(StreamArn=resource_arn)
1✔
1941
            except ClientError as e:
1✔
1942
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
1943
                    raise InvalidParameterValueException(
1✔
1944
                        f"Stream not found: {resource_arn}",
1945
                        Type="User",
1946
                    )
1947
                raise e
×
1948

1949
    @handler("CreateEventSourceMapping", expand=False)
1✔
1950
    def create_event_source_mapping(
1✔
1951
        self,
1952
        context: RequestContext,
1953
        request: CreateEventSourceMappingRequest,
1954
    ) -> EventSourceMappingConfiguration:
1955
        return self.create_event_source_mapping_v2(context, request)
1✔
1956

1957
    def create_event_source_mapping_v2(
1✔
1958
        self,
1959
        context: RequestContext,
1960
        request: CreateEventSourceMappingRequest,
1961
    ) -> EventSourceMappingConfiguration:
1962
        # Validations
1963
        function_arn, function_name, state, function_version, function_role = (
1✔
1964
            self.validate_event_source_mapping(context, request)
1965
        )
1966

1967
        esm_config = EsmConfigFactory(request, context, function_arn).get_esm_config()
1✔
1968

1969
        # Copy esm_config to avoid a race condition with potential async update in the store
1970
        state.event_source_mappings[esm_config["UUID"]] = esm_config.copy()
1✔
1971
        enabled = request.get("Enabled", True)
1✔
1972
        # TODO: check for potential async race condition update -> think about locking
1973
        esm_worker = EsmWorkerFactory(esm_config, function_role, enabled).get_esm_worker()
1✔
1974
        self.esm_workers[esm_worker.uuid] = esm_worker
1✔
1975
        # TODO: check StateTransitionReason, LastModified, LastProcessingResult (concurrent updates requires locking!)
1976
        if tags := request.get("Tags"):
1✔
1977
            self._store_tags(esm_config.get("EventSourceMappingArn"), tags)
1✔
1978
        esm_worker.create()
1✔
1979
        return esm_config
1✔
1980

1981
    def validate_event_source_mapping(self, context, request):
1✔
1982
        # TODO: test whether stream ARNs are valid sources for Pipes or ESM or whether only DynamoDB table ARNs work
1983
        # TODO: Validate MaxRecordAgeInSeconds (i.e cannot subceed 60s but can be -1) and MaxRetryAttempts parameters.
1984
        # See https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumrecordageinseconds
1985
        is_create_esm_request = context.operation.name == self.create_event_source_mapping.operation
1✔
1986

1987
        if destination_config := request.get("DestinationConfig"):
1✔
1988
            if "OnSuccess" in destination_config:
1✔
1989
                raise InvalidParameterValueException(
1✔
1990
                    "Unsupported DestinationConfig parameter for given event source mapping type.",
1991
                    Type="User",
1992
                )
1993

1994
        service = None
1✔
1995
        if "SelfManagedEventSource" in request:
1✔
1996
            service = "kafka"
×
1997
            if "SourceAccessConfigurations" not in request:
×
1998
                raise InvalidParameterValueException(
×
1999
                    "Required 'sourceAccessConfigurations' parameter is missing.", Type="User"
2000
                )
2001
        if service is None and "EventSourceArn" not in request:
1✔
2002
            raise InvalidParameterValueException("Unrecognized event source.", Type="User")
1✔
2003
        if service is None:
1✔
2004
            service = extract_service_from_arn(request["EventSourceArn"])
1✔
2005

2006
        batch_size = api_utils.validate_and_set_batch_size(service, request.get("BatchSize"))
1✔
2007
        if service in ["dynamodb", "kinesis"]:
1✔
2008
            starting_position = request.get("StartingPosition")
1✔
2009
            if not starting_position:
1✔
2010
                raise InvalidParameterValueException(
1✔
2011
                    "1 validation error detected: Value null at 'startingPosition' failed to satisfy constraint: Member must not be null.",
2012
                    Type="User",
2013
                )
2014

2015
            if starting_position not in KinesisStreamStartPosition.__members__:
1✔
2016
                raise ValidationException(
1✔
2017
                    f"1 validation error detected: Value '{starting_position}' at 'startingPosition' failed to satisfy constraint: Member must satisfy enum value set: [LATEST, AT_TIMESTAMP, TRIM_HORIZON]"
2018
                )
2019
            # AT_TIMESTAMP is not allowed for DynamoDB Streams
2020
            elif (
1✔
2021
                service == "dynamodb"
2022
                and starting_position not in DynamoDBStreamStartPosition.__members__
2023
            ):
2024
                raise InvalidParameterValueException(
1✔
2025
                    f"Unsupported starting position for arn type: {request['EventSourceArn']}",
2026
                    Type="User",
2027
                )
2028

2029
        if service in ["sqs", "sqs-fifo"]:
1✔
2030
            if batch_size > 10 and request.get("MaximumBatchingWindowInSeconds", 0) == 0:
1✔
2031
                raise InvalidParameterValueException(
1✔
2032
                    "Maximum batch window in seconds must be greater than 0 if maximum batch size is greater than 10",
2033
                    Type="User",
2034
                )
2035

2036
        if (filter_criteria := request.get("FilterCriteria")) is not None:
1✔
2037
            for filter_ in filter_criteria.get("Filters", []):
1✔
2038
                pattern_str = filter_.get("Pattern")
1✔
2039
                if not pattern_str or not isinstance(pattern_str, str):
1✔
2040
                    raise InvalidParameterValueException(
×
2041
                        "Invalid filter pattern definition.", Type="User"
2042
                    )
2043

2044
                if not validate_event_pattern(pattern_str):
1✔
2045
                    raise InvalidParameterValueException(
1✔
2046
                        "Invalid filter pattern definition.", Type="User"
2047
                    )
2048

2049
        # Can either have a FunctionName (i.e CreateEventSourceMapping request) or
2050
        # an internal EventSourceMappingConfiguration representation
2051
        request_function_name = request.get("FunctionName") or request.get("FunctionArn")
1✔
2052
        # can be either a partial arn or a full arn for the version/alias
2053
        function_name, qualifier, account, region = function_locators_from_arn(
1✔
2054
            request_function_name
2055
        )
2056
        # TODO: validate `context.region` vs. `region(request["FunctionName"])` vs. `region(request["EventSourceArn"])`
2057
        account = account or context.account_id
1✔
2058
        region = region or context.region
1✔
2059
        state = lambda_stores[account][region]
1✔
2060
        fn = state.functions.get(function_name)
1✔
2061
        if not fn:
1✔
2062
            raise InvalidParameterValueException("Function does not exist", Type="User")
1✔
2063

2064
        if qualifier:
1✔
2065
            # make sure the function version/alias exists
2066
            if api_utils.qualifier_is_alias(qualifier):
1✔
2067
                fn_alias = fn.aliases.get(qualifier)
1✔
2068
                if not fn_alias:
1✔
2069
                    raise Exception("unknown alias")  # TODO: cover via test
×
2070
            elif api_utils.qualifier_is_version(qualifier):
1✔
2071
                fn_version = fn.versions.get(qualifier)
1✔
2072
                if not fn_version:
1✔
2073
                    raise Exception("unknown version")  # TODO: cover via test
×
2074
            elif qualifier == "$LATEST":
1✔
2075
                pass
1✔
2076
            else:
2077
                raise Exception("invalid functionname")  # TODO: cover via test
×
2078
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account, region)
1✔
2079

2080
        else:
2081
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account, region)
1✔
2082

2083
        function_version = get_function_version_from_arn(fn_arn)
1✔
2084
        function_role = function_version.config.role
1✔
2085

2086
        if source_arn := request.get("EventSourceArn"):
1✔
2087
            self.check_service_resource_exists(service, source_arn, fn_arn, function_role)
1✔
2088
        # Check we are validating a CreateEventSourceMapping request
2089
        if is_create_esm_request:
1✔
2090

2091
            def _get_mapping_sources(mapping: dict[str, Any]) -> list[str]:
1✔
2092
                if event_source_arn := mapping.get("EventSourceArn"):
1✔
2093
                    return [event_source_arn]
1✔
2094
                return (
×
2095
                    mapping.get("SelfManagedEventSource", {})
2096
                    .get("Endpoints", {})
2097
                    .get("KAFKA_BOOTSTRAP_SERVERS", [])
2098
                )
2099

2100
            # check for event source duplicates
2101
            # TODO: currently validated for sqs, kinesis, and dynamodb
2102
            service_id = load_service(service).service_id
1✔
2103
            for uuid, mapping in state.event_source_mappings.items():
1✔
2104
                mapping_sources = _get_mapping_sources(mapping)
1✔
2105
                request_sources = _get_mapping_sources(request)
1✔
2106
                if mapping["FunctionArn"] == fn_arn and (
1✔
2107
                    set(mapping_sources).intersection(request_sources)
2108
                ):
2109
                    if service == "sqs":
1✔
2110
                        # *shakes fist at SQS*
2111
                        raise ResourceConflictException(
1✔
2112
                            f'An event source mapping with {service_id} arn (" {mapping["EventSourceArn"]} ") '
2113
                            f'and function (" {function_name} ") already exists. Please update or delete the '
2114
                            f"existing mapping with UUID {uuid}",
2115
                            Type="User",
2116
                        )
2117
                    elif service == "kafka":
1✔
2118
                        if set(mapping["Topics"]).intersection(request["Topics"]):
×
2119
                            raise ResourceConflictException(
×
2120
                                f'An event source mapping with event source ("{",".join(request_sources)}"), '
2121
                                f'function ("{fn_arn}"), '
2122
                                f'topics ("{",".join(request["Topics"])}") already exists. Please update or delete the '
2123
                                f"existing mapping with UUID {uuid}",
2124
                                Type="User",
2125
                            )
2126
                    else:
2127
                        raise ResourceConflictException(
1✔
2128
                            f'The event source arn (" {mapping["EventSourceArn"]} ") and function '
2129
                            f'(" {function_name} ") provided mapping already exists. Please update or delete the '
2130
                            f"existing mapping with UUID {uuid}",
2131
                            Type="User",
2132
                        )
2133
        return fn_arn, function_name, state, function_version, function_role
1✔
2134

2135
    @handler("UpdateEventSourceMapping", expand=False)
1✔
2136
    def update_event_source_mapping(
1✔
2137
        self,
2138
        context: RequestContext,
2139
        request: UpdateEventSourceMappingRequest,
2140
    ) -> EventSourceMappingConfiguration:
2141
        return self.update_event_source_mapping_v2(context, request)
1✔
2142

2143
    def update_event_source_mapping_v2(
1✔
2144
        self,
2145
        context: RequestContext,
2146
        request: UpdateEventSourceMappingRequest,
2147
    ) -> EventSourceMappingConfiguration:
2148
        # TODO: test and implement this properly (quite complex with many validations and limitations!)
2149
        LOG.warning(
1✔
2150
            "Updating Lambda Event Source Mapping is in experimental state and not yet fully tested."
2151
        )
2152
        state = lambda_stores[context.account_id][context.region]
1✔
2153
        request_data = {**request}
1✔
2154
        uuid = request_data.pop("UUID", None)
1✔
2155
        if not uuid:
1✔
2156
            raise ResourceNotFoundException(
×
2157
                "The resource you requested does not exist.", Type="User"
2158
            )
2159
        old_event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2160
        esm_worker = self.esm_workers.get(uuid)
1✔
2161
        if old_event_source_mapping is None or esm_worker is None:
1✔
2162
            raise ResourceNotFoundException(
1✔
2163
                "The resource you requested does not exist.", Type="User"
2164
            )  # TODO: test?
2165

2166
        # normalize values to overwrite
2167
        event_source_mapping = old_event_source_mapping | request_data
1✔
2168

2169
        temp_params = {}  # values only set for the returned response, not saved internally (e.g. transient state)
1✔
2170

2171
        # Validate the newly updated ESM object. We ignore the output here since we only care whether an Exception is raised.
2172
        function_arn, _, _, function_version, function_role = self.validate_event_source_mapping(
1✔
2173
            context, event_source_mapping
2174
        )
2175

2176
        # remove the FunctionName field
2177
        event_source_mapping.pop("FunctionName", None)
1✔
2178

2179
        if function_arn:
1✔
2180
            event_source_mapping["FunctionArn"] = function_arn
1✔
2181

2182
        # Only apply update if the desired state differs
2183
        enabled = request.get("Enabled")
1✔
2184
        if enabled is not None:
1✔
2185
            if enabled and old_event_source_mapping["State"] != EsmState.ENABLED:
1✔
2186
                event_source_mapping["State"] = EsmState.ENABLING
1✔
2187
            # TODO: What happens when trying to update during an update or failed state?!
2188
            elif not enabled and old_event_source_mapping["State"] == EsmState.ENABLED:
1✔
2189
                event_source_mapping["State"] = EsmState.DISABLING
1✔
2190
        else:
2191
            event_source_mapping["State"] = EsmState.UPDATING
1✔
2192

2193
        # To ensure parity, certain responses need to be immediately returned
2194
        temp_params["State"] = event_source_mapping["State"]
1✔
2195

2196
        state.event_source_mappings[uuid] = event_source_mapping
1✔
2197

2198
        # TODO: Currently, we re-create the entire ESM worker. Look into approach with better performance.
2199
        worker_factory = EsmWorkerFactory(
1✔
2200
            event_source_mapping, function_role, request.get("Enabled", esm_worker.enabled)
2201
        )
2202

2203
        # Get a new ESM worker object but do not active it, since the factory holds all logic for creating new worker from configuration.
2204
        updated_esm_worker = worker_factory.get_esm_worker()
1✔
2205
        self.esm_workers[uuid] = updated_esm_worker
1✔
2206

2207
        # We should stop() the worker since the delete() will remove the ESM from the state mapping.
2208
        esm_worker.stop()
1✔
2209
        # This will either create an EsmWorker in the CREATING state if enabled. Otherwise, the DISABLING state is set.
2210
        updated_esm_worker.create()
1✔
2211

2212
        return {**event_source_mapping, **temp_params}
1✔
2213

2214
    def delete_event_source_mapping(
1✔
2215
        self, context: RequestContext, uuid: String, **kwargs
2216
    ) -> EventSourceMappingConfiguration:
2217
        state = lambda_stores[context.account_id][context.region]
1✔
2218
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2219
        if not event_source_mapping:
1✔
2220
            raise ResourceNotFoundException(
1✔
2221
                "The resource you requested does not exist.", Type="User"
2222
            )
2223
        esm = state.event_source_mappings[uuid]
1✔
2224
        # TODO: add proper locking
2225
        esm_worker = self.esm_workers.pop(uuid, None)
1✔
2226
        # Asynchronous delete in v2
2227
        if not esm_worker:
1✔
2228
            raise ResourceNotFoundException(
×
2229
                "The resource you requested does not exist.", Type="User"
2230
            )
2231
        esm_worker.delete()
1✔
2232
        return {**esm, "State": EsmState.DELETING}
1✔
2233

2234
    def get_event_source_mapping(
1✔
2235
        self, context: RequestContext, uuid: String, **kwargs
2236
    ) -> EventSourceMappingConfiguration:
2237
        state = lambda_stores[context.account_id][context.region]
1✔
2238
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2239
        if not event_source_mapping:
1✔
2240
            raise ResourceNotFoundException(
1✔
2241
                "The resource you requested does not exist.", Type="User"
2242
            )
2243
        esm_worker = self.esm_workers.get(uuid)
1✔
2244
        if not esm_worker:
1✔
2245
            raise ResourceNotFoundException(
×
2246
                "The resource you requested does not exist.", Type="User"
2247
            )
2248
        event_source_mapping["State"] = esm_worker.current_state
1✔
2249
        event_source_mapping["StateTransitionReason"] = esm_worker.state_transition_reason
1✔
2250
        return event_source_mapping
1✔
2251

2252
    def list_event_source_mappings(
1✔
2253
        self,
2254
        context: RequestContext,
2255
        event_source_arn: Arn = None,
2256
        function_name: FunctionName = None,
2257
        marker: String = None,
2258
        max_items: MaxListItems = None,
2259
        **kwargs,
2260
    ) -> ListEventSourceMappingsResponse:
2261
        state = lambda_stores[context.account_id][context.region]
1✔
2262

2263
        esms = state.event_source_mappings.values()
1✔
2264
        # TODO: update and test State and StateTransitionReason for ESM v2
2265

2266
        if event_source_arn:  # TODO: validate pattern
1✔
2267
            esms = [e for e in esms if e.get("EventSourceArn") == event_source_arn]
1✔
2268

2269
        if function_name:
1✔
2270
            esms = [e for e in esms if function_name in e["FunctionArn"]]
1✔
2271

2272
        esms = PaginatedList(esms)
1✔
2273
        page, token = esms.get_page(
1✔
2274
            lambda x: x["UUID"],
2275
            marker,
2276
            max_items,
2277
        )
2278
        return ListEventSourceMappingsResponse(EventSourceMappings=page, NextMarker=token)
1✔
2279

2280
    def get_source_type_from_request(self, request: dict[str, Any]) -> str:
1✔
2281
        if event_source_arn := request.get("EventSourceArn", ""):
×
2282
            service = extract_service_from_arn(event_source_arn)
×
2283
            if service == "sqs" and "fifo" in event_source_arn:
×
2284
                service = "sqs-fifo"
×
2285
            return service
×
2286
        elif request.get("SelfManagedEventSource"):
×
2287
            return "kafka"
×
2288

2289
    # =======================================
2290
    # ============ FUNCTION URLS ============
2291
    # =======================================
2292

2293
    @staticmethod
1✔
2294
    def _validate_qualifier(qualifier: str) -> None:
1✔
2295
        if qualifier == "$LATEST" or (qualifier and api_utils.qualifier_is_version(qualifier)):
1✔
2296
            raise ValidationException(
1✔
2297
                f"1 validation error detected: Value '{qualifier}' at 'qualifier' failed to satisfy constraint: Member must satisfy regular expression pattern: ((?!^\\d+$)^[0-9a-zA-Z-_]+$)"
2298
            )
2299

2300
    @staticmethod
1✔
2301
    def _validate_invoke_mode(invoke_mode: str) -> None:
1✔
2302
        if invoke_mode and invoke_mode not in [InvokeMode.BUFFERED, InvokeMode.RESPONSE_STREAM]:
1✔
2303
            raise ValidationException(
1✔
2304
                f"1 validation error detected: Value '{invoke_mode}' at 'invokeMode' failed to satisfy constraint: Member must satisfy enum value set: [RESPONSE_STREAM, BUFFERED]"
2305
            )
2306
        if invoke_mode == InvokeMode.RESPONSE_STREAM:
1✔
2307
            # TODO should we actually fail for setting RESPONSE_STREAM?
2308
            #  It should trigger InvokeWithResponseStream which is not implemented
2309
            LOG.warning(
1✔
2310
                "The invokeMode 'RESPONSE_STREAM' is not yet supported on LocalStack. The property is only mocked, the execution will still be 'BUFFERED'"
2311
            )
2312

2313
    # TODO: what happens if function state is not active?
2314
    def create_function_url_config(
1✔
2315
        self,
2316
        context: RequestContext,
2317
        function_name: FunctionName,
2318
        auth_type: FunctionUrlAuthType,
2319
        qualifier: FunctionUrlQualifier = None,
2320
        cors: Cors = None,
2321
        invoke_mode: InvokeMode = None,
2322
        **kwargs,
2323
    ) -> CreateFunctionUrlConfigResponse:
2324
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2325
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2326
            function_name, qualifier, context
2327
        )
2328
        state = lambda_stores[account_id][region]
1✔
2329
        self._validate_qualifier(qualifier)
1✔
2330
        self._validate_invoke_mode(invoke_mode)
1✔
2331

2332
        fn = state.functions.get(function_name)
1✔
2333
        if fn is None:
1✔
2334
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2335

2336
        url_config = fn.function_url_configs.get(qualifier or "$LATEST")
1✔
2337
        if url_config:
1✔
2338
            raise ResourceConflictException(
1✔
2339
                f"Failed to create function url config for [functionArn = {url_config.function_arn}]. Error message:  FunctionUrlConfig exists for this Lambda function",
2340
                Type="User",
2341
            )
2342

2343
        if qualifier and qualifier != "$LATEST" and qualifier not in fn.aliases:
1✔
2344
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2345

2346
        normalized_qualifier = qualifier or "$LATEST"
1✔
2347

2348
        function_arn = (
1✔
2349
            api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
2350
            if qualifier
2351
            else api_utils.unqualified_lambda_arn(function_name, account_id, region)
2352
        )
2353

2354
        custom_id: str | None = None
1✔
2355

2356
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
2357
        if TAG_KEY_CUSTOM_URL in tags:
1✔
2358
            # Note: I really wanted to add verification here that the
2359
            # url_id is unique, so we could surface that to the user ASAP.
2360
            # However, it seems like that information isn't available yet,
2361
            # since (as far as I can tell) we call
2362
            # self.router.register_routes() once, in a single shot, for all
2363
            # of the routes -- and we need to verify that it's unique not
2364
            # just for this particular lambda function, but for the entire
2365
            # lambda provider. Therefore... that idea proved non-trivial!
2366
            custom_id_tag_value = (
1✔
2367
                f"{tags[TAG_KEY_CUSTOM_URL]}-{qualifier}" if qualifier else tags[TAG_KEY_CUSTOM_URL]
2368
            )
2369
            if TAG_KEY_CUSTOM_URL_VALIDATOR.match(custom_id_tag_value):
1✔
2370
                custom_id = custom_id_tag_value
1✔
2371

2372
            else:
2373
                # Note: we're logging here instead of raising to prioritize
2374
                # strict parity with AWS over the localstack-only custom_id
2375
                LOG.warning(
1✔
2376
                    "Invalid custom ID tag value for lambda URL (%s=%s). "
2377
                    "Replaced with default (random id)",
2378
                    TAG_KEY_CUSTOM_URL,
2379
                    custom_id_tag_value,
2380
                )
2381

2382
        # The url_id is the subdomain used for the URL we're creating. This
2383
        # is either created randomly (as in AWS), or can be passed as a tag
2384
        # to the lambda itself (localstack-only).
2385
        url_id: str
2386
        if custom_id is None:
1✔
2387
            url_id = api_utils.generate_random_url_id()
1✔
2388
        else:
2389
            url_id = custom_id
1✔
2390

2391
        host_definition = localstack_host(custom_port=config.GATEWAY_LISTEN[0].port)
1✔
2392
        fn.function_url_configs[normalized_qualifier] = FunctionUrlConfig(
1✔
2393
            function_arn=function_arn,
2394
            function_name=function_name,
2395
            cors=cors,
2396
            url_id=url_id,
2397
            url=f"http://{url_id}.lambda-url.{context.region}.{host_definition.host_and_port()}/",  # TODO: https support
2398
            auth_type=auth_type,
2399
            creation_time=api_utils.generate_lambda_date(),
2400
            last_modified_time=api_utils.generate_lambda_date(),
2401
            invoke_mode=invoke_mode,
2402
        )
2403

2404
        # persist and start URL
2405
        # TODO: implement URL invoke
2406
        api_url_config = api_utils.map_function_url_config(
1✔
2407
            fn.function_url_configs[normalized_qualifier]
2408
        )
2409

2410
        return CreateFunctionUrlConfigResponse(
1✔
2411
            FunctionUrl=api_url_config["FunctionUrl"],
2412
            FunctionArn=api_url_config["FunctionArn"],
2413
            AuthType=api_url_config["AuthType"],
2414
            Cors=api_url_config["Cors"],
2415
            CreationTime=api_url_config["CreationTime"],
2416
            InvokeMode=api_url_config["InvokeMode"],
2417
        )
2418

2419
    def get_function_url_config(
1✔
2420
        self,
2421
        context: RequestContext,
2422
        function_name: FunctionName,
2423
        qualifier: FunctionUrlQualifier = None,
2424
        **kwargs,
2425
    ) -> GetFunctionUrlConfigResponse:
2426
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2427
        state = lambda_stores[account_id][region]
1✔
2428

2429
        fn_name, qualifier = api_utils.get_name_and_qualifier(function_name, qualifier, context)
1✔
2430

2431
        self._validate_qualifier(qualifier)
1✔
2432

2433
        resolved_fn = state.functions.get(fn_name)
1✔
2434
        if not resolved_fn:
1✔
2435
            raise ResourceNotFoundException(
1✔
2436
                "The resource you requested does not exist.", Type="User"
2437
            )
2438

2439
        qualifier = qualifier or "$LATEST"
1✔
2440
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2441
        if not url_config:
1✔
2442
            raise ResourceNotFoundException(
1✔
2443
                "The resource you requested does not exist.", Type="User"
2444
            )
2445

2446
        return api_utils.map_function_url_config(url_config)
1✔
2447

2448
    def update_function_url_config(
1✔
2449
        self,
2450
        context: RequestContext,
2451
        function_name: FunctionName,
2452
        qualifier: FunctionUrlQualifier = None,
2453
        auth_type: FunctionUrlAuthType = None,
2454
        cors: Cors = None,
2455
        invoke_mode: InvokeMode = None,
2456
        **kwargs,
2457
    ) -> UpdateFunctionUrlConfigResponse:
2458
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2459
        state = lambda_stores[account_id][region]
1✔
2460

2461
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2462
            function_name, qualifier, context
2463
        )
2464
        self._validate_qualifier(qualifier)
1✔
2465
        self._validate_invoke_mode(invoke_mode)
1✔
2466

2467
        fn = state.functions.get(function_name)
1✔
2468
        if not fn:
1✔
2469
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2470

2471
        normalized_qualifier = qualifier or "$LATEST"
1✔
2472

2473
        if (
1✔
2474
            api_utils.qualifier_is_alias(normalized_qualifier)
2475
            and normalized_qualifier not in fn.aliases
2476
        ):
2477
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2478

2479
        url_config = fn.function_url_configs.get(normalized_qualifier)
1✔
2480
        if not url_config:
1✔
2481
            raise ResourceNotFoundException(
1✔
2482
                "The resource you requested does not exist.", Type="User"
2483
            )
2484

2485
        changes = {
1✔
2486
            "last_modified_time": api_utils.generate_lambda_date(),
2487
            **({"cors": cors} if cors is not None else {}),
2488
            **({"auth_type": auth_type} if auth_type is not None else {}),
2489
        }
2490

2491
        if invoke_mode:
1✔
2492
            changes["invoke_mode"] = invoke_mode
1✔
2493

2494
        new_url_config = dataclasses.replace(url_config, **changes)
1✔
2495
        fn.function_url_configs[normalized_qualifier] = new_url_config
1✔
2496

2497
        return UpdateFunctionUrlConfigResponse(
1✔
2498
            FunctionUrl=new_url_config.url,
2499
            FunctionArn=new_url_config.function_arn,
2500
            AuthType=new_url_config.auth_type,
2501
            Cors=new_url_config.cors,
2502
            CreationTime=new_url_config.creation_time,
2503
            LastModifiedTime=new_url_config.last_modified_time,
2504
            InvokeMode=new_url_config.invoke_mode,
2505
        )
2506

2507
    def delete_function_url_config(
1✔
2508
        self,
2509
        context: RequestContext,
2510
        function_name: FunctionName,
2511
        qualifier: FunctionUrlQualifier = None,
2512
        **kwargs,
2513
    ) -> None:
2514
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2515
        state = lambda_stores[account_id][region]
1✔
2516

2517
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2518
            function_name, qualifier, context
2519
        )
2520
        self._validate_qualifier(qualifier)
1✔
2521

2522
        resolved_fn = state.functions.get(function_name)
1✔
2523
        if not resolved_fn:
1✔
2524
            raise ResourceNotFoundException(
1✔
2525
                "The resource you requested does not exist.", Type="User"
2526
            )
2527

2528
        qualifier = qualifier or "$LATEST"
1✔
2529
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2530
        if not url_config:
1✔
2531
            raise ResourceNotFoundException(
1✔
2532
                "The resource you requested does not exist.", Type="User"
2533
            )
2534

2535
        del resolved_fn.function_url_configs[qualifier]
1✔
2536

2537
    def list_function_url_configs(
1✔
2538
        self,
2539
        context: RequestContext,
2540
        function_name: FunctionName,
2541
        marker: String = None,
2542
        max_items: MaxItems = None,
2543
        **kwargs,
2544
    ) -> ListFunctionUrlConfigsResponse:
2545
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2546
        state = lambda_stores[account_id][region]
1✔
2547

2548
        fn_name = api_utils.get_function_name(function_name, context)
1✔
2549
        resolved_fn = state.functions.get(fn_name)
1✔
2550
        if not resolved_fn:
1✔
2551
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2552

2553
        url_configs = [
1✔
2554
            api_utils.map_function_url_config(fn_conf)
2555
            for fn_conf in resolved_fn.function_url_configs.values()
2556
        ]
2557
        url_configs = PaginatedList(url_configs)
1✔
2558
        page, token = url_configs.get_page(
1✔
2559
            lambda url_config: url_config["FunctionArn"],
2560
            marker,
2561
            max_items,
2562
        )
2563
        url_configs = page
1✔
2564
        return ListFunctionUrlConfigsResponse(FunctionUrlConfigs=url_configs, NextMarker=token)
1✔
2565

2566
    # =======================================
2567
    # ============  Permissions  ============
2568
    # =======================================
2569

2570
    @handler("AddPermission", expand=False)
1✔
2571
    def add_permission(
1✔
2572
        self,
2573
        context: RequestContext,
2574
        request: AddPermissionRequest,
2575
    ) -> AddPermissionResponse:
2576
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2577
            request.get("FunctionName"), request.get("Qualifier"), context
2578
        )
2579

2580
        # validate qualifier
2581
        if qualifier is not None:
1✔
2582
            self._validate_qualifier_expression(qualifier)
1✔
2583
            if qualifier == "$LATEST":
1✔
2584
                raise InvalidParameterValueException(
1✔
2585
                    "We currently do not support adding policies for $LATEST.", Type="User"
2586
                )
2587
        account_id, region = api_utils.get_account_and_region(request.get("FunctionName"), context)
1✔
2588

2589
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2590
        resolved_qualifier, fn_arn = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2591

2592
        revision_id = request.get("RevisionId")
1✔
2593
        if revision_id:
1✔
2594
            fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2595
            if revision_id != fn_revision_id:
1✔
2596
                raise PreconditionFailedException(
1✔
2597
                    "The Revision Id provided does not match the latest Revision Id. "
2598
                    "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2599
                    Type="User",
2600
                )
2601

2602
        request_sid = request["StatementId"]
1✔
2603
        if not bool(STATEMENT_ID_REGEX.match(request_sid)):
1✔
2604
            raise ValidationException(
1✔
2605
                f"1 validation error detected: Value '{request_sid}' at 'statementId' failed to satisfy constraint: Member must satisfy regular expression pattern: ([a-zA-Z0-9-_]+)"
2606
            )
2607
        # check for an already existing policy and any conflicts in existing statements
2608
        existing_policy = resolved_fn.permissions.get(resolved_qualifier)
1✔
2609
        if existing_policy:
1✔
2610
            if request_sid in [s["Sid"] for s in existing_policy.policy.Statement]:
1✔
2611
                # uniqueness scope: statement id needs to be unique per qualified function ($LATEST, version, or alias)
2612
                # Counterexample: the same sid can exist within $LATEST, version, and alias
2613
                raise ResourceConflictException(
1✔
2614
                    f"The statement id ({request_sid}) provided already exists. Please provide a new statement id, or remove the existing statement.",
2615
                    Type="User",
2616
                )
2617

2618
        permission_statement = api_utils.build_statement(
1✔
2619
            partition=context.partition,
2620
            resource_arn=fn_arn,
2621
            statement_id=request["StatementId"],
2622
            action=request["Action"],
2623
            principal=request["Principal"],
2624
            source_arn=request.get("SourceArn"),
2625
            source_account=request.get("SourceAccount"),
2626
            principal_org_id=request.get("PrincipalOrgID"),
2627
            event_source_token=request.get("EventSourceToken"),
2628
            auth_type=request.get("FunctionUrlAuthType"),
2629
        )
2630
        new_policy = existing_policy
1✔
2631
        if not existing_policy:
1✔
2632
            new_policy = FunctionResourcePolicy(
1✔
2633
                policy=ResourcePolicy(Version="2012-10-17", Id="default", Statement=[])
2634
            )
2635
        new_policy.policy.Statement.append(permission_statement)
1✔
2636
        if not existing_policy:
1✔
2637
            resolved_fn.permissions[resolved_qualifier] = new_policy
1✔
2638

2639
        # Update revision id of alias or version
2640
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2641
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2642
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2643
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2644
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
1✔
2645
        # Assumes that a non-alias is a version
2646
        else:
2647
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2648
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2649
                resolved_version, config=dataclasses.replace(resolved_version.config)
2650
            )
2651
        return AddPermissionResponse(Statement=json.dumps(permission_statement))
1✔
2652

2653
    def remove_permission(
1✔
2654
        self,
2655
        context: RequestContext,
2656
        function_name: FunctionName,
2657
        statement_id: NamespacedStatementId,
2658
        qualifier: Qualifier = None,
2659
        revision_id: String = None,
2660
        **kwargs,
2661
    ) -> None:
2662
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2663
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2664
            function_name, qualifier, context
2665
        )
2666
        if qualifier is not None:
1✔
2667
            self._validate_qualifier_expression(qualifier)
1✔
2668

2669
        state = lambda_stores[account_id][region]
1✔
2670
        resolved_fn = state.functions.get(function_name)
1✔
2671
        if resolved_fn is None:
1✔
2672
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2673
            raise ResourceNotFoundException(f"No policy found for: {fn_arn}", Type="User")
1✔
2674

2675
        resolved_qualifier, _ = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2676
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2677
        if not function_permission:
1✔
2678
            raise ResourceNotFoundException(
1✔
2679
                "No policy is associated with the given resource.", Type="User"
2680
            )
2681

2682
        # try to find statement in policy and delete it
2683
        statement = None
1✔
2684
        for s in function_permission.policy.Statement:
1✔
2685
            if s["Sid"] == statement_id:
1✔
2686
                statement = s
1✔
2687
                break
1✔
2688

2689
        if not statement:
1✔
2690
            raise ResourceNotFoundException(
1✔
2691
                f"Statement {statement_id} is not found in resource policy.", Type="User"
2692
            )
2693
        fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2694
        if revision_id and revision_id != fn_revision_id:
1✔
2695
            raise PreconditionFailedException(
1✔
2696
                "The Revision Id provided does not match the latest Revision Id. "
2697
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2698
                Type="User",
2699
            )
2700
        function_permission.policy.Statement.remove(statement)
1✔
2701

2702
        # Update revision id for alias or version
2703
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2704
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2705
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2706
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
×
2707
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
×
2708
        # Assumes that a non-alias is a version
2709
        else:
2710
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2711
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2712
                resolved_version, config=dataclasses.replace(resolved_version.config)
2713
            )
2714

2715
        # remove the policy as a whole when there's no statement left in it
2716
        if len(function_permission.policy.Statement) == 0:
1✔
2717
            del resolved_fn.permissions[resolved_qualifier]
1✔
2718

2719
    def get_policy(
1✔
2720
        self,
2721
        context: RequestContext,
2722
        function_name: NamespacedFunctionName,
2723
        qualifier: Qualifier = None,
2724
        **kwargs,
2725
    ) -> GetPolicyResponse:
2726
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2727
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2728
            function_name, qualifier, context
2729
        )
2730

2731
        if qualifier is not None:
1✔
2732
            self._validate_qualifier_expression(qualifier)
1✔
2733

2734
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2735

2736
        resolved_qualifier = qualifier or "$LATEST"
1✔
2737
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2738
        if not function_permission:
1✔
2739
            raise ResourceNotFoundException(
1✔
2740
                "The resource you requested does not exist.", Type="User"
2741
            )
2742

2743
        fn_revision_id = None
1✔
2744
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2745
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2746
            fn_revision_id = resolved_alias.revision_id
1✔
2747
        # Assumes that a non-alias is a version
2748
        else:
2749
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2750
            fn_revision_id = resolved_version.config.revision_id
1✔
2751

2752
        return GetPolicyResponse(
1✔
2753
            Policy=json.dumps(dataclasses.asdict(function_permission.policy)),
2754
            RevisionId=fn_revision_id,
2755
        )
2756

2757
    # =======================================
2758
    # ========  Code signing config  ========
2759
    # =======================================
2760

2761
    def create_code_signing_config(
1✔
2762
        self,
2763
        context: RequestContext,
2764
        allowed_publishers: AllowedPublishers,
2765
        description: Description = None,
2766
        code_signing_policies: CodeSigningPolicies = None,
2767
        tags: Tags = None,
2768
        **kwargs,
2769
    ) -> CreateCodeSigningConfigResponse:
2770
        account = context.account_id
1✔
2771
        region = context.region
1✔
2772

2773
        state = lambda_stores[account][region]
1✔
2774
        # TODO: can there be duplicates?
2775
        csc_id = f"csc-{get_random_hex(17)}"  # e.g. 'csc-077c33b4c19e26036'
1✔
2776
        csc_arn = f"arn:{context.partition}:lambda:{region}:{account}:code-signing-config:{csc_id}"
1✔
2777
        csc = CodeSigningConfig(
1✔
2778
            csc_id=csc_id,
2779
            arn=csc_arn,
2780
            allowed_publishers=allowed_publishers,
2781
            policies=code_signing_policies,
2782
            last_modified=api_utils.generate_lambda_date(),
2783
            description=description,
2784
        )
2785
        state.code_signing_configs[csc_arn] = csc
1✔
2786
        return CreateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
2787

2788
    def put_function_code_signing_config(
1✔
2789
        self,
2790
        context: RequestContext,
2791
        code_signing_config_arn: CodeSigningConfigArn,
2792
        function_name: FunctionName,
2793
        **kwargs,
2794
    ) -> PutFunctionCodeSigningConfigResponse:
2795
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2796
        state = lambda_stores[account_id][region]
1✔
2797
        function_name = api_utils.get_function_name(function_name, context)
1✔
2798

2799
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2800
        if not csc:
1✔
2801
            raise CodeSigningConfigNotFoundException(
1✔
2802
                f"The code signing configuration cannot be found. Check that the provided configuration is not deleted: {code_signing_config_arn}.",
2803
                Type="User",
2804
            )
2805

2806
        fn = state.functions.get(function_name)
1✔
2807
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2808
        if not fn:
1✔
2809
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
2810

2811
        fn.code_signing_config_arn = code_signing_config_arn
1✔
2812
        return PutFunctionCodeSigningConfigResponse(
1✔
2813
            CodeSigningConfigArn=code_signing_config_arn, FunctionName=function_name
2814
        )
2815

2816
    def update_code_signing_config(
1✔
2817
        self,
2818
        context: RequestContext,
2819
        code_signing_config_arn: CodeSigningConfigArn,
2820
        description: Description = None,
2821
        allowed_publishers: AllowedPublishers = None,
2822
        code_signing_policies: CodeSigningPolicies = None,
2823
        **kwargs,
2824
    ) -> UpdateCodeSigningConfigResponse:
2825
        state = lambda_stores[context.account_id][context.region]
1✔
2826
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2827
        if not csc:
1✔
2828
            raise ResourceNotFoundException(
1✔
2829
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2830
            )
2831

2832
        changes = {
1✔
2833
            **(
2834
                {"allowed_publishers": allowed_publishers} if allowed_publishers is not None else {}
2835
            ),
2836
            **({"policies": code_signing_policies} if code_signing_policies is not None else {}),
2837
            **({"description": description} if description is not None else {}),
2838
        }
2839
        new_csc = dataclasses.replace(
1✔
2840
            csc, last_modified=api_utils.generate_lambda_date(), **changes
2841
        )
2842
        state.code_signing_configs[code_signing_config_arn] = new_csc
1✔
2843

2844
        return UpdateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(new_csc))
1✔
2845

2846
    def get_code_signing_config(
1✔
2847
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
2848
    ) -> GetCodeSigningConfigResponse:
2849
        state = lambda_stores[context.account_id][context.region]
1✔
2850
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2851
        if not csc:
1✔
2852
            raise ResourceNotFoundException(
1✔
2853
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2854
            )
2855

2856
        return GetCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
2857

2858
    def get_function_code_signing_config(
1✔
2859
        self, context: RequestContext, function_name: FunctionName, **kwargs
2860
    ) -> GetFunctionCodeSigningConfigResponse:
2861
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2862
        state = lambda_stores[account_id][region]
1✔
2863
        function_name = api_utils.get_function_name(function_name, context)
1✔
2864
        fn = state.functions.get(function_name)
1✔
2865
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2866
        if not fn:
1✔
2867
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
2868

2869
        if fn.code_signing_config_arn:
1✔
2870
            return GetFunctionCodeSigningConfigResponse(
1✔
2871
                CodeSigningConfigArn=fn.code_signing_config_arn, FunctionName=function_name
2872
            )
2873

2874
        return GetFunctionCodeSigningConfigResponse()
1✔
2875

2876
    def delete_function_code_signing_config(
1✔
2877
        self, context: RequestContext, function_name: FunctionName, **kwargs
2878
    ) -> None:
2879
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2880
        state = lambda_stores[account_id][region]
1✔
2881
        function_name = api_utils.get_function_name(function_name, context)
1✔
2882
        fn = state.functions.get(function_name)
1✔
2883
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2884
        if not fn:
1✔
2885
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
2886

2887
        fn.code_signing_config_arn = None
1✔
2888

2889
    def delete_code_signing_config(
1✔
2890
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
2891
    ) -> DeleteCodeSigningConfigResponse:
2892
        state = lambda_stores[context.account_id][context.region]
1✔
2893

2894
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2895
        if not csc:
1✔
2896
            raise ResourceNotFoundException(
1✔
2897
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2898
            )
2899

2900
        del state.code_signing_configs[code_signing_config_arn]
1✔
2901

2902
        return DeleteCodeSigningConfigResponse()
1✔
2903

2904
    def list_code_signing_configs(
1✔
2905
        self,
2906
        context: RequestContext,
2907
        marker: String = None,
2908
        max_items: MaxListItems = None,
2909
        **kwargs,
2910
    ) -> ListCodeSigningConfigsResponse:
2911
        state = lambda_stores[context.account_id][context.region]
1✔
2912

2913
        cscs = [api_utils.map_csc(csc) for csc in state.code_signing_configs.values()]
1✔
2914
        cscs = PaginatedList(cscs)
1✔
2915
        page, token = cscs.get_page(
1✔
2916
            lambda csc: csc["CodeSigningConfigId"],
2917
            marker,
2918
            max_items,
2919
        )
2920
        return ListCodeSigningConfigsResponse(CodeSigningConfigs=page, NextMarker=token)
1✔
2921

2922
    def list_functions_by_code_signing_config(
1✔
2923
        self,
2924
        context: RequestContext,
2925
        code_signing_config_arn: CodeSigningConfigArn,
2926
        marker: String = None,
2927
        max_items: MaxListItems = None,
2928
        **kwargs,
2929
    ) -> ListFunctionsByCodeSigningConfigResponse:
2930
        account = context.account_id
1✔
2931
        region = context.region
1✔
2932

2933
        state = lambda_stores[account][region]
1✔
2934

2935
        if code_signing_config_arn not in state.code_signing_configs:
1✔
2936
            raise ResourceNotFoundException(
1✔
2937
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2938
            )
2939

2940
        fn_arns = [
1✔
2941
            api_utils.unqualified_lambda_arn(fn.function_name, account, region)
2942
            for fn in state.functions.values()
2943
            if fn.code_signing_config_arn == code_signing_config_arn
2944
        ]
2945

2946
        cscs = PaginatedList(fn_arns)
1✔
2947
        page, token = cscs.get_page(
1✔
2948
            lambda x: x,
2949
            marker,
2950
            max_items,
2951
        )
2952
        return ListFunctionsByCodeSigningConfigResponse(FunctionArns=page, NextMarker=token)
1✔
2953

2954
    # =======================================
2955
    # =========  Account Settings   =========
2956
    # =======================================
2957

2958
    # CAVE: these settings & usages are *per* region!
2959
    # Lambda quotas: https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
2960
    def get_account_settings(self, context: RequestContext, **kwargs) -> GetAccountSettingsResponse:
1✔
2961
        state = lambda_stores[context.account_id][context.region]
1✔
2962

2963
        fn_count = 0
1✔
2964
        code_size_sum = 0
1✔
2965
        reserved_concurrency_sum = 0
1✔
2966
        for fn in state.functions.values():
1✔
2967
            fn_count += 1
1✔
2968
            for fn_version in fn.versions.values():
1✔
2969
                # Image-based Lambdas do not have a code attribute and count against the ECR quotas instead
2970
                if fn_version.config.package_type == PackageType.Zip:
1✔
2971
                    code_size_sum += fn_version.config.code.code_size
1✔
2972
            if fn.reserved_concurrent_executions is not None:
1✔
2973
                reserved_concurrency_sum += fn.reserved_concurrent_executions
1✔
2974
            for c in fn.provisioned_concurrency_configs.values():
1✔
2975
                reserved_concurrency_sum += c.provisioned_concurrent_executions
1✔
2976
        for layer in state.layers.values():
1✔
2977
            for layer_version in layer.layer_versions.values():
1✔
2978
                code_size_sum += layer_version.code.code_size
1✔
2979
        return GetAccountSettingsResponse(
1✔
2980
            AccountLimit=AccountLimit(
2981
                TotalCodeSize=config.LAMBDA_LIMITS_TOTAL_CODE_SIZE,
2982
                CodeSizeZipped=config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED,
2983
                CodeSizeUnzipped=config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED,
2984
                ConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS,
2985
                UnreservedConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS
2986
                - reserved_concurrency_sum,
2987
            ),
2988
            AccountUsage=AccountUsage(
2989
                TotalCodeSize=code_size_sum,
2990
                FunctionCount=fn_count,
2991
            ),
2992
        )
2993

2994
    # =======================================
2995
    # ==  Provisioned Concurrency Config   ==
2996
    # =======================================
2997

2998
    def _get_provisioned_config(
1✔
2999
        self, context: RequestContext, function_name: str, qualifier: str
3000
    ) -> ProvisionedConcurrencyConfiguration | None:
3001
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3002
        state = lambda_stores[account_id][region]
1✔
3003
        function_name = api_utils.get_function_name(function_name, context)
1✔
3004
        fn = state.functions.get(function_name)
1✔
3005
        if api_utils.qualifier_is_alias(qualifier):
1✔
3006
            fn_alias = None
1✔
3007
            if fn:
1✔
3008
                fn_alias = fn.aliases.get(qualifier)
1✔
3009
            if fn_alias is None:
1✔
3010
                raise ResourceNotFoundException(
1✔
3011
                    f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3012
                    Type="User",
3013
                )
3014
        elif api_utils.qualifier_is_version(qualifier):
1✔
3015
            fn_version = None
1✔
3016
            if fn:
1✔
3017
                fn_version = fn.versions.get(qualifier)
1✔
3018
            if fn_version is None:
1✔
3019
                raise ResourceNotFoundException(
1✔
3020
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3021
                    Type="User",
3022
                )
3023

3024
        return fn.provisioned_concurrency_configs.get(qualifier)
1✔
3025

3026
    def put_provisioned_concurrency_config(
1✔
3027
        self,
3028
        context: RequestContext,
3029
        function_name: FunctionName,
3030
        qualifier: Qualifier,
3031
        provisioned_concurrent_executions: PositiveInteger,
3032
        **kwargs,
3033
    ) -> PutProvisionedConcurrencyConfigResponse:
3034
        if provisioned_concurrent_executions <= 0:
1✔
3035
            raise ValidationException(
1✔
3036
                f"1 validation error detected: Value '{provisioned_concurrent_executions}' at 'provisionedConcurrentExecutions' failed to satisfy constraint: Member must have value greater than or equal to 1"
3037
            )
3038

3039
        if qualifier == "$LATEST":
1✔
3040
            raise InvalidParameterValueException(
1✔
3041
                "Provisioned Concurrency Configs cannot be applied to unpublished function versions.",
3042
                Type="User",
3043
            )
3044
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3045
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3046
            function_name, qualifier, context
3047
        )
3048
        state = lambda_stores[account_id][region]
1✔
3049
        fn = state.functions.get(function_name)
1✔
3050

3051
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3052

3053
        if provisioned_config:  # TODO: merge?
1✔
3054
            # TODO: add a test for partial updates (if possible)
3055
            LOG.warning(
1✔
3056
                "Partial update of provisioned concurrency config is currently not supported."
3057
            )
3058

3059
        other_provisioned_sum = sum(
1✔
3060
            [
3061
                provisioned_configs.provisioned_concurrent_executions
3062
                for provisioned_qualifier, provisioned_configs in fn.provisioned_concurrency_configs.items()
3063
                if provisioned_qualifier != qualifier
3064
            ]
3065
        )
3066

3067
        if (
1✔
3068
            fn.reserved_concurrent_executions is not None
3069
            and fn.reserved_concurrent_executions
3070
            < other_provisioned_sum + provisioned_concurrent_executions
3071
        ):
3072
            raise InvalidParameterValueException(
1✔
3073
                "Requested Provisioned Concurrency should not be greater than the reservedConcurrentExecution for function",
3074
                Type="User",
3075
            )
3076

3077
        if provisioned_concurrent_executions > config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS:
1✔
3078
            raise InvalidParameterValueException(
1✔
3079
                f"Specified ConcurrentExecutions for function is greater than account's unreserved concurrency"
3080
                f" [{config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS}]."
3081
            )
3082

3083
        settings = self.get_account_settings(context)
1✔
3084
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
3085
            "UnreservedConcurrentExecutions"
3086
        ]
3087
        if (
1✔
3088
            unreserved_concurrent_executions - provisioned_concurrent_executions
3089
            < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY
3090
        ):
3091
            raise InvalidParameterValueException(
1✔
3092
                f"Specified ConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below"
3093
                f" its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
3094
            )
3095

3096
        provisioned_config = ProvisionedConcurrencyConfiguration(
1✔
3097
            provisioned_concurrent_executions, api_utils.generate_lambda_date()
3098
        )
3099
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3100

3101
        if api_utils.qualifier_is_alias(qualifier):
1✔
3102
            alias = fn.aliases.get(qualifier)
1✔
3103
            resolved_version = fn.versions.get(alias.function_version)
1✔
3104

3105
            if (
1✔
3106
                resolved_version
3107
                and fn.provisioned_concurrency_configs.get(alias.function_version) is not None
3108
            ):
3109
                raise ResourceConflictException(
1✔
3110
                    "Alias can't be used for Provisioned Concurrency configuration on an already Provisioned version",
3111
                    Type="User",
3112
                )
3113
            fn_arn = resolved_version.id.qualified_arn()
1✔
3114
        elif api_utils.qualifier_is_version(qualifier):
1✔
3115
            fn_version = fn.versions.get(qualifier)
1✔
3116

3117
            # TODO: might be useful other places, utilize
3118
            pointing_aliases = []
1✔
3119
            for alias in fn.aliases.values():
1✔
3120
                if (
1✔
3121
                    alias.function_version == qualifier
3122
                    and fn.provisioned_concurrency_configs.get(alias.name) is not None
3123
                ):
3124
                    pointing_aliases.append(alias.name)
1✔
3125
            if pointing_aliases:
1✔
3126
                raise ResourceConflictException(
1✔
3127
                    "Version is pointed by a Provisioned Concurrency alias", Type="User"
3128
                )
3129

3130
            fn_arn = fn_version.id.qualified_arn()
1✔
3131

3132
        manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3133

3134
        fn.provisioned_concurrency_configs[qualifier] = provisioned_config
1✔
3135

3136
        manager.update_provisioned_concurrency_config(
1✔
3137
            provisioned_config.provisioned_concurrent_executions
3138
        )
3139

3140
        return PutProvisionedConcurrencyConfigResponse(
1✔
3141
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3142
            AvailableProvisionedConcurrentExecutions=0,
3143
            AllocatedProvisionedConcurrentExecutions=0,
3144
            Status=ProvisionedConcurrencyStatusEnum.IN_PROGRESS,
3145
            # StatusReason=manager.provisioned_state.status_reason,
3146
            LastModified=provisioned_config.last_modified,  # TODO: does change with configuration or also with state changes?
3147
        )
3148

3149
    def get_provisioned_concurrency_config(
1✔
3150
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3151
    ) -> GetProvisionedConcurrencyConfigResponse:
3152
        if qualifier == "$LATEST":
1✔
3153
            raise InvalidParameterValueException(
1✔
3154
                "The function resource provided must be an alias or a published version.",
3155
                Type="User",
3156
            )
3157
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3158
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3159
            function_name, qualifier, context
3160
        )
3161

3162
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3163
        if not provisioned_config:
1✔
3164
            raise ProvisionedConcurrencyConfigNotFoundException(
1✔
3165
                "No Provisioned Concurrency Config found for this function", Type="User"
3166
            )
3167

3168
        # TODO: make this compatible with alias pointer migration on update
3169
        if api_utils.qualifier_is_alias(qualifier):
1✔
3170
            state = lambda_stores[account_id][region]
1✔
3171
            fn = state.functions.get(function_name)
1✔
3172
            alias = fn.aliases.get(qualifier)
1✔
3173
            fn_arn = api_utils.qualified_lambda_arn(
1✔
3174
                function_name, alias.function_version, account_id, region
3175
            )
3176
        else:
3177
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3178

3179
        ver_manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3180

3181
        return GetProvisionedConcurrencyConfigResponse(
1✔
3182
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3183
            LastModified=provisioned_config.last_modified,
3184
            AvailableProvisionedConcurrentExecutions=ver_manager.provisioned_state.available,
3185
            AllocatedProvisionedConcurrentExecutions=ver_manager.provisioned_state.allocated,
3186
            Status=ver_manager.provisioned_state.status,
3187
            StatusReason=ver_manager.provisioned_state.status_reason,
3188
        )
3189

3190
    def list_provisioned_concurrency_configs(
1✔
3191
        self,
3192
        context: RequestContext,
3193
        function_name: FunctionName,
3194
        marker: String = None,
3195
        max_items: MaxProvisionedConcurrencyConfigListItems = None,
3196
        **kwargs,
3197
    ) -> ListProvisionedConcurrencyConfigsResponse:
3198
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3199
        state = lambda_stores[account_id][region]
1✔
3200

3201
        function_name = api_utils.get_function_name(function_name, context)
1✔
3202
        fn = state.functions.get(function_name)
1✔
3203
        if fn is None:
1✔
3204
            raise ResourceNotFoundException(
1✔
3205
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
3206
                Type="User",
3207
            )
3208

3209
        configs = []
1✔
3210
        for qualifier, pc_config in fn.provisioned_concurrency_configs.items():
1✔
3211
            if api_utils.qualifier_is_alias(qualifier):
×
3212
                alias = fn.aliases.get(qualifier)
×
3213
                fn_arn = api_utils.qualified_lambda_arn(
×
3214
                    function_name, alias.function_version, account_id, region
3215
                )
3216
            else:
3217
                fn_arn = api_utils.qualified_lambda_arn(
×
3218
                    function_name, qualifier, account_id, region
3219
                )
3220

3221
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
3222

3223
            configs.append(
×
3224
                ProvisionedConcurrencyConfigListItem(
3225
                    FunctionArn=api_utils.qualified_lambda_arn(
3226
                        function_name, qualifier, account_id, region
3227
                    ),
3228
                    RequestedProvisionedConcurrentExecutions=pc_config.provisioned_concurrent_executions,
3229
                    AvailableProvisionedConcurrentExecutions=manager.provisioned_state.available,
3230
                    AllocatedProvisionedConcurrentExecutions=manager.provisioned_state.allocated,
3231
                    Status=manager.provisioned_state.status,
3232
                    StatusReason=manager.provisioned_state.status_reason,
3233
                    LastModified=pc_config.last_modified,
3234
                )
3235
            )
3236

3237
        provisioned_concurrency_configs = configs
1✔
3238
        provisioned_concurrency_configs = PaginatedList(provisioned_concurrency_configs)
1✔
3239
        page, token = provisioned_concurrency_configs.get_page(
1✔
3240
            lambda x: x,
3241
            marker,
3242
            max_items,
3243
        )
3244
        return ListProvisionedConcurrencyConfigsResponse(
1✔
3245
            ProvisionedConcurrencyConfigs=page, NextMarker=token
3246
        )
3247

3248
    def delete_provisioned_concurrency_config(
1✔
3249
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3250
    ) -> None:
3251
        if qualifier == "$LATEST":
1✔
3252
            raise InvalidParameterValueException(
1✔
3253
                "The function resource provided must be an alias or a published version.",
3254
                Type="User",
3255
            )
3256
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3257
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3258
            function_name, qualifier, context
3259
        )
3260
        state = lambda_stores[account_id][region]
1✔
3261
        fn = state.functions.get(function_name)
1✔
3262

3263
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3264
        # delete is idempotent and doesn't actually care about the provisioned concurrency config not existing
3265
        if provisioned_config:
1✔
3266
            fn.provisioned_concurrency_configs.pop(qualifier)
1✔
3267
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3268
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3269
            manager.update_provisioned_concurrency_config(0)
1✔
3270

3271
    # =======================================
3272
    # =======  Event Invoke Config   ========
3273
    # =======================================
3274

3275
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)"
3276
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]2((-gov)|(-iso(b?)))?-[a-z]+-\\d1)?:(\\d12)?:(.*)" ... (expected → actual)
3277

3278
    def _validate_destination_config(
1✔
3279
        self, store: LambdaStore, function_name: str, destination_config: DestinationConfig
3280
    ):
3281
        def _validate_destination_arn(destination_arn) -> bool:
1✔
3282
            if not api_utils.DESTINATION_ARN_PATTERN.match(destination_arn):
1✔
3283
                # technically we shouldn't handle this in the provider
3284
                raise ValidationException(
1✔
3285
                    "1 validation error detected: Value '"
3286
                    + destination_arn
3287
                    + r"' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\-])+:([a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\d{1})?:(\d{12})?:(.*)"
3288
                )
3289

3290
            match destination_arn.split(":")[2]:
1✔
3291
                case "lambda":
1✔
3292
                    fn_parts = api_utils.FULL_FN_ARN_PATTERN.search(destination_arn).groupdict()
1✔
3293
                    if fn_parts:
1✔
3294
                        # check if it exists
3295
                        fn = store.functions.get(fn_parts["function_name"])
1✔
3296
                        if not fn:
1✔
3297
                            raise InvalidParameterValueException(
1✔
3298
                                f"The destination ARN {destination_arn} is invalid.", Type="User"
3299
                            )
3300
                        if fn_parts["function_name"] == function_name:
1✔
3301
                            raise InvalidParameterValueException(
1✔
3302
                                "You can't specify the function as a destination for itself.",
3303
                                Type="User",
3304
                            )
3305
                case "sns" | "sqs" | "events":
1✔
3306
                    pass
1✔
3307
                case _:
1✔
3308
                    return False
1✔
3309
            return True
1✔
3310

3311
        validation_err = False
1✔
3312

3313
        failure_destination = destination_config.get("OnFailure", {}).get("Destination")
1✔
3314
        if failure_destination:
1✔
3315
            validation_err = validation_err or not _validate_destination_arn(failure_destination)
1✔
3316

3317
        success_destination = destination_config.get("OnSuccess", {}).get("Destination")
1✔
3318
        if success_destination:
1✔
3319
            validation_err = validation_err or not _validate_destination_arn(success_destination)
1✔
3320

3321
        if validation_err:
1✔
3322
            on_success_part = (
1✔
3323
                f"OnSuccess(destination={success_destination})" if success_destination else "null"
3324
            )
3325
            on_failure_part = (
1✔
3326
                f"OnFailure(destination={failure_destination})" if failure_destination else "null"
3327
            )
3328
            raise InvalidParameterValueException(
1✔
3329
                f"The provided destination config DestinationConfig(onSuccess={on_success_part}, onFailure={on_failure_part}) is invalid.",
3330
                Type="User",
3331
            )
3332

3333
    def put_function_event_invoke_config(
1✔
3334
        self,
3335
        context: RequestContext,
3336
        function_name: FunctionName,
3337
        qualifier: Qualifier = None,
3338
        maximum_retry_attempts: MaximumRetryAttempts = None,
3339
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3340
        destination_config: DestinationConfig = None,
3341
        **kwargs,
3342
    ) -> FunctionEventInvokeConfig:
3343
        """
3344
        Destination ARNs can be:
3345
        * SQS arn
3346
        * SNS arn
3347
        * Lambda arn
3348
        * EventBridge arn
3349

3350
        Differences between put_ and update_:
3351
            * put overwrites any existing config
3352
            * update allows changes only single values while keeping the rest of existing ones
3353
            * update fails on non-existing configs
3354

3355
        Differences between destination and DLQ
3356
            * "However, a dead-letter queue is part of a function's version-specific configuration, so it is locked in when you publish a version."
3357
            *  "On-failure destinations also support additional targets and include details about the function's response in the invocation record."
3358

3359
        """
3360
        if (
1✔
3361
            maximum_event_age_in_seconds is None
3362
            and maximum_retry_attempts is None
3363
            and destination_config is None
3364
        ):
3365
            raise InvalidParameterValueException(
1✔
3366
                "You must specify at least one of error handling or destination setting.",
3367
                Type="User",
3368
            )
3369
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3370
        state = lambda_stores[account_id][region]
1✔
3371
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3372
            function_name, qualifier, context
3373
        )
3374
        fn = state.functions.get(function_name)
1✔
3375
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3376
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3377

3378
        qualifier = qualifier or "$LATEST"
1✔
3379

3380
        # validate and normalize destination config
3381
        if destination_config:
1✔
3382
            self._validate_destination_config(state, function_name, destination_config)
1✔
3383

3384
        destination_config = DestinationConfig(
1✔
3385
            OnSuccess=OnSuccess(
3386
                Destination=(destination_config or {}).get("OnSuccess", {}).get("Destination")
3387
            ),
3388
            OnFailure=OnFailure(
3389
                Destination=(destination_config or {}).get("OnFailure", {}).get("Destination")
3390
            ),
3391
        )
3392

3393
        config = EventInvokeConfig(
1✔
3394
            function_name=function_name,
3395
            qualifier=qualifier,
3396
            maximum_event_age_in_seconds=maximum_event_age_in_seconds,
3397
            maximum_retry_attempts=maximum_retry_attempts,
3398
            last_modified=api_utils.generate_lambda_date(),
3399
            destination_config=destination_config,
3400
        )
3401
        fn.event_invoke_configs[qualifier] = config
1✔
3402

3403
        return FunctionEventInvokeConfig(
1✔
3404
            LastModified=datetime.datetime.strptime(
3405
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3406
            ),
3407
            FunctionArn=api_utils.qualified_lambda_arn(
3408
                function_name, qualifier or "$LATEST", account_id, region
3409
            ),
3410
            DestinationConfig=destination_config,
3411
            MaximumEventAgeInSeconds=maximum_event_age_in_seconds,
3412
            MaximumRetryAttempts=maximum_retry_attempts,
3413
        )
3414

3415
    def get_function_event_invoke_config(
1✔
3416
        self,
3417
        context: RequestContext,
3418
        function_name: FunctionName,
3419
        qualifier: Qualifier = None,
3420
        **kwargs,
3421
    ) -> FunctionEventInvokeConfig:
3422
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3423
        state = lambda_stores[account_id][region]
1✔
3424
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3425
            function_name, qualifier, context
3426
        )
3427

3428
        qualifier = qualifier or "$LATEST"
1✔
3429
        fn = state.functions.get(function_name)
1✔
3430
        if not fn:
1✔
3431
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3432
            raise ResourceNotFoundException(
1✔
3433
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3434
            )
3435

3436
        config = fn.event_invoke_configs.get(qualifier)
1✔
3437
        if not config:
1✔
3438
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3439
            raise ResourceNotFoundException(
1✔
3440
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3441
            )
3442

3443
        return FunctionEventInvokeConfig(
1✔
3444
            LastModified=datetime.datetime.strptime(
3445
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3446
            ),
3447
            FunctionArn=api_utils.qualified_lambda_arn(
3448
                function_name, qualifier, account_id, region
3449
            ),
3450
            DestinationConfig=config.destination_config,
3451
            MaximumEventAgeInSeconds=config.maximum_event_age_in_seconds,
3452
            MaximumRetryAttempts=config.maximum_retry_attempts,
3453
        )
3454

3455
    def list_function_event_invoke_configs(
1✔
3456
        self,
3457
        context: RequestContext,
3458
        function_name: FunctionName,
3459
        marker: String = None,
3460
        max_items: MaxFunctionEventInvokeConfigListItems = None,
3461
        **kwargs,
3462
    ) -> ListFunctionEventInvokeConfigsResponse:
3463
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3464
        state = lambda_stores[account_id][region]
1✔
3465
        fn = state.functions.get(function_name)
1✔
3466
        if not fn:
1✔
3467
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3468

3469
        event_invoke_configs = [
1✔
3470
            FunctionEventInvokeConfig(
3471
                LastModified=c.last_modified,
3472
                FunctionArn=api_utils.qualified_lambda_arn(
3473
                    function_name, c.qualifier, account_id, region
3474
                ),
3475
                MaximumEventAgeInSeconds=c.maximum_event_age_in_seconds,
3476
                MaximumRetryAttempts=c.maximum_retry_attempts,
3477
                DestinationConfig=c.destination_config,
3478
            )
3479
            for c in fn.event_invoke_configs.values()
3480
        ]
3481

3482
        event_invoke_configs = PaginatedList(event_invoke_configs)
1✔
3483
        page, token = event_invoke_configs.get_page(
1✔
3484
            lambda x: x["FunctionArn"],
3485
            marker,
3486
            max_items,
3487
        )
3488
        return ListFunctionEventInvokeConfigsResponse(
1✔
3489
            FunctionEventInvokeConfigs=page, NextMarker=token
3490
        )
3491

3492
    def delete_function_event_invoke_config(
1✔
3493
        self,
3494
        context: RequestContext,
3495
        function_name: FunctionName,
3496
        qualifier: Qualifier = None,
3497
        **kwargs,
3498
    ) -> None:
3499
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3500
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3501
            function_name, qualifier, context
3502
        )
3503
        state = lambda_stores[account_id][region]
1✔
3504
        fn = state.functions.get(function_name)
1✔
3505
        resolved_qualifier = qualifier or "$LATEST"
1✔
3506
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3507
        if not fn:
1✔
3508
            raise ResourceNotFoundException(
1✔
3509
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3510
            )
3511

3512
        config = fn.event_invoke_configs.get(resolved_qualifier)
1✔
3513
        if not config:
1✔
3514
            raise ResourceNotFoundException(
1✔
3515
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3516
            )
3517

3518
        del fn.event_invoke_configs[resolved_qualifier]
1✔
3519

3520
    def update_function_event_invoke_config(
1✔
3521
        self,
3522
        context: RequestContext,
3523
        function_name: FunctionName,
3524
        qualifier: Qualifier = None,
3525
        maximum_retry_attempts: MaximumRetryAttempts = None,
3526
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3527
        destination_config: DestinationConfig = None,
3528
        **kwargs,
3529
    ) -> FunctionEventInvokeConfig:
3530
        # like put but only update single fields via replace
3531
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3532
        state = lambda_stores[account_id][region]
1✔
3533
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3534
            function_name, qualifier, context
3535
        )
3536

3537
        if (
1✔
3538
            maximum_event_age_in_seconds is None
3539
            and maximum_retry_attempts is None
3540
            and destination_config is None
3541
        ):
3542
            raise InvalidParameterValueException(
×
3543
                "You must specify at least one of error handling or destination setting.",
3544
                Type="User",
3545
            )
3546

3547
        fn = state.functions.get(function_name)
1✔
3548
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3549
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3550

3551
        qualifier = qualifier or "$LATEST"
1✔
3552

3553
        config = fn.event_invoke_configs.get(qualifier)
1✔
3554
        if not config:
1✔
3555
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3556
            raise ResourceNotFoundException(
1✔
3557
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3558
            )
3559

3560
        if destination_config:
1✔
3561
            self._validate_destination_config(state, function_name, destination_config)
×
3562

3563
        optional_kwargs = {
1✔
3564
            k: v
3565
            for k, v in {
3566
                "destination_config": destination_config,
3567
                "maximum_retry_attempts": maximum_retry_attempts,
3568
                "maximum_event_age_in_seconds": maximum_event_age_in_seconds,
3569
            }.items()
3570
            if v is not None
3571
        }
3572

3573
        new_config = dataclasses.replace(
1✔
3574
            config, last_modified=api_utils.generate_lambda_date(), **optional_kwargs
3575
        )
3576
        fn.event_invoke_configs[qualifier] = new_config
1✔
3577

3578
        return FunctionEventInvokeConfig(
1✔
3579
            LastModified=datetime.datetime.strptime(
3580
                new_config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3581
            ),
3582
            FunctionArn=api_utils.qualified_lambda_arn(
3583
                function_name, qualifier or "$LATEST", account_id, region
3584
            ),
3585
            DestinationConfig=new_config.destination_config,
3586
            MaximumEventAgeInSeconds=new_config.maximum_event_age_in_seconds,
3587
            MaximumRetryAttempts=new_config.maximum_retry_attempts,
3588
        )
3589

3590
    # =======================================
3591
    # ======  Layer & Layer Versions  =======
3592
    # =======================================
3593

3594
    @staticmethod
1✔
3595
    def _resolve_layer(
1✔
3596
        layer_name_or_arn: str, context: RequestContext
3597
    ) -> tuple[str, str, str, str | None]:
3598
        """
3599
        Return locator attributes for a given Lambda layer.
3600

3601
        :param layer_name_or_arn: Layer name or ARN
3602
        :param context: Request context
3603
        :return: Tuple of region, account ID, layer name, layer version
3604
        """
3605
        if api_utils.is_layer_arn(layer_name_or_arn):
1✔
3606
            return api_utils.parse_layer_arn(layer_name_or_arn)
1✔
3607

3608
        return context.region, context.account_id, layer_name_or_arn, None
1✔
3609

3610
    def publish_layer_version(
1✔
3611
        self,
3612
        context: RequestContext,
3613
        layer_name: LayerName,
3614
        content: LayerVersionContentInput,
3615
        description: Description = None,
3616
        compatible_runtimes: CompatibleRuntimes = None,
3617
        license_info: LicenseInfo = None,
3618
        compatible_architectures: CompatibleArchitectures = None,
3619
        **kwargs,
3620
    ) -> PublishLayerVersionResponse:
3621
        """
3622
        On first use of a LayerName a new layer is created and for each subsequent call with the same LayerName a new version is created.
3623
        Note that there are no $LATEST versions with layers!
3624

3625
        """
3626
        account = context.account_id
1✔
3627
        region = context.region
1✔
3628

3629
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
3630
            compatible_runtimes, compatible_architectures
3631
        )
3632
        if validation_errors:
1✔
3633
            raise ValidationException(
1✔
3634
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {'; '.join(validation_errors)}"
3635
            )
3636

3637
        state = lambda_stores[account][region]
1✔
3638
        with self.create_layer_lock:
1✔
3639
            if layer_name not in state.layers:
1✔
3640
                # we don't have a version so create new layer object
3641
                # lock is required to avoid creating two v1 objects for the same name
3642
                layer = Layer(
1✔
3643
                    arn=api_utils.layer_arn(layer_name=layer_name, account=account, region=region)
3644
                )
3645
                state.layers[layer_name] = layer
1✔
3646

3647
        layer = state.layers[layer_name]
1✔
3648
        with layer.next_version_lock:
1✔
3649
            next_version = LambdaLayerVersionIdentifier(
1✔
3650
                account_id=account, region=region, layer_name=layer_name
3651
            ).generate(next_version=layer.next_version)
3652
            # When creating a layer with user defined layer version, it is possible that we
3653
            # create layer versions out of order.
3654
            # ie. a user could replicate layer v2 then layer v1. It is important to always keep the maximum possible
3655
            # value for next layer to avoid overwriting existing versions
3656
            if layer.next_version <= next_version:
1✔
3657
                # We don't need to update layer.next_version if the created version is lower than the "next in line"
3658
                layer.next_version = max(next_version, layer.next_version) + 1
1✔
3659

3660
        # creating a new layer
3661
        if content.get("ZipFile"):
1✔
3662
            code = store_lambda_archive(
1✔
3663
                archive_file=content["ZipFile"],
3664
                function_name=layer_name,
3665
                region_name=region,
3666
                account_id=account,
3667
            )
3668
        else:
3669
            code = store_s3_bucket_archive(
1✔
3670
                archive_bucket=content["S3Bucket"],
3671
                archive_key=content["S3Key"],
3672
                archive_version=content.get("S3ObjectVersion"),
3673
                function_name=layer_name,
3674
                region_name=region,
3675
                account_id=account,
3676
            )
3677

3678
        new_layer_version = LayerVersion(
1✔
3679
            layer_version_arn=api_utils.layer_version_arn(
3680
                layer_name=layer_name,
3681
                account=account,
3682
                region=region,
3683
                version=str(next_version),
3684
            ),
3685
            layer_arn=layer.arn,
3686
            version=next_version,
3687
            description=description or "",
3688
            license_info=license_info,
3689
            compatible_runtimes=compatible_runtimes,
3690
            compatible_architectures=compatible_architectures,
3691
            created=api_utils.generate_lambda_date(),
3692
            code=code,
3693
        )
3694

3695
        layer.layer_versions[str(next_version)] = new_layer_version
1✔
3696

3697
        return api_utils.map_layer_out(new_layer_version)
1✔
3698

3699
    def get_layer_version(
1✔
3700
        self,
3701
        context: RequestContext,
3702
        layer_name: LayerName,
3703
        version_number: LayerVersionNumber,
3704
        **kwargs,
3705
    ) -> GetLayerVersionResponse:
3706
        # TODO: handle layer_name as an ARN
3707

3708
        region_name, account_id, layer_name, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
3709
        state = lambda_stores[account_id][region_name]
1✔
3710

3711
        layer = state.layers.get(layer_name)
1✔
3712
        if version_number < 1:
1✔
3713
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
3714
        if layer is None:
1✔
3715
            raise ResourceNotFoundException(
1✔
3716
                "The resource you requested does not exist.", Type="User"
3717
            )
3718
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3719
        if layer_version is None:
1✔
3720
            raise ResourceNotFoundException(
1✔
3721
                "The resource you requested does not exist.", Type="User"
3722
            )
3723
        return api_utils.map_layer_out(layer_version)
1✔
3724

3725
    def get_layer_version_by_arn(
1✔
3726
        self, context: RequestContext, arn: LayerVersionArn, **kwargs
3727
    ) -> GetLayerVersionResponse:
3728
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3729
            arn, context
3730
        )
3731

3732
        if not layer_version:
1✔
3733
            raise ValidationException(
1✔
3734
                f"1 validation error detected: Value '{arn}' at 'arn' failed to satisfy constraint: Member must satisfy regular expression pattern: "
3735
                + "(arn:(aws[a-zA-Z-]*)?:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+)"
3736
            )
3737

3738
        store = lambda_stores[account_id][region_name]
1✔
3739
        if not (layers := store.layers.get(layer_name)):
1✔
3740
            raise ResourceNotFoundException(
×
3741
                "The resource you requested does not exist.", Type="User"
3742
            )
3743

3744
        layer_version = layers.layer_versions.get(layer_version)
1✔
3745

3746
        if not layer_version:
1✔
3747
            raise ResourceNotFoundException(
1✔
3748
                "The resource you requested does not exist.", Type="User"
3749
            )
3750

3751
        return api_utils.map_layer_out(layer_version)
1✔
3752

3753
    def list_layers(
1✔
3754
        self,
3755
        context: RequestContext,
3756
        compatible_runtime: Runtime = None,
3757
        marker: String = None,
3758
        max_items: MaxLayerListItems = None,
3759
        compatible_architecture: Architecture = None,
3760
        **kwargs,
3761
    ) -> ListLayersResponse:
3762
        validation_errors = []
1✔
3763

3764
        validation_error_arch = api_utils.validate_layer_architecture(compatible_architecture)
1✔
3765
        if validation_error_arch:
1✔
3766
            validation_errors.append(validation_error_arch)
1✔
3767

3768
        validation_error_runtime = api_utils.validate_layer_runtime(compatible_runtime)
1✔
3769
        if validation_error_runtime:
1✔
3770
            validation_errors.append(validation_error_runtime)
1✔
3771

3772
        if validation_errors:
1✔
3773
            raise ValidationException(
1✔
3774
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
3775
            )
3776
        # TODO: handle filter: compatible_runtime
3777
        # TODO: handle filter: compatible_architecture
3778

3779
        state = lambda_stores[context.account_id][context.region]
×
3780
        layers = state.layers
×
3781

3782
        # TODO: test how filters behave together with only returning layers here? Does it return the latest "matching" layer, i.e. does it ignore later layer versions that don't match?
3783

3784
        responses: list[LayersListItem] = []
×
3785
        for layer_name, layer in layers.items():
×
3786
            # fetch latest version
3787
            layer_versions = list(layer.layer_versions.values())
×
3788
            sorted(layer_versions, key=lambda x: x.version)
×
3789
            latest_layer_version = layer_versions[-1]
×
3790
            responses.append(
×
3791
                LayersListItem(
3792
                    LayerName=layer_name,
3793
                    LayerArn=layer.arn,
3794
                    LatestMatchingVersion=api_utils.map_layer_out(latest_layer_version),
3795
                )
3796
            )
3797

3798
        responses = PaginatedList(responses)
×
3799
        page, token = responses.get_page(
×
3800
            lambda version: version,
3801
            marker,
3802
            max_items,
3803
        )
3804

3805
        return ListLayersResponse(NextMarker=token, Layers=page)
×
3806

3807
    def list_layer_versions(
1✔
3808
        self,
3809
        context: RequestContext,
3810
        layer_name: LayerName,
3811
        compatible_runtime: Runtime = None,
3812
        marker: String = None,
3813
        max_items: MaxLayerListItems = None,
3814
        compatible_architecture: Architecture = None,
3815
        **kwargs,
3816
    ) -> ListLayerVersionsResponse:
3817
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
3818
            [compatible_runtime] if compatible_runtime else [],
3819
            [compatible_architecture] if compatible_architecture else [],
3820
        )
3821
        if validation_errors:
1✔
3822
            raise ValidationException(
×
3823
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
3824
            )
3825

3826
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3827
            layer_name, context
3828
        )
3829
        state = lambda_stores[account_id][region_name]
1✔
3830

3831
        # TODO: Test & handle filter: compatible_runtime
3832
        # TODO: Test & handle filter: compatible_architecture
3833
        all_layer_versions = []
1✔
3834
        layer = state.layers.get(layer_name)
1✔
3835
        if layer is not None:
1✔
3836
            for layer_version in layer.layer_versions.values():
1✔
3837
                all_layer_versions.append(api_utils.map_layer_out(layer_version))
1✔
3838

3839
        all_layer_versions.sort(key=lambda x: x["Version"], reverse=True)
1✔
3840
        all_layer_versions = PaginatedList(all_layer_versions)
1✔
3841
        page, token = all_layer_versions.get_page(
1✔
3842
            lambda version: version["LayerVersionArn"],
3843
            marker,
3844
            max_items,
3845
        )
3846
        return ListLayerVersionsResponse(NextMarker=token, LayerVersions=page)
1✔
3847

3848
    def delete_layer_version(
1✔
3849
        self,
3850
        context: RequestContext,
3851
        layer_name: LayerName,
3852
        version_number: LayerVersionNumber,
3853
        **kwargs,
3854
    ) -> None:
3855
        if version_number < 1:
1✔
3856
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
3857

3858
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3859
            layer_name, context
3860
        )
3861

3862
        store = lambda_stores[account_id][region_name]
1✔
3863
        layer = store.layers.get(layer_name, {})
1✔
3864
        if layer:
1✔
3865
            layer.layer_versions.pop(str(version_number), None)
1✔
3866

3867
    # =======================================
3868
    # =====  Layer Version Permissions  =====
3869
    # =======================================
3870
    # TODO: lock updates that change revision IDs
3871

3872
    def add_layer_version_permission(
1✔
3873
        self,
3874
        context: RequestContext,
3875
        layer_name: LayerName,
3876
        version_number: LayerVersionNumber,
3877
        statement_id: StatementId,
3878
        action: LayerPermissionAllowedAction,
3879
        principal: LayerPermissionAllowedPrincipal,
3880
        organization_id: OrganizationId = None,
3881
        revision_id: String = None,
3882
        **kwargs,
3883
    ) -> AddLayerVersionPermissionResponse:
3884
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
3885
        # `layer_n` contains the layer name.
3886
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
3887

3888
        if action != "lambda:GetLayerVersion":
1✔
3889
            raise ValidationException(
1✔
3890
                f"1 validation error detected: Value '{action}' at 'action' failed to satisfy constraint: Member must satisfy regular expression pattern: lambda:GetLayerVersion"
3891
            )
3892

3893
        store = lambda_stores[account_id][region_name]
1✔
3894
        layer = store.layers.get(layer_n)
1✔
3895

3896
        layer_version_arn = api_utils.layer_version_arn(
1✔
3897
            layer_name, account_id, region_name, str(version_number)
3898
        )
3899

3900
        if layer is None:
1✔
3901
            raise ResourceNotFoundException(
1✔
3902
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3903
            )
3904
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3905
        if layer_version is None:
1✔
3906
            raise ResourceNotFoundException(
1✔
3907
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3908
            )
3909
        # do we have a policy? if not set one
3910
        if layer_version.policy is None:
1✔
3911
            layer_version.policy = LayerPolicy()
1✔
3912

3913
        if statement_id in layer_version.policy.statements:
1✔
3914
            raise ResourceConflictException(
1✔
3915
                f"The statement id ({statement_id}) provided already exists. Please provide a new statement id, or remove the existing statement.",
3916
                Type="User",
3917
            )
3918

3919
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
3920
            raise PreconditionFailedException(
1✔
3921
                "The Revision Id provided does not match the latest Revision Id. "
3922
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
3923
                Type="User",
3924
            )
3925

3926
        statement = LayerPolicyStatement(
1✔
3927
            sid=statement_id, action=action, principal=principal, organization_id=organization_id
3928
        )
3929

3930
        old_statements = layer_version.policy.statements
1✔
3931
        layer_version.policy = dataclasses.replace(
1✔
3932
            layer_version.policy, statements={**old_statements, statement_id: statement}
3933
        )
3934

3935
        return AddLayerVersionPermissionResponse(
1✔
3936
            Statement=json.dumps(
3937
                {
3938
                    "Sid": statement.sid,
3939
                    "Effect": "Allow",
3940
                    "Principal": statement.principal,
3941
                    "Action": statement.action,
3942
                    "Resource": layer_version.layer_version_arn,
3943
                }
3944
            ),
3945
            RevisionId=layer_version.policy.revision_id,
3946
        )
3947

3948
    def remove_layer_version_permission(
1✔
3949
        self,
3950
        context: RequestContext,
3951
        layer_name: LayerName,
3952
        version_number: LayerVersionNumber,
3953
        statement_id: StatementId,
3954
        revision_id: String = None,
3955
        **kwargs,
3956
    ) -> None:
3957
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
3958
        # `layer_n` contains the layer name.
3959
        region_name, account_id, layer_n, layer_version = LambdaProvider._resolve_layer(
1✔
3960
            layer_name, context
3961
        )
3962

3963
        layer_version_arn = api_utils.layer_version_arn(
1✔
3964
            layer_name, account_id, region_name, str(version_number)
3965
        )
3966

3967
        state = lambda_stores[account_id][region_name]
1✔
3968
        layer = state.layers.get(layer_n)
1✔
3969
        if layer is None:
1✔
3970
            raise ResourceNotFoundException(
1✔
3971
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3972
            )
3973
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3974
        if layer_version is None:
1✔
3975
            raise ResourceNotFoundException(
1✔
3976
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3977
            )
3978

3979
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
3980
            raise PreconditionFailedException(
1✔
3981
                "The Revision Id provided does not match the latest Revision Id. "
3982
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
3983
                Type="User",
3984
            )
3985

3986
        if statement_id not in layer_version.policy.statements:
1✔
3987
            raise ResourceNotFoundException(
1✔
3988
                f"Statement {statement_id} is not found in resource policy.", Type="User"
3989
            )
3990

3991
        old_statements = layer_version.policy.statements
1✔
3992
        layer_version.policy = dataclasses.replace(
1✔
3993
            layer_version.policy,
3994
            statements={k: v for k, v in old_statements.items() if k != statement_id},
3995
        )
3996

3997
    def get_layer_version_policy(
1✔
3998
        self,
3999
        context: RequestContext,
4000
        layer_name: LayerName,
4001
        version_number: LayerVersionNumber,
4002
        **kwargs,
4003
    ) -> GetLayerVersionPolicyResponse:
4004
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4005
        # `layer_n` contains the layer name.
4006
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
4007

4008
        layer_version_arn = api_utils.layer_version_arn(
1✔
4009
            layer_name, account_id, region_name, str(version_number)
4010
        )
4011

4012
        store = lambda_stores[account_id][region_name]
1✔
4013
        layer = store.layers.get(layer_n)
1✔
4014

4015
        if layer is None:
1✔
4016
            raise ResourceNotFoundException(
1✔
4017
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4018
            )
4019

4020
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4021
        if layer_version is None:
1✔
4022
            raise ResourceNotFoundException(
1✔
4023
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4024
            )
4025

4026
        if layer_version.policy is None:
1✔
4027
            raise ResourceNotFoundException(
1✔
4028
                "No policy is associated with the given resource.", Type="User"
4029
            )
4030

4031
        return GetLayerVersionPolicyResponse(
1✔
4032
            Policy=json.dumps(
4033
                {
4034
                    "Version": layer_version.policy.version,
4035
                    "Id": layer_version.policy.id,
4036
                    "Statement": [
4037
                        {
4038
                            "Sid": ps.sid,
4039
                            "Effect": "Allow",
4040
                            "Principal": ps.principal,
4041
                            "Action": ps.action,
4042
                            "Resource": layer_version.layer_version_arn,
4043
                        }
4044
                        for ps in layer_version.policy.statements.values()
4045
                    ],
4046
                }
4047
            ),
4048
            RevisionId=layer_version.policy.revision_id,
4049
        )
4050

4051
    # =======================================
4052
    # =======  Function Concurrency  ========
4053
    # =======================================
4054
    # (Reserved) function concurrency is scoped to the whole function
4055

4056
    def get_function_concurrency(
1✔
4057
        self, context: RequestContext, function_name: FunctionName, **kwargs
4058
    ) -> GetFunctionConcurrencyResponse:
4059
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4060
        function_name = api_utils.get_function_name(function_name, context)
1✔
4061
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
4062
        return GetFunctionConcurrencyResponse(
1✔
4063
            ReservedConcurrentExecutions=fn.reserved_concurrent_executions
4064
        )
4065

4066
    def put_function_concurrency(
1✔
4067
        self,
4068
        context: RequestContext,
4069
        function_name: FunctionName,
4070
        reserved_concurrent_executions: ReservedConcurrentExecutions,
4071
        **kwargs,
4072
    ) -> Concurrency:
4073
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4074

4075
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4076
        if qualifier:
1✔
4077
            raise InvalidParameterValueException(
1✔
4078
                "This operation is permitted on Lambda functions only. Aliases and versions do not support this operation. Please specify either a function name or an unqualified function ARN.",
4079
                Type="User",
4080
            )
4081

4082
        store = lambda_stores[account_id][region]
1✔
4083
        fn = store.functions.get(function_name)
1✔
4084
        if not fn:
1✔
4085
            fn_arn = api_utils.qualified_lambda_arn(
1✔
4086
                function_name,
4087
                qualifier="$LATEST",
4088
                account=account_id,
4089
                region=region,
4090
            )
4091
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
4092

4093
        settings = self.get_account_settings(context)
1✔
4094
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
4095
            "UnreservedConcurrentExecutions"
4096
        ]
4097

4098
        # The existing reserved concurrent executions for the same function are already deduced in
4099
        # unreserved_concurrent_executions but must not count because the new one will replace the existing one.
4100
        # Joel tested this behavior manually against AWS (2023-11-28).
4101
        existing_reserved_concurrent_executions = (
1✔
4102
            fn.reserved_concurrent_executions if fn.reserved_concurrent_executions else 0
4103
        )
4104
        if (
1✔
4105
            unreserved_concurrent_executions
4106
            - reserved_concurrent_executions
4107
            + existing_reserved_concurrent_executions
4108
        ) < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY:
4109
            raise InvalidParameterValueException(
1✔
4110
                f"Specified ReservedConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
4111
            )
4112

4113
        total_provisioned_concurrency = sum(
1✔
4114
            [
4115
                provisioned_configs.provisioned_concurrent_executions
4116
                for provisioned_configs in fn.provisioned_concurrency_configs.values()
4117
            ]
4118
        )
4119
        if total_provisioned_concurrency > reserved_concurrent_executions:
1✔
4120
            raise InvalidParameterValueException(
1✔
4121
                f" ReservedConcurrentExecutions  {reserved_concurrent_executions} should not be lower than function's total provisioned concurrency [{total_provisioned_concurrency}]."
4122
            )
4123

4124
        fn.reserved_concurrent_executions = reserved_concurrent_executions
1✔
4125

4126
        return Concurrency(ReservedConcurrentExecutions=fn.reserved_concurrent_executions)
1✔
4127

4128
    def delete_function_concurrency(
1✔
4129
        self, context: RequestContext, function_name: FunctionName, **kwargs
4130
    ) -> None:
4131
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4132
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4133
        store = lambda_stores[account_id][region]
1✔
4134
        fn = store.functions.get(function_name)
1✔
4135
        fn.reserved_concurrent_executions = None
1✔
4136

4137
    # =======================================
4138
    # ===============  TAGS   ===============
4139
    # =======================================
4140
    # only Function, Event Source Mapping, and Code Signing Config (not currently supported by LocalStack) ARNs an are available for tagging in AWS
4141

4142
    def _get_tags(self, resource: TaggableResource) -> dict[str, str]:
1✔
4143
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4144
        lambda_adapted_tags = {
1✔
4145
            tag["Key"]: tag["Value"]
4146
            for tag in state.TAGS.list_tags_for_resource(resource).get("Tags")
4147
        }
4148
        return lambda_adapted_tags
1✔
4149

4150
    def _store_tags(self, resource: TaggableResource, tags: dict[str, str]):
1✔
4151
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4152
        if len(state.TAGS.tags.get(resource, {}) | tags) > LAMBDA_TAG_LIMIT_PER_RESOURCE:
1✔
4153
            raise InvalidParameterValueException(
1✔
4154
                "Number of tags exceeds resource tag limit.", Type="User"
4155
            )
4156

4157
        tag_svc_adapted_tags = [{"Key": key, "Value": value} for key, value in tags.items()]
1✔
4158
        state.TAGS.tag_resource(resource, tag_svc_adapted_tags)
1✔
4159

4160
    def fetch_lambda_store_for_tagging(self, resource: TaggableResource) -> LambdaStore:
1✔
4161
        """
4162
        Takes a resource ARN for a TaggableResource (Lambda Function, Event Source Mapping, or Code Signing Config) and returns a corresponding
4163
        LambdaStore for its region and account.
4164

4165
        In addition, this function validates that the ARN is a valid TaggableResource type, and that the TaggableResource exists.
4166

4167
        Raises:
4168
            ValidationException: If the resource ARN is not a full ARN for a TaggableResource.
4169
            ResourceNotFoundException: If the specified resource does not exist.
4170
            InvalidParameterValueException: If the resource ARN is a qualified Lambda Function.
4171
        """
4172

4173
        def _raise_validation_exception():
1✔
4174
            raise ValidationException(
1✔
4175
                f"1 validation error detected: Value '{resource}' at 'resource' failed to satisfy constraint: Member must satisfy regular expression pattern: {api_utils.TAGGABLE_RESOURCE_ARN_PATTERN}"
4176
            )
4177

4178
        # Check whether the ARN we have been passed is correctly formatted
4179
        parsed_resource_arn: ArnData = None
1✔
4180
        try:
1✔
4181
            parsed_resource_arn = parse_arn(resource)
1✔
4182
        except Exception:
1✔
4183
            _raise_validation_exception()
1✔
4184

4185
        # TODO: Should we be checking whether this is a full ARN?
4186
        region, account_id, resource_type = map(
1✔
4187
            parsed_resource_arn.get, ("region", "account", "resource")
4188
        )
4189

4190
        if not all((region, account_id, resource_type)):
1✔
4191
            _raise_validation_exception()
×
4192

4193
        if not (parts := resource_type.split(":")):
1✔
4194
            _raise_validation_exception()
×
4195

4196
        resource_type, resource_identifier, *qualifier = parts
1✔
4197
        if resource_type not in {"event-source-mapping", "code-signing-config", "function"}:
1✔
4198
            _raise_validation_exception()
1✔
4199

4200
        if qualifier:
1✔
4201
            if resource_type == "function":
1✔
4202
                raise InvalidParameterValueException(
1✔
4203
                    "Tags on function aliases and versions are not supported. Please specify a function ARN.",
4204
                    Type="User",
4205
                )
4206
            _raise_validation_exception()
1✔
4207

4208
        match resource_type:
1✔
4209
            case "event-source-mapping":
1✔
4210
                self._get_esm(resource_identifier, account_id, region)
1✔
4211
            case "code-signing-config":
1✔
4212
                raise NotImplementedError("Resource tagging on CSC not yet implemented.")
4213
            case "function":
1✔
4214
                self._get_function(
1✔
4215
                    function_name=resource_identifier, account_id=account_id, region=region
4216
                )
4217

4218
        # If no exceptions are raised, assume ARN and referenced resource is valid for tag operations
4219
        return lambda_stores[account_id][region]
1✔
4220

4221
    def tag_resource(
1✔
4222
        self, context: RequestContext, resource: TaggableResource, tags: Tags, **kwargs
4223
    ) -> None:
4224
        if not tags:
1✔
4225
            raise InvalidParameterValueException(
1✔
4226
                "An error occurred and the request cannot be processed.", Type="User"
4227
            )
4228
        self._store_tags(resource, tags)
1✔
4229

4230
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4231
            "function"
4232
        ):
4233
            name, _, account, region = function_locators_from_arn(resource)
1✔
4234
            function = self._get_function(name, account, region)
1✔
4235
            with function.lock:
1✔
4236
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4237
                latest_version = function.versions["$LATEST"]
1✔
4238
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4239
                    latest_version, config=dataclasses.replace(latest_version.config)
4240
                )
4241

4242
    def list_tags(
1✔
4243
        self, context: RequestContext, resource: TaggableResource, **kwargs
4244
    ) -> ListTagsResponse:
4245
        tags = self._get_tags(resource)
1✔
4246
        return ListTagsResponse(Tags=tags)
1✔
4247

4248
    def untag_resource(
1✔
4249
        self, context: RequestContext, resource: TaggableResource, tag_keys: TagKeyList, **kwargs
4250
    ) -> None:
4251
        if not tag_keys:
1✔
4252
            raise ValidationException(
1✔
4253
                "1 validation error detected: Value null at 'tagKeys' failed to satisfy constraint: Member must not be null"
4254
            )  # should probably be generalized a bit
4255

4256
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4257
        state.TAGS.untag_resource(resource, tag_keys)
1✔
4258

4259
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4260
            "function"
4261
        ):
4262
            name, _, account, region = function_locators_from_arn(resource)
1✔
4263
            function = self._get_function(name, account, region)
1✔
4264
            # TODO: Potential race condition
4265
            with function.lock:
1✔
4266
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4267
                latest_version = function.versions["$LATEST"]
1✔
4268
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4269
                    latest_version, config=dataclasses.replace(latest_version.config)
4270
                )
4271

4272
    # =======================================
4273
    # =======  LEGACY / DEPRECATED   ========
4274
    # =======================================
4275

4276
    def invoke_async(
1✔
4277
        self,
4278
        context: RequestContext,
4279
        function_name: NamespacedFunctionName,
4280
        invoke_args: IO[BlobStream],
4281
        **kwargs,
4282
    ) -> InvokeAsyncResponse:
4283
        """LEGACY API endpoint. Even AWS heavily discourages its usage."""
4284
        raise NotImplementedError
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc