• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19451970107

17 Nov 2025 08:17PM UTC coverage: 86.905% (+0.003%) from 86.902%
19451970107

push

github

web-flow
Update CODEOWNERS (#13383)

Co-authored-by: LocalStack Bot <localstack-bot@users.noreply.github.com>

68622 of 78962 relevant lines covered (86.91%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.02
/localstack-core/localstack/services/lambda_/provider.py
1
import base64
1✔
2
import dataclasses
1✔
3
import datetime
1✔
4
import itertools
1✔
5
import json
1✔
6
import logging
1✔
7
import re
1✔
8
import threading
1✔
9
import time
1✔
10
from typing import IO, Any
1✔
11

12
from botocore.exceptions import ClientError
1✔
13

14
from localstack import config
1✔
15
from localstack.aws.api import RequestContext, ServiceException, handler
1✔
16
from localstack.aws.api.lambda_ import (
1✔
17
    AccountLimit,
18
    AccountUsage,
19
    AddLayerVersionPermissionResponse,
20
    AddPermissionRequest,
21
    AddPermissionResponse,
22
    Alias,
23
    AliasConfiguration,
24
    AliasRoutingConfiguration,
25
    AllowedPublishers,
26
    Architecture,
27
    Arn,
28
    Blob,
29
    BlobStream,
30
    CodeSigningConfigArn,
31
    CodeSigningConfigNotFoundException,
32
    CodeSigningPolicies,
33
    CompatibleArchitectures,
34
    CompatibleRuntimes,
35
    Concurrency,
36
    Cors,
37
    CreateCodeSigningConfigResponse,
38
    CreateEventSourceMappingRequest,
39
    CreateFunctionRequest,
40
    CreateFunctionUrlConfigResponse,
41
    DeleteCodeSigningConfigResponse,
42
    Description,
43
    DestinationConfig,
44
    EventSourceMappingConfiguration,
45
    FunctionCodeLocation,
46
    FunctionConfiguration,
47
    FunctionEventInvokeConfig,
48
    FunctionName,
49
    FunctionUrlAuthType,
50
    FunctionUrlQualifier,
51
    GetAccountSettingsResponse,
52
    GetCodeSigningConfigResponse,
53
    GetFunctionCodeSigningConfigResponse,
54
    GetFunctionConcurrencyResponse,
55
    GetFunctionRecursionConfigResponse,
56
    GetFunctionResponse,
57
    GetFunctionUrlConfigResponse,
58
    GetLayerVersionPolicyResponse,
59
    GetLayerVersionResponse,
60
    GetPolicyResponse,
61
    GetProvisionedConcurrencyConfigResponse,
62
    InvalidParameterValueException,
63
    InvocationResponse,
64
    InvocationType,
65
    InvokeAsyncResponse,
66
    InvokeMode,
67
    LambdaApi,
68
    LastUpdateStatus,
69
    LayerName,
70
    LayerPermissionAllowedAction,
71
    LayerPermissionAllowedPrincipal,
72
    LayersListItem,
73
    LayerVersionArn,
74
    LayerVersionContentInput,
75
    LayerVersionNumber,
76
    LicenseInfo,
77
    ListAliasesResponse,
78
    ListCodeSigningConfigsResponse,
79
    ListEventSourceMappingsResponse,
80
    ListFunctionEventInvokeConfigsResponse,
81
    ListFunctionsByCodeSigningConfigResponse,
82
    ListFunctionsResponse,
83
    ListFunctionUrlConfigsResponse,
84
    ListLayersResponse,
85
    ListLayerVersionsResponse,
86
    ListProvisionedConcurrencyConfigsResponse,
87
    ListTagsResponse,
88
    ListVersionsByFunctionResponse,
89
    LogFormat,
90
    LoggingConfig,
91
    LogType,
92
    MasterRegion,
93
    MaxFunctionEventInvokeConfigListItems,
94
    MaximumEventAgeInSeconds,
95
    MaximumRetryAttempts,
96
    MaxItems,
97
    MaxLayerListItems,
98
    MaxListItems,
99
    MaxProvisionedConcurrencyConfigListItems,
100
    NamespacedFunctionName,
101
    NamespacedStatementId,
102
    OnFailure,
103
    OnSuccess,
104
    OrganizationId,
105
    PackageType,
106
    PositiveInteger,
107
    PreconditionFailedException,
108
    ProvisionedConcurrencyConfigListItem,
109
    ProvisionedConcurrencyConfigNotFoundException,
110
    ProvisionedConcurrencyStatusEnum,
111
    PublishLayerVersionResponse,
112
    PutFunctionCodeSigningConfigResponse,
113
    PutFunctionRecursionConfigResponse,
114
    PutProvisionedConcurrencyConfigResponse,
115
    Qualifier,
116
    RecursiveLoop,
117
    ReservedConcurrentExecutions,
118
    ResourceConflictException,
119
    ResourceNotFoundException,
120
    Runtime,
121
    RuntimeVersionConfig,
122
    SnapStart,
123
    SnapStartApplyOn,
124
    SnapStartOptimizationStatus,
125
    SnapStartResponse,
126
    State,
127
    StatementId,
128
    StateReasonCode,
129
    String,
130
    TaggableResource,
131
    TagKeyList,
132
    Tags,
133
    TracingMode,
134
    UnqualifiedFunctionName,
135
    UpdateCodeSigningConfigResponse,
136
    UpdateEventSourceMappingRequest,
137
    UpdateFunctionCodeRequest,
138
    UpdateFunctionConfigurationRequest,
139
    UpdateFunctionUrlConfigResponse,
140
    Version,
141
)
142
from localstack.aws.api.lambda_ import FunctionVersion as FunctionVersionApi
1✔
143
from localstack.aws.api.lambda_ import ServiceException as LambdaServiceException
1✔
144
from localstack.aws.api.pipes import (
1✔
145
    DynamoDBStreamStartPosition,
146
    KinesisStreamStartPosition,
147
)
148
from localstack.aws.connect import connect_to
1✔
149
from localstack.aws.spec import load_service
1✔
150
from localstack.services.edge import ROUTER
1✔
151
from localstack.services.lambda_ import api_utils
1✔
152
from localstack.services.lambda_ import hooks as lambda_hooks
1✔
153
from localstack.services.lambda_.analytics import (
1✔
154
    FunctionOperation,
155
    FunctionStatus,
156
    function_counter,
157
)
158
from localstack.services.lambda_.api_utils import (
1✔
159
    ARCHITECTURES,
160
    STATEMENT_ID_REGEX,
161
    SUBNET_ID_REGEX,
162
    function_locators_from_arn,
163
)
164
from localstack.services.lambda_.event_source_mapping.esm_config_factory import (
1✔
165
    EsmConfigFactory,
166
)
167
from localstack.services.lambda_.event_source_mapping.esm_worker import (
1✔
168
    EsmState,
169
    EsmWorker,
170
)
171
from localstack.services.lambda_.event_source_mapping.esm_worker_factory import (
1✔
172
    EsmWorkerFactory,
173
)
174
from localstack.services.lambda_.event_source_mapping.pipe_utils import get_internal_client
1✔
175
from localstack.services.lambda_.invocation import AccessDeniedException
1✔
176
from localstack.services.lambda_.invocation.execution_environment import (
1✔
177
    EnvironmentStartupTimeoutException,
178
)
179
from localstack.services.lambda_.invocation.lambda_models import (
1✔
180
    AliasRoutingConfig,
181
    CodeSigningConfig,
182
    EventInvokeConfig,
183
    Function,
184
    FunctionResourcePolicy,
185
    FunctionUrlConfig,
186
    FunctionVersion,
187
    ImageConfig,
188
    LambdaEphemeralStorage,
189
    Layer,
190
    LayerPolicy,
191
    LayerPolicyStatement,
192
    LayerVersion,
193
    ProvisionedConcurrencyConfiguration,
194
    RequestEntityTooLargeException,
195
    ResourcePolicy,
196
    UpdateStatus,
197
    ValidationException,
198
    VersionAlias,
199
    VersionFunctionConfiguration,
200
    VersionIdentifier,
201
    VersionState,
202
    VpcConfig,
203
)
204
from localstack.services.lambda_.invocation.lambda_service import (
1✔
205
    LambdaService,
206
    create_image_code,
207
    destroy_code_if_not_used,
208
    lambda_stores,
209
    store_lambda_archive,
210
    store_s3_bucket_archive,
211
)
212
from localstack.services.lambda_.invocation.models import LambdaStore
1✔
213
from localstack.services.lambda_.invocation.runtime_executor import get_runtime_executor
1✔
214
from localstack.services.lambda_.lambda_utils import HINT_LOG
1✔
215
from localstack.services.lambda_.layerfetcher.layer_fetcher import LayerFetcher
1✔
216
from localstack.services.lambda_.provider_utils import (
1✔
217
    LambdaLayerVersionIdentifier,
218
    get_function_version,
219
    get_function_version_from_arn,
220
)
221
from localstack.services.lambda_.runtimes import (
1✔
222
    ALL_RUNTIMES,
223
    DEPRECATED_RUNTIMES,
224
    DEPRECATED_RUNTIMES_UPGRADES,
225
    RUNTIMES_AGGREGATED,
226
    SNAP_START_SUPPORTED_RUNTIMES,
227
    VALID_RUNTIMES,
228
)
229
from localstack.services.lambda_.urlrouter import FunctionUrlRouter
1✔
230
from localstack.services.plugins import ServiceLifecycleHook
1✔
231
from localstack.state import StateVisitor
1✔
232
from localstack.utils.aws.arns import (
1✔
233
    ArnData,
234
    extract_resource_from_arn,
235
    extract_service_from_arn,
236
    get_partition,
237
    lambda_event_source_mapping_arn,
238
    parse_arn,
239
)
240
from localstack.utils.aws.client_types import ServicePrincipal
1✔
241
from localstack.utils.bootstrap import is_api_enabled
1✔
242
from localstack.utils.collections import PaginatedList
1✔
243
from localstack.utils.event_matcher import validate_event_pattern
1✔
244
from localstack.utils.strings import get_random_hex, short_uid, to_bytes, to_str
1✔
245
from localstack.utils.sync import poll_condition
1✔
246
from localstack.utils.urls import localstack_host
1✔
247

248
LOG = logging.getLogger(__name__)
1✔
249

250
LAMBDA_DEFAULT_TIMEOUT = 3
1✔
251
LAMBDA_DEFAULT_MEMORY_SIZE = 128
1✔
252

253
LAMBDA_TAG_LIMIT_PER_RESOURCE = 50
1✔
254
LAMBDA_LAYERS_LIMIT_PER_FUNCTION = 5
1✔
255

256
TAG_KEY_CUSTOM_URL = "_custom_id_"
1✔
257
# Requirements (from RFC3986 & co): not longer than 63, first char must be
258
# alpha, then alphanumeric or hyphen, except cannot start or end with hyphen
259
TAG_KEY_CUSTOM_URL_VALIDATOR = re.compile(r"^[A-Za-z]([A-Za-z0-9\-]{0,61}[A-Za-z0-9])?$")
1✔
260

261

262
class LambdaProvider(LambdaApi, ServiceLifecycleHook):
1✔
263
    lambda_service: LambdaService
1✔
264
    create_fn_lock: threading.RLock
1✔
265
    create_layer_lock: threading.RLock
1✔
266
    router: FunctionUrlRouter
1✔
267
    esm_workers: dict[str, EsmWorker]
1✔
268
    layer_fetcher: LayerFetcher | None
1✔
269

270
    def __init__(self) -> None:
1✔
271
        self.lambda_service = LambdaService()
1✔
272
        self.create_fn_lock = threading.RLock()
1✔
273
        self.create_layer_lock = threading.RLock()
1✔
274
        self.router = FunctionUrlRouter(ROUTER, self.lambda_service)
1✔
275
        self.esm_workers = {}
1✔
276
        self.layer_fetcher = None
1✔
277
        lambda_hooks.inject_layer_fetcher.run(self)
1✔
278

279
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
280
        visitor.visit(lambda_stores)
×
281

282
    def on_before_state_reset(self):
1✔
283
        self.lambda_service.stop()
×
284

285
    def on_after_state_reset(self):
1✔
286
        self.router.lambda_service = self.lambda_service = LambdaService()
×
287

288
    def on_before_state_load(self):
1✔
289
        self.lambda_service.stop()
×
290

291
    def on_after_state_load(self):
1✔
292
        self.lambda_service = LambdaService()
×
293
        self.router.lambda_service = self.lambda_service
×
294

295
        for account_id, account_bundle in lambda_stores.items():
×
296
            for region_name, state in account_bundle.items():
×
297
                for fn in state.functions.values():
×
298
                    for fn_version in fn.versions.values():
×
299
                        # restore the "Pending" state for every function version and start it
300
                        try:
×
301
                            new_state = VersionState(
×
302
                                state=State.Pending,
303
                                code=StateReasonCode.Creating,
304
                                reason="The function is being created.",
305
                            )
306
                            new_config = dataclasses.replace(fn_version.config, state=new_state)
×
307
                            new_version = dataclasses.replace(fn_version, config=new_config)
×
308
                            fn.versions[fn_version.id.qualifier] = new_version
×
309
                            self.lambda_service.create_function_version(fn_version).result(
×
310
                                timeout=5
311
                            )
312
                        except Exception:
×
313
                            LOG.warning(
×
314
                                "Failed to restore function version %s",
315
                                fn_version.id.qualified_arn(),
316
                                exc_info=LOG.isEnabledFor(logging.DEBUG),
317
                            )
318
                    # restore provisioned concurrency per function considering both versions and aliases
319
                    for (
×
320
                        provisioned_qualifier,
321
                        provisioned_config,
322
                    ) in fn.provisioned_concurrency_configs.items():
323
                        fn_arn = None
×
324
                        try:
×
325
                            if api_utils.qualifier_is_alias(provisioned_qualifier):
×
326
                                alias = fn.aliases.get(provisioned_qualifier)
×
327
                                resolved_version = fn.versions.get(alias.function_version)
×
328
                                fn_arn = resolved_version.id.qualified_arn()
×
329
                            elif api_utils.qualifier_is_version(provisioned_qualifier):
×
330
                                fn_version = fn.versions.get(provisioned_qualifier)
×
331
                                fn_arn = fn_version.id.qualified_arn()
×
332
                            else:
333
                                raise InvalidParameterValueException(
×
334
                                    "Invalid qualifier type:"
335
                                    " Qualifier can only be an alias or a version for provisioned concurrency."
336
                                )
337

338
                            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
339
                            manager.update_provisioned_concurrency_config(
×
340
                                provisioned_config.provisioned_concurrent_executions
341
                            )
342
                        except Exception:
×
343
                            LOG.warning(
×
344
                                "Failed to restore provisioned concurrency %s for function %s",
345
                                provisioned_config,
346
                                fn_arn,
347
                                exc_info=LOG.isEnabledFor(logging.DEBUG),
348
                            )
349

350
                for esm in state.event_source_mappings.values():
×
351
                    # Restores event source workers
352
                    function_arn = esm.get("FunctionArn")
×
353

354
                    # TODO: How do we know the event source is up?
355
                    # A basic poll to see if the mapped Lambda function is active/failed
356
                    if not poll_condition(
×
357
                        lambda: get_function_version_from_arn(function_arn).config.state.state
358
                        in [State.Active, State.Failed],
359
                        timeout=10,
360
                    ):
361
                        LOG.warning(
×
362
                            "Creating ESM for Lambda that is not in running state: %s",
363
                            function_arn,
364
                        )
365

366
                    function_version = get_function_version_from_arn(function_arn)
×
367
                    function_role = function_version.config.role
×
368

369
                    is_esm_enabled = esm.get("State", EsmState.DISABLED) not in (
×
370
                        EsmState.DISABLED,
371
                        EsmState.DISABLING,
372
                    )
373
                    esm_worker = EsmWorkerFactory(
×
374
                        esm, function_role, is_esm_enabled
375
                    ).get_esm_worker()
376

377
                    # Note: a worker is created in the DISABLED state if not enabled
378
                    esm_worker.create()
×
379
                    # TODO: assigning the esm_worker to the dict only works after .create(). Could it cause a race
380
                    #  condition if we get a shutdown here and have a worker thread spawned but not accounted for?
381
                    self.esm_workers[esm_worker.uuid] = esm_worker
×
382

383
    def on_after_init(self):
1✔
384
        self.router.register_routes()
1✔
385
        get_runtime_executor().validate_environment()
1✔
386

387
    def on_before_stop(self) -> None:
1✔
388
        for esm_worker in self.esm_workers.values():
1✔
389
            esm_worker.stop_for_shutdown()
1✔
390

391
        # TODO: should probably unregister routes?
392
        self.lambda_service.stop()
1✔
393

394
    @staticmethod
1✔
395
    def _get_function(function_name: str, account_id: str, region: str) -> Function:
1✔
396
        state = lambda_stores[account_id][region]
1✔
397
        function = state.functions.get(function_name)
1✔
398
        if not function:
1✔
399
            arn = api_utils.unqualified_lambda_arn(
1✔
400
                function_name=function_name,
401
                account=account_id,
402
                region=region,
403
            )
404
            raise ResourceNotFoundException(
1✔
405
                f"Function not found: {arn}",
406
                Type="User",
407
            )
408
        return function
1✔
409

410
    @staticmethod
1✔
411
    def _get_esm(uuid: str, account_id: str, region: str) -> EventSourceMappingConfiguration:
1✔
412
        state = lambda_stores[account_id][region]
1✔
413
        esm = state.event_source_mappings.get(uuid)
1✔
414
        if not esm:
1✔
415
            arn = lambda_event_source_mapping_arn(uuid, account_id, region)
1✔
416
            raise ResourceNotFoundException(
1✔
417
                f"Event source mapping not found: {arn}",
418
                Type="User",
419
            )
420
        return esm
1✔
421

422
    @staticmethod
1✔
423
    def _validate_qualifier_expression(qualifier: str) -> None:
1✔
424
        if error_messages := api_utils.validate_qualifier(qualifier):
1✔
425
            raise ValidationException(
×
426
                message=api_utils.construct_validation_exception_message(error_messages)
427
            )
428

429
    @staticmethod
1✔
430
    def _resolve_fn_qualifier(resolved_fn: Function, qualifier: str | None) -> tuple[str, str]:
1✔
431
        """Attempts to resolve a given qualifier and returns a qualifier that exists or
432
        raises an appropriate ResourceNotFoundException.
433

434
        :param resolved_fn: The resolved lambda function
435
        :param qualifier: The qualifier to be resolved or None
436
        :return: Tuple of (resolved qualifier, function arn either qualified or unqualified)"""
437
        function_name = resolved_fn.function_name
1✔
438
        # assuming function versions need to live in the same account and region
439
        account_id = resolved_fn.latest().id.account
1✔
440
        region = resolved_fn.latest().id.region
1✔
441
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
442
        if qualifier is not None:
1✔
443
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
444
            if api_utils.qualifier_is_alias(qualifier):
1✔
445
                if qualifier not in resolved_fn.aliases:
1✔
446
                    raise ResourceNotFoundException(f"Cannot find alias arn: {fn_arn}", Type="User")
1✔
447
            elif api_utils.qualifier_is_version(qualifier) or qualifier == "$LATEST":
1✔
448
                if qualifier not in resolved_fn.versions:
1✔
449
                    raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
450
            else:
451
                # matches qualifier pattern but invalid alias or version
452
                raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
453
        resolved_qualifier = qualifier or "$LATEST"
1✔
454
        return resolved_qualifier, fn_arn
1✔
455

456
    @staticmethod
1✔
457
    def _function_revision_id(resolved_fn: Function, resolved_qualifier: str) -> str:
1✔
458
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
459
            return resolved_fn.aliases[resolved_qualifier].revision_id
1✔
460
        # Assumes that a non-alias is a version
461
        else:
462
            return resolved_fn.versions[resolved_qualifier].config.revision_id
1✔
463

464
    def _resolve_vpc_id(self, account_id: str, region_name: str, subnet_id: str) -> str:
1✔
465
        ec2_client = connect_to(aws_access_key_id=account_id, region_name=region_name).ec2
1✔
466
        try:
1✔
467
            return ec2_client.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["VpcId"]
1✔
468
        except ec2_client.exceptions.ClientError as e:
1✔
469
            code = e.response["Error"]["Code"]
1✔
470
            message = e.response["Error"]["Message"]
1✔
471
            raise InvalidParameterValueException(
1✔
472
                f"Error occurred while DescribeSubnets. EC2 Error Code: {code}. EC2 Error Message: {message}",
473
                Type="User",
474
            )
475

476
    def _build_vpc_config(
1✔
477
        self,
478
        account_id: str,
479
        region_name: str,
480
        vpc_config: dict | None = None,
481
    ) -> VpcConfig | None:
482
        if not vpc_config or not is_api_enabled("ec2"):
1✔
483
            return None
1✔
484

485
        subnet_ids = vpc_config.get("SubnetIds", [])
1✔
486
        if subnet_ids is not None and len(subnet_ids) == 0:
1✔
487
            return VpcConfig(vpc_id="", security_group_ids=[], subnet_ids=[])
1✔
488

489
        subnet_id = subnet_ids[0]
1✔
490
        if not bool(SUBNET_ID_REGEX.match(subnet_id)):
1✔
491
            raise ValidationException(
1✔
492
                f"1 validation error detected: Value '[{subnet_id}]' at 'vpcConfig.subnetIds' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 0, Member must satisfy regular expression pattern: ^subnet-[0-9a-z]*$]"
493
            )
494

495
        return VpcConfig(
1✔
496
            vpc_id=self._resolve_vpc_id(account_id, region_name, subnet_id),
497
            security_group_ids=vpc_config.get("SecurityGroupIds", []),
498
            subnet_ids=subnet_ids,
499
        )
500

501
    def _create_version_model(
1✔
502
        self,
503
        function_name: str,
504
        region: str,
505
        account_id: str,
506
        description: str | None = None,
507
        revision_id: str | None = None,
508
        code_sha256: str | None = None,
509
    ) -> tuple[FunctionVersion, bool]:
510
        """
511
        Release a new version to the model if all restrictions are met.
512
        Restrictions:
513
          - CodeSha256, if provided, must equal the current latest version code hash
514
          - RevisionId, if provided, must equal the current latest version revision id
515
          - Some changes have been done to the latest version since last publish
516
        Will return a tuple of the version, and whether the version was published (True) or the latest available version was taken (False).
517
        This can happen if the latest version has not been changed since the last version publish, in this case the last version will be returned.
518

519
        :param function_name: Function name to be published
520
        :param region: Region of the function
521
        :param account_id: Account of the function
522
        :param description: new description of the version (will be the description of the function if missing)
523
        :param revision_id: Revision id, function will raise error if it does not match latest revision id
524
        :param code_sha256: Code sha256, function will raise error if it does not match latest code hash
525
        :return: Tuple of (published version, whether version was released or last released version returned, since nothing changed)
526
        """
527
        current_latest_version = get_function_version(
1✔
528
            function_name=function_name, qualifier="$LATEST", account_id=account_id, region=region
529
        )
530
        if revision_id and current_latest_version.config.revision_id != revision_id:
1✔
531
            raise PreconditionFailedException(
1✔
532
                "The Revision Id provided does not match the latest Revision Id. Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
533
                Type="User",
534
            )
535

536
        # check if code hashes match if they are specified
537
        current_hash = (
1✔
538
            current_latest_version.config.code.code_sha256
539
            if current_latest_version.config.package_type == PackageType.Zip
540
            else current_latest_version.config.image.code_sha256
541
        )
542
        # if the code is a zip package and hot reloaded (hot reloading is currently only supported for zip packagetypes)
543
        # we cannot enforce the codesha256 check
544
        is_hot_reloaded_zip_package = (
1✔
545
            current_latest_version.config.package_type == PackageType.Zip
546
            and current_latest_version.config.code.is_hot_reloading()
547
        )
548
        if code_sha256 and current_hash != code_sha256 and not is_hot_reloaded_zip_package:
1✔
549
            raise InvalidParameterValueException(
1✔
550
                f"CodeSHA256 ({code_sha256}) is different from current CodeSHA256 in $LATEST ({current_hash}). Please try again with the CodeSHA256 in $LATEST.",
551
                Type="User",
552
            )
553

554
        state = lambda_stores[account_id][region]
1✔
555
        function = state.functions.get(function_name)
1✔
556
        changes = {}
1✔
557
        if description is not None:
1✔
558
            changes["description"] = description
1✔
559
        # TODO copy environment instead of restarting one, get rid of all the "Pending"s
560

561
        with function.lock:
1✔
562
            if function.next_version > 1 and (
1✔
563
                prev_version := function.versions.get(str(function.next_version - 1))
564
            ):
565
                if (
1✔
566
                    prev_version.config.internal_revision
567
                    == current_latest_version.config.internal_revision
568
                ):
569
                    return prev_version, False
1✔
570
            # TODO check if there was a change since last version
571
            next_version = str(function.next_version)
1✔
572
            function.next_version += 1
1✔
573
            new_id = VersionIdentifier(
1✔
574
                function_name=function_name,
575
                qualifier=next_version,
576
                region=region,
577
                account=account_id,
578
            )
579
            apply_on = current_latest_version.config.snap_start["ApplyOn"]
1✔
580
            optimization_status = SnapStartOptimizationStatus.Off
1✔
581
            if apply_on == SnapStartApplyOn.PublishedVersions:
1✔
582
                optimization_status = SnapStartOptimizationStatus.On
×
583
            snap_start = SnapStartResponse(
1✔
584
                ApplyOn=apply_on,
585
                OptimizationStatus=optimization_status,
586
            )
587
            new_version = dataclasses.replace(
1✔
588
                current_latest_version,
589
                config=dataclasses.replace(
590
                    current_latest_version.config,
591
                    last_update=None,  # versions never have a last update status
592
                    state=VersionState(
593
                        state=State.Pending,
594
                        code=StateReasonCode.Creating,
595
                        reason="The function is being created.",
596
                    ),
597
                    snap_start=snap_start,
598
                    **changes,
599
                ),
600
                id=new_id,
601
            )
602
            function.versions[next_version] = new_version
1✔
603
        return new_version, True
1✔
604

605
    def _publish_version_from_existing_version(
1✔
606
        self,
607
        function_name: str,
608
        region: str,
609
        account_id: str,
610
        description: str | None = None,
611
        revision_id: str | None = None,
612
        code_sha256: str | None = None,
613
    ) -> FunctionVersion:
614
        """
615
        Publish version from an existing, already initialized LATEST
616

617
        :param function_name: Function name
618
        :param region: region
619
        :param account_id: account id
620
        :param description: description
621
        :param revision_id: revision id (check if current version matches)
622
        :param code_sha256: code sha (check if current code matches)
623
        :return: new version
624
        """
625
        new_version, changed = self._create_version_model(
1✔
626
            function_name=function_name,
627
            region=region,
628
            account_id=account_id,
629
            description=description,
630
            revision_id=revision_id,
631
            code_sha256=code_sha256,
632
        )
633
        if not changed:
1✔
634
            return new_version
1✔
635
        self.lambda_service.publish_version(new_version)
1✔
636
        state = lambda_stores[account_id][region]
1✔
637
        function = state.functions.get(function_name)
1✔
638
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
639
        latest_version = function.versions["$LATEST"]
1✔
640
        function.versions["$LATEST"] = dataclasses.replace(
1✔
641
            latest_version, config=dataclasses.replace(latest_version.config)
642
        )
643
        return function.versions.get(new_version.id.qualifier)
1✔
644

645
    def _publish_version_with_changes(
1✔
646
        self,
647
        function_name: str,
648
        region: str,
649
        account_id: str,
650
        description: str | None = None,
651
        revision_id: str | None = None,
652
        code_sha256: str | None = None,
653
    ) -> FunctionVersion:
654
        """
655
        Publish version together with a new latest version (publish on create / update)
656

657
        :param function_name: Function name
658
        :param region: region
659
        :param account_id: account id
660
        :param description: description
661
        :param revision_id: revision id (check if current version matches)
662
        :param code_sha256: code sha (check if current code matches)
663
        :return: new version
664
        """
665
        new_version, changed = self._create_version_model(
1✔
666
            function_name=function_name,
667
            region=region,
668
            account_id=account_id,
669
            description=description,
670
            revision_id=revision_id,
671
            code_sha256=code_sha256,
672
        )
673
        if not changed:
1✔
674
            return new_version
×
675
        self.lambda_service.create_function_version(new_version)
1✔
676
        return new_version
1✔
677

678
    @staticmethod
1✔
679
    def _verify_env_variables(env_vars: dict[str, str]):
1✔
680
        dumped_env_vars = json.dumps(env_vars, separators=(",", ":"))
1✔
681
        if (
1✔
682
            len(dumped_env_vars.encode("utf-8"))
683
            > config.LAMBDA_LIMITS_MAX_FUNCTION_ENVVAR_SIZE_BYTES
684
        ):
685
            raise InvalidParameterValueException(
1✔
686
                f"Lambda was unable to configure your environment variables because the environment variables you have provided exceeded the 4KB limit. String measured: {dumped_env_vars}",
687
                Type="User",
688
            )
689

690
    @staticmethod
1✔
691
    def _validate_snapstart(snap_start: SnapStart, runtime: Runtime):
1✔
692
        apply_on = snap_start.get("ApplyOn")
1✔
693
        if apply_on not in [
1✔
694
            SnapStartApplyOn.PublishedVersions,
695
            SnapStartApplyOn.None_,
696
        ]:
697
            raise ValidationException(
1✔
698
                f"1 validation error detected: Value '{apply_on}' at 'snapStart.applyOn' failed to satisfy constraint: Member must satisfy enum value set: [PublishedVersions, None]"
699
            )
700

701
        if runtime not in SNAP_START_SUPPORTED_RUNTIMES:
1✔
702
            raise InvalidParameterValueException(
×
703
                f"{runtime} is not supported for SnapStart enabled functions.", Type="User"
704
            )
705

706
    def _validate_layers(self, new_layers: list[str], region: str, account_id: str):
1✔
707
        if len(new_layers) > LAMBDA_LAYERS_LIMIT_PER_FUNCTION:
1✔
708
            raise InvalidParameterValueException(
1✔
709
                "Cannot reference more than 5 layers.", Type="User"
710
            )
711

712
        visited_layers = {}
1✔
713
        for layer_version_arn in new_layers:
1✔
714
            (
1✔
715
                layer_region,
716
                layer_account_id,
717
                layer_name,
718
                layer_version_str,
719
            ) = api_utils.parse_layer_arn(layer_version_arn)
720
            if layer_version_str is None:
1✔
721
                raise ValidationException(
1✔
722
                    f"1 validation error detected: Value '[{layer_version_arn}]'"
723
                    + r" at 'layers' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 140, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: (arn:[a-zA-Z0-9-]+:lambda:[a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\d{1}:\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+), Member must not be null]",
724
                )
725

726
            state = lambda_stores[layer_account_id][layer_region]
1✔
727
            layer = state.layers.get(layer_name)
1✔
728
            layer_version = None
1✔
729
            if layer is not None:
1✔
730
                layer_version = layer.layer_versions.get(layer_version_str)
1✔
731
            if layer_account_id == account_id:
1✔
732
                if region and layer_region != region:
1✔
733
                    raise InvalidParameterValueException(
1✔
734
                        f"Layers are not in the same region as the function. "
735
                        f"Layers are expected to be in region {region}.",
736
                        Type="User",
737
                    )
738
                if layer is None or layer.layer_versions.get(layer_version_str) is None:
1✔
739
                    raise InvalidParameterValueException(
1✔
740
                        f"Layer version {layer_version_arn} does not exist.", Type="User"
741
                    )
742
            else:  # External layer from other account
743
                # TODO: validate IAM layer policy here, allowing access by default for now and only checking region
744
                if region and layer_region != region:
×
745
                    # TODO: detect user or role from context when IAM users are implemented
746
                    user = "user/localstack-testing"
×
747
                    raise AccessDeniedException(
×
748
                        f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
749
                    )
750
                if layer is None or layer_version is None:
×
751
                    # Limitation: cannot fetch external layers when using the same account id as the target layer
752
                    # because we do not want to trigger the layer fetcher for every non-existing layer.
753
                    if self.layer_fetcher is None:
×
754
                        raise NotImplementedError(
755
                            "Fetching shared layers from AWS is a pro feature."
756
                        )
757

758
                    layer = self.layer_fetcher.fetch_layer(layer_version_arn)
×
759
                    if layer is None:
×
760
                        # TODO: detect user or role from context when IAM users are implemented
761
                        user = "user/localstack-testing"
×
762
                        raise AccessDeniedException(
×
763
                            f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
764
                        )
765

766
                    # Distinguish between new layer and new layer version
767
                    if layer_version is None:
×
768
                        # Create whole layer from scratch
769
                        state.layers[layer_name] = layer
×
770
                    else:
771
                        # Create layer version if another version of the same layer already exists
772
                        state.layers[layer_name].layer_versions[layer_version_str] = (
×
773
                            layer.layer_versions.get(layer_version_str)
774
                        )
775

776
            # only the first two matches in the array are considered for the error message
777
            layer_arn = ":".join(layer_version_arn.split(":")[:-1])
1✔
778
            if layer_arn in visited_layers:
1✔
779
                conflict_layer_version_arn = visited_layers[layer_arn]
1✔
780
                raise InvalidParameterValueException(
1✔
781
                    f"Two different versions of the same layer are not allowed to be referenced in the same function. {conflict_layer_version_arn} and {layer_version_arn} are versions of the same layer.",
782
                    Type="User",
783
                )
784
            visited_layers[layer_arn] = layer_version_arn
1✔
785

786
    @staticmethod
1✔
787
    def map_layers(new_layers: list[str]) -> list[LayerVersion]:
1✔
788
        layers = []
1✔
789
        for layer_version_arn in new_layers:
1✔
790
            region_name, account_id, layer_name, layer_version = api_utils.parse_layer_arn(
1✔
791
                layer_version_arn
792
            )
793
            layer = lambda_stores[account_id][region_name].layers.get(layer_name)
1✔
794
            layer_version = layer.layer_versions.get(layer_version)
1✔
795
            layers.append(layer_version)
1✔
796
        return layers
1✔
797

798
    def get_function_recursion_config(
1✔
799
        self,
800
        context: RequestContext,
801
        function_name: UnqualifiedFunctionName,
802
        **kwargs,
803
    ) -> GetFunctionRecursionConfigResponse:
804
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
805
        function_name = api_utils.get_function_name(function_name, context)
1✔
806
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
807
        return GetFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
808

809
    def put_function_recursion_config(
1✔
810
        self,
811
        context: RequestContext,
812
        function_name: UnqualifiedFunctionName,
813
        recursive_loop: RecursiveLoop,
814
        **kwargs,
815
    ) -> PutFunctionRecursionConfigResponse:
816
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
817
        function_name = api_utils.get_function_name(function_name, context)
1✔
818

819
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
820

821
        allowed_values = list(RecursiveLoop.__members__.values())
1✔
822
        if recursive_loop not in allowed_values:
1✔
823
            raise ValidationException(
1✔
824
                f"1 validation error detected: Value '{recursive_loop}' at 'recursiveLoop' failed to satisfy constraint: "
825
                f"Member must satisfy enum value set: [Terminate, Allow]"
826
            )
827

828
        fn.recursive_loop = recursive_loop
1✔
829
        return PutFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
830

831
    @handler(operation="CreateFunction", expand=False)
1✔
832
    def create_function(
1✔
833
        self,
834
        context: RequestContext,
835
        request: CreateFunctionRequest,
836
    ) -> FunctionConfiguration:
837
        context_region = context.region
1✔
838
        context_account_id = context.account_id
1✔
839

840
        zip_file = request.get("Code", {}).get("ZipFile")
1✔
841
        if zip_file and len(zip_file) > config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED:
1✔
842
            raise RequestEntityTooLargeException(
1✔
843
                f"Zipped size must be smaller than {config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED} bytes"
844
            )
845

846
        if context.request.content_length > config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE:
1✔
847
            raise RequestEntityTooLargeException(
1✔
848
                f"Request must be smaller than {config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE} bytes for the CreateFunction operation"
849
            )
850

851
        if architectures := request.get("Architectures"):
1✔
852
            if len(architectures) != 1:
1✔
853
                raise ValidationException(
1✔
854
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
855
                    f"satisfy constraint: Member must have length less than or equal to 1",
856
                )
857
            if architectures[0] not in ARCHITECTURES:
1✔
858
                raise ValidationException(
1✔
859
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
860
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
861
                    f"[x86_64, arm64], Member must not be null]",
862
                )
863

864
        if env_vars := request.get("Environment", {}).get("Variables"):
1✔
865
            self._verify_env_variables(env_vars)
1✔
866

867
        if layers := request.get("Layers", []):
1✔
868
            self._validate_layers(layers, region=context_region, account_id=context_account_id)
1✔
869

870
        if not api_utils.is_role_arn(request.get("Role")):
1✔
871
            raise ValidationException(
1✔
872
                f"1 validation error detected: Value '{request.get('Role')}'"
873
                + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
874
            )
875
        if not self.lambda_service.can_assume_role(request.get("Role"), context.region):
1✔
876
            raise InvalidParameterValueException(
×
877
                "The role defined for the function cannot be assumed by Lambda.", Type="User"
878
            )
879
        package_type = request.get("PackageType", PackageType.Zip)
1✔
880
        runtime = request.get("Runtime")
1✔
881
        self._validate_runtime(package_type, runtime)
1✔
882

883
        request_function_name = request.get("FunctionName")
1✔
884

885
        function_name, *_ = api_utils.get_name_and_qualifier(
1✔
886
            function_arn_or_name=request_function_name,
887
            qualifier=None,
888
            context=context,
889
        )
890

891
        if runtime in DEPRECATED_RUNTIMES:
1✔
892
            LOG.warning(
1✔
893
                "The Lambda runtime %s} is deprecated. "
894
                "Please upgrade the runtime for the function %s: "
895
                "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
896
                runtime,
897
                function_name,
898
            )
899
        if snap_start := request.get("SnapStart"):
1✔
900
            self._validate_snapstart(snap_start, runtime)
1✔
901
        state = lambda_stores[context_account_id][context_region]
1✔
902

903
        with self.create_fn_lock:
1✔
904
            if function_name in state.functions:
1✔
905
                raise ResourceConflictException(f"Function already exist: {function_name}")
×
906
            fn = Function(function_name=function_name)
1✔
907
            arn = VersionIdentifier(
1✔
908
                function_name=function_name,
909
                qualifier="$LATEST",
910
                region=context_region,
911
                account=context_account_id,
912
            )
913
            # save function code to s3
914
            code = None
1✔
915
            image = None
1✔
916
            image_config = None
1✔
917
            runtime_version_config = RuntimeVersionConfig(
1✔
918
                # Limitation: the runtime id (presumably sha256 of image) is currently hardcoded
919
                # Potential implementation: provide (cached) sha256 hash of used Docker image
920
                RuntimeVersionArn=f"arn:{context.partition}:lambda:{context_region}::runtime:8eeff65f6809a3ce81507fe733fe09b835899b99481ba22fd75b5a7338290ec1"
921
            )
922
            request_code = request.get("Code")
1✔
923
            if package_type == PackageType.Zip:
1✔
924
                # TODO verify if correct combination of code is set
925
                if zip_file := request_code.get("ZipFile"):
1✔
926
                    code = store_lambda_archive(
1✔
927
                        archive_file=zip_file,
928
                        function_name=function_name,
929
                        region_name=context_region,
930
                        account_id=context_account_id,
931
                    )
932
                elif s3_bucket := request_code.get("S3Bucket"):
1✔
933
                    s3_key = request_code["S3Key"]
1✔
934
                    s3_object_version = request_code.get("S3ObjectVersion")
1✔
935
                    code = store_s3_bucket_archive(
1✔
936
                        archive_bucket=s3_bucket,
937
                        archive_key=s3_key,
938
                        archive_version=s3_object_version,
939
                        function_name=function_name,
940
                        region_name=context_region,
941
                        account_id=context_account_id,
942
                    )
943
                else:
944
                    raise LambdaServiceException("Gotta have s3 bucket or zip file")
×
945
            elif package_type == PackageType.Image:
1✔
946
                image = request_code.get("ImageUri")
1✔
947
                if not image:
1✔
948
                    raise LambdaServiceException("Gotta have an image when package type is image")
×
949
                image = create_image_code(image_uri=image)
1✔
950

951
                image_config_req = request.get("ImageConfig", {})
1✔
952
                image_config = ImageConfig(
1✔
953
                    command=image_config_req.get("Command"),
954
                    entrypoint=image_config_req.get("EntryPoint"),
955
                    working_directory=image_config_req.get("WorkingDirectory"),
956
                )
957
                # Runtime management controls are not available when providing a custom image
958
                runtime_version_config = None
1✔
959
            if "LoggingConfig" in request:
1✔
960
                logging_config = request["LoggingConfig"]
1✔
961
                LOG.warning(
1✔
962
                    "Advanced Lambda Logging Configuration is currently mocked "
963
                    "and will not impact the logging behavior. "
964
                    "Please create a feature request if needed."
965
                )
966

967
                # when switching to JSON, app and system level log is auto set to INFO
968
                if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
969
                    logging_config = {
1✔
970
                        "ApplicationLogLevel": "INFO",
971
                        "SystemLogLevel": "INFO",
972
                        "LogGroup": f"/aws/lambda/{function_name}",
973
                    } | logging_config
974
                else:
975
                    logging_config = (
×
976
                        LoggingConfig(
977
                            LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
978
                        )
979
                        | logging_config
980
                    )
981

982
            else:
983
                logging_config = LoggingConfig(
1✔
984
                    LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
985
                )
986

987
            version = FunctionVersion(
1✔
988
                id=arn,
989
                config=VersionFunctionConfiguration(
990
                    last_modified=api_utils.format_lambda_date(datetime.datetime.now()),
991
                    description=request.get("Description", ""),
992
                    role=request["Role"],
993
                    timeout=request.get("Timeout", LAMBDA_DEFAULT_TIMEOUT),
994
                    runtime=request.get("Runtime"),
995
                    memory_size=request.get("MemorySize", LAMBDA_DEFAULT_MEMORY_SIZE),
996
                    handler=request.get("Handler"),
997
                    package_type=package_type,
998
                    environment=env_vars,
999
                    architectures=request.get("Architectures") or [Architecture.x86_64],
1000
                    tracing_config_mode=request.get("TracingConfig", {}).get(
1001
                        "Mode", TracingMode.PassThrough
1002
                    ),
1003
                    image=image,
1004
                    image_config=image_config,
1005
                    code=code,
1006
                    layers=self.map_layers(layers),
1007
                    internal_revision=short_uid(),
1008
                    ephemeral_storage=LambdaEphemeralStorage(
1009
                        size=request.get("EphemeralStorage", {}).get("Size", 512)
1010
                    ),
1011
                    snap_start=SnapStartResponse(
1012
                        ApplyOn=request.get("SnapStart", {}).get("ApplyOn", SnapStartApplyOn.None_),
1013
                        OptimizationStatus=SnapStartOptimizationStatus.Off,
1014
                    ),
1015
                    runtime_version_config=runtime_version_config,
1016
                    dead_letter_arn=request.get("DeadLetterConfig", {}).get("TargetArn"),
1017
                    vpc_config=self._build_vpc_config(
1018
                        context_account_id, context_region, request.get("VpcConfig")
1019
                    ),
1020
                    state=VersionState(
1021
                        state=State.Pending,
1022
                        code=StateReasonCode.Creating,
1023
                        reason="The function is being created.",
1024
                    ),
1025
                    logging_config=logging_config,
1026
                ),
1027
            )
1028
            fn.versions["$LATEST"] = version
1✔
1029
            state.functions[function_name] = fn
1✔
1030
        function_counter.labels(
1✔
1031
            operation=FunctionOperation.create,
1032
            runtime=runtime or "n/a",
1033
            status=FunctionStatus.success,
1034
            invocation_type="n/a",
1035
            package_type=package_type,
1036
        )
1037
        self.lambda_service.create_function_version(version)
1✔
1038

1039
        if tags := request.get("Tags"):
1✔
1040
            # This will check whether the function exists.
1041
            self._store_tags(arn.unqualified_arn(), tags)
1✔
1042

1043
        if request.get("Publish"):
1✔
1044
            version = self._publish_version_with_changes(
1✔
1045
                function_name=function_name, region=context_region, account_id=context_account_id
1046
            )
1047

1048
        if config.LAMBDA_SYNCHRONOUS_CREATE:
1✔
1049
            # block via retrying until "terminal" condition reached before returning
1050
            if not poll_condition(
×
1051
                lambda: get_function_version(
1052
                    function_name, version.id.qualifier, version.id.account, version.id.region
1053
                ).config.state.state
1054
                in [State.Active, State.Failed],
1055
                timeout=10,
1056
            ):
1057
                LOG.warning(
×
1058
                    "LAMBDA_SYNCHRONOUS_CREATE is active, but waiting for %s reached timeout.",
1059
                    function_name,
1060
                )
1061

1062
        return api_utils.map_config_out(
1✔
1063
            version, return_qualified_arn=False, return_update_status=False
1064
        )
1065

1066
    def _validate_runtime(self, package_type, runtime):
1✔
1067
        runtimes = ALL_RUNTIMES
1✔
1068
        if config.LAMBDA_RUNTIME_VALIDATION:
1✔
1069
            runtimes = list(itertools.chain(RUNTIMES_AGGREGATED.values()))
1✔
1070

1071
        if package_type == PackageType.Zip and runtime not in runtimes:
1✔
1072
            # deprecated runtimes have different error
1073
            if runtime in DEPRECATED_RUNTIMES:
1✔
1074
                HINT_LOG.info(
1✔
1075
                    "Set env variable LAMBDA_RUNTIME_VALIDATION to 0"
1076
                    " in order to allow usage of deprecated runtimes"
1077
                )
1078
                self._check_for_recomended_migration_target(runtime)
1✔
1079

1080
            raise InvalidParameterValueException(
1✔
1081
                f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1082
                Type="User",
1083
            )
1084

1085
    def _check_for_recomended_migration_target(self, deprecated_runtime):
1✔
1086
        # AWS offers recommended runtime for migration for "newly" deprecated runtimes
1087
        # in order to preserve parity with error messages we need the code bellow
1088
        latest_runtime = DEPRECATED_RUNTIMES_UPGRADES.get(deprecated_runtime)
1✔
1089

1090
        if latest_runtime is not None:
1✔
1091
            LOG.debug(
1✔
1092
                "The Lambda runtime %s is deprecated. Please upgrade to a supported Lambda runtime such as %s.",
1093
                deprecated_runtime,
1094
                latest_runtime,
1095
            )
1096
            raise InvalidParameterValueException(
1✔
1097
                f"The runtime parameter of {deprecated_runtime} is no longer supported for creating or updating AWS Lambda functions. We recommend you use a supported runtime while creating or updating functions.",
1098
                Type="User",
1099
            )
1100

1101
    @handler(operation="UpdateFunctionConfiguration", expand=False)
1✔
1102
    def update_function_configuration(
1✔
1103
        self, context: RequestContext, request: UpdateFunctionConfigurationRequest
1104
    ) -> FunctionConfiguration:
1105
        """updates the $LATEST version of the function"""
1106
        function_name = request.get("FunctionName")
1✔
1107

1108
        # in case we got ARN or partial ARN
1109
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1110
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1111
        state = lambda_stores[account_id][region]
1✔
1112

1113
        if function_name not in state.functions:
1✔
1114
            raise ResourceNotFoundException(
×
1115
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1116
                Type="User",
1117
            )
1118
        function = state.functions[function_name]
1✔
1119

1120
        # TODO: lock modification of latest version
1121
        # TODO: notify service for changes relevant to re-provisioning of $LATEST
1122
        latest_version = function.latest()
1✔
1123
        latest_version_config = latest_version.config
1✔
1124

1125
        revision_id = request.get("RevisionId")
1✔
1126
        if revision_id and revision_id != latest_version.config.revision_id:
1✔
1127
            raise PreconditionFailedException(
1✔
1128
                "The Revision Id provided does not match the latest Revision Id. "
1129
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1130
                Type="User",
1131
            )
1132

1133
        replace_kwargs = {}
1✔
1134
        if "EphemeralStorage" in request:
1✔
1135
            replace_kwargs["ephemeral_storage"] = LambdaEphemeralStorage(
×
1136
                request.get("EphemeralStorage", {}).get("Size", 512)
1137
            )  # TODO: do defaults here apply as well?
1138

1139
        if "Role" in request:
1✔
1140
            if not api_utils.is_role_arn(request["Role"]):
1✔
1141
                raise ValidationException(
1✔
1142
                    f"1 validation error detected: Value '{request.get('Role')}'"
1143
                    + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
1144
                )
1145
            replace_kwargs["role"] = request["Role"]
1✔
1146

1147
        if "Description" in request:
1✔
1148
            replace_kwargs["description"] = request["Description"]
1✔
1149

1150
        if "Timeout" in request:
1✔
1151
            replace_kwargs["timeout"] = request["Timeout"]
1✔
1152

1153
        if "MemorySize" in request:
1✔
1154
            replace_kwargs["memory_size"] = request["MemorySize"]
1✔
1155

1156
        if "DeadLetterConfig" in request:
1✔
1157
            replace_kwargs["dead_letter_arn"] = request.get("DeadLetterConfig", {}).get("TargetArn")
1✔
1158

1159
        if vpc_config := request.get("VpcConfig"):
1✔
1160
            replace_kwargs["vpc_config"] = self._build_vpc_config(account_id, region, vpc_config)
1✔
1161

1162
        if "Handler" in request:
1✔
1163
            replace_kwargs["handler"] = request["Handler"]
1✔
1164

1165
        if "Runtime" in request:
1✔
1166
            runtime = request["Runtime"]
1✔
1167

1168
            if runtime not in ALL_RUNTIMES:
1✔
1169
                raise InvalidParameterValueException(
1✔
1170
                    f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1171
                    Type="User",
1172
                )
1173
            if runtime in DEPRECATED_RUNTIMES:
1✔
1174
                LOG.warning(
×
1175
                    "The Lambda runtime %s is deprecated. "
1176
                    "Please upgrade the runtime for the function %s: "
1177
                    "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
1178
                    runtime,
1179
                    function_name,
1180
                )
1181
            replace_kwargs["runtime"] = request["Runtime"]
1✔
1182

1183
        if snap_start := request.get("SnapStart"):
1✔
1184
            runtime = replace_kwargs.get("runtime") or latest_version_config.runtime
1✔
1185
            self._validate_snapstart(snap_start, runtime)
1✔
1186
            replace_kwargs["snap_start"] = SnapStartResponse(
1✔
1187
                ApplyOn=snap_start.get("ApplyOn", SnapStartApplyOn.None_),
1188
                OptimizationStatus=SnapStartOptimizationStatus.Off,
1189
            )
1190

1191
        if "Environment" in request:
1✔
1192
            if env_vars := request.get("Environment", {}).get("Variables", {}):
1✔
1193
                self._verify_env_variables(env_vars)
1✔
1194
            replace_kwargs["environment"] = env_vars
1✔
1195

1196
        if "Layers" in request:
1✔
1197
            new_layers = request["Layers"]
1✔
1198
            if new_layers:
1✔
1199
                self._validate_layers(new_layers, region=region, account_id=account_id)
1✔
1200
            replace_kwargs["layers"] = self.map_layers(new_layers)
1✔
1201

1202
        if "ImageConfig" in request:
1✔
1203
            new_image_config = request["ImageConfig"]
1✔
1204
            replace_kwargs["image_config"] = ImageConfig(
1✔
1205
                command=new_image_config.get("Command"),
1206
                entrypoint=new_image_config.get("EntryPoint"),
1207
                working_directory=new_image_config.get("WorkingDirectory"),
1208
            )
1209

1210
        if "LoggingConfig" in request:
1✔
1211
            logging_config = request["LoggingConfig"]
1✔
1212
            LOG.warning(
1✔
1213
                "Advanced Lambda Logging Configuration is currently mocked "
1214
                "and will not impact the logging behavior. "
1215
                "Please create a feature request if needed."
1216
            )
1217

1218
            # when switching to JSON, app and system level log is auto set to INFO
1219
            if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
1220
                logging_config = {
1✔
1221
                    "ApplicationLogLevel": "INFO",
1222
                    "SystemLogLevel": "INFO",
1223
                } | logging_config
1224

1225
            last_config = latest_version_config.logging_config
1✔
1226

1227
            # add partial update
1228
            new_logging_config = last_config | logging_config
1✔
1229

1230
            # in case we switched from JSON to Text we need to remove LogLevel keys
1231
            if (
1✔
1232
                new_logging_config.get("LogFormat") == LogFormat.Text
1233
                and last_config.get("LogFormat") == LogFormat.JSON
1234
            ):
1235
                new_logging_config.pop("ApplicationLogLevel", None)
1✔
1236
                new_logging_config.pop("SystemLogLevel", None)
1✔
1237

1238
            replace_kwargs["logging_config"] = new_logging_config
1✔
1239

1240
        if "TracingConfig" in request:
1✔
1241
            new_mode = request.get("TracingConfig", {}).get("Mode")
×
1242
            if new_mode:
×
1243
                replace_kwargs["tracing_config_mode"] = new_mode
×
1244

1245
        new_latest_version = dataclasses.replace(
1✔
1246
            latest_version,
1247
            config=dataclasses.replace(
1248
                latest_version_config,
1249
                last_modified=api_utils.generate_lambda_date(),
1250
                internal_revision=short_uid(),
1251
                last_update=UpdateStatus(
1252
                    status=LastUpdateStatus.InProgress,
1253
                    code="Creating",
1254
                    reason="The function is being created.",
1255
                ),
1256
                **replace_kwargs,
1257
            ),
1258
        )
1259
        function.versions["$LATEST"] = new_latest_version  # TODO: notify
1✔
1260
        self.lambda_service.update_version(new_version=new_latest_version)
1✔
1261

1262
        return api_utils.map_config_out(new_latest_version)
1✔
1263

1264
    @handler(operation="UpdateFunctionCode", expand=False)
1✔
1265
    def update_function_code(
1✔
1266
        self, context: RequestContext, request: UpdateFunctionCodeRequest
1267
    ) -> FunctionConfiguration:
1268
        """updates the $LATEST version of the function"""
1269
        # only supports normal zip packaging atm
1270
        # if request.get("Publish"):
1271
        #     self.lambda_service.create_function_version()
1272

1273
        function_name = request.get("FunctionName")
1✔
1274
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1275
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1276

1277
        store = lambda_stores[account_id][region]
1✔
1278
        if function_name not in store.functions:
1✔
1279
            raise ResourceNotFoundException(
×
1280
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1281
                Type="User",
1282
            )
1283
        function = store.functions[function_name]
1✔
1284

1285
        revision_id = request.get("RevisionId")
1✔
1286
        if revision_id and revision_id != function.latest().config.revision_id:
1✔
1287
            raise PreconditionFailedException(
1✔
1288
                "The Revision Id provided does not match the latest Revision Id. "
1289
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1290
                Type="User",
1291
            )
1292

1293
        # TODO verify if correct combination of code is set
1294
        image = None
1✔
1295
        if (
1✔
1296
            request.get("ZipFile") or request.get("S3Bucket")
1297
        ) and function.latest().config.package_type == PackageType.Image:
1298
            raise InvalidParameterValueException(
1✔
1299
                "Please provide ImageUri when updating a function with packageType Image.",
1300
                Type="User",
1301
            )
1302
        elif request.get("ImageUri") and function.latest().config.package_type == PackageType.Zip:
1✔
1303
            raise InvalidParameterValueException(
1✔
1304
                "Please don't provide ImageUri when updating a function with packageType Zip.",
1305
                Type="User",
1306
            )
1307

1308
        if zip_file := request.get("ZipFile"):
1✔
1309
            code = store_lambda_archive(
1✔
1310
                archive_file=zip_file,
1311
                function_name=function_name,
1312
                region_name=region,
1313
                account_id=account_id,
1314
            )
1315
        elif s3_bucket := request.get("S3Bucket"):
1✔
1316
            s3_key = request["S3Key"]
1✔
1317
            s3_object_version = request.get("S3ObjectVersion")
1✔
1318
            code = store_s3_bucket_archive(
1✔
1319
                archive_bucket=s3_bucket,
1320
                archive_key=s3_key,
1321
                archive_version=s3_object_version,
1322
                function_name=function_name,
1323
                region_name=region,
1324
                account_id=account_id,
1325
            )
1326
        elif image := request.get("ImageUri"):
1✔
1327
            code = None
1✔
1328
            image = create_image_code(image_uri=image)
1✔
1329
        else:
1330
            raise LambdaServiceException("Gotta have s3 bucket or zip file or image")
×
1331

1332
        old_function_version = function.versions.get("$LATEST")
1✔
1333
        replace_kwargs = {"code": code} if code else {"image": image}
1✔
1334

1335
        if architectures := request.get("Architectures"):
1✔
1336
            if len(architectures) != 1:
×
1337
                raise ValidationException(
×
1338
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1339
                    f"satisfy constraint: Member must have length less than or equal to 1",
1340
                )
1341
            # An empty list of architectures is also forbidden. Further exceptions are tested here for create_function:
1342
            # tests.aws.services.lambda_.test_lambda_api.TestLambdaFunction.test_create_lambda_exceptions
1343
            if architectures[0] not in ARCHITECTURES:
×
1344
                raise ValidationException(
×
1345
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1346
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
1347
                    f"[x86_64, arm64], Member must not be null]",
1348
                )
1349
            replace_kwargs["architectures"] = architectures
×
1350

1351
        config = dataclasses.replace(
1✔
1352
            old_function_version.config,
1353
            internal_revision=short_uid(),
1354
            last_modified=api_utils.generate_lambda_date(),
1355
            last_update=UpdateStatus(
1356
                status=LastUpdateStatus.InProgress,
1357
                code="Creating",
1358
                reason="The function is being created.",
1359
            ),
1360
            **replace_kwargs,
1361
        )
1362
        function_version = dataclasses.replace(old_function_version, config=config)
1✔
1363
        function.versions["$LATEST"] = function_version
1✔
1364

1365
        self.lambda_service.update_version(new_version=function_version)
1✔
1366
        if request.get("Publish"):
1✔
1367
            function_version = self._publish_version_with_changes(
1✔
1368
                function_name=function_name, region=region, account_id=account_id
1369
            )
1370
        return api_utils.map_config_out(
1✔
1371
            function_version, return_qualified_arn=bool(request.get("Publish"))
1372
        )
1373

1374
    # TODO: does deleting the latest published version affect the next versions number?
1375
    # TODO: what happens when we call this with a qualifier and a fully qualified ARN? (+ conflicts?)
1376
    # TODO: test different ARN patterns (shorthand ARN?)
1377
    # TODO: test deleting across regions?
1378
    # TODO: test mismatch between context region and region in ARN
1379
    # TODO: test qualifier $LATEST, alias-name and version
1380
    def delete_function(
1✔
1381
        self,
1382
        context: RequestContext,
1383
        function_name: FunctionName,
1384
        qualifier: Qualifier = None,
1385
        **kwargs,
1386
    ) -> None:
1387
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1388
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1389
            function_name, qualifier, context
1390
        )
1391

1392
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1393
            raise InvalidParameterValueException(
×
1394
                "Deletion of aliases is not currently supported.",
1395
                Type="User",
1396
            )
1397

1398
        store = lambda_stores[account_id][region]
1✔
1399
        if qualifier == "$LATEST":
1✔
1400
            raise InvalidParameterValueException(
1✔
1401
                "$LATEST version cannot be deleted without deleting the function.", Type="User"
1402
            )
1403

1404
        if function_name not in store.functions:
1✔
1405
            e = ResourceNotFoundException(
1✔
1406
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1407
                Type="User",
1408
            )
1409
            raise e
1✔
1410
        function = store.functions.get(function_name)
1✔
1411

1412
        if qualifier:
1✔
1413
            # delete a version of the function
1414
            version = function.versions.pop(qualifier, None)
1✔
1415
            if version:
1✔
1416
                self.lambda_service.stop_version(version.id.qualified_arn())
1✔
1417
                destroy_code_if_not_used(code=version.config.code, function=function)
1✔
1418
        else:
1419
            # delete the whole function
1420
            # TODO: introduce locking for safe deletion: We could create a new version at the API layer before
1421
            #  the old version gets cleaned up in the internal lambda service.
1422
            function = store.functions.pop(function_name)
1✔
1423
            for version in function.versions.values():
1✔
1424
                self.lambda_service.stop_version(qualified_arn=version.id.qualified_arn())
1✔
1425
                # we can safely destroy the code here
1426
                if version.config.code:
1✔
1427
                    version.config.code.destroy()
1✔
1428

1429
    def list_functions(
1✔
1430
        self,
1431
        context: RequestContext,
1432
        master_region: MasterRegion = None,  # (only relevant for lambda@edge)
1433
        function_version: FunctionVersionApi = None,
1434
        marker: String = None,
1435
        max_items: MaxListItems = None,
1436
        **kwargs,
1437
    ) -> ListFunctionsResponse:
1438
        state = lambda_stores[context.account_id][context.region]
1✔
1439

1440
        if function_version and function_version != FunctionVersionApi.ALL:
1✔
1441
            raise ValidationException(
1✔
1442
                f"1 validation error detected: Value '{function_version}'"
1443
                + " at 'functionVersion' failed to satisfy constraint: Member must satisfy enum value set: [ALL]"
1444
            )
1445

1446
        if function_version == FunctionVersionApi.ALL:
1✔
1447
            # include all versions for all function
1448
            versions = [v for f in state.functions.values() for v in f.versions.values()]
1✔
1449
            return_qualified_arn = True
1✔
1450
        else:
1451
            versions = [f.latest() for f in state.functions.values()]
1✔
1452
            return_qualified_arn = False
1✔
1453

1454
        versions = [
1✔
1455
            api_utils.map_to_list_response(
1456
                api_utils.map_config_out(fc, return_qualified_arn=return_qualified_arn)
1457
            )
1458
            for fc in versions
1459
        ]
1460
        versions = PaginatedList(versions)
1✔
1461
        page, token = versions.get_page(
1✔
1462
            lambda version: version["FunctionArn"],
1463
            marker,
1464
            max_items,
1465
        )
1466
        return ListFunctionsResponse(Functions=page, NextMarker=token)
1✔
1467

1468
    def get_function(
1✔
1469
        self,
1470
        context: RequestContext,
1471
        function_name: NamespacedFunctionName,
1472
        qualifier: Qualifier = None,
1473
        **kwargs,
1474
    ) -> GetFunctionResponse:
1475
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1476
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1477
            function_name, qualifier, context
1478
        )
1479

1480
        fn = lambda_stores[account_id][region].functions.get(function_name)
1✔
1481
        if fn is None:
1✔
1482
            if qualifier is None:
1✔
1483
                raise ResourceNotFoundException(
1✔
1484
                    f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
1485
                    Type="User",
1486
                )
1487
            else:
1488
                raise ResourceNotFoundException(
1✔
1489
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
1490
                    Type="User",
1491
                )
1492
        alias_name = None
1✔
1493
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1494
            if qualifier not in fn.aliases:
1✔
1495
                alias_arn = api_utils.qualified_lambda_arn(
1✔
1496
                    function_name, qualifier, account_id, region
1497
                )
1498
                raise ResourceNotFoundException(f"Function not found: {alias_arn}", Type="User")
1✔
1499
            alias_name = qualifier
1✔
1500
            qualifier = fn.aliases[alias_name].function_version
1✔
1501

1502
        version = get_function_version(
1✔
1503
            function_name=function_name,
1504
            qualifier=qualifier,
1505
            account_id=account_id,
1506
            region=region,
1507
        )
1508
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
1509
        additional_fields = {}
1✔
1510
        if tags:
1✔
1511
            additional_fields["Tags"] = tags
1✔
1512
        code_location = None
1✔
1513
        if code := version.config.code:
1✔
1514
            code_location = FunctionCodeLocation(
1✔
1515
                Location=code.generate_presigned_url(endpoint_url=config.external_service_url()),
1516
                RepositoryType="S3",
1517
            )
1518
        elif image := version.config.image:
1✔
1519
            code_location = FunctionCodeLocation(
1✔
1520
                ImageUri=image.image_uri,
1521
                RepositoryType=image.repository_type,
1522
                ResolvedImageUri=image.resolved_image_uri,
1523
            )
1524
        concurrency = None
1✔
1525
        if fn.reserved_concurrent_executions:
1✔
1526
            concurrency = Concurrency(
1✔
1527
                ReservedConcurrentExecutions=fn.reserved_concurrent_executions
1528
            )
1529

1530
        return GetFunctionResponse(
1✔
1531
            Configuration=api_utils.map_config_out(
1532
                version, return_qualified_arn=bool(qualifier), alias_name=alias_name
1533
            ),
1534
            Code=code_location,  # TODO
1535
            Concurrency=concurrency,
1536
            **additional_fields,
1537
        )
1538

1539
    def get_function_configuration(
1✔
1540
        self,
1541
        context: RequestContext,
1542
        function_name: NamespacedFunctionName,
1543
        qualifier: Qualifier = None,
1544
        **kwargs,
1545
    ) -> FunctionConfiguration:
1546
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1547
        # CAVE: THIS RETURN VALUE IS *NOT* THE SAME AS IN get_function (!) but seems to be only configuration part?
1548
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1549
            function_name, qualifier, context
1550
        )
1551
        version = get_function_version(
1✔
1552
            function_name=function_name,
1553
            qualifier=qualifier,
1554
            account_id=account_id,
1555
            region=region,
1556
        )
1557
        return api_utils.map_config_out(version, return_qualified_arn=bool(qualifier))
1✔
1558

1559
    def invoke(
1✔
1560
        self,
1561
        context: RequestContext,
1562
        function_name: NamespacedFunctionName,
1563
        invocation_type: InvocationType = None,
1564
        log_type: LogType = None,
1565
        client_context: String = None,
1566
        payload: IO[Blob] = None,
1567
        qualifier: Qualifier = None,
1568
        **kwargs,
1569
    ) -> InvocationResponse:
1570
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1571
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1572
            function_name, qualifier, context
1573
        )
1574

1575
        user_agent = context.request.user_agent.string
1✔
1576

1577
        time_before = time.perf_counter()
1✔
1578
        try:
1✔
1579
            invocation_result = self.lambda_service.invoke(
1✔
1580
                function_name=function_name,
1581
                qualifier=qualifier,
1582
                region=region,
1583
                account_id=account_id,
1584
                invocation_type=invocation_type,
1585
                client_context=client_context,
1586
                request_id=context.request_id,
1587
                trace_context=context.trace_context,
1588
                payload=payload.read() if payload else None,
1589
                user_agent=user_agent,
1590
            )
1591
        except ServiceException:
1✔
1592
            raise
1✔
1593
        except EnvironmentStartupTimeoutException as e:
1✔
1594
            raise LambdaServiceException(
1✔
1595
                f"[{context.request_id}] Timeout while starting up lambda environment for function {function_name}:{qualifier}"
1596
            ) from e
1597
        except Exception as e:
1✔
1598
            LOG.error(
1✔
1599
                "[%s] Error while invoking lambda %s",
1600
                context.request_id,
1601
                function_name,
1602
                exc_info=LOG.isEnabledFor(logging.DEBUG),
1603
            )
1604
            raise LambdaServiceException(
1✔
1605
                f"[{context.request_id}] Internal error while executing lambda {function_name}:{qualifier}. Caused by {type(e).__name__}: {e}"
1606
            ) from e
1607

1608
        if invocation_type == InvocationType.Event:
1✔
1609
            # This happens when invocation type is event
1610
            return InvocationResponse(StatusCode=202)
1✔
1611
        if invocation_type == InvocationType.DryRun:
1✔
1612
            # This happens when invocation type is dryrun
1613
            return InvocationResponse(StatusCode=204)
1✔
1614
        LOG.debug("Lambda invocation duration: %0.2fms", (time.perf_counter() - time_before) * 1000)
1✔
1615

1616
        response = InvocationResponse(
1✔
1617
            StatusCode=200,
1618
            Payload=invocation_result.payload,
1619
            ExecutedVersion=invocation_result.executed_version,
1620
        )
1621

1622
        if invocation_result.is_error:
1✔
1623
            response["FunctionError"] = "Unhandled"
1✔
1624

1625
        if log_type == LogType.Tail:
1✔
1626
            response["LogResult"] = to_str(
1✔
1627
                base64.b64encode(to_bytes(invocation_result.logs)[-4096:])
1628
            )
1629

1630
        return response
1✔
1631

1632
    # Version operations
1633
    def publish_version(
1✔
1634
        self,
1635
        context: RequestContext,
1636
        function_name: FunctionName,
1637
        code_sha256: String = None,
1638
        description: Description = None,
1639
        revision_id: String = None,
1640
        **kwargs,
1641
    ) -> FunctionConfiguration:
1642
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1643
        function_name = api_utils.get_function_name(function_name, context)
1✔
1644
        new_version = self._publish_version_from_existing_version(
1✔
1645
            function_name=function_name,
1646
            description=description,
1647
            account_id=account_id,
1648
            region=region,
1649
            revision_id=revision_id,
1650
            code_sha256=code_sha256,
1651
        )
1652
        return api_utils.map_config_out(new_version, return_qualified_arn=True)
1✔
1653

1654
    def list_versions_by_function(
1✔
1655
        self,
1656
        context: RequestContext,
1657
        function_name: NamespacedFunctionName,
1658
        marker: String = None,
1659
        max_items: MaxListItems = None,
1660
        **kwargs,
1661
    ) -> ListVersionsByFunctionResponse:
1662
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1663
        function_name = api_utils.get_function_name(function_name, context)
1✔
1664
        function = self._get_function(
1✔
1665
            function_name=function_name, region=region, account_id=account_id
1666
        )
1667
        versions = [
1✔
1668
            api_utils.map_to_list_response(
1669
                api_utils.map_config_out(version=version, return_qualified_arn=True)
1670
            )
1671
            for version in function.versions.values()
1672
        ]
1673
        items = PaginatedList(versions)
1✔
1674
        page, token = items.get_page(
1✔
1675
            lambda item: item,
1676
            marker,
1677
            max_items,
1678
        )
1679
        return ListVersionsByFunctionResponse(Versions=page, NextMarker=token)
1✔
1680

1681
    # Alias
1682

1683
    def _create_routing_config_model(
1✔
1684
        self, routing_config_dict: dict[str, float], function_version: FunctionVersion
1685
    ):
1686
        if len(routing_config_dict) > 1:
1✔
1687
            raise InvalidParameterValueException(
1✔
1688
                "Number of items in AdditionalVersionWeights cannot be greater than 1",
1689
                Type="User",
1690
            )
1691
        # should be exactly one item here, still iterating, might be supported in the future
1692
        for key, value in routing_config_dict.items():
1✔
1693
            if value < 0.0 or value >= 1.0:
1✔
1694
                raise ValidationException(
1✔
1695
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map value must satisfy constraint: [Member must have value less than or equal to 1.0, Member must have value greater than or equal to 0.0, Member must not be null]"
1696
                )
1697
            if key == function_version.id.qualifier:
1✔
1698
                raise InvalidParameterValueException(
1✔
1699
                    f"Invalid function version {function_version.id.qualifier}. Function version {function_version.id.qualifier} is already included in routing configuration.",
1700
                    Type="User",
1701
                )
1702
            # check if version target is latest, then no routing config is allowed
1703
            if function_version.id.qualifier == "$LATEST":
1✔
1704
                raise InvalidParameterValueException(
1✔
1705
                    "$LATEST is not supported for an alias pointing to more than 1 version"
1706
                )
1707
            if not api_utils.qualifier_is_version(key):
1✔
1708
                raise ValidationException(
1✔
1709
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map keys must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: [0-9]+, Member must not be null]"
1710
                )
1711

1712
            # checking if the version in the config exists
1713
            get_function_version(
1✔
1714
                function_name=function_version.id.function_name,
1715
                qualifier=key,
1716
                region=function_version.id.region,
1717
                account_id=function_version.id.account,
1718
            )
1719
        return AliasRoutingConfig(version_weights=routing_config_dict)
1✔
1720

1721
    def create_alias(
1✔
1722
        self,
1723
        context: RequestContext,
1724
        function_name: FunctionName,
1725
        name: Alias,
1726
        function_version: Version,
1727
        description: Description = None,
1728
        routing_config: AliasRoutingConfiguration = None,
1729
        **kwargs,
1730
    ) -> AliasConfiguration:
1731
        if not api_utils.qualifier_is_alias(name):
1✔
1732
            raise ValidationException(
1✔
1733
                f"1 validation error detected: Value '{name}' at 'name' failed to satisfy constraint: Member must satisfy regular expression pattern: (?!^[0-9]+$)([a-zA-Z0-9-_]+)"
1734
            )
1735

1736
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1737
        function_name = api_utils.get_function_name(function_name, context)
1✔
1738
        target_version = get_function_version(
1✔
1739
            function_name=function_name,
1740
            qualifier=function_version,
1741
            region=region,
1742
            account_id=account_id,
1743
        )
1744
        function = self._get_function(
1✔
1745
            function_name=function_name, region=region, account_id=account_id
1746
        )
1747
        # description is always present, if not specified it's an empty string
1748
        description = description or ""
1✔
1749
        with function.lock:
1✔
1750
            if existing_alias := function.aliases.get(name):
1✔
1751
                raise ResourceConflictException(
1✔
1752
                    f"Alias already exists: {api_utils.map_alias_out(alias=existing_alias, function=function)['AliasArn']}",
1753
                    Type="User",
1754
                )
1755
            # checking if the version exists
1756
            routing_configuration = None
1✔
1757
            if routing_config and (
1✔
1758
                routing_config_dict := routing_config.get("AdditionalVersionWeights")
1759
            ):
1760
                routing_configuration = self._create_routing_config_model(
1✔
1761
                    routing_config_dict, target_version
1762
                )
1763

1764
            alias = VersionAlias(
1✔
1765
                name=name,
1766
                function_version=function_version,
1767
                description=description,
1768
                routing_configuration=routing_configuration,
1769
            )
1770
            function.aliases[name] = alias
1✔
1771
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1772

1773
    def list_aliases(
1✔
1774
        self,
1775
        context: RequestContext,
1776
        function_name: FunctionName,
1777
        function_version: Version = None,
1778
        marker: String = None,
1779
        max_items: MaxListItems = None,
1780
        **kwargs,
1781
    ) -> ListAliasesResponse:
1782
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1783
        function_name = api_utils.get_function_name(function_name, context)
1✔
1784
        function = self._get_function(
1✔
1785
            function_name=function_name, region=region, account_id=account_id
1786
        )
1787
        aliases = [
1✔
1788
            api_utils.map_alias_out(alias, function)
1789
            for alias in function.aliases.values()
1790
            if function_version is None or alias.function_version == function_version
1791
        ]
1792

1793
        aliases = PaginatedList(aliases)
1✔
1794
        page, token = aliases.get_page(
1✔
1795
            lambda alias: alias["AliasArn"],
1796
            marker,
1797
            max_items,
1798
        )
1799

1800
        return ListAliasesResponse(Aliases=page, NextMarker=token)
1✔
1801

1802
    def delete_alias(
1✔
1803
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
1804
    ) -> None:
1805
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1806
        function_name = api_utils.get_function_name(function_name, context)
1✔
1807
        function = self._get_function(
1✔
1808
            function_name=function_name, region=region, account_id=account_id
1809
        )
1810
        version_alias = function.aliases.pop(name, None)
1✔
1811

1812
        # cleanup related resources
1813
        if name in function.provisioned_concurrency_configs:
1✔
1814
            function.provisioned_concurrency_configs.pop(name)
1✔
1815

1816
        # TODO: Allow for deactivating/unregistering specific Lambda URLs
1817
        if version_alias and name in function.function_url_configs:
1✔
1818
            url_config = function.function_url_configs.pop(name)
1✔
1819
            LOG.debug(
1✔
1820
                "Stopping aliased Lambda Function URL %s for %s",
1821
                url_config.url,
1822
                url_config.function_name,
1823
            )
1824

1825
    def get_alias(
1✔
1826
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
1827
    ) -> AliasConfiguration:
1828
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1829
        function_name = api_utils.get_function_name(function_name, context)
1✔
1830
        function = self._get_function(
1✔
1831
            function_name=function_name, region=region, account_id=account_id
1832
        )
1833
        if not (alias := function.aliases.get(name)):
1✔
1834
            raise ResourceNotFoundException(
1✔
1835
                f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name=function_name, qualifier=name, region=region, account=account_id)}",
1836
                Type="User",
1837
            )
1838
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1839

1840
    def update_alias(
1✔
1841
        self,
1842
        context: RequestContext,
1843
        function_name: FunctionName,
1844
        name: Alias,
1845
        function_version: Version = None,
1846
        description: Description = None,
1847
        routing_config: AliasRoutingConfiguration = None,
1848
        revision_id: String = None,
1849
        **kwargs,
1850
    ) -> AliasConfiguration:
1851
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1852
        function_name = api_utils.get_function_name(function_name, context)
1✔
1853
        function = self._get_function(
1✔
1854
            function_name=function_name, region=region, account_id=account_id
1855
        )
1856
        if not (alias := function.aliases.get(name)):
1✔
1857
            fn_arn = api_utils.qualified_lambda_arn(function_name, name, account_id, region)
1✔
1858
            raise ResourceNotFoundException(
1✔
1859
                f"Alias not found: {fn_arn}",
1860
                Type="User",
1861
            )
1862
        if revision_id and alias.revision_id != revision_id:
1✔
1863
            raise PreconditionFailedException(
1✔
1864
                "The Revision Id provided does not match the latest Revision Id. "
1865
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1866
                Type="User",
1867
            )
1868
        changes = {}
1✔
1869
        if function_version is not None:
1✔
1870
            changes |= {"function_version": function_version}
1✔
1871
        if description is not None:
1✔
1872
            changes |= {"description": description}
1✔
1873
        if routing_config is not None:
1✔
1874
            # if it is an empty dict or AdditionalVersionWeights is empty, set routing config to None
1875
            new_routing_config = None
1✔
1876
            if routing_config_dict := routing_config.get("AdditionalVersionWeights"):
1✔
1877
                new_routing_config = self._create_routing_config_model(routing_config_dict)
×
1878
            changes |= {"routing_configuration": new_routing_config}
1✔
1879
        # even if no changes are done, we have to update revision id for some reason
1880
        old_alias = alias
1✔
1881
        alias = dataclasses.replace(alias, **changes)
1✔
1882
        function.aliases[name] = alias
1✔
1883

1884
        # TODO: signal lambda service that pointer potentially changed
1885
        self.lambda_service.update_alias(old_alias=old_alias, new_alias=alias, function=function)
1✔
1886

1887
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1888

1889
    # =======================================
1890
    # ======= EVENT SOURCE MAPPINGS =========
1891
    # =======================================
1892
    def check_service_resource_exists(
1✔
1893
        self, service: str, resource_arn: str, function_arn: str, function_role_arn: str
1894
    ):
1895
        """
1896
        Check if the service resource exists and if the function has access to it.
1897

1898
        Raises:
1899
            InvalidParameterValueException: If the service resource does not exist or the function does not have access to it.
1900
        """
1901
        arn = parse_arn(resource_arn)
1✔
1902
        source_client = get_internal_client(
1✔
1903
            arn=resource_arn,
1904
            role_arn=function_role_arn,
1905
            service_principal=ServicePrincipal.lambda_,
1906
            source_arn=function_arn,
1907
        )
1908
        if service in ["sqs", "sqs-fifo"]:
1✔
1909
            try:
1✔
1910
                # AWS uses `GetQueueAttributes` internally to verify the queue existence, but we need the `QueueUrl`
1911
                # which is not given directly. We build out a dummy `QueueUrl` which can be parsed by SQS to return
1912
                # the right value
1913
                queue_name = arn["resource"].split("/")[-1]
1✔
1914
                queue_url = f"http://sqs.{arn['region']}.domain/{arn['account']}/{queue_name}"
1✔
1915
                source_client.get_queue_attributes(QueueUrl=queue_url)
1✔
1916
            except ClientError as e:
1✔
1917
                error_code = e.response["Error"]["Code"]
1✔
1918
                if error_code == "AWS.SimpleQueueService.NonExistentQueue":
1✔
1919
                    raise InvalidParameterValueException(
1✔
1920
                        f"Error occurred while ReceiveMessage. SQS Error Code: {error_code}. SQS Error Message: {e.response['Error']['Message']}",
1921
                        Type="User",
1922
                    )
1923
                raise e
×
1924
        elif service in ["kinesis"]:
1✔
1925
            try:
1✔
1926
                source_client.describe_stream(StreamARN=resource_arn)
1✔
1927
            except ClientError as e:
1✔
1928
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
1929
                    raise InvalidParameterValueException(
1✔
1930
                        f"Stream not found: {resource_arn}",
1931
                        Type="User",
1932
                    )
1933
                raise e
×
1934
        elif service in ["dynamodb"]:
1✔
1935
            try:
1✔
1936
                source_client.describe_stream(StreamArn=resource_arn)
1✔
1937
            except ClientError as e:
1✔
1938
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
1939
                    raise InvalidParameterValueException(
1✔
1940
                        f"Stream not found: {resource_arn}",
1941
                        Type="User",
1942
                    )
1943
                raise e
×
1944

1945
    @handler("CreateEventSourceMapping", expand=False)
1✔
1946
    def create_event_source_mapping(
1✔
1947
        self,
1948
        context: RequestContext,
1949
        request: CreateEventSourceMappingRequest,
1950
    ) -> EventSourceMappingConfiguration:
1951
        return self.create_event_source_mapping_v2(context, request)
1✔
1952

1953
    def create_event_source_mapping_v2(
1✔
1954
        self,
1955
        context: RequestContext,
1956
        request: CreateEventSourceMappingRequest,
1957
    ) -> EventSourceMappingConfiguration:
1958
        # Validations
1959
        function_arn, function_name, state, function_version, function_role = (
1✔
1960
            self.validate_event_source_mapping(context, request)
1961
        )
1962

1963
        esm_config = EsmConfigFactory(request, context, function_arn).get_esm_config()
1✔
1964

1965
        # Copy esm_config to avoid a race condition with potential async update in the store
1966
        state.event_source_mappings[esm_config["UUID"]] = esm_config.copy()
1✔
1967
        enabled = request.get("Enabled", True)
1✔
1968
        # TODO: check for potential async race condition update -> think about locking
1969
        esm_worker = EsmWorkerFactory(esm_config, function_role, enabled).get_esm_worker()
1✔
1970
        self.esm_workers[esm_worker.uuid] = esm_worker
1✔
1971
        # TODO: check StateTransitionReason, LastModified, LastProcessingResult (concurrent updates requires locking!)
1972
        if tags := request.get("Tags"):
1✔
1973
            self._store_tags(esm_config.get("EventSourceMappingArn"), tags)
1✔
1974
        esm_worker.create()
1✔
1975
        return esm_config
1✔
1976

1977
    def validate_event_source_mapping(self, context, request):
1✔
1978
        # TODO: test whether stream ARNs are valid sources for Pipes or ESM or whether only DynamoDB table ARNs work
1979
        # TODO: Validate MaxRecordAgeInSeconds (i.e cannot subceed 60s but can be -1) and MaxRetryAttempts parameters.
1980
        # See https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumrecordageinseconds
1981
        is_create_esm_request = context.operation.name == self.create_event_source_mapping.operation
1✔
1982

1983
        if destination_config := request.get("DestinationConfig"):
1✔
1984
            if "OnSuccess" in destination_config:
1✔
1985
                raise InvalidParameterValueException(
1✔
1986
                    "Unsupported DestinationConfig parameter for given event source mapping type.",
1987
                    Type="User",
1988
                )
1989

1990
        service = None
1✔
1991
        if "SelfManagedEventSource" in request:
1✔
1992
            service = "kafka"
×
1993
            if "SourceAccessConfigurations" not in request:
×
1994
                raise InvalidParameterValueException(
×
1995
                    "Required 'sourceAccessConfigurations' parameter is missing.", Type="User"
1996
                )
1997
        if service is None and "EventSourceArn" not in request:
1✔
1998
            raise InvalidParameterValueException("Unrecognized event source.", Type="User")
1✔
1999
        if service is None:
1✔
2000
            service = extract_service_from_arn(request["EventSourceArn"])
1✔
2001

2002
        batch_size = api_utils.validate_and_set_batch_size(service, request.get("BatchSize"))
1✔
2003
        if service in ["dynamodb", "kinesis"]:
1✔
2004
            starting_position = request.get("StartingPosition")
1✔
2005
            if not starting_position:
1✔
2006
                raise InvalidParameterValueException(
1✔
2007
                    "1 validation error detected: Value null at 'startingPosition' failed to satisfy constraint: Member must not be null.",
2008
                    Type="User",
2009
                )
2010

2011
            if starting_position not in KinesisStreamStartPosition.__members__:
1✔
2012
                raise ValidationException(
1✔
2013
                    f"1 validation error detected: Value '{starting_position}' at 'startingPosition' failed to satisfy constraint: Member must satisfy enum value set: [LATEST, AT_TIMESTAMP, TRIM_HORIZON]"
2014
                )
2015
            # AT_TIMESTAMP is not allowed for DynamoDB Streams
2016
            elif (
1✔
2017
                service == "dynamodb"
2018
                and starting_position not in DynamoDBStreamStartPosition.__members__
2019
            ):
2020
                raise InvalidParameterValueException(
1✔
2021
                    f"Unsupported starting position for arn type: {request['EventSourceArn']}",
2022
                    Type="User",
2023
                )
2024

2025
        if service in ["sqs", "sqs-fifo"]:
1✔
2026
            if batch_size > 10 and request.get("MaximumBatchingWindowInSeconds", 0) == 0:
1✔
2027
                raise InvalidParameterValueException(
1✔
2028
                    "Maximum batch window in seconds must be greater than 0 if maximum batch size is greater than 10",
2029
                    Type="User",
2030
                )
2031

2032
        if (filter_criteria := request.get("FilterCriteria")) is not None:
1✔
2033
            for filter_ in filter_criteria.get("Filters", []):
1✔
2034
                pattern_str = filter_.get("Pattern")
1✔
2035
                if not pattern_str or not isinstance(pattern_str, str):
1✔
2036
                    raise InvalidParameterValueException(
×
2037
                        "Invalid filter pattern definition.", Type="User"
2038
                    )
2039

2040
                if not validate_event_pattern(pattern_str):
1✔
2041
                    raise InvalidParameterValueException(
1✔
2042
                        "Invalid filter pattern definition.", Type="User"
2043
                    )
2044

2045
        # Can either have a FunctionName (i.e CreateEventSourceMapping request) or
2046
        # an internal EventSourceMappingConfiguration representation
2047
        request_function_name = request.get("FunctionName") or request.get("FunctionArn")
1✔
2048
        # can be either a partial arn or a full arn for the version/alias
2049
        function_name, qualifier, account, region = function_locators_from_arn(
1✔
2050
            request_function_name
2051
        )
2052
        # TODO: validate `context.region` vs. `region(request["FunctionName"])` vs. `region(request["EventSourceArn"])`
2053
        account = account or context.account_id
1✔
2054
        region = region or context.region
1✔
2055
        state = lambda_stores[account][region]
1✔
2056
        fn = state.functions.get(function_name)
1✔
2057
        if not fn:
1✔
2058
            raise InvalidParameterValueException("Function does not exist", Type="User")
1✔
2059

2060
        if qualifier:
1✔
2061
            # make sure the function version/alias exists
2062
            if api_utils.qualifier_is_alias(qualifier):
1✔
2063
                fn_alias = fn.aliases.get(qualifier)
1✔
2064
                if not fn_alias:
1✔
2065
                    raise Exception("unknown alias")  # TODO: cover via test
×
2066
            elif api_utils.qualifier_is_version(qualifier):
1✔
2067
                fn_version = fn.versions.get(qualifier)
1✔
2068
                if not fn_version:
1✔
2069
                    raise Exception("unknown version")  # TODO: cover via test
×
2070
            elif qualifier == "$LATEST":
1✔
2071
                pass
1✔
2072
            else:
2073
                raise Exception("invalid functionname")  # TODO: cover via test
×
2074
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account, region)
1✔
2075

2076
        else:
2077
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account, region)
1✔
2078

2079
        function_version = get_function_version_from_arn(fn_arn)
1✔
2080
        function_role = function_version.config.role
1✔
2081

2082
        if source_arn := request.get("EventSourceArn"):
1✔
2083
            self.check_service_resource_exists(service, source_arn, fn_arn, function_role)
1✔
2084
        # Check we are validating a CreateEventSourceMapping request
2085
        if is_create_esm_request:
1✔
2086

2087
            def _get_mapping_sources(mapping: dict[str, Any]) -> list[str]:
1✔
2088
                if event_source_arn := mapping.get("EventSourceArn"):
1✔
2089
                    return [event_source_arn]
1✔
2090
                return (
×
2091
                    mapping.get("SelfManagedEventSource", {})
2092
                    .get("Endpoints", {})
2093
                    .get("KAFKA_BOOTSTRAP_SERVERS", [])
2094
                )
2095

2096
            # check for event source duplicates
2097
            # TODO: currently validated for sqs, kinesis, and dynamodb
2098
            service_id = load_service(service).service_id
1✔
2099
            for uuid, mapping in state.event_source_mappings.items():
1✔
2100
                mapping_sources = _get_mapping_sources(mapping)
1✔
2101
                request_sources = _get_mapping_sources(request)
1✔
2102
                if mapping["FunctionArn"] == fn_arn and (
1✔
2103
                    set(mapping_sources).intersection(request_sources)
2104
                ):
2105
                    if service == "sqs":
1✔
2106
                        # *shakes fist at SQS*
2107
                        raise ResourceConflictException(
1✔
2108
                            f'An event source mapping with {service_id} arn (" {mapping["EventSourceArn"]} ") '
2109
                            f'and function (" {function_name} ") already exists. Please update or delete the '
2110
                            f"existing mapping with UUID {uuid}",
2111
                            Type="User",
2112
                        )
2113
                    elif service == "kafka":
1✔
2114
                        if set(mapping["Topics"]).intersection(request["Topics"]):
×
2115
                            raise ResourceConflictException(
×
2116
                                f'An event source mapping with event source ("{",".join(request_sources)}"), '
2117
                                f'function ("{fn_arn}"), '
2118
                                f'topics ("{",".join(request["Topics"])}") already exists. Please update or delete the '
2119
                                f"existing mapping with UUID {uuid}",
2120
                                Type="User",
2121
                            )
2122
                    else:
2123
                        raise ResourceConflictException(
1✔
2124
                            f'The event source arn (" {mapping["EventSourceArn"]} ") and function '
2125
                            f'(" {function_name} ") provided mapping already exists. Please update or delete the '
2126
                            f"existing mapping with UUID {uuid}",
2127
                            Type="User",
2128
                        )
2129
        return fn_arn, function_name, state, function_version, function_role
1✔
2130

2131
    @handler("UpdateEventSourceMapping", expand=False)
1✔
2132
    def update_event_source_mapping(
1✔
2133
        self,
2134
        context: RequestContext,
2135
        request: UpdateEventSourceMappingRequest,
2136
    ) -> EventSourceMappingConfiguration:
2137
        return self.update_event_source_mapping_v2(context, request)
1✔
2138

2139
    def update_event_source_mapping_v2(
1✔
2140
        self,
2141
        context: RequestContext,
2142
        request: UpdateEventSourceMappingRequest,
2143
    ) -> EventSourceMappingConfiguration:
2144
        # TODO: test and implement this properly (quite complex with many validations and limitations!)
2145
        LOG.warning(
1✔
2146
            "Updating Lambda Event Source Mapping is in experimental state and not yet fully tested."
2147
        )
2148
        state = lambda_stores[context.account_id][context.region]
1✔
2149
        request_data = {**request}
1✔
2150
        uuid = request_data.pop("UUID", None)
1✔
2151
        if not uuid:
1✔
2152
            raise ResourceNotFoundException(
×
2153
                "The resource you requested does not exist.", Type="User"
2154
            )
2155
        old_event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2156
        esm_worker = self.esm_workers.get(uuid)
1✔
2157
        if old_event_source_mapping is None or esm_worker is None:
1✔
2158
            raise ResourceNotFoundException(
1✔
2159
                "The resource you requested does not exist.", Type="User"
2160
            )  # TODO: test?
2161

2162
        # normalize values to overwrite
2163
        event_source_mapping = old_event_source_mapping | request_data
1✔
2164

2165
        temp_params = {}  # values only set for the returned response, not saved internally (e.g. transient state)
1✔
2166

2167
        # Validate the newly updated ESM object. We ignore the output here since we only care whether an Exception is raised.
2168
        function_arn, _, _, function_version, function_role = self.validate_event_source_mapping(
1✔
2169
            context, event_source_mapping
2170
        )
2171

2172
        # remove the FunctionName field
2173
        event_source_mapping.pop("FunctionName", None)
1✔
2174

2175
        if function_arn:
1✔
2176
            event_source_mapping["FunctionArn"] = function_arn
1✔
2177

2178
        # Only apply update if the desired state differs
2179
        enabled = request.get("Enabled")
1✔
2180
        if enabled is not None:
1✔
2181
            if enabled and old_event_source_mapping["State"] != EsmState.ENABLED:
1✔
2182
                event_source_mapping["State"] = EsmState.ENABLING
1✔
2183
            # TODO: What happens when trying to update during an update or failed state?!
2184
            elif not enabled and old_event_source_mapping["State"] == EsmState.ENABLED:
1✔
2185
                event_source_mapping["State"] = EsmState.DISABLING
1✔
2186
        else:
2187
            event_source_mapping["State"] = EsmState.UPDATING
1✔
2188

2189
        # To ensure parity, certain responses need to be immediately returned
2190
        temp_params["State"] = event_source_mapping["State"]
1✔
2191

2192
        state.event_source_mappings[uuid] = event_source_mapping
1✔
2193

2194
        # TODO: Currently, we re-create the entire ESM worker. Look into approach with better performance.
2195
        worker_factory = EsmWorkerFactory(
1✔
2196
            event_source_mapping, function_role, request.get("Enabled", esm_worker.enabled)
2197
        )
2198

2199
        # Get a new ESM worker object but do not active it, since the factory holds all logic for creating new worker from configuration.
2200
        updated_esm_worker = worker_factory.get_esm_worker()
1✔
2201
        self.esm_workers[uuid] = updated_esm_worker
1✔
2202

2203
        # We should stop() the worker since the delete() will remove the ESM from the state mapping.
2204
        esm_worker.stop()
1✔
2205
        # This will either create an EsmWorker in the CREATING state if enabled. Otherwise, the DISABLING state is set.
2206
        updated_esm_worker.create()
1✔
2207

2208
        return {**event_source_mapping, **temp_params}
1✔
2209

2210
    def delete_event_source_mapping(
1✔
2211
        self, context: RequestContext, uuid: String, **kwargs
2212
    ) -> EventSourceMappingConfiguration:
2213
        state = lambda_stores[context.account_id][context.region]
1✔
2214
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2215
        if not event_source_mapping:
1✔
2216
            raise ResourceNotFoundException(
1✔
2217
                "The resource you requested does not exist.", Type="User"
2218
            )
2219
        esm = state.event_source_mappings[uuid]
1✔
2220
        # TODO: add proper locking
2221
        esm_worker = self.esm_workers.pop(uuid, None)
1✔
2222
        # Asynchronous delete in v2
2223
        if not esm_worker:
1✔
2224
            raise ResourceNotFoundException(
×
2225
                "The resource you requested does not exist.", Type="User"
2226
            )
2227
        esm_worker.delete()
1✔
2228
        return {**esm, "State": EsmState.DELETING}
1✔
2229

2230
    def get_event_source_mapping(
1✔
2231
        self, context: RequestContext, uuid: String, **kwargs
2232
    ) -> EventSourceMappingConfiguration:
2233
        state = lambda_stores[context.account_id][context.region]
1✔
2234
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2235
        if not event_source_mapping:
1✔
2236
            raise ResourceNotFoundException(
1✔
2237
                "The resource you requested does not exist.", Type="User"
2238
            )
2239
        esm_worker = self.esm_workers.get(uuid)
1✔
2240
        if not esm_worker:
1✔
2241
            raise ResourceNotFoundException(
×
2242
                "The resource you requested does not exist.", Type="User"
2243
            )
2244
        event_source_mapping["State"] = esm_worker.current_state
1✔
2245
        event_source_mapping["StateTransitionReason"] = esm_worker.state_transition_reason
1✔
2246
        return event_source_mapping
1✔
2247

2248
    def list_event_source_mappings(
1✔
2249
        self,
2250
        context: RequestContext,
2251
        event_source_arn: Arn = None,
2252
        function_name: FunctionName = None,
2253
        marker: String = None,
2254
        max_items: MaxListItems = None,
2255
        **kwargs,
2256
    ) -> ListEventSourceMappingsResponse:
2257
        state = lambda_stores[context.account_id][context.region]
1✔
2258

2259
        esms = state.event_source_mappings.values()
1✔
2260
        # TODO: update and test State and StateTransitionReason for ESM v2
2261

2262
        if event_source_arn:  # TODO: validate pattern
1✔
2263
            esms = [e for e in esms if e.get("EventSourceArn") == event_source_arn]
1✔
2264

2265
        if function_name:
1✔
2266
            esms = [e for e in esms if function_name in e["FunctionArn"]]
1✔
2267

2268
        esms = PaginatedList(esms)
1✔
2269
        page, token = esms.get_page(
1✔
2270
            lambda x: x["UUID"],
2271
            marker,
2272
            max_items,
2273
        )
2274
        return ListEventSourceMappingsResponse(EventSourceMappings=page, NextMarker=token)
1✔
2275

2276
    def get_source_type_from_request(self, request: dict[str, Any]) -> str:
1✔
2277
        if event_source_arn := request.get("EventSourceArn", ""):
×
2278
            service = extract_service_from_arn(event_source_arn)
×
2279
            if service == "sqs" and "fifo" in event_source_arn:
×
2280
                service = "sqs-fifo"
×
2281
            return service
×
2282
        elif request.get("SelfManagedEventSource"):
×
2283
            return "kafka"
×
2284

2285
    # =======================================
2286
    # ============ FUNCTION URLS ============
2287
    # =======================================
2288

2289
    @staticmethod
1✔
2290
    def _validate_qualifier(qualifier: str) -> None:
1✔
2291
        if qualifier == "$LATEST" or (qualifier and api_utils.qualifier_is_version(qualifier)):
1✔
2292
            raise ValidationException(
1✔
2293
                f"1 validation error detected: Value '{qualifier}' at 'qualifier' failed to satisfy constraint: Member must satisfy regular expression pattern: ((?!^\\d+$)^[0-9a-zA-Z-_]+$)"
2294
            )
2295

2296
    @staticmethod
1✔
2297
    def _validate_invoke_mode(invoke_mode: str) -> None:
1✔
2298
        if invoke_mode and invoke_mode not in [InvokeMode.BUFFERED, InvokeMode.RESPONSE_STREAM]:
1✔
2299
            raise ValidationException(
1✔
2300
                f"1 validation error detected: Value '{invoke_mode}' at 'invokeMode' failed to satisfy constraint: Member must satisfy enum value set: [RESPONSE_STREAM, BUFFERED]"
2301
            )
2302
        if invoke_mode == InvokeMode.RESPONSE_STREAM:
1✔
2303
            # TODO should we actually fail for setting RESPONSE_STREAM?
2304
            #  It should trigger InvokeWithResponseStream which is not implemented
2305
            LOG.warning(
1✔
2306
                "The invokeMode 'RESPONSE_STREAM' is not yet supported on LocalStack. The property is only mocked, the execution will still be 'BUFFERED'"
2307
            )
2308

2309
    # TODO: what happens if function state is not active?
2310
    def create_function_url_config(
1✔
2311
        self,
2312
        context: RequestContext,
2313
        function_name: FunctionName,
2314
        auth_type: FunctionUrlAuthType,
2315
        qualifier: FunctionUrlQualifier = None,
2316
        cors: Cors = None,
2317
        invoke_mode: InvokeMode = None,
2318
        **kwargs,
2319
    ) -> CreateFunctionUrlConfigResponse:
2320
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2321
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2322
            function_name, qualifier, context
2323
        )
2324
        state = lambda_stores[account_id][region]
1✔
2325
        self._validate_qualifier(qualifier)
1✔
2326
        self._validate_invoke_mode(invoke_mode)
1✔
2327

2328
        fn = state.functions.get(function_name)
1✔
2329
        if fn is None:
1✔
2330
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2331

2332
        url_config = fn.function_url_configs.get(qualifier or "$LATEST")
1✔
2333
        if url_config:
1✔
2334
            raise ResourceConflictException(
1✔
2335
                f"Failed to create function url config for [functionArn = {url_config.function_arn}]. Error message:  FunctionUrlConfig exists for this Lambda function",
2336
                Type="User",
2337
            )
2338

2339
        if qualifier and qualifier != "$LATEST" and qualifier not in fn.aliases:
1✔
2340
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2341

2342
        normalized_qualifier = qualifier or "$LATEST"
1✔
2343

2344
        function_arn = (
1✔
2345
            api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
2346
            if qualifier
2347
            else api_utils.unqualified_lambda_arn(function_name, account_id, region)
2348
        )
2349

2350
        custom_id: str | None = None
1✔
2351

2352
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
2353
        if TAG_KEY_CUSTOM_URL in tags:
1✔
2354
            # Note: I really wanted to add verification here that the
2355
            # url_id is unique, so we could surface that to the user ASAP.
2356
            # However, it seems like that information isn't available yet,
2357
            # since (as far as I can tell) we call
2358
            # self.router.register_routes() once, in a single shot, for all
2359
            # of the routes -- and we need to verify that it's unique not
2360
            # just for this particular lambda function, but for the entire
2361
            # lambda provider. Therefore... that idea proved non-trivial!
2362
            custom_id_tag_value = (
1✔
2363
                f"{tags[TAG_KEY_CUSTOM_URL]}-{qualifier}" if qualifier else tags[TAG_KEY_CUSTOM_URL]
2364
            )
2365
            if TAG_KEY_CUSTOM_URL_VALIDATOR.match(custom_id_tag_value):
1✔
2366
                custom_id = custom_id_tag_value
1✔
2367

2368
            else:
2369
                # Note: we're logging here instead of raising to prioritize
2370
                # strict parity with AWS over the localstack-only custom_id
2371
                LOG.warning(
1✔
2372
                    "Invalid custom ID tag value for lambda URL (%s=%s). "
2373
                    "Replaced with default (random id)",
2374
                    TAG_KEY_CUSTOM_URL,
2375
                    custom_id_tag_value,
2376
                )
2377

2378
        # The url_id is the subdomain used for the URL we're creating. This
2379
        # is either created randomly (as in AWS), or can be passed as a tag
2380
        # to the lambda itself (localstack-only).
2381
        url_id: str
2382
        if custom_id is None:
1✔
2383
            url_id = api_utils.generate_random_url_id()
1✔
2384
        else:
2385
            url_id = custom_id
1✔
2386

2387
        host_definition = localstack_host(custom_port=config.GATEWAY_LISTEN[0].port)
1✔
2388
        fn.function_url_configs[normalized_qualifier] = FunctionUrlConfig(
1✔
2389
            function_arn=function_arn,
2390
            function_name=function_name,
2391
            cors=cors,
2392
            url_id=url_id,
2393
            url=f"http://{url_id}.lambda-url.{context.region}.{host_definition.host_and_port()}/",  # TODO: https support
2394
            auth_type=auth_type,
2395
            creation_time=api_utils.generate_lambda_date(),
2396
            last_modified_time=api_utils.generate_lambda_date(),
2397
            invoke_mode=invoke_mode,
2398
        )
2399

2400
        # persist and start URL
2401
        # TODO: implement URL invoke
2402
        api_url_config = api_utils.map_function_url_config(
1✔
2403
            fn.function_url_configs[normalized_qualifier]
2404
        )
2405

2406
        return CreateFunctionUrlConfigResponse(
1✔
2407
            FunctionUrl=api_url_config["FunctionUrl"],
2408
            FunctionArn=api_url_config["FunctionArn"],
2409
            AuthType=api_url_config["AuthType"],
2410
            Cors=api_url_config["Cors"],
2411
            CreationTime=api_url_config["CreationTime"],
2412
            InvokeMode=api_url_config["InvokeMode"],
2413
        )
2414

2415
    def get_function_url_config(
1✔
2416
        self,
2417
        context: RequestContext,
2418
        function_name: FunctionName,
2419
        qualifier: FunctionUrlQualifier = None,
2420
        **kwargs,
2421
    ) -> GetFunctionUrlConfigResponse:
2422
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2423
        state = lambda_stores[account_id][region]
1✔
2424

2425
        fn_name, qualifier = api_utils.get_name_and_qualifier(function_name, qualifier, context)
1✔
2426

2427
        self._validate_qualifier(qualifier)
1✔
2428

2429
        resolved_fn = state.functions.get(fn_name)
1✔
2430
        if not resolved_fn:
1✔
2431
            raise ResourceNotFoundException(
1✔
2432
                "The resource you requested does not exist.", Type="User"
2433
            )
2434

2435
        qualifier = qualifier or "$LATEST"
1✔
2436
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2437
        if not url_config:
1✔
2438
            raise ResourceNotFoundException(
1✔
2439
                "The resource you requested does not exist.", Type="User"
2440
            )
2441

2442
        return api_utils.map_function_url_config(url_config)
1✔
2443

2444
    def update_function_url_config(
1✔
2445
        self,
2446
        context: RequestContext,
2447
        function_name: FunctionName,
2448
        qualifier: FunctionUrlQualifier = None,
2449
        auth_type: FunctionUrlAuthType = None,
2450
        cors: Cors = None,
2451
        invoke_mode: InvokeMode = None,
2452
        **kwargs,
2453
    ) -> UpdateFunctionUrlConfigResponse:
2454
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2455
        state = lambda_stores[account_id][region]
1✔
2456

2457
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2458
            function_name, qualifier, context
2459
        )
2460
        self._validate_qualifier(qualifier)
1✔
2461
        self._validate_invoke_mode(invoke_mode)
1✔
2462

2463
        fn = state.functions.get(function_name)
1✔
2464
        if not fn:
1✔
2465
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2466

2467
        normalized_qualifier = qualifier or "$LATEST"
1✔
2468

2469
        if (
1✔
2470
            api_utils.qualifier_is_alias(normalized_qualifier)
2471
            and normalized_qualifier not in fn.aliases
2472
        ):
2473
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2474

2475
        url_config = fn.function_url_configs.get(normalized_qualifier)
1✔
2476
        if not url_config:
1✔
2477
            raise ResourceNotFoundException(
1✔
2478
                "The resource you requested does not exist.", Type="User"
2479
            )
2480

2481
        changes = {
1✔
2482
            "last_modified_time": api_utils.generate_lambda_date(),
2483
            **({"cors": cors} if cors is not None else {}),
2484
            **({"auth_type": auth_type} if auth_type is not None else {}),
2485
        }
2486

2487
        if invoke_mode:
1✔
2488
            changes["invoke_mode"] = invoke_mode
1✔
2489

2490
        new_url_config = dataclasses.replace(url_config, **changes)
1✔
2491
        fn.function_url_configs[normalized_qualifier] = new_url_config
1✔
2492

2493
        return UpdateFunctionUrlConfigResponse(
1✔
2494
            FunctionUrl=new_url_config.url,
2495
            FunctionArn=new_url_config.function_arn,
2496
            AuthType=new_url_config.auth_type,
2497
            Cors=new_url_config.cors,
2498
            CreationTime=new_url_config.creation_time,
2499
            LastModifiedTime=new_url_config.last_modified_time,
2500
            InvokeMode=new_url_config.invoke_mode,
2501
        )
2502

2503
    def delete_function_url_config(
1✔
2504
        self,
2505
        context: RequestContext,
2506
        function_name: FunctionName,
2507
        qualifier: FunctionUrlQualifier = None,
2508
        **kwargs,
2509
    ) -> None:
2510
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2511
        state = lambda_stores[account_id][region]
1✔
2512

2513
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2514
            function_name, qualifier, context
2515
        )
2516
        self._validate_qualifier(qualifier)
1✔
2517

2518
        resolved_fn = state.functions.get(function_name)
1✔
2519
        if not resolved_fn:
1✔
2520
            raise ResourceNotFoundException(
1✔
2521
                "The resource you requested does not exist.", Type="User"
2522
            )
2523

2524
        qualifier = qualifier or "$LATEST"
1✔
2525
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2526
        if not url_config:
1✔
2527
            raise ResourceNotFoundException(
1✔
2528
                "The resource you requested does not exist.", Type="User"
2529
            )
2530

2531
        del resolved_fn.function_url_configs[qualifier]
1✔
2532

2533
    def list_function_url_configs(
1✔
2534
        self,
2535
        context: RequestContext,
2536
        function_name: FunctionName,
2537
        marker: String = None,
2538
        max_items: MaxItems = None,
2539
        **kwargs,
2540
    ) -> ListFunctionUrlConfigsResponse:
2541
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2542
        state = lambda_stores[account_id][region]
1✔
2543

2544
        fn_name = api_utils.get_function_name(function_name, context)
1✔
2545
        resolved_fn = state.functions.get(fn_name)
1✔
2546
        if not resolved_fn:
1✔
2547
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2548

2549
        url_configs = [
1✔
2550
            api_utils.map_function_url_config(fn_conf)
2551
            for fn_conf in resolved_fn.function_url_configs.values()
2552
        ]
2553
        url_configs = PaginatedList(url_configs)
1✔
2554
        page, token = url_configs.get_page(
1✔
2555
            lambda url_config: url_config["FunctionArn"],
2556
            marker,
2557
            max_items,
2558
        )
2559
        url_configs = page
1✔
2560
        return ListFunctionUrlConfigsResponse(FunctionUrlConfigs=url_configs, NextMarker=token)
1✔
2561

2562
    # =======================================
2563
    # ============  Permissions  ============
2564
    # =======================================
2565

2566
    @handler("AddPermission", expand=False)
1✔
2567
    def add_permission(
1✔
2568
        self,
2569
        context: RequestContext,
2570
        request: AddPermissionRequest,
2571
    ) -> AddPermissionResponse:
2572
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2573
            request.get("FunctionName"), request.get("Qualifier"), context
2574
        )
2575

2576
        # validate qualifier
2577
        if qualifier is not None:
1✔
2578
            self._validate_qualifier_expression(qualifier)
1✔
2579
            if qualifier == "$LATEST":
1✔
2580
                raise InvalidParameterValueException(
1✔
2581
                    "We currently do not support adding policies for $LATEST.", Type="User"
2582
                )
2583
        account_id, region = api_utils.get_account_and_region(request.get("FunctionName"), context)
1✔
2584

2585
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2586
        resolved_qualifier, fn_arn = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2587

2588
        revision_id = request.get("RevisionId")
1✔
2589
        if revision_id:
1✔
2590
            fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2591
            if revision_id != fn_revision_id:
1✔
2592
                raise PreconditionFailedException(
1✔
2593
                    "The Revision Id provided does not match the latest Revision Id. "
2594
                    "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2595
                    Type="User",
2596
                )
2597

2598
        request_sid = request["StatementId"]
1✔
2599
        if not bool(STATEMENT_ID_REGEX.match(request_sid)):
1✔
2600
            raise ValidationException(
1✔
2601
                f"1 validation error detected: Value '{request_sid}' at 'statementId' failed to satisfy constraint: Member must satisfy regular expression pattern: ([a-zA-Z0-9-_]+)"
2602
            )
2603
        # check for an already existing policy and any conflicts in existing statements
2604
        existing_policy = resolved_fn.permissions.get(resolved_qualifier)
1✔
2605
        if existing_policy:
1✔
2606
            if request_sid in [s["Sid"] for s in existing_policy.policy.Statement]:
1✔
2607
                # uniqueness scope: statement id needs to be unique per qualified function ($LATEST, version, or alias)
2608
                # Counterexample: the same sid can exist within $LATEST, version, and alias
2609
                raise ResourceConflictException(
1✔
2610
                    f"The statement id ({request_sid}) provided already exists. Please provide a new statement id, or remove the existing statement.",
2611
                    Type="User",
2612
                )
2613

2614
        permission_statement = api_utils.build_statement(
1✔
2615
            partition=context.partition,
2616
            resource_arn=fn_arn,
2617
            statement_id=request["StatementId"],
2618
            action=request["Action"],
2619
            principal=request["Principal"],
2620
            source_arn=request.get("SourceArn"),
2621
            source_account=request.get("SourceAccount"),
2622
            principal_org_id=request.get("PrincipalOrgID"),
2623
            event_source_token=request.get("EventSourceToken"),
2624
            auth_type=request.get("FunctionUrlAuthType"),
2625
        )
2626
        new_policy = existing_policy
1✔
2627
        if not existing_policy:
1✔
2628
            new_policy = FunctionResourcePolicy(
1✔
2629
                policy=ResourcePolicy(Version="2012-10-17", Id="default", Statement=[])
2630
            )
2631
        new_policy.policy.Statement.append(permission_statement)
1✔
2632
        if not existing_policy:
1✔
2633
            resolved_fn.permissions[resolved_qualifier] = new_policy
1✔
2634

2635
        # Update revision id of alias or version
2636
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2637
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2638
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2639
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2640
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
1✔
2641
        # Assumes that a non-alias is a version
2642
        else:
2643
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2644
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2645
                resolved_version, config=dataclasses.replace(resolved_version.config)
2646
            )
2647
        return AddPermissionResponse(Statement=json.dumps(permission_statement))
1✔
2648

2649
    def remove_permission(
1✔
2650
        self,
2651
        context: RequestContext,
2652
        function_name: FunctionName,
2653
        statement_id: NamespacedStatementId,
2654
        qualifier: Qualifier = None,
2655
        revision_id: String = None,
2656
        **kwargs,
2657
    ) -> None:
2658
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2659
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2660
            function_name, qualifier, context
2661
        )
2662
        if qualifier is not None:
1✔
2663
            self._validate_qualifier_expression(qualifier)
1✔
2664

2665
        state = lambda_stores[account_id][region]
1✔
2666
        resolved_fn = state.functions.get(function_name)
1✔
2667
        if resolved_fn is None:
1✔
2668
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2669
            raise ResourceNotFoundException(f"No policy found for: {fn_arn}", Type="User")
1✔
2670

2671
        resolved_qualifier, _ = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2672
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2673
        if not function_permission:
1✔
2674
            raise ResourceNotFoundException(
1✔
2675
                "No policy is associated with the given resource.", Type="User"
2676
            )
2677

2678
        # try to find statement in policy and delete it
2679
        statement = None
1✔
2680
        for s in function_permission.policy.Statement:
1✔
2681
            if s["Sid"] == statement_id:
1✔
2682
                statement = s
1✔
2683
                break
1✔
2684

2685
        if not statement:
1✔
2686
            raise ResourceNotFoundException(
1✔
2687
                f"Statement {statement_id} is not found in resource policy.", Type="User"
2688
            )
2689
        fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2690
        if revision_id and revision_id != fn_revision_id:
1✔
2691
            raise PreconditionFailedException(
1✔
2692
                "The Revision Id provided does not match the latest Revision Id. "
2693
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2694
                Type="User",
2695
            )
2696
        function_permission.policy.Statement.remove(statement)
1✔
2697

2698
        # Update revision id for alias or version
2699
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2700
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2701
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2702
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
×
2703
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
×
2704
        # Assumes that a non-alias is a version
2705
        else:
2706
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2707
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2708
                resolved_version, config=dataclasses.replace(resolved_version.config)
2709
            )
2710

2711
        # remove the policy as a whole when there's no statement left in it
2712
        if len(function_permission.policy.Statement) == 0:
1✔
2713
            del resolved_fn.permissions[resolved_qualifier]
1✔
2714

2715
    def get_policy(
1✔
2716
        self,
2717
        context: RequestContext,
2718
        function_name: NamespacedFunctionName,
2719
        qualifier: Qualifier = None,
2720
        **kwargs,
2721
    ) -> GetPolicyResponse:
2722
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2723
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2724
            function_name, qualifier, context
2725
        )
2726

2727
        if qualifier is not None:
1✔
2728
            self._validate_qualifier_expression(qualifier)
1✔
2729

2730
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2731

2732
        resolved_qualifier = qualifier or "$LATEST"
1✔
2733
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2734
        if not function_permission:
1✔
2735
            raise ResourceNotFoundException(
1✔
2736
                "The resource you requested does not exist.", Type="User"
2737
            )
2738

2739
        fn_revision_id = None
1✔
2740
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2741
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2742
            fn_revision_id = resolved_alias.revision_id
1✔
2743
        # Assumes that a non-alias is a version
2744
        else:
2745
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2746
            fn_revision_id = resolved_version.config.revision_id
1✔
2747

2748
        return GetPolicyResponse(
1✔
2749
            Policy=json.dumps(dataclasses.asdict(function_permission.policy)),
2750
            RevisionId=fn_revision_id,
2751
        )
2752

2753
    # =======================================
2754
    # ========  Code signing config  ========
2755
    # =======================================
2756

2757
    def create_code_signing_config(
1✔
2758
        self,
2759
        context: RequestContext,
2760
        allowed_publishers: AllowedPublishers,
2761
        description: Description = None,
2762
        code_signing_policies: CodeSigningPolicies = None,
2763
        tags: Tags = None,
2764
        **kwargs,
2765
    ) -> CreateCodeSigningConfigResponse:
2766
        account = context.account_id
1✔
2767
        region = context.region
1✔
2768

2769
        state = lambda_stores[account][region]
1✔
2770
        # TODO: can there be duplicates?
2771
        csc_id = f"csc-{get_random_hex(17)}"  # e.g. 'csc-077c33b4c19e26036'
1✔
2772
        csc_arn = f"arn:{context.partition}:lambda:{region}:{account}:code-signing-config:{csc_id}"
1✔
2773
        csc = CodeSigningConfig(
1✔
2774
            csc_id=csc_id,
2775
            arn=csc_arn,
2776
            allowed_publishers=allowed_publishers,
2777
            policies=code_signing_policies,
2778
            last_modified=api_utils.generate_lambda_date(),
2779
            description=description,
2780
        )
2781
        state.code_signing_configs[csc_arn] = csc
1✔
2782
        return CreateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
2783

2784
    def put_function_code_signing_config(
1✔
2785
        self,
2786
        context: RequestContext,
2787
        code_signing_config_arn: CodeSigningConfigArn,
2788
        function_name: FunctionName,
2789
        **kwargs,
2790
    ) -> PutFunctionCodeSigningConfigResponse:
2791
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2792
        state = lambda_stores[account_id][region]
1✔
2793
        function_name = api_utils.get_function_name(function_name, context)
1✔
2794

2795
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2796
        if not csc:
1✔
2797
            raise CodeSigningConfigNotFoundException(
1✔
2798
                f"The code signing configuration cannot be found. Check that the provided configuration is not deleted: {code_signing_config_arn}.",
2799
                Type="User",
2800
            )
2801

2802
        fn = state.functions.get(function_name)
1✔
2803
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2804
        if not fn:
1✔
2805
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
2806

2807
        fn.code_signing_config_arn = code_signing_config_arn
1✔
2808
        return PutFunctionCodeSigningConfigResponse(
1✔
2809
            CodeSigningConfigArn=code_signing_config_arn, FunctionName=function_name
2810
        )
2811

2812
    def update_code_signing_config(
1✔
2813
        self,
2814
        context: RequestContext,
2815
        code_signing_config_arn: CodeSigningConfigArn,
2816
        description: Description = None,
2817
        allowed_publishers: AllowedPublishers = None,
2818
        code_signing_policies: CodeSigningPolicies = None,
2819
        **kwargs,
2820
    ) -> UpdateCodeSigningConfigResponse:
2821
        state = lambda_stores[context.account_id][context.region]
1✔
2822
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2823
        if not csc:
1✔
2824
            raise ResourceNotFoundException(
1✔
2825
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2826
            )
2827

2828
        changes = {
1✔
2829
            **(
2830
                {"allowed_publishers": allowed_publishers} if allowed_publishers is not None else {}
2831
            ),
2832
            **({"policies": code_signing_policies} if code_signing_policies is not None else {}),
2833
            **({"description": description} if description is not None else {}),
2834
        }
2835
        new_csc = dataclasses.replace(
1✔
2836
            csc, last_modified=api_utils.generate_lambda_date(), **changes
2837
        )
2838
        state.code_signing_configs[code_signing_config_arn] = new_csc
1✔
2839

2840
        return UpdateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(new_csc))
1✔
2841

2842
    def get_code_signing_config(
1✔
2843
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
2844
    ) -> GetCodeSigningConfigResponse:
2845
        state = lambda_stores[context.account_id][context.region]
1✔
2846
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2847
        if not csc:
1✔
2848
            raise ResourceNotFoundException(
1✔
2849
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2850
            )
2851

2852
        return GetCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
2853

2854
    def get_function_code_signing_config(
1✔
2855
        self, context: RequestContext, function_name: FunctionName, **kwargs
2856
    ) -> GetFunctionCodeSigningConfigResponse:
2857
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2858
        state = lambda_stores[account_id][region]
1✔
2859
        function_name = api_utils.get_function_name(function_name, context)
1✔
2860
        fn = state.functions.get(function_name)
1✔
2861
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2862
        if not fn:
1✔
2863
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
2864

2865
        if fn.code_signing_config_arn:
1✔
2866
            return GetFunctionCodeSigningConfigResponse(
1✔
2867
                CodeSigningConfigArn=fn.code_signing_config_arn, FunctionName=function_name
2868
            )
2869

2870
        return GetFunctionCodeSigningConfigResponse()
1✔
2871

2872
    def delete_function_code_signing_config(
1✔
2873
        self, context: RequestContext, function_name: FunctionName, **kwargs
2874
    ) -> None:
2875
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2876
        state = lambda_stores[account_id][region]
1✔
2877
        function_name = api_utils.get_function_name(function_name, context)
1✔
2878
        fn = state.functions.get(function_name)
1✔
2879
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2880
        if not fn:
1✔
2881
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
2882

2883
        fn.code_signing_config_arn = None
1✔
2884

2885
    def delete_code_signing_config(
1✔
2886
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
2887
    ) -> DeleteCodeSigningConfigResponse:
2888
        state = lambda_stores[context.account_id][context.region]
1✔
2889

2890
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2891
        if not csc:
1✔
2892
            raise ResourceNotFoundException(
1✔
2893
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2894
            )
2895

2896
        del state.code_signing_configs[code_signing_config_arn]
1✔
2897

2898
        return DeleteCodeSigningConfigResponse()
1✔
2899

2900
    def list_code_signing_configs(
1✔
2901
        self,
2902
        context: RequestContext,
2903
        marker: String = None,
2904
        max_items: MaxListItems = None,
2905
        **kwargs,
2906
    ) -> ListCodeSigningConfigsResponse:
2907
        state = lambda_stores[context.account_id][context.region]
1✔
2908

2909
        cscs = [api_utils.map_csc(csc) for csc in state.code_signing_configs.values()]
1✔
2910
        cscs = PaginatedList(cscs)
1✔
2911
        page, token = cscs.get_page(
1✔
2912
            lambda csc: csc["CodeSigningConfigId"],
2913
            marker,
2914
            max_items,
2915
        )
2916
        return ListCodeSigningConfigsResponse(CodeSigningConfigs=page, NextMarker=token)
1✔
2917

2918
    def list_functions_by_code_signing_config(
1✔
2919
        self,
2920
        context: RequestContext,
2921
        code_signing_config_arn: CodeSigningConfigArn,
2922
        marker: String = None,
2923
        max_items: MaxListItems = None,
2924
        **kwargs,
2925
    ) -> ListFunctionsByCodeSigningConfigResponse:
2926
        account = context.account_id
1✔
2927
        region = context.region
1✔
2928

2929
        state = lambda_stores[account][region]
1✔
2930

2931
        if code_signing_config_arn not in state.code_signing_configs:
1✔
2932
            raise ResourceNotFoundException(
1✔
2933
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2934
            )
2935

2936
        fn_arns = [
1✔
2937
            api_utils.unqualified_lambda_arn(fn.function_name, account, region)
2938
            for fn in state.functions.values()
2939
            if fn.code_signing_config_arn == code_signing_config_arn
2940
        ]
2941

2942
        cscs = PaginatedList(fn_arns)
1✔
2943
        page, token = cscs.get_page(
1✔
2944
            lambda x: x,
2945
            marker,
2946
            max_items,
2947
        )
2948
        return ListFunctionsByCodeSigningConfigResponse(FunctionArns=page, NextMarker=token)
1✔
2949

2950
    # =======================================
2951
    # =========  Account Settings   =========
2952
    # =======================================
2953

2954
    # CAVE: these settings & usages are *per* region!
2955
    # Lambda quotas: https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
2956
    def get_account_settings(self, context: RequestContext, **kwargs) -> GetAccountSettingsResponse:
1✔
2957
        state = lambda_stores[context.account_id][context.region]
1✔
2958

2959
        fn_count = 0
1✔
2960
        code_size_sum = 0
1✔
2961
        reserved_concurrency_sum = 0
1✔
2962
        for fn in state.functions.values():
1✔
2963
            fn_count += 1
1✔
2964
            for fn_version in fn.versions.values():
1✔
2965
                # Image-based Lambdas do not have a code attribute and count against the ECR quotas instead
2966
                if fn_version.config.package_type == PackageType.Zip:
1✔
2967
                    code_size_sum += fn_version.config.code.code_size
1✔
2968
            if fn.reserved_concurrent_executions is not None:
1✔
2969
                reserved_concurrency_sum += fn.reserved_concurrent_executions
1✔
2970
            for c in fn.provisioned_concurrency_configs.values():
1✔
2971
                reserved_concurrency_sum += c.provisioned_concurrent_executions
1✔
2972
        for layer in state.layers.values():
1✔
2973
            for layer_version in layer.layer_versions.values():
1✔
2974
                code_size_sum += layer_version.code.code_size
1✔
2975
        return GetAccountSettingsResponse(
1✔
2976
            AccountLimit=AccountLimit(
2977
                TotalCodeSize=config.LAMBDA_LIMITS_TOTAL_CODE_SIZE,
2978
                CodeSizeZipped=config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED,
2979
                CodeSizeUnzipped=config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED,
2980
                ConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS,
2981
                UnreservedConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS
2982
                - reserved_concurrency_sum,
2983
            ),
2984
            AccountUsage=AccountUsage(
2985
                TotalCodeSize=code_size_sum,
2986
                FunctionCount=fn_count,
2987
            ),
2988
        )
2989

2990
    # =======================================
2991
    # ==  Provisioned Concurrency Config   ==
2992
    # =======================================
2993

2994
    def _get_provisioned_config(
1✔
2995
        self, context: RequestContext, function_name: str, qualifier: str
2996
    ) -> ProvisionedConcurrencyConfiguration | None:
2997
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2998
        state = lambda_stores[account_id][region]
1✔
2999
        function_name = api_utils.get_function_name(function_name, context)
1✔
3000
        fn = state.functions.get(function_name)
1✔
3001
        if api_utils.qualifier_is_alias(qualifier):
1✔
3002
            fn_alias = None
1✔
3003
            if fn:
1✔
3004
                fn_alias = fn.aliases.get(qualifier)
1✔
3005
            if fn_alias is None:
1✔
3006
                raise ResourceNotFoundException(
1✔
3007
                    f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3008
                    Type="User",
3009
                )
3010
        elif api_utils.qualifier_is_version(qualifier):
1✔
3011
            fn_version = None
1✔
3012
            if fn:
1✔
3013
                fn_version = fn.versions.get(qualifier)
1✔
3014
            if fn_version is None:
1✔
3015
                raise ResourceNotFoundException(
1✔
3016
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3017
                    Type="User",
3018
                )
3019

3020
        return fn.provisioned_concurrency_configs.get(qualifier)
1✔
3021

3022
    def put_provisioned_concurrency_config(
1✔
3023
        self,
3024
        context: RequestContext,
3025
        function_name: FunctionName,
3026
        qualifier: Qualifier,
3027
        provisioned_concurrent_executions: PositiveInteger,
3028
        **kwargs,
3029
    ) -> PutProvisionedConcurrencyConfigResponse:
3030
        if provisioned_concurrent_executions <= 0:
1✔
3031
            raise ValidationException(
1✔
3032
                f"1 validation error detected: Value '{provisioned_concurrent_executions}' at 'provisionedConcurrentExecutions' failed to satisfy constraint: Member must have value greater than or equal to 1"
3033
            )
3034

3035
        if qualifier == "$LATEST":
1✔
3036
            raise InvalidParameterValueException(
1✔
3037
                "Provisioned Concurrency Configs cannot be applied to unpublished function versions.",
3038
                Type="User",
3039
            )
3040
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3041
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3042
            function_name, qualifier, context
3043
        )
3044
        state = lambda_stores[account_id][region]
1✔
3045
        fn = state.functions.get(function_name)
1✔
3046

3047
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3048

3049
        if provisioned_config:  # TODO: merge?
1✔
3050
            # TODO: add a test for partial updates (if possible)
3051
            LOG.warning(
1✔
3052
                "Partial update of provisioned concurrency config is currently not supported."
3053
            )
3054

3055
        other_provisioned_sum = sum(
1✔
3056
            [
3057
                provisioned_configs.provisioned_concurrent_executions
3058
                for provisioned_qualifier, provisioned_configs in fn.provisioned_concurrency_configs.items()
3059
                if provisioned_qualifier != qualifier
3060
            ]
3061
        )
3062

3063
        if (
1✔
3064
            fn.reserved_concurrent_executions is not None
3065
            and fn.reserved_concurrent_executions
3066
            < other_provisioned_sum + provisioned_concurrent_executions
3067
        ):
3068
            raise InvalidParameterValueException(
1✔
3069
                "Requested Provisioned Concurrency should not be greater than the reservedConcurrentExecution for function",
3070
                Type="User",
3071
            )
3072

3073
        if provisioned_concurrent_executions > config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS:
1✔
3074
            raise InvalidParameterValueException(
1✔
3075
                f"Specified ConcurrentExecutions for function is greater than account's unreserved concurrency"
3076
                f" [{config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS}]."
3077
            )
3078

3079
        settings = self.get_account_settings(context)
1✔
3080
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
3081
            "UnreservedConcurrentExecutions"
3082
        ]
3083
        if (
1✔
3084
            unreserved_concurrent_executions - provisioned_concurrent_executions
3085
            < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY
3086
        ):
3087
            raise InvalidParameterValueException(
1✔
3088
                f"Specified ConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below"
3089
                f" its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
3090
            )
3091

3092
        provisioned_config = ProvisionedConcurrencyConfiguration(
1✔
3093
            provisioned_concurrent_executions, api_utils.generate_lambda_date()
3094
        )
3095
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3096

3097
        if api_utils.qualifier_is_alias(qualifier):
1✔
3098
            alias = fn.aliases.get(qualifier)
1✔
3099
            resolved_version = fn.versions.get(alias.function_version)
1✔
3100

3101
            if (
1✔
3102
                resolved_version
3103
                and fn.provisioned_concurrency_configs.get(alias.function_version) is not None
3104
            ):
3105
                raise ResourceConflictException(
1✔
3106
                    "Alias can't be used for Provisioned Concurrency configuration on an already Provisioned version",
3107
                    Type="User",
3108
                )
3109
            fn_arn = resolved_version.id.qualified_arn()
1✔
3110
        elif api_utils.qualifier_is_version(qualifier):
1✔
3111
            fn_version = fn.versions.get(qualifier)
1✔
3112

3113
            # TODO: might be useful other places, utilize
3114
            pointing_aliases = []
1✔
3115
            for alias in fn.aliases.values():
1✔
3116
                if (
1✔
3117
                    alias.function_version == qualifier
3118
                    and fn.provisioned_concurrency_configs.get(alias.name) is not None
3119
                ):
3120
                    pointing_aliases.append(alias.name)
1✔
3121
            if pointing_aliases:
1✔
3122
                raise ResourceConflictException(
1✔
3123
                    "Version is pointed by a Provisioned Concurrency alias", Type="User"
3124
                )
3125

3126
            fn_arn = fn_version.id.qualified_arn()
1✔
3127

3128
        manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3129

3130
        fn.provisioned_concurrency_configs[qualifier] = provisioned_config
1✔
3131

3132
        manager.update_provisioned_concurrency_config(
1✔
3133
            provisioned_config.provisioned_concurrent_executions
3134
        )
3135

3136
        return PutProvisionedConcurrencyConfigResponse(
1✔
3137
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3138
            AvailableProvisionedConcurrentExecutions=0,
3139
            AllocatedProvisionedConcurrentExecutions=0,
3140
            Status=ProvisionedConcurrencyStatusEnum.IN_PROGRESS,
3141
            # StatusReason=manager.provisioned_state.status_reason,
3142
            LastModified=provisioned_config.last_modified,  # TODO: does change with configuration or also with state changes?
3143
        )
3144

3145
    def get_provisioned_concurrency_config(
1✔
3146
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3147
    ) -> GetProvisionedConcurrencyConfigResponse:
3148
        if qualifier == "$LATEST":
1✔
3149
            raise InvalidParameterValueException(
1✔
3150
                "The function resource provided must be an alias or a published version.",
3151
                Type="User",
3152
            )
3153
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3154
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3155
            function_name, qualifier, context
3156
        )
3157

3158
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3159
        if not provisioned_config:
1✔
3160
            raise ProvisionedConcurrencyConfigNotFoundException(
1✔
3161
                "No Provisioned Concurrency Config found for this function", Type="User"
3162
            )
3163

3164
        # TODO: make this compatible with alias pointer migration on update
3165
        if api_utils.qualifier_is_alias(qualifier):
1✔
3166
            state = lambda_stores[account_id][region]
1✔
3167
            fn = state.functions.get(function_name)
1✔
3168
            alias = fn.aliases.get(qualifier)
1✔
3169
            fn_arn = api_utils.qualified_lambda_arn(
1✔
3170
                function_name, alias.function_version, account_id, region
3171
            )
3172
        else:
3173
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3174

3175
        ver_manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3176

3177
        return GetProvisionedConcurrencyConfigResponse(
1✔
3178
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3179
            LastModified=provisioned_config.last_modified,
3180
            AvailableProvisionedConcurrentExecutions=ver_manager.provisioned_state.available,
3181
            AllocatedProvisionedConcurrentExecutions=ver_manager.provisioned_state.allocated,
3182
            Status=ver_manager.provisioned_state.status,
3183
            StatusReason=ver_manager.provisioned_state.status_reason,
3184
        )
3185

3186
    def list_provisioned_concurrency_configs(
1✔
3187
        self,
3188
        context: RequestContext,
3189
        function_name: FunctionName,
3190
        marker: String = None,
3191
        max_items: MaxProvisionedConcurrencyConfigListItems = None,
3192
        **kwargs,
3193
    ) -> ListProvisionedConcurrencyConfigsResponse:
3194
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3195
        state = lambda_stores[account_id][region]
1✔
3196

3197
        function_name = api_utils.get_function_name(function_name, context)
1✔
3198
        fn = state.functions.get(function_name)
1✔
3199
        if fn is None:
1✔
3200
            raise ResourceNotFoundException(
1✔
3201
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
3202
                Type="User",
3203
            )
3204

3205
        configs = []
1✔
3206
        for qualifier, pc_config in fn.provisioned_concurrency_configs.items():
1✔
3207
            if api_utils.qualifier_is_alias(qualifier):
×
3208
                alias = fn.aliases.get(qualifier)
×
3209
                fn_arn = api_utils.qualified_lambda_arn(
×
3210
                    function_name, alias.function_version, account_id, region
3211
                )
3212
            else:
3213
                fn_arn = api_utils.qualified_lambda_arn(
×
3214
                    function_name, qualifier, account_id, region
3215
                )
3216

3217
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
3218

3219
            configs.append(
×
3220
                ProvisionedConcurrencyConfigListItem(
3221
                    FunctionArn=api_utils.qualified_lambda_arn(
3222
                        function_name, qualifier, account_id, region
3223
                    ),
3224
                    RequestedProvisionedConcurrentExecutions=pc_config.provisioned_concurrent_executions,
3225
                    AvailableProvisionedConcurrentExecutions=manager.provisioned_state.available,
3226
                    AllocatedProvisionedConcurrentExecutions=manager.provisioned_state.allocated,
3227
                    Status=manager.provisioned_state.status,
3228
                    StatusReason=manager.provisioned_state.status_reason,
3229
                    LastModified=pc_config.last_modified,
3230
                )
3231
            )
3232

3233
        provisioned_concurrency_configs = configs
1✔
3234
        provisioned_concurrency_configs = PaginatedList(provisioned_concurrency_configs)
1✔
3235
        page, token = provisioned_concurrency_configs.get_page(
1✔
3236
            lambda x: x,
3237
            marker,
3238
            max_items,
3239
        )
3240
        return ListProvisionedConcurrencyConfigsResponse(
1✔
3241
            ProvisionedConcurrencyConfigs=page, NextMarker=token
3242
        )
3243

3244
    def delete_provisioned_concurrency_config(
1✔
3245
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3246
    ) -> None:
3247
        if qualifier == "$LATEST":
1✔
3248
            raise InvalidParameterValueException(
1✔
3249
                "The function resource provided must be an alias or a published version.",
3250
                Type="User",
3251
            )
3252
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3253
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3254
            function_name, qualifier, context
3255
        )
3256
        state = lambda_stores[account_id][region]
1✔
3257
        fn = state.functions.get(function_name)
1✔
3258

3259
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3260
        # delete is idempotent and doesn't actually care about the provisioned concurrency config not existing
3261
        if provisioned_config:
1✔
3262
            fn.provisioned_concurrency_configs.pop(qualifier)
1✔
3263
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3264
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3265
            manager.update_provisioned_concurrency_config(0)
1✔
3266

3267
    # =======================================
3268
    # =======  Event Invoke Config   ========
3269
    # =======================================
3270

3271
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)"
3272
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]2((-gov)|(-iso(b?)))?-[a-z]+-\\d1)?:(\\d12)?:(.*)" ... (expected → actual)
3273

3274
    def _validate_destination_config(
1✔
3275
        self, store: LambdaStore, function_name: str, destination_config: DestinationConfig
3276
    ):
3277
        def _validate_destination_arn(destination_arn) -> bool:
1✔
3278
            if not api_utils.DESTINATION_ARN_PATTERN.match(destination_arn):
1✔
3279
                # technically we shouldn't handle this in the provider
3280
                raise ValidationException(
1✔
3281
                    "1 validation error detected: Value '"
3282
                    + destination_arn
3283
                    + r"' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\-])+:([a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\d{1})?:(\d{12})?:(.*)"
3284
                )
3285

3286
            match destination_arn.split(":")[2]:
1✔
3287
                case "lambda":
1✔
3288
                    fn_parts = api_utils.FULL_FN_ARN_PATTERN.search(destination_arn).groupdict()
1✔
3289
                    if fn_parts:
1✔
3290
                        # check if it exists
3291
                        fn = store.functions.get(fn_parts["function_name"])
1✔
3292
                        if not fn:
1✔
3293
                            raise InvalidParameterValueException(
1✔
3294
                                f"The destination ARN {destination_arn} is invalid.", Type="User"
3295
                            )
3296
                        if fn_parts["function_name"] == function_name:
1✔
3297
                            raise InvalidParameterValueException(
1✔
3298
                                "You can't specify the function as a destination for itself.",
3299
                                Type="User",
3300
                            )
3301
                case "sns" | "sqs" | "events":
1✔
3302
                    pass
1✔
3303
                case _:
1✔
3304
                    return False
1✔
3305
            return True
1✔
3306

3307
        validation_err = False
1✔
3308

3309
        failure_destination = destination_config.get("OnFailure", {}).get("Destination")
1✔
3310
        if failure_destination:
1✔
3311
            validation_err = validation_err or not _validate_destination_arn(failure_destination)
1✔
3312

3313
        success_destination = destination_config.get("OnSuccess", {}).get("Destination")
1✔
3314
        if success_destination:
1✔
3315
            validation_err = validation_err or not _validate_destination_arn(success_destination)
1✔
3316

3317
        if validation_err:
1✔
3318
            on_success_part = (
1✔
3319
                f"OnSuccess(destination={success_destination})" if success_destination else "null"
3320
            )
3321
            on_failure_part = (
1✔
3322
                f"OnFailure(destination={failure_destination})" if failure_destination else "null"
3323
            )
3324
            raise InvalidParameterValueException(
1✔
3325
                f"The provided destination config DestinationConfig(onSuccess={on_success_part}, onFailure={on_failure_part}) is invalid.",
3326
                Type="User",
3327
            )
3328

3329
    def put_function_event_invoke_config(
1✔
3330
        self,
3331
        context: RequestContext,
3332
        function_name: FunctionName,
3333
        qualifier: Qualifier = None,
3334
        maximum_retry_attempts: MaximumRetryAttempts = None,
3335
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3336
        destination_config: DestinationConfig = None,
3337
        **kwargs,
3338
    ) -> FunctionEventInvokeConfig:
3339
        """
3340
        Destination ARNs can be:
3341
        * SQS arn
3342
        * SNS arn
3343
        * Lambda arn
3344
        * EventBridge arn
3345

3346
        Differences between put_ and update_:
3347
            * put overwrites any existing config
3348
            * update allows changes only single values while keeping the rest of existing ones
3349
            * update fails on non-existing configs
3350

3351
        Differences between destination and DLQ
3352
            * "However, a dead-letter queue is part of a function's version-specific configuration, so it is locked in when you publish a version."
3353
            *  "On-failure destinations also support additional targets and include details about the function's response in the invocation record."
3354

3355
        """
3356
        if (
1✔
3357
            maximum_event_age_in_seconds is None
3358
            and maximum_retry_attempts is None
3359
            and destination_config is None
3360
        ):
3361
            raise InvalidParameterValueException(
1✔
3362
                "You must specify at least one of error handling or destination setting.",
3363
                Type="User",
3364
            )
3365
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3366
        state = lambda_stores[account_id][region]
1✔
3367
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3368
            function_name, qualifier, context
3369
        )
3370
        fn = state.functions.get(function_name)
1✔
3371
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3372
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3373

3374
        qualifier = qualifier or "$LATEST"
1✔
3375

3376
        # validate and normalize destination config
3377
        if destination_config:
1✔
3378
            self._validate_destination_config(state, function_name, destination_config)
1✔
3379

3380
        destination_config = DestinationConfig(
1✔
3381
            OnSuccess=OnSuccess(
3382
                Destination=(destination_config or {}).get("OnSuccess", {}).get("Destination")
3383
            ),
3384
            OnFailure=OnFailure(
3385
                Destination=(destination_config or {}).get("OnFailure", {}).get("Destination")
3386
            ),
3387
        )
3388

3389
        config = EventInvokeConfig(
1✔
3390
            function_name=function_name,
3391
            qualifier=qualifier,
3392
            maximum_event_age_in_seconds=maximum_event_age_in_seconds,
3393
            maximum_retry_attempts=maximum_retry_attempts,
3394
            last_modified=api_utils.generate_lambda_date(),
3395
            destination_config=destination_config,
3396
        )
3397
        fn.event_invoke_configs[qualifier] = config
1✔
3398

3399
        return FunctionEventInvokeConfig(
1✔
3400
            LastModified=datetime.datetime.strptime(
3401
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3402
            ),
3403
            FunctionArn=api_utils.qualified_lambda_arn(
3404
                function_name, qualifier or "$LATEST", account_id, region
3405
            ),
3406
            DestinationConfig=destination_config,
3407
            MaximumEventAgeInSeconds=maximum_event_age_in_seconds,
3408
            MaximumRetryAttempts=maximum_retry_attempts,
3409
        )
3410

3411
    def get_function_event_invoke_config(
1✔
3412
        self,
3413
        context: RequestContext,
3414
        function_name: FunctionName,
3415
        qualifier: Qualifier = None,
3416
        **kwargs,
3417
    ) -> FunctionEventInvokeConfig:
3418
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3419
        state = lambda_stores[account_id][region]
1✔
3420
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3421
            function_name, qualifier, context
3422
        )
3423

3424
        qualifier = qualifier or "$LATEST"
1✔
3425
        fn = state.functions.get(function_name)
1✔
3426
        if not fn:
1✔
3427
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3428
            raise ResourceNotFoundException(
1✔
3429
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3430
            )
3431

3432
        config = fn.event_invoke_configs.get(qualifier)
1✔
3433
        if not config:
1✔
3434
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3435
            raise ResourceNotFoundException(
1✔
3436
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3437
            )
3438

3439
        return FunctionEventInvokeConfig(
1✔
3440
            LastModified=datetime.datetime.strptime(
3441
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3442
            ),
3443
            FunctionArn=api_utils.qualified_lambda_arn(
3444
                function_name, qualifier, account_id, region
3445
            ),
3446
            DestinationConfig=config.destination_config,
3447
            MaximumEventAgeInSeconds=config.maximum_event_age_in_seconds,
3448
            MaximumRetryAttempts=config.maximum_retry_attempts,
3449
        )
3450

3451
    def list_function_event_invoke_configs(
1✔
3452
        self,
3453
        context: RequestContext,
3454
        function_name: FunctionName,
3455
        marker: String = None,
3456
        max_items: MaxFunctionEventInvokeConfigListItems = None,
3457
        **kwargs,
3458
    ) -> ListFunctionEventInvokeConfigsResponse:
3459
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3460
        state = lambda_stores[account_id][region]
1✔
3461
        fn = state.functions.get(function_name)
1✔
3462
        if not fn:
1✔
3463
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3464

3465
        event_invoke_configs = [
1✔
3466
            FunctionEventInvokeConfig(
3467
                LastModified=c.last_modified,
3468
                FunctionArn=api_utils.qualified_lambda_arn(
3469
                    function_name, c.qualifier, account_id, region
3470
                ),
3471
                MaximumEventAgeInSeconds=c.maximum_event_age_in_seconds,
3472
                MaximumRetryAttempts=c.maximum_retry_attempts,
3473
                DestinationConfig=c.destination_config,
3474
            )
3475
            for c in fn.event_invoke_configs.values()
3476
        ]
3477

3478
        event_invoke_configs = PaginatedList(event_invoke_configs)
1✔
3479
        page, token = event_invoke_configs.get_page(
1✔
3480
            lambda x: x["FunctionArn"],
3481
            marker,
3482
            max_items,
3483
        )
3484
        return ListFunctionEventInvokeConfigsResponse(
1✔
3485
            FunctionEventInvokeConfigs=page, NextMarker=token
3486
        )
3487

3488
    def delete_function_event_invoke_config(
1✔
3489
        self,
3490
        context: RequestContext,
3491
        function_name: FunctionName,
3492
        qualifier: Qualifier = None,
3493
        **kwargs,
3494
    ) -> None:
3495
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3496
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3497
            function_name, qualifier, context
3498
        )
3499
        state = lambda_stores[account_id][region]
1✔
3500
        fn = state.functions.get(function_name)
1✔
3501
        resolved_qualifier = qualifier or "$LATEST"
1✔
3502
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3503
        if not fn:
1✔
3504
            raise ResourceNotFoundException(
1✔
3505
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3506
            )
3507

3508
        config = fn.event_invoke_configs.get(resolved_qualifier)
1✔
3509
        if not config:
1✔
3510
            raise ResourceNotFoundException(
1✔
3511
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3512
            )
3513

3514
        del fn.event_invoke_configs[resolved_qualifier]
1✔
3515

3516
    def update_function_event_invoke_config(
1✔
3517
        self,
3518
        context: RequestContext,
3519
        function_name: FunctionName,
3520
        qualifier: Qualifier = None,
3521
        maximum_retry_attempts: MaximumRetryAttempts = None,
3522
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3523
        destination_config: DestinationConfig = None,
3524
        **kwargs,
3525
    ) -> FunctionEventInvokeConfig:
3526
        # like put but only update single fields via replace
3527
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3528
        state = lambda_stores[account_id][region]
1✔
3529
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3530
            function_name, qualifier, context
3531
        )
3532

3533
        if (
1✔
3534
            maximum_event_age_in_seconds is None
3535
            and maximum_retry_attempts is None
3536
            and destination_config is None
3537
        ):
3538
            raise InvalidParameterValueException(
×
3539
                "You must specify at least one of error handling or destination setting.",
3540
                Type="User",
3541
            )
3542

3543
        fn = state.functions.get(function_name)
1✔
3544
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3545
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3546

3547
        qualifier = qualifier or "$LATEST"
1✔
3548

3549
        config = fn.event_invoke_configs.get(qualifier)
1✔
3550
        if not config:
1✔
3551
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3552
            raise ResourceNotFoundException(
1✔
3553
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3554
            )
3555

3556
        if destination_config:
1✔
3557
            self._validate_destination_config(state, function_name, destination_config)
×
3558

3559
        optional_kwargs = {
1✔
3560
            k: v
3561
            for k, v in {
3562
                "destination_config": destination_config,
3563
                "maximum_retry_attempts": maximum_retry_attempts,
3564
                "maximum_event_age_in_seconds": maximum_event_age_in_seconds,
3565
            }.items()
3566
            if v is not None
3567
        }
3568

3569
        new_config = dataclasses.replace(
1✔
3570
            config, last_modified=api_utils.generate_lambda_date(), **optional_kwargs
3571
        )
3572
        fn.event_invoke_configs[qualifier] = new_config
1✔
3573

3574
        return FunctionEventInvokeConfig(
1✔
3575
            LastModified=datetime.datetime.strptime(
3576
                new_config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3577
            ),
3578
            FunctionArn=api_utils.qualified_lambda_arn(
3579
                function_name, qualifier or "$LATEST", account_id, region
3580
            ),
3581
            DestinationConfig=new_config.destination_config,
3582
            MaximumEventAgeInSeconds=new_config.maximum_event_age_in_seconds,
3583
            MaximumRetryAttempts=new_config.maximum_retry_attempts,
3584
        )
3585

3586
    # =======================================
3587
    # ======  Layer & Layer Versions  =======
3588
    # =======================================
3589

3590
    @staticmethod
1✔
3591
    def _resolve_layer(
1✔
3592
        layer_name_or_arn: str, context: RequestContext
3593
    ) -> tuple[str, str, str, str | None]:
3594
        """
3595
        Return locator attributes for a given Lambda layer.
3596

3597
        :param layer_name_or_arn: Layer name or ARN
3598
        :param context: Request context
3599
        :return: Tuple of region, account ID, layer name, layer version
3600
        """
3601
        if api_utils.is_layer_arn(layer_name_or_arn):
1✔
3602
            return api_utils.parse_layer_arn(layer_name_or_arn)
1✔
3603

3604
        return context.region, context.account_id, layer_name_or_arn, None
1✔
3605

3606
    def publish_layer_version(
1✔
3607
        self,
3608
        context: RequestContext,
3609
        layer_name: LayerName,
3610
        content: LayerVersionContentInput,
3611
        description: Description = None,
3612
        compatible_runtimes: CompatibleRuntimes = None,
3613
        license_info: LicenseInfo = None,
3614
        compatible_architectures: CompatibleArchitectures = None,
3615
        **kwargs,
3616
    ) -> PublishLayerVersionResponse:
3617
        """
3618
        On first use of a LayerName a new layer is created and for each subsequent call with the same LayerName a new version is created.
3619
        Note that there are no $LATEST versions with layers!
3620

3621
        """
3622
        account = context.account_id
1✔
3623
        region = context.region
1✔
3624

3625
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
3626
            compatible_runtimes, compatible_architectures
3627
        )
3628
        if validation_errors:
1✔
3629
            raise ValidationException(
1✔
3630
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {'; '.join(validation_errors)}"
3631
            )
3632

3633
        state = lambda_stores[account][region]
1✔
3634
        with self.create_layer_lock:
1✔
3635
            if layer_name not in state.layers:
1✔
3636
                # we don't have a version so create new layer object
3637
                # lock is required to avoid creating two v1 objects for the same name
3638
                layer = Layer(
1✔
3639
                    arn=api_utils.layer_arn(layer_name=layer_name, account=account, region=region)
3640
                )
3641
                state.layers[layer_name] = layer
1✔
3642

3643
        layer = state.layers[layer_name]
1✔
3644
        with layer.next_version_lock:
1✔
3645
            next_version = LambdaLayerVersionIdentifier(
1✔
3646
                account_id=account, region=region, layer_name=layer_name
3647
            ).generate(next_version=layer.next_version)
3648
            # When creating a layer with user defined layer version, it is possible that we
3649
            # create layer versions out of order.
3650
            # ie. a user could replicate layer v2 then layer v1. It is important to always keep the maximum possible
3651
            # value for next layer to avoid overwriting existing versions
3652
            if layer.next_version <= next_version:
1✔
3653
                # We don't need to update layer.next_version if the created version is lower than the "next in line"
3654
                layer.next_version = max(next_version, layer.next_version) + 1
1✔
3655

3656
        # creating a new layer
3657
        if content.get("ZipFile"):
1✔
3658
            code = store_lambda_archive(
1✔
3659
                archive_file=content["ZipFile"],
3660
                function_name=layer_name,
3661
                region_name=region,
3662
                account_id=account,
3663
            )
3664
        else:
3665
            code = store_s3_bucket_archive(
1✔
3666
                archive_bucket=content["S3Bucket"],
3667
                archive_key=content["S3Key"],
3668
                archive_version=content.get("S3ObjectVersion"),
3669
                function_name=layer_name,
3670
                region_name=region,
3671
                account_id=account,
3672
            )
3673

3674
        new_layer_version = LayerVersion(
1✔
3675
            layer_version_arn=api_utils.layer_version_arn(
3676
                layer_name=layer_name,
3677
                account=account,
3678
                region=region,
3679
                version=str(next_version),
3680
            ),
3681
            layer_arn=layer.arn,
3682
            version=next_version,
3683
            description=description or "",
3684
            license_info=license_info,
3685
            compatible_runtimes=compatible_runtimes,
3686
            compatible_architectures=compatible_architectures,
3687
            created=api_utils.generate_lambda_date(),
3688
            code=code,
3689
        )
3690

3691
        layer.layer_versions[str(next_version)] = new_layer_version
1✔
3692

3693
        return api_utils.map_layer_out(new_layer_version)
1✔
3694

3695
    def get_layer_version(
1✔
3696
        self,
3697
        context: RequestContext,
3698
        layer_name: LayerName,
3699
        version_number: LayerVersionNumber,
3700
        **kwargs,
3701
    ) -> GetLayerVersionResponse:
3702
        # TODO: handle layer_name as an ARN
3703

3704
        region_name, account_id, layer_name, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
3705
        state = lambda_stores[account_id][region_name]
1✔
3706

3707
        layer = state.layers.get(layer_name)
1✔
3708
        if version_number < 1:
1✔
3709
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
3710
        if layer is None:
1✔
3711
            raise ResourceNotFoundException(
1✔
3712
                "The resource you requested does not exist.", Type="User"
3713
            )
3714
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3715
        if layer_version is None:
1✔
3716
            raise ResourceNotFoundException(
1✔
3717
                "The resource you requested does not exist.", Type="User"
3718
            )
3719
        return api_utils.map_layer_out(layer_version)
1✔
3720

3721
    def get_layer_version_by_arn(
1✔
3722
        self, context: RequestContext, arn: LayerVersionArn, **kwargs
3723
    ) -> GetLayerVersionResponse:
3724
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3725
            arn, context
3726
        )
3727

3728
        if not layer_version:
1✔
3729
            raise ValidationException(
1✔
3730
                f"1 validation error detected: Value '{arn}' at 'arn' failed to satisfy constraint: Member must satisfy regular expression pattern: "
3731
                + "(arn:(aws[a-zA-Z-]*)?:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+)"
3732
            )
3733

3734
        store = lambda_stores[account_id][region_name]
1✔
3735
        if not (layers := store.layers.get(layer_name)):
1✔
3736
            raise ResourceNotFoundException(
×
3737
                "The resource you requested does not exist.", Type="User"
3738
            )
3739

3740
        layer_version = layers.layer_versions.get(layer_version)
1✔
3741

3742
        if not layer_version:
1✔
3743
            raise ResourceNotFoundException(
1✔
3744
                "The resource you requested does not exist.", Type="User"
3745
            )
3746

3747
        return api_utils.map_layer_out(layer_version)
1✔
3748

3749
    def list_layers(
1✔
3750
        self,
3751
        context: RequestContext,
3752
        compatible_runtime: Runtime = None,
3753
        marker: String = None,
3754
        max_items: MaxLayerListItems = None,
3755
        compatible_architecture: Architecture = None,
3756
        **kwargs,
3757
    ) -> ListLayersResponse:
3758
        validation_errors = []
1✔
3759

3760
        validation_error_arch = api_utils.validate_layer_architecture(compatible_architecture)
1✔
3761
        if validation_error_arch:
1✔
3762
            validation_errors.append(validation_error_arch)
1✔
3763

3764
        validation_error_runtime = api_utils.validate_layer_runtime(compatible_runtime)
1✔
3765
        if validation_error_runtime:
1✔
3766
            validation_errors.append(validation_error_runtime)
1✔
3767

3768
        if validation_errors:
1✔
3769
            raise ValidationException(
1✔
3770
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
3771
            )
3772
        # TODO: handle filter: compatible_runtime
3773
        # TODO: handle filter: compatible_architecture
3774

3775
        state = lambda_stores[context.account_id][context.region]
×
3776
        layers = state.layers
×
3777

3778
        # TODO: test how filters behave together with only returning layers here? Does it return the latest "matching" layer, i.e. does it ignore later layer versions that don't match?
3779

3780
        responses: list[LayersListItem] = []
×
3781
        for layer_name, layer in layers.items():
×
3782
            # fetch latest version
3783
            layer_versions = list(layer.layer_versions.values())
×
3784
            sorted(layer_versions, key=lambda x: x.version)
×
3785
            latest_layer_version = layer_versions[-1]
×
3786
            responses.append(
×
3787
                LayersListItem(
3788
                    LayerName=layer_name,
3789
                    LayerArn=layer.arn,
3790
                    LatestMatchingVersion=api_utils.map_layer_out(latest_layer_version),
3791
                )
3792
            )
3793

3794
        responses = PaginatedList(responses)
×
3795
        page, token = responses.get_page(
×
3796
            lambda version: version,
3797
            marker,
3798
            max_items,
3799
        )
3800

3801
        return ListLayersResponse(NextMarker=token, Layers=page)
×
3802

3803
    def list_layer_versions(
1✔
3804
        self,
3805
        context: RequestContext,
3806
        layer_name: LayerName,
3807
        compatible_runtime: Runtime = None,
3808
        marker: String = None,
3809
        max_items: MaxLayerListItems = None,
3810
        compatible_architecture: Architecture = None,
3811
        **kwargs,
3812
    ) -> ListLayerVersionsResponse:
3813
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
3814
            [compatible_runtime] if compatible_runtime else [],
3815
            [compatible_architecture] if compatible_architecture else [],
3816
        )
3817
        if validation_errors:
1✔
3818
            raise ValidationException(
×
3819
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
3820
            )
3821

3822
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3823
            layer_name, context
3824
        )
3825
        state = lambda_stores[account_id][region_name]
1✔
3826

3827
        # TODO: Test & handle filter: compatible_runtime
3828
        # TODO: Test & handle filter: compatible_architecture
3829
        all_layer_versions = []
1✔
3830
        layer = state.layers.get(layer_name)
1✔
3831
        if layer is not None:
1✔
3832
            for layer_version in layer.layer_versions.values():
1✔
3833
                all_layer_versions.append(api_utils.map_layer_out(layer_version))
1✔
3834

3835
        all_layer_versions.sort(key=lambda x: x["Version"], reverse=True)
1✔
3836
        all_layer_versions = PaginatedList(all_layer_versions)
1✔
3837
        page, token = all_layer_versions.get_page(
1✔
3838
            lambda version: version["LayerVersionArn"],
3839
            marker,
3840
            max_items,
3841
        )
3842
        return ListLayerVersionsResponse(NextMarker=token, LayerVersions=page)
1✔
3843

3844
    def delete_layer_version(
1✔
3845
        self,
3846
        context: RequestContext,
3847
        layer_name: LayerName,
3848
        version_number: LayerVersionNumber,
3849
        **kwargs,
3850
    ) -> None:
3851
        if version_number < 1:
1✔
3852
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
3853

3854
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3855
            layer_name, context
3856
        )
3857

3858
        store = lambda_stores[account_id][region_name]
1✔
3859
        layer = store.layers.get(layer_name, {})
1✔
3860
        if layer:
1✔
3861
            layer.layer_versions.pop(str(version_number), None)
1✔
3862

3863
    # =======================================
3864
    # =====  Layer Version Permissions  =====
3865
    # =======================================
3866
    # TODO: lock updates that change revision IDs
3867

3868
    def add_layer_version_permission(
1✔
3869
        self,
3870
        context: RequestContext,
3871
        layer_name: LayerName,
3872
        version_number: LayerVersionNumber,
3873
        statement_id: StatementId,
3874
        action: LayerPermissionAllowedAction,
3875
        principal: LayerPermissionAllowedPrincipal,
3876
        organization_id: OrganizationId = None,
3877
        revision_id: String = None,
3878
        **kwargs,
3879
    ) -> AddLayerVersionPermissionResponse:
3880
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
3881
        # `layer_n` contains the layer name.
3882
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
3883

3884
        if action != "lambda:GetLayerVersion":
1✔
3885
            raise ValidationException(
1✔
3886
                f"1 validation error detected: Value '{action}' at 'action' failed to satisfy constraint: Member must satisfy regular expression pattern: lambda:GetLayerVersion"
3887
            )
3888

3889
        store = lambda_stores[account_id][region_name]
1✔
3890
        layer = store.layers.get(layer_n)
1✔
3891

3892
        layer_version_arn = api_utils.layer_version_arn(
1✔
3893
            layer_name, account_id, region_name, str(version_number)
3894
        )
3895

3896
        if layer is None:
1✔
3897
            raise ResourceNotFoundException(
1✔
3898
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3899
            )
3900
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3901
        if layer_version is None:
1✔
3902
            raise ResourceNotFoundException(
1✔
3903
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3904
            )
3905
        # do we have a policy? if not set one
3906
        if layer_version.policy is None:
1✔
3907
            layer_version.policy = LayerPolicy()
1✔
3908

3909
        if statement_id in layer_version.policy.statements:
1✔
3910
            raise ResourceConflictException(
1✔
3911
                f"The statement id ({statement_id}) provided already exists. Please provide a new statement id, or remove the existing statement.",
3912
                Type="User",
3913
            )
3914

3915
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
3916
            raise PreconditionFailedException(
1✔
3917
                "The Revision Id provided does not match the latest Revision Id. "
3918
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
3919
                Type="User",
3920
            )
3921

3922
        statement = LayerPolicyStatement(
1✔
3923
            sid=statement_id, action=action, principal=principal, organization_id=organization_id
3924
        )
3925

3926
        old_statements = layer_version.policy.statements
1✔
3927
        layer_version.policy = dataclasses.replace(
1✔
3928
            layer_version.policy, statements={**old_statements, statement_id: statement}
3929
        )
3930

3931
        return AddLayerVersionPermissionResponse(
1✔
3932
            Statement=json.dumps(
3933
                {
3934
                    "Sid": statement.sid,
3935
                    "Effect": "Allow",
3936
                    "Principal": statement.principal,
3937
                    "Action": statement.action,
3938
                    "Resource": layer_version.layer_version_arn,
3939
                }
3940
            ),
3941
            RevisionId=layer_version.policy.revision_id,
3942
        )
3943

3944
    def remove_layer_version_permission(
1✔
3945
        self,
3946
        context: RequestContext,
3947
        layer_name: LayerName,
3948
        version_number: LayerVersionNumber,
3949
        statement_id: StatementId,
3950
        revision_id: String = None,
3951
        **kwargs,
3952
    ) -> None:
3953
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
3954
        # `layer_n` contains the layer name.
3955
        region_name, account_id, layer_n, layer_version = LambdaProvider._resolve_layer(
1✔
3956
            layer_name, context
3957
        )
3958

3959
        layer_version_arn = api_utils.layer_version_arn(
1✔
3960
            layer_name, account_id, region_name, str(version_number)
3961
        )
3962

3963
        state = lambda_stores[account_id][region_name]
1✔
3964
        layer = state.layers.get(layer_n)
1✔
3965
        if layer is None:
1✔
3966
            raise ResourceNotFoundException(
1✔
3967
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3968
            )
3969
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3970
        if layer_version is None:
1✔
3971
            raise ResourceNotFoundException(
1✔
3972
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3973
            )
3974

3975
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
3976
            raise PreconditionFailedException(
1✔
3977
                "The Revision Id provided does not match the latest Revision Id. "
3978
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
3979
                Type="User",
3980
            )
3981

3982
        if statement_id not in layer_version.policy.statements:
1✔
3983
            raise ResourceNotFoundException(
1✔
3984
                f"Statement {statement_id} is not found in resource policy.", Type="User"
3985
            )
3986

3987
        old_statements = layer_version.policy.statements
1✔
3988
        layer_version.policy = dataclasses.replace(
1✔
3989
            layer_version.policy,
3990
            statements={k: v for k, v in old_statements.items() if k != statement_id},
3991
        )
3992

3993
    def get_layer_version_policy(
1✔
3994
        self,
3995
        context: RequestContext,
3996
        layer_name: LayerName,
3997
        version_number: LayerVersionNumber,
3998
        **kwargs,
3999
    ) -> GetLayerVersionPolicyResponse:
4000
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4001
        # `layer_n` contains the layer name.
4002
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
4003

4004
        layer_version_arn = api_utils.layer_version_arn(
1✔
4005
            layer_name, account_id, region_name, str(version_number)
4006
        )
4007

4008
        store = lambda_stores[account_id][region_name]
1✔
4009
        layer = store.layers.get(layer_n)
1✔
4010

4011
        if layer is None:
1✔
4012
            raise ResourceNotFoundException(
1✔
4013
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4014
            )
4015

4016
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4017
        if layer_version is None:
1✔
4018
            raise ResourceNotFoundException(
1✔
4019
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4020
            )
4021

4022
        if layer_version.policy is None:
1✔
4023
            raise ResourceNotFoundException(
1✔
4024
                "No policy is associated with the given resource.", Type="User"
4025
            )
4026

4027
        return GetLayerVersionPolicyResponse(
1✔
4028
            Policy=json.dumps(
4029
                {
4030
                    "Version": layer_version.policy.version,
4031
                    "Id": layer_version.policy.id,
4032
                    "Statement": [
4033
                        {
4034
                            "Sid": ps.sid,
4035
                            "Effect": "Allow",
4036
                            "Principal": ps.principal,
4037
                            "Action": ps.action,
4038
                            "Resource": layer_version.layer_version_arn,
4039
                        }
4040
                        for ps in layer_version.policy.statements.values()
4041
                    ],
4042
                }
4043
            ),
4044
            RevisionId=layer_version.policy.revision_id,
4045
        )
4046

4047
    # =======================================
4048
    # =======  Function Concurrency  ========
4049
    # =======================================
4050
    # (Reserved) function concurrency is scoped to the whole function
4051

4052
    def get_function_concurrency(
1✔
4053
        self, context: RequestContext, function_name: FunctionName, **kwargs
4054
    ) -> GetFunctionConcurrencyResponse:
4055
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4056
        function_name = api_utils.get_function_name(function_name, context)
1✔
4057
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
4058
        return GetFunctionConcurrencyResponse(
1✔
4059
            ReservedConcurrentExecutions=fn.reserved_concurrent_executions
4060
        )
4061

4062
    def put_function_concurrency(
1✔
4063
        self,
4064
        context: RequestContext,
4065
        function_name: FunctionName,
4066
        reserved_concurrent_executions: ReservedConcurrentExecutions,
4067
        **kwargs,
4068
    ) -> Concurrency:
4069
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4070

4071
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4072
        if qualifier:
1✔
4073
            raise InvalidParameterValueException(
1✔
4074
                "This operation is permitted on Lambda functions only. Aliases and versions do not support this operation. Please specify either a function name or an unqualified function ARN.",
4075
                Type="User",
4076
            )
4077

4078
        store = lambda_stores[account_id][region]
1✔
4079
        fn = store.functions.get(function_name)
1✔
4080
        if not fn:
1✔
4081
            fn_arn = api_utils.qualified_lambda_arn(
1✔
4082
                function_name,
4083
                qualifier="$LATEST",
4084
                account=account_id,
4085
                region=region,
4086
            )
4087
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
4088

4089
        settings = self.get_account_settings(context)
1✔
4090
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
4091
            "UnreservedConcurrentExecutions"
4092
        ]
4093

4094
        # The existing reserved concurrent executions for the same function are already deduced in
4095
        # unreserved_concurrent_executions but must not count because the new one will replace the existing one.
4096
        # Joel tested this behavior manually against AWS (2023-11-28).
4097
        existing_reserved_concurrent_executions = (
1✔
4098
            fn.reserved_concurrent_executions if fn.reserved_concurrent_executions else 0
4099
        )
4100
        if (
1✔
4101
            unreserved_concurrent_executions
4102
            - reserved_concurrent_executions
4103
            + existing_reserved_concurrent_executions
4104
        ) < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY:
4105
            raise InvalidParameterValueException(
1✔
4106
                f"Specified ReservedConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
4107
            )
4108

4109
        total_provisioned_concurrency = sum(
1✔
4110
            [
4111
                provisioned_configs.provisioned_concurrent_executions
4112
                for provisioned_configs in fn.provisioned_concurrency_configs.values()
4113
            ]
4114
        )
4115
        if total_provisioned_concurrency > reserved_concurrent_executions:
1✔
4116
            raise InvalidParameterValueException(
1✔
4117
                f" ReservedConcurrentExecutions  {reserved_concurrent_executions} should not be lower than function's total provisioned concurrency [{total_provisioned_concurrency}]."
4118
            )
4119

4120
        fn.reserved_concurrent_executions = reserved_concurrent_executions
1✔
4121

4122
        return Concurrency(ReservedConcurrentExecutions=fn.reserved_concurrent_executions)
1✔
4123

4124
    def delete_function_concurrency(
1✔
4125
        self, context: RequestContext, function_name: FunctionName, **kwargs
4126
    ) -> None:
4127
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4128
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4129
        store = lambda_stores[account_id][region]
1✔
4130
        fn = store.functions.get(function_name)
1✔
4131
        fn.reserved_concurrent_executions = None
1✔
4132

4133
    # =======================================
4134
    # ===============  TAGS   ===============
4135
    # =======================================
4136
    # only Function, Event Source Mapping, and Code Signing Config (not currently supported by LocalStack) ARNs an are available for tagging in AWS
4137

4138
    def _get_tags(self, resource: TaggableResource) -> dict[str, str]:
1✔
4139
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4140
        lambda_adapted_tags = {
1✔
4141
            tag["Key"]: tag["Value"]
4142
            for tag in state.TAGS.list_tags_for_resource(resource).get("Tags")
4143
        }
4144
        return lambda_adapted_tags
1✔
4145

4146
    def _store_tags(self, resource: TaggableResource, tags: dict[str, str]):
1✔
4147
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4148
        if len(state.TAGS.tags.get(resource, {}) | tags) > LAMBDA_TAG_LIMIT_PER_RESOURCE:
1✔
4149
            raise InvalidParameterValueException(
1✔
4150
                "Number of tags exceeds resource tag limit.", Type="User"
4151
            )
4152

4153
        tag_svc_adapted_tags = [{"Key": key, "Value": value} for key, value in tags.items()]
1✔
4154
        state.TAGS.tag_resource(resource, tag_svc_adapted_tags)
1✔
4155

4156
    def fetch_lambda_store_for_tagging(self, resource: TaggableResource) -> LambdaStore:
1✔
4157
        """
4158
        Takes a resource ARN for a TaggableResource (Lambda Function, Event Source Mapping, or Code Signing Config) and returns a corresponding
4159
        LambdaStore for its region and account.
4160

4161
        In addition, this function validates that the ARN is a valid TaggableResource type, and that the TaggableResource exists.
4162

4163
        Raises:
4164
            ValidationException: If the resource ARN is not a full ARN for a TaggableResource.
4165
            ResourceNotFoundException: If the specified resource does not exist.
4166
            InvalidParameterValueException: If the resource ARN is a qualified Lambda Function.
4167
        """
4168

4169
        def _raise_validation_exception():
1✔
4170
            raise ValidationException(
1✔
4171
                f"1 validation error detected: Value '{resource}' at 'resource' failed to satisfy constraint: Member must satisfy regular expression pattern: {api_utils.TAGGABLE_RESOURCE_ARN_PATTERN}"
4172
            )
4173

4174
        # Check whether the ARN we have been passed is correctly formatted
4175
        parsed_resource_arn: ArnData = None
1✔
4176
        try:
1✔
4177
            parsed_resource_arn = parse_arn(resource)
1✔
4178
        except Exception:
1✔
4179
            _raise_validation_exception()
1✔
4180

4181
        # TODO: Should we be checking whether this is a full ARN?
4182
        region, account_id, resource_type = map(
1✔
4183
            parsed_resource_arn.get, ("region", "account", "resource")
4184
        )
4185

4186
        if not all((region, account_id, resource_type)):
1✔
4187
            _raise_validation_exception()
×
4188

4189
        if not (parts := resource_type.split(":")):
1✔
4190
            _raise_validation_exception()
×
4191

4192
        resource_type, resource_identifier, *qualifier = parts
1✔
4193
        if resource_type not in {"event-source-mapping", "code-signing-config", "function"}:
1✔
4194
            _raise_validation_exception()
1✔
4195

4196
        if qualifier:
1✔
4197
            if resource_type == "function":
1✔
4198
                raise InvalidParameterValueException(
1✔
4199
                    "Tags on function aliases and versions are not supported. Please specify a function ARN.",
4200
                    Type="User",
4201
                )
4202
            _raise_validation_exception()
1✔
4203

4204
        match resource_type:
1✔
4205
            case "event-source-mapping":
1✔
4206
                self._get_esm(resource_identifier, account_id, region)
1✔
4207
            case "code-signing-config":
1✔
4208
                raise NotImplementedError("Resource tagging on CSC not yet implemented.")
4209
            case "function":
1✔
4210
                self._get_function(
1✔
4211
                    function_name=resource_identifier, account_id=account_id, region=region
4212
                )
4213

4214
        # If no exceptions are raised, assume ARN and referenced resource is valid for tag operations
4215
        return lambda_stores[account_id][region]
1✔
4216

4217
    def tag_resource(
1✔
4218
        self, context: RequestContext, resource: TaggableResource, tags: Tags, **kwargs
4219
    ) -> None:
4220
        if not tags:
1✔
4221
            raise InvalidParameterValueException(
1✔
4222
                "An error occurred and the request cannot be processed.", Type="User"
4223
            )
4224
        self._store_tags(resource, tags)
1✔
4225

4226
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4227
            "function"
4228
        ):
4229
            name, _, account, region = function_locators_from_arn(resource)
1✔
4230
            function = self._get_function(name, account, region)
1✔
4231
            with function.lock:
1✔
4232
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4233
                latest_version = function.versions["$LATEST"]
1✔
4234
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4235
                    latest_version, config=dataclasses.replace(latest_version.config)
4236
                )
4237

4238
    def list_tags(
1✔
4239
        self, context: RequestContext, resource: TaggableResource, **kwargs
4240
    ) -> ListTagsResponse:
4241
        tags = self._get_tags(resource)
1✔
4242
        return ListTagsResponse(Tags=tags)
1✔
4243

4244
    def untag_resource(
1✔
4245
        self, context: RequestContext, resource: TaggableResource, tag_keys: TagKeyList, **kwargs
4246
    ) -> None:
4247
        if not tag_keys:
1✔
4248
            raise ValidationException(
1✔
4249
                "1 validation error detected: Value null at 'tagKeys' failed to satisfy constraint: Member must not be null"
4250
            )  # should probably be generalized a bit
4251

4252
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4253
        state.TAGS.untag_resource(resource, tag_keys)
1✔
4254

4255
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4256
            "function"
4257
        ):
4258
            name, _, account, region = function_locators_from_arn(resource)
1✔
4259
            function = self._get_function(name, account, region)
1✔
4260
            # TODO: Potential race condition
4261
            with function.lock:
1✔
4262
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4263
                latest_version = function.versions["$LATEST"]
1✔
4264
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4265
                    latest_version, config=dataclasses.replace(latest_version.config)
4266
                )
4267

4268
    # =======================================
4269
    # =======  LEGACY / DEPRECATED   ========
4270
    # =======================================
4271

4272
    def invoke_async(
1✔
4273
        self,
4274
        context: RequestContext,
4275
        function_name: NamespacedFunctionName,
4276
        invoke_args: IO[BlobStream],
4277
        **kwargs,
4278
    ) -> InvokeAsyncResponse:
4279
        """LEGACY API endpoint. Even AWS heavily discourages its usage."""
4280
        raise NotImplementedError
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc