• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 48f393e1-29b1-4c63-9492-bea03b969f84

03 Mar 2025 10:41PM UTC coverage: 86.891% (+0.01%) from 86.878%
48f393e1-29b1-4c63-9492-bea03b969f84

push

circleci

web-flow
APIGW: add validation for AWS ARN in PutIntegration (#12324)

11 of 13 new or added lines in 1 file covered. (84.62%)

124 existing lines in 9 files now uncovered.

61911 of 71251 relevant lines covered (86.89%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.87
/localstack-core/localstack/services/lambda_/provider.py
1
import base64
1✔
2
import dataclasses
1✔
3
import datetime
1✔
4
import itertools
1✔
5
import json
1✔
6
import logging
1✔
7
import re
1✔
8
import threading
1✔
9
import time
1✔
10
from typing import IO, Any, Optional, Tuple
1✔
11

12
from botocore.exceptions import ClientError
1✔
13

14
from localstack import config
1✔
15
from localstack.aws.api import RequestContext, ServiceException, handler
1✔
16
from localstack.aws.api.lambda_ import (
1✔
17
    AccountLimit,
18
    AccountUsage,
19
    AddLayerVersionPermissionResponse,
20
    AddPermissionRequest,
21
    AddPermissionResponse,
22
    Alias,
23
    AliasConfiguration,
24
    AliasRoutingConfiguration,
25
    AllowedPublishers,
26
    Architecture,
27
    Arn,
28
    Blob,
29
    BlobStream,
30
    CodeSigningConfigArn,
31
    CodeSigningConfigNotFoundException,
32
    CodeSigningPolicies,
33
    CompatibleArchitectures,
34
    CompatibleRuntimes,
35
    Concurrency,
36
    Cors,
37
    CreateCodeSigningConfigResponse,
38
    CreateEventSourceMappingRequest,
39
    CreateFunctionRequest,
40
    CreateFunctionUrlConfigResponse,
41
    DeleteCodeSigningConfigResponse,
42
    Description,
43
    DestinationConfig,
44
    EventSourceMappingConfiguration,
45
    FunctionCodeLocation,
46
    FunctionConfiguration,
47
    FunctionEventInvokeConfig,
48
    FunctionName,
49
    FunctionUrlAuthType,
50
    FunctionUrlQualifier,
51
    GetAccountSettingsResponse,
52
    GetCodeSigningConfigResponse,
53
    GetFunctionCodeSigningConfigResponse,
54
    GetFunctionConcurrencyResponse,
55
    GetFunctionRecursionConfigResponse,
56
    GetFunctionResponse,
57
    GetFunctionUrlConfigResponse,
58
    GetLayerVersionPolicyResponse,
59
    GetLayerVersionResponse,
60
    GetPolicyResponse,
61
    GetProvisionedConcurrencyConfigResponse,
62
    InvalidParameterValueException,
63
    InvocationResponse,
64
    InvocationType,
65
    InvokeAsyncResponse,
66
    InvokeMode,
67
    LambdaApi,
68
    LastUpdateStatus,
69
    LayerName,
70
    LayerPermissionAllowedAction,
71
    LayerPermissionAllowedPrincipal,
72
    LayersListItem,
73
    LayerVersionArn,
74
    LayerVersionContentInput,
75
    LayerVersionNumber,
76
    LicenseInfo,
77
    ListAliasesResponse,
78
    ListCodeSigningConfigsResponse,
79
    ListEventSourceMappingsResponse,
80
    ListFunctionEventInvokeConfigsResponse,
81
    ListFunctionsByCodeSigningConfigResponse,
82
    ListFunctionsResponse,
83
    ListFunctionUrlConfigsResponse,
84
    ListLayersResponse,
85
    ListLayerVersionsResponse,
86
    ListProvisionedConcurrencyConfigsResponse,
87
    ListTagsResponse,
88
    ListVersionsByFunctionResponse,
89
    LogFormat,
90
    LoggingConfig,
91
    LogType,
92
    MasterRegion,
93
    MaxFunctionEventInvokeConfigListItems,
94
    MaximumEventAgeInSeconds,
95
    MaximumRetryAttempts,
96
    MaxItems,
97
    MaxLayerListItems,
98
    MaxListItems,
99
    MaxProvisionedConcurrencyConfigListItems,
100
    NamespacedFunctionName,
101
    NamespacedStatementId,
102
    OnFailure,
103
    OnSuccess,
104
    OrganizationId,
105
    PackageType,
106
    PositiveInteger,
107
    PreconditionFailedException,
108
    ProvisionedConcurrencyConfigListItem,
109
    ProvisionedConcurrencyConfigNotFoundException,
110
    ProvisionedConcurrencyStatusEnum,
111
    PublishLayerVersionResponse,
112
    PutFunctionCodeSigningConfigResponse,
113
    PutFunctionRecursionConfigResponse,
114
    PutProvisionedConcurrencyConfigResponse,
115
    Qualifier,
116
    RecursiveLoop,
117
    ReservedConcurrentExecutions,
118
    ResourceConflictException,
119
    ResourceNotFoundException,
120
    Runtime,
121
    RuntimeVersionConfig,
122
    SnapStart,
123
    SnapStartApplyOn,
124
    SnapStartOptimizationStatus,
125
    SnapStartResponse,
126
    State,
127
    StatementId,
128
    StateReasonCode,
129
    String,
130
    TaggableResource,
131
    TagKeyList,
132
    Tags,
133
    TracingMode,
134
    UnqualifiedFunctionName,
135
    UpdateCodeSigningConfigResponse,
136
    UpdateEventSourceMappingRequest,
137
    UpdateFunctionCodeRequest,
138
    UpdateFunctionConfigurationRequest,
139
    UpdateFunctionUrlConfigResponse,
140
    Version,
141
)
142
from localstack.aws.api.lambda_ import FunctionVersion as FunctionVersionApi
1✔
143
from localstack.aws.api.lambda_ import ServiceException as LambdaServiceException
1✔
144
from localstack.aws.connect import connect_to
1✔
145
from localstack.aws.spec import load_service
1✔
146
from localstack.services.edge import ROUTER
1✔
147
from localstack.services.lambda_ import api_utils
1✔
148
from localstack.services.lambda_ import hooks as lambda_hooks
1✔
149
from localstack.services.lambda_.api_utils import (
1✔
150
    ARCHITECTURES,
151
    STATEMENT_ID_REGEX,
152
    SUBNET_ID_REGEX,
153
    function_locators_from_arn,
154
)
155
from localstack.services.lambda_.event_source_mapping.esm_config_factory import (
1✔
156
    EsmConfigFactory,
157
)
158
from localstack.services.lambda_.event_source_mapping.esm_worker import (
1✔
159
    EsmState,
160
    EsmWorker,
161
)
162
from localstack.services.lambda_.event_source_mapping.esm_worker_factory import (
1✔
163
    EsmWorkerFactory,
164
)
165
from localstack.services.lambda_.event_source_mapping.pipe_utils import get_internal_client
1✔
166
from localstack.services.lambda_.invocation import AccessDeniedException
1✔
167
from localstack.services.lambda_.invocation.execution_environment import (
1✔
168
    EnvironmentStartupTimeoutException,
169
)
170
from localstack.services.lambda_.invocation.lambda_models import (
1✔
171
    AliasRoutingConfig,
172
    CodeSigningConfig,
173
    EventInvokeConfig,
174
    Function,
175
    FunctionResourcePolicy,
176
    FunctionUrlConfig,
177
    FunctionVersion,
178
    ImageConfig,
179
    LambdaEphemeralStorage,
180
    Layer,
181
    LayerPolicy,
182
    LayerPolicyStatement,
183
    LayerVersion,
184
    ProvisionedConcurrencyConfiguration,
185
    RequestEntityTooLargeException,
186
    ResourcePolicy,
187
    UpdateStatus,
188
    ValidationException,
189
    VersionAlias,
190
    VersionFunctionConfiguration,
191
    VersionIdentifier,
192
    VersionState,
193
    VpcConfig,
194
)
195
from localstack.services.lambda_.invocation.lambda_service import (
1✔
196
    LambdaService,
197
    create_image_code,
198
    destroy_code_if_not_used,
199
    lambda_stores,
200
    store_lambda_archive,
201
    store_s3_bucket_archive,
202
)
203
from localstack.services.lambda_.invocation.models import LambdaStore
1✔
204
from localstack.services.lambda_.invocation.runtime_executor import get_runtime_executor
1✔
205
from localstack.services.lambda_.lambda_utils import HINT_LOG
1✔
206
from localstack.services.lambda_.layerfetcher.layer_fetcher import LayerFetcher
1✔
207
from localstack.services.lambda_.provider_utils import (
1✔
208
    LambdaLayerVersionIdentifier,
209
    get_function_version,
210
    get_function_version_from_arn,
211
)
212
from localstack.services.lambda_.runtimes import (
1✔
213
    ALL_RUNTIMES,
214
    DEPRECATED_RUNTIMES,
215
    DEPRECATED_RUNTIMES_UPGRADES,
216
    RUNTIMES_AGGREGATED,
217
    VALID_RUNTIMES,
218
)
219
from localstack.services.lambda_.urlrouter import FunctionUrlRouter
1✔
220
from localstack.services.plugins import ServiceLifecycleHook
1✔
221
from localstack.state import StateVisitor
1✔
222
from localstack.utils.aws.arns import (
1✔
223
    ArnData,
224
    extract_resource_from_arn,
225
    extract_service_from_arn,
226
    get_partition,
227
    lambda_event_source_mapping_arn,
228
    parse_arn,
229
)
230
from localstack.utils.aws.client_types import ServicePrincipal
1✔
231
from localstack.utils.bootstrap import is_api_enabled
1✔
232
from localstack.utils.collections import PaginatedList
1✔
233
from localstack.utils.event_matcher import validate_event_pattern
1✔
234
from localstack.utils.lambda_debug_mode.lambda_debug_mode_session import LambdaDebugModeSession
1✔
235
from localstack.utils.strings import get_random_hex, short_uid, to_bytes, to_str
1✔
236
from localstack.utils.sync import poll_condition
1✔
237
from localstack.utils.urls import localstack_host
1✔
238

239
LOG = logging.getLogger(__name__)
1✔
240

241
LAMBDA_DEFAULT_TIMEOUT = 3
1✔
242
LAMBDA_DEFAULT_MEMORY_SIZE = 128
1✔
243

244
LAMBDA_TAG_LIMIT_PER_RESOURCE = 50
1✔
245
LAMBDA_LAYERS_LIMIT_PER_FUNCTION = 5
1✔
246

247
TAG_KEY_CUSTOM_URL = "_custom_id_"
1✔
248
# Requirements (from RFC3986 & co): not longer than 63, first char must be
249
# alpha, then alphanumeric or hyphen, except cannot start or end with hyphen
250
TAG_KEY_CUSTOM_URL_VALIDATOR = re.compile(r"^[A-Za-z]([A-Za-z0-9\-]{0,61}[A-Za-z0-9])?$")
1✔
251

252

253
class LambdaProvider(LambdaApi, ServiceLifecycleHook):
1✔
254
    lambda_service: LambdaService
1✔
255
    create_fn_lock: threading.RLock
1✔
256
    create_layer_lock: threading.RLock
1✔
257
    router: FunctionUrlRouter
1✔
258
    esm_workers: dict[str, EsmWorker]
1✔
259
    layer_fetcher: LayerFetcher | None
1✔
260

261
    def __init__(self) -> None:
1✔
262
        self.lambda_service = LambdaService()
1✔
263
        self.create_fn_lock = threading.RLock()
1✔
264
        self.create_layer_lock = threading.RLock()
1✔
265
        self.router = FunctionUrlRouter(ROUTER, self.lambda_service)
1✔
266
        self.esm_workers = {}
1✔
267
        self.layer_fetcher = None
1✔
268
        lambda_hooks.inject_layer_fetcher.run(self)
1✔
269

270
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
UNCOV
271
        visitor.visit(lambda_stores)
×
272

273
    def on_before_start(self):
1✔
274
        # Attempt to start the Lambda Debug Mode session object.
275
        try:
1✔
276
            lambda_debug_mode_session = LambdaDebugModeSession.get()
1✔
277
            lambda_debug_mode_session.ensure_running()
1✔
UNCOV
278
        except Exception as ex:
×
UNCOV
279
            LOG.error(
×
280
                "Unexpected error encountered when attempting to initialise Lambda Debug Mode '%s'.",
281
                ex,
282
            )
283

284
    def on_before_state_reset(self):
1✔
UNCOV
285
        self.lambda_service.stop()
×
286

287
    def on_after_state_reset(self):
1✔
UNCOV
288
        self.router.lambda_service = self.lambda_service = LambdaService()
×
289

290
    def on_before_state_load(self):
1✔
291
        self.lambda_service.stop()
×
292

293
    def on_after_state_load(self):
1✔
294
        self.lambda_service = LambdaService()
×
295
        self.router.lambda_service = self.lambda_service
×
296

UNCOV
297
        for account_id, account_bundle in lambda_stores.items():
×
298
            for region_name, state in account_bundle.items():
×
299
                for fn in state.functions.values():
×
UNCOV
300
                    for fn_version in fn.versions.values():
×
301
                        # restore the "Pending" state for every function version and start it
UNCOV
302
                        try:
×
UNCOV
303
                            new_state = VersionState(
×
304
                                state=State.Pending,
305
                                code=StateReasonCode.Creating,
306
                                reason="The function is being created.",
307
                            )
UNCOV
308
                            new_config = dataclasses.replace(fn_version.config, state=new_state)
×
UNCOV
309
                            new_version = dataclasses.replace(fn_version, config=new_config)
×
310
                            fn.versions[fn_version.id.qualifier] = new_version
×
311
                            self.lambda_service.create_function_version(fn_version).result(
×
312
                                timeout=5
313
                            )
UNCOV
314
                        except Exception:
×
UNCOV
315
                            LOG.warning(
×
316
                                "Failed to restore function version %s",
317
                                fn_version.id.qualified_arn(),
318
                                exc_info=True,
319
                            )
320
                    # restore provisioned concurrency per function considering both versions and aliases
321
                    for (
×
322
                        provisioned_qualifier,
323
                        provisioned_config,
324
                    ) in fn.provisioned_concurrency_configs.items():
325
                        fn_arn = None
×
326
                        try:
×
327
                            if api_utils.qualifier_is_alias(provisioned_qualifier):
×
328
                                alias = fn.aliases.get(provisioned_qualifier)
×
329
                                resolved_version = fn.versions.get(alias.function_version)
×
UNCOV
330
                                fn_arn = resolved_version.id.qualified_arn()
×
331
                            elif api_utils.qualifier_is_version(provisioned_qualifier):
×
UNCOV
332
                                fn_version = fn.versions.get(provisioned_qualifier)
×
UNCOV
333
                                fn_arn = fn_version.id.qualified_arn()
×
334
                            else:
UNCOV
335
                                raise InvalidParameterValueException(
×
336
                                    "Invalid qualifier type:"
337
                                    " Qualifier can only be an alias or a version for provisioned concurrency."
338
                                )
339

340
                            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
341
                            manager.update_provisioned_concurrency_config(
×
342
                                provisioned_config.provisioned_concurrent_executions
343
                            )
UNCOV
344
                        except Exception:
×
UNCOV
345
                            LOG.warning(
×
346
                                "Failed to restore provisioned concurrency %s for function %s",
347
                                provisioned_config,
348
                                fn_arn,
349
                                exc_info=True,
350
                            )
351

UNCOV
352
                for esm in state.event_source_mappings.values():
×
353
                    # Restores event source workers
354
                    function_arn = esm.get("FunctionArn")
×
355

356
                    # TODO: How do we know the event source is up?
357
                    # A basic poll to see if the mapped Lambda function is active/failed
UNCOV
358
                    if not poll_condition(
×
359
                        lambda: get_function_version_from_arn(function_arn).config.state.state
360
                        in [State.Active, State.Failed],
361
                        timeout=10,
362
                    ):
UNCOV
363
                        LOG.warning(
×
364
                            "Creating ESM for Lambda that is not in running state: %s",
365
                            function_arn,
366
                        )
367

UNCOV
368
                    function_version = get_function_version_from_arn(function_arn)
×
UNCOV
369
                    function_role = function_version.config.role
×
370

371
                    is_esm_enabled = esm.get("State", EsmState.DISABLED) not in (
×
372
                        EsmState.DISABLED,
373
                        EsmState.DISABLING,
374
                    )
UNCOV
375
                    esm_worker = EsmWorkerFactory(
×
376
                        esm, function_role, is_esm_enabled
377
                    ).get_esm_worker()
378

379
                    # Note: a worker is created in the DISABLED state if not enabled
UNCOV
380
                    esm_worker.create()
×
381
                    # TODO: assigning the esm_worker to the dict only works after .create(). Could it cause a race
382
                    #  condition if we get a shutdown here and have a worker thread spawned but not accounted for?
UNCOV
383
                    self.esm_workers[esm_worker.uuid] = esm_worker
×
384

385
    def on_after_init(self):
1✔
386
        self.router.register_routes()
1✔
387
        get_runtime_executor().validate_environment()
1✔
388

389
    def on_before_stop(self) -> None:
1✔
390
        for esm_worker in self.esm_workers.values():
1✔
391
            esm_worker.stop_for_shutdown()
1✔
392

393
        # TODO: should probably unregister routes?
394
        self.lambda_service.stop()
1✔
395
        # Attempt to signal to the Lambda Debug Mode session object to stop.
396
        try:
1✔
397
            lambda_debug_mode_session = LambdaDebugModeSession.get()
1✔
398
            lambda_debug_mode_session.signal_stop()
1✔
UNCOV
399
        except Exception as ex:
×
UNCOV
400
            LOG.error(
×
401
                "Unexpected error encountered when attempting to signal Lambda Debug Mode to stop '%s'.",
402
                ex,
403
            )
404

405
    @staticmethod
1✔
406
    def _get_function(function_name: str, account_id: str, region: str) -> Function:
1✔
407
        state = lambda_stores[account_id][region]
1✔
408
        function = state.functions.get(function_name)
1✔
409
        if not function:
1✔
410
            arn = api_utils.unqualified_lambda_arn(
1✔
411
                function_name=function_name,
412
                account=account_id,
413
                region=region,
414
            )
415
            raise ResourceNotFoundException(
1✔
416
                f"Function not found: {arn}",
417
                Type="User",
418
            )
419
        return function
1✔
420

421
    @staticmethod
1✔
422
    def _get_esm(uuid: str, account_id: str, region: str) -> EventSourceMappingConfiguration:
1✔
423
        state = lambda_stores[account_id][region]
1✔
424
        esm = state.event_source_mappings.get(uuid)
1✔
425
        if not esm:
1✔
426
            arn = lambda_event_source_mapping_arn(uuid, account_id, region)
1✔
427
            raise ResourceNotFoundException(
1✔
428
                f"Event source mapping not found: {arn}",
429
                Type="User",
430
            )
431
        return esm
1✔
432

433
    @staticmethod
1✔
434
    def _validate_qualifier_expression(qualifier: str) -> None:
1✔
435
        if error_messages := api_utils.validate_qualifier(qualifier):
1✔
UNCOV
436
            raise ValidationException(
×
437
                message=api_utils.construct_validation_exception_message(error_messages)
438
            )
439

440
    @staticmethod
1✔
441
    def _resolve_fn_qualifier(resolved_fn: Function, qualifier: str | None) -> tuple[str, str]:
1✔
442
        """Attempts to resolve a given qualifier and returns a qualifier that exists or
443
        raises an appropriate ResourceNotFoundException.
444

445
        :param resolved_fn: The resolved lambda function
446
        :param qualifier: The qualifier to be resolved or None
447
        :return: Tuple of (resolved qualifier, function arn either qualified or unqualified)"""
448
        function_name = resolved_fn.function_name
1✔
449
        # assuming function versions need to live in the same account and region
450
        account_id = resolved_fn.latest().id.account
1✔
451
        region = resolved_fn.latest().id.region
1✔
452
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
453
        if qualifier is not None:
1✔
454
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
455
            if api_utils.qualifier_is_alias(qualifier):
1✔
456
                if qualifier not in resolved_fn.aliases:
1✔
457
                    raise ResourceNotFoundException(f"Cannot find alias arn: {fn_arn}", Type="User")
1✔
458
            elif api_utils.qualifier_is_version(qualifier) or qualifier == "$LATEST":
1✔
459
                if qualifier not in resolved_fn.versions:
1✔
460
                    raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
461
            else:
462
                # matches qualifier pattern but invalid alias or version
463
                raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
464
        resolved_qualifier = qualifier or "$LATEST"
1✔
465
        return resolved_qualifier, fn_arn
1✔
466

467
    @staticmethod
1✔
468
    def _function_revision_id(resolved_fn: Function, resolved_qualifier: str) -> str:
1✔
469
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
470
            return resolved_fn.aliases[resolved_qualifier].revision_id
1✔
471
        # Assumes that a non-alias is a version
472
        else:
473
            return resolved_fn.versions[resolved_qualifier].config.revision_id
1✔
474

475
    def _resolve_vpc_id(self, account_id: str, region_name: str, subnet_id: str) -> str:
1✔
476
        ec2_client = connect_to(aws_access_key_id=account_id, region_name=region_name).ec2
1✔
477
        try:
1✔
478
            return ec2_client.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["VpcId"]
1✔
479
        except ec2_client.exceptions.ClientError as e:
1✔
480
            code = e.response["Error"]["Code"]
1✔
481
            message = e.response["Error"]["Message"]
1✔
482
            raise InvalidParameterValueException(
1✔
483
                f"Error occurred while DescribeSubnets. EC2 Error Code: {code}. EC2 Error Message: {message}",
484
                Type="User",
485
            )
486

487
    def _build_vpc_config(
1✔
488
        self,
489
        account_id: str,
490
        region_name: str,
491
        vpc_config: Optional[dict] = None,
492
    ) -> VpcConfig | None:
493
        if not vpc_config or not is_api_enabled("ec2"):
1✔
494
            return None
1✔
495

496
        subnet_ids = vpc_config.get("SubnetIds", [])
1✔
497
        if subnet_ids is not None and len(subnet_ids) == 0:
1✔
498
            return VpcConfig(vpc_id="", security_group_ids=[], subnet_ids=[])
1✔
499

500
        subnet_id = subnet_ids[0]
1✔
501
        if not bool(SUBNET_ID_REGEX.match(subnet_id)):
1✔
502
            raise ValidationException(
1✔
503
                f"1 validation error detected: Value '[{subnet_id}]' at 'vpcConfig.subnetIds' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 0, Member must satisfy regular expression pattern: ^subnet-[0-9a-z]*$]"
504
            )
505

506
        return VpcConfig(
1✔
507
            vpc_id=self._resolve_vpc_id(account_id, region_name, subnet_id),
508
            security_group_ids=vpc_config.get("SecurityGroupIds", []),
509
            subnet_ids=subnet_ids,
510
        )
511

512
    def _create_version_model(
1✔
513
        self,
514
        function_name: str,
515
        region: str,
516
        account_id: str,
517
        description: str | None = None,
518
        revision_id: str | None = None,
519
        code_sha256: str | None = None,
520
    ) -> tuple[FunctionVersion, bool]:
521
        """
522
        Release a new version to the model if all restrictions are met.
523
        Restrictions:
524
          - CodeSha256, if provided, must equal the current latest version code hash
525
          - RevisionId, if provided, must equal the current latest version revision id
526
          - Some changes have been done to the latest version since last publish
527
        Will return a tuple of the version, and whether the version was published (True) or the latest available version was taken (False).
528
        This can happen if the latest version has not been changed since the last version publish, in this case the last version will be returned.
529

530
        :param function_name: Function name to be published
531
        :param region: Region of the function
532
        :param account_id: Account of the function
533
        :param description: new description of the version (will be the description of the function if missing)
534
        :param revision_id: Revision id, function will raise error if it does not match latest revision id
535
        :param code_sha256: Code sha256, function will raise error if it does not match latest code hash
536
        :return: Tuple of (published version, whether version was released or last released version returned, since nothing changed)
537
        """
538
        current_latest_version = get_function_version(
1✔
539
            function_name=function_name, qualifier="$LATEST", account_id=account_id, region=region
540
        )
541
        if revision_id and current_latest_version.config.revision_id != revision_id:
1✔
542
            raise PreconditionFailedException(
1✔
543
                "The Revision Id provided does not match the latest Revision Id. Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
544
                Type="User",
545
            )
546

547
        # check if code hashes match if they are specified
548
        current_hash = (
1✔
549
            current_latest_version.config.code.code_sha256
550
            if current_latest_version.config.package_type == PackageType.Zip
551
            else current_latest_version.config.image.code_sha256
552
        )
553
        # if the code is a zip package and hot reloaded (hot reloading is currently only supported for zip packagetypes)
554
        # we cannot enforce the codesha256 check
555
        is_hot_reloaded_zip_package = (
1✔
556
            current_latest_version.config.package_type == PackageType.Zip
557
            and current_latest_version.config.code.is_hot_reloading()
558
        )
559
        if code_sha256 and current_hash != code_sha256 and not is_hot_reloaded_zip_package:
1✔
560
            raise InvalidParameterValueException(
1✔
561
                f"CodeSHA256 ({code_sha256}) is different from current CodeSHA256 in $LATEST ({current_hash}). Please try again with the CodeSHA256 in $LATEST.",
562
                Type="User",
563
            )
564

565
        state = lambda_stores[account_id][region]
1✔
566
        function = state.functions.get(function_name)
1✔
567
        changes = {}
1✔
568
        if description is not None:
1✔
569
            changes["description"] = description
1✔
570
        # TODO copy environment instead of restarting one, get rid of all the "Pending"s
571

572
        with function.lock:
1✔
573
            if function.next_version > 1 and (
1✔
574
                prev_version := function.versions.get(str(function.next_version - 1))
575
            ):
576
                if (
1✔
577
                    prev_version.config.internal_revision
578
                    == current_latest_version.config.internal_revision
579
                ):
580
                    return prev_version, False
1✔
581
            # TODO check if there was a change since last version
582
            next_version = str(function.next_version)
1✔
583
            function.next_version += 1
1✔
584
            new_id = VersionIdentifier(
1✔
585
                function_name=function_name,
586
                qualifier=next_version,
587
                region=region,
588
                account=account_id,
589
            )
590
            apply_on = current_latest_version.config.snap_start["ApplyOn"]
1✔
591
            optimization_status = SnapStartOptimizationStatus.Off
1✔
592
            if apply_on == SnapStartApplyOn.PublishedVersions:
1✔
593
                optimization_status = SnapStartOptimizationStatus.On
1✔
594
            snap_start = SnapStartResponse(
1✔
595
                ApplyOn=apply_on,
596
                OptimizationStatus=optimization_status,
597
            )
598
            new_version = dataclasses.replace(
1✔
599
                current_latest_version,
600
                config=dataclasses.replace(
601
                    current_latest_version.config,
602
                    last_update=None,  # versions never have a last update status
603
                    state=VersionState(
604
                        state=State.Pending,
605
                        code=StateReasonCode.Creating,
606
                        reason="The function is being created.",
607
                    ),
608
                    snap_start=snap_start,
609
                    **changes,
610
                ),
611
                id=new_id,
612
            )
613
            function.versions[next_version] = new_version
1✔
614
        return new_version, True
1✔
615

616
    def _publish_version_from_existing_version(
1✔
617
        self,
618
        function_name: str,
619
        region: str,
620
        account_id: str,
621
        description: str | None = None,
622
        revision_id: str | None = None,
623
        code_sha256: str | None = None,
624
    ) -> FunctionVersion:
625
        """
626
        Publish version from an existing, already initialized LATEST
627

628
        :param function_name: Function name
629
        :param region: region
630
        :param account_id: account id
631
        :param description: description
632
        :param revision_id: revision id (check if current version matches)
633
        :param code_sha256: code sha (check if current code matches)
634
        :return: new version
635
        """
636
        new_version, changed = self._create_version_model(
1✔
637
            function_name=function_name,
638
            region=region,
639
            account_id=account_id,
640
            description=description,
641
            revision_id=revision_id,
642
            code_sha256=code_sha256,
643
        )
644
        if not changed:
1✔
645
            return new_version
1✔
646
        self.lambda_service.publish_version(new_version)
1✔
647
        state = lambda_stores[account_id][region]
1✔
648
        function = state.functions.get(function_name)
1✔
649
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
650
        latest_version = function.versions["$LATEST"]
1✔
651
        function.versions["$LATEST"] = dataclasses.replace(
1✔
652
            latest_version, config=dataclasses.replace(latest_version.config)
653
        )
654
        return function.versions.get(new_version.id.qualifier)
1✔
655

656
    def _publish_version_with_changes(
1✔
657
        self,
658
        function_name: str,
659
        region: str,
660
        account_id: str,
661
        description: str | None = None,
662
        revision_id: str | None = None,
663
        code_sha256: str | None = None,
664
    ) -> FunctionVersion:
665
        """
666
        Publish version together with a new latest version (publish on create / update)
667

668
        :param function_name: Function name
669
        :param region: region
670
        :param account_id: account id
671
        :param description: description
672
        :param revision_id: revision id (check if current version matches)
673
        :param code_sha256: code sha (check if current code matches)
674
        :return: new version
675
        """
676
        new_version, changed = self._create_version_model(
1✔
677
            function_name=function_name,
678
            region=region,
679
            account_id=account_id,
680
            description=description,
681
            revision_id=revision_id,
682
            code_sha256=code_sha256,
683
        )
684
        if not changed:
1✔
UNCOV
685
            return new_version
×
686
        self.lambda_service.create_function_version(new_version)
1✔
687
        return new_version
1✔
688

689
    @staticmethod
1✔
690
    def _verify_env_variables(env_vars: dict[str, str]):
1✔
691
        dumped_env_vars = json.dumps(env_vars, separators=(",", ":"))
1✔
692
        if (
1✔
693
            len(dumped_env_vars.encode("utf-8"))
694
            > config.LAMBDA_LIMITS_MAX_FUNCTION_ENVVAR_SIZE_BYTES
695
        ):
696
            raise InvalidParameterValueException(
1✔
697
                f"Lambda was unable to configure your environment variables because the environment variables you have provided exceeded the 4KB limit. String measured: {dumped_env_vars}",
698
                Type="User",
699
            )
700

701
    @staticmethod
1✔
702
    def _validate_snapstart(snap_start: SnapStart, runtime: Runtime):
1✔
703
        apply_on = snap_start.get("ApplyOn")
1✔
704
        if apply_on not in [
1✔
705
            SnapStartApplyOn.PublishedVersions,
706
            SnapStartApplyOn.None_,
707
        ]:
708
            raise ValidationException(
1✔
709
                f"1 validation error detected: Value '{apply_on}' at 'snapStart.applyOn' failed to satisfy constraint: Member must satisfy enum value set: [PublishedVersions, None]"
710
            )
711

712
    def _validate_layers(self, new_layers: list[str], region: str, account_id: str):
1✔
713
        if len(new_layers) > LAMBDA_LAYERS_LIMIT_PER_FUNCTION:
1✔
714
            raise InvalidParameterValueException(
1✔
715
                "Cannot reference more than 5 layers.", Type="User"
716
            )
717

718
        visited_layers = dict()
1✔
719
        for layer_version_arn in new_layers:
1✔
720
            (
1✔
721
                layer_region,
722
                layer_account_id,
723
                layer_name,
724
                layer_version_str,
725
            ) = api_utils.parse_layer_arn(layer_version_arn)
726
            if layer_version_str is None:
1✔
727
                raise ValidationException(
1✔
728
                    f"1 validation error detected: Value '[{layer_version_arn}]'"
729
                    + r" at 'layers' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 140, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: (arn:[a-zA-Z0-9-]+:lambda:[a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\d{1}:\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+), Member must not be null]",
730
                )
731

732
            state = lambda_stores[layer_account_id][layer_region]
1✔
733
            layer = state.layers.get(layer_name)
1✔
734
            layer_version = None
1✔
735
            if layer is not None:
1✔
736
                layer_version = layer.layer_versions.get(layer_version_str)
1✔
737
            if layer_account_id == account_id:
1✔
738
                if region and layer_region != region:
1✔
739
                    raise InvalidParameterValueException(
1✔
740
                        f"Layers are not in the same region as the function. "
741
                        f"Layers are expected to be in region {region}.",
742
                        Type="User",
743
                    )
744
                if layer is None or layer.layer_versions.get(layer_version_str) is None:
1✔
745
                    raise InvalidParameterValueException(
1✔
746
                        f"Layer version {layer_version_arn} does not exist.", Type="User"
747
                    )
748
            else:  # External layer from other account
749
                # TODO: validate IAM layer policy here, allowing access by default for now and only checking region
UNCOV
750
                if region and layer_region != region:
×
751
                    # TODO: detect user or role from context when IAM users are implemented
752
                    user = "user/localstack-testing"
×
UNCOV
753
                    raise AccessDeniedException(
×
754
                        f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
755
                    )
UNCOV
756
                if layer is None or layer_version is None:
×
757
                    # Limitation: cannot fetch external layers when using the same account id as the target layer
758
                    # because we do not want to trigger the layer fetcher for every non-existing layer.
UNCOV
759
                    if self.layer_fetcher is None:
×
760
                        raise NotImplementedError(
761
                            "Fetching shared layers from AWS is a pro feature."
762
                        )
763

764
                    layer = self.layer_fetcher.fetch_layer(layer_version_arn)
×
UNCOV
765
                    if layer is None:
×
766
                        # TODO: detect user or role from context when IAM users are implemented
UNCOV
767
                        user = "user/localstack-testing"
×
UNCOV
768
                        raise AccessDeniedException(
×
769
                            f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
770
                        )
771

772
                    # Distinguish between new layer and new layer version
UNCOV
773
                    if layer_version is None:
×
774
                        # Create whole layer from scratch
UNCOV
775
                        state.layers[layer_name] = layer
×
776
                    else:
777
                        # Create layer version if another version of the same layer already exists
UNCOV
778
                        state.layers[layer_name].layer_versions[layer_version_str] = (
×
779
                            layer.layer_versions.get(layer_version_str)
780
                        )
781

782
            # only the first two matches in the array are considered for the error message
783
            layer_arn = ":".join(layer_version_arn.split(":")[:-1])
1✔
784
            if layer_arn in visited_layers:
1✔
785
                conflict_layer_version_arn = visited_layers[layer_arn]
1✔
786
                raise InvalidParameterValueException(
1✔
787
                    f"Two different versions of the same layer are not allowed to be referenced in the same function. {conflict_layer_version_arn} and {layer_version_arn} are versions of the same layer.",
788
                    Type="User",
789
                )
790
            visited_layers[layer_arn] = layer_version_arn
1✔
791

792
    @staticmethod
1✔
793
    def map_layers(new_layers: list[str]) -> list[LayerVersion]:
1✔
794
        layers = []
1✔
795
        for layer_version_arn in new_layers:
1✔
796
            region_name, account_id, layer_name, layer_version = api_utils.parse_layer_arn(
1✔
797
                layer_version_arn
798
            )
799
            layer = lambda_stores[account_id][region_name].layers.get(layer_name)
1✔
800
            layer_version = layer.layer_versions.get(layer_version)
1✔
801
            layers.append(layer_version)
1✔
802
        return layers
1✔
803

804
    def get_function_recursion_config(
1✔
805
        self,
806
        context: RequestContext,
807
        function_name: UnqualifiedFunctionName,
808
        **kwargs,
809
    ) -> GetFunctionRecursionConfigResponse:
810
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
811
        function_name = api_utils.get_function_name(function_name, context)
1✔
812
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
813
        return GetFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
814

815
    def put_function_recursion_config(
1✔
816
        self,
817
        context: RequestContext,
818
        function_name: UnqualifiedFunctionName,
819
        recursive_loop: RecursiveLoop,
820
        **kwargs,
821
    ) -> PutFunctionRecursionConfigResponse:
822
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
823
        function_name = api_utils.get_function_name(function_name, context)
1✔
824

825
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
826

827
        allowed_values = list(RecursiveLoop.__members__.values())
1✔
828
        if recursive_loop not in allowed_values:
1✔
829
            raise ValidationException(
1✔
830
                f"1 validation error detected: Value '{recursive_loop}' at 'recursiveLoop' failed to satisfy constraint: "
831
                f"Member must satisfy enum value set: [Terminate, Allow]"
832
            )
833

834
        fn.recursive_loop = recursive_loop
1✔
835
        return PutFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
836

837
    @handler(operation="CreateFunction", expand=False)
1✔
838
    def create_function(
1✔
839
        self,
840
        context: RequestContext,
841
        request: CreateFunctionRequest,
842
    ) -> FunctionConfiguration:
843
        context_region = context.region
1✔
844
        context_account_id = context.account_id
1✔
845

846
        zip_file = request.get("Code", {}).get("ZipFile")
1✔
847
        if zip_file and len(zip_file) > config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED:
1✔
848
            raise RequestEntityTooLargeException(
1✔
849
                f"Zipped size must be smaller than {config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED} bytes"
850
            )
851

852
        if context.request.content_length > config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE:
1✔
853
            raise RequestEntityTooLargeException(
1✔
854
                f"Request must be smaller than {config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE} bytes for the CreateFunction operation"
855
            )
856

857
        if architectures := request.get("Architectures"):
1✔
858
            if len(architectures) != 1:
1✔
859
                raise ValidationException(
1✔
860
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
861
                    f"satisfy constraint: Member must have length less than or equal to 1",
862
                )
863
            if architectures[0] not in ARCHITECTURES:
1✔
864
                raise ValidationException(
1✔
865
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
866
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
867
                    f"[x86_64, arm64], Member must not be null]",
868
                )
869

870
        if env_vars := request.get("Environment", {}).get("Variables"):
1✔
871
            self._verify_env_variables(env_vars)
1✔
872

873
        if layers := request.get("Layers", []):
1✔
874
            self._validate_layers(layers, region=context_region, account_id=context_account_id)
1✔
875

876
        if not api_utils.is_role_arn(request.get("Role")):
1✔
877
            raise ValidationException(
1✔
878
                f"1 validation error detected: Value '{request.get('Role')}'"
879
                + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
880
            )
881
        if not self.lambda_service.can_assume_role(request.get("Role"), context.region):
1✔
UNCOV
882
            raise InvalidParameterValueException(
×
883
                "The role defined for the function cannot be assumed by Lambda.", Type="User"
884
            )
885
        package_type = request.get("PackageType", PackageType.Zip)
1✔
886
        runtime = request.get("Runtime")
1✔
887
        self._validate_runtime(package_type, runtime)
1✔
888

889
        request_function_name = request.get("FunctionName")
1✔
890

891
        function_name, *_ = api_utils.get_name_and_qualifier(
1✔
892
            function_arn_or_name=request_function_name,
893
            qualifier=None,
894
            context=context,
895
        )
896

897
        if runtime in DEPRECATED_RUNTIMES:
1✔
898
            LOG.warning(
1✔
899
                "The Lambda runtime %s} is deprecated. "
900
                "Please upgrade the runtime for the function %s: "
901
                "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
902
                runtime,
903
                function_name,
904
            )
905
        if snap_start := request.get("SnapStart"):
1✔
906
            self._validate_snapstart(snap_start, runtime)
1✔
907
        state = lambda_stores[context_account_id][context_region]
1✔
908

909
        with self.create_fn_lock:
1✔
910
            if function_name in state.functions:
1✔
UNCOV
911
                raise ResourceConflictException(f"Function already exist: {function_name}")
×
912
            fn = Function(function_name=function_name)
1✔
913
            arn = VersionIdentifier(
1✔
914
                function_name=function_name,
915
                qualifier="$LATEST",
916
                region=context_region,
917
                account=context_account_id,
918
            )
919
            # save function code to s3
920
            code = None
1✔
921
            image = None
1✔
922
            image_config = None
1✔
923
            runtime_version_config = RuntimeVersionConfig(
1✔
924
                # Limitation: the runtime id (presumably sha256 of image) is currently hardcoded
925
                # Potential implementation: provide (cached) sha256 hash of used Docker image
926
                RuntimeVersionArn=f"arn:{context.partition}:lambda:{context_region}::runtime:8eeff65f6809a3ce81507fe733fe09b835899b99481ba22fd75b5a7338290ec1"
927
            )
928
            request_code = request.get("Code")
1✔
929
            if package_type == PackageType.Zip:
1✔
930
                # TODO verify if correct combination of code is set
931
                if zip_file := request_code.get("ZipFile"):
1✔
932
                    code = store_lambda_archive(
1✔
933
                        archive_file=zip_file,
934
                        function_name=function_name,
935
                        region_name=context_region,
936
                        account_id=context_account_id,
937
                    )
938
                elif s3_bucket := request_code.get("S3Bucket"):
1✔
939
                    s3_key = request_code["S3Key"]
1✔
940
                    s3_object_version = request_code.get("S3ObjectVersion")
1✔
941
                    code = store_s3_bucket_archive(
1✔
942
                        archive_bucket=s3_bucket,
943
                        archive_key=s3_key,
944
                        archive_version=s3_object_version,
945
                        function_name=function_name,
946
                        region_name=context_region,
947
                        account_id=context_account_id,
948
                    )
949
                else:
950
                    raise LambdaServiceException("Gotta have s3 bucket or zip file")
×
951
            elif package_type == PackageType.Image:
1✔
952
                image = request_code.get("ImageUri")
1✔
953
                if not image:
1✔
UNCOV
954
                    raise LambdaServiceException("Gotta have an image when package type is image")
×
955
                image = create_image_code(image_uri=image)
1✔
956

957
                image_config_req = request.get("ImageConfig", {})
1✔
958
                image_config = ImageConfig(
1✔
959
                    command=image_config_req.get("Command"),
960
                    entrypoint=image_config_req.get("EntryPoint"),
961
                    working_directory=image_config_req.get("WorkingDirectory"),
962
                )
963
                # Runtime management controls are not available when providing a custom image
964
                runtime_version_config = None
1✔
965
            if "LoggingConfig" in request:
1✔
966
                logging_config = request["LoggingConfig"]
1✔
967
                LOG.warning(
1✔
968
                    "Advanced Lambda Logging Configuration is currently mocked "
969
                    "and will not impact the logging behavior. "
970
                    "Please create a feature request if needed."
971
                )
972

973
                # when switching to JSON, app and system level log is auto set to INFO
974
                if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
975
                    logging_config = {
1✔
976
                        "ApplicationLogLevel": "INFO",
977
                        "SystemLogLevel": "INFO",
978
                        "LogGroup": f"/aws/lambda/{function_name}",
979
                    } | logging_config
980
                else:
UNCOV
981
                    logging_config = (
×
982
                        LoggingConfig(
983
                            LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
984
                        )
985
                        | logging_config
986
                    )
987

988
            else:
989
                logging_config = LoggingConfig(
1✔
990
                    LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
991
                )
992

993
            version = FunctionVersion(
1✔
994
                id=arn,
995
                config=VersionFunctionConfiguration(
996
                    last_modified=api_utils.format_lambda_date(datetime.datetime.now()),
997
                    description=request.get("Description", ""),
998
                    role=request["Role"],
999
                    timeout=request.get("Timeout", LAMBDA_DEFAULT_TIMEOUT),
1000
                    runtime=request.get("Runtime"),
1001
                    memory_size=request.get("MemorySize", LAMBDA_DEFAULT_MEMORY_SIZE),
1002
                    handler=request.get("Handler"),
1003
                    package_type=package_type,
1004
                    environment=env_vars,
1005
                    architectures=request.get("Architectures") or [Architecture.x86_64],
1006
                    tracing_config_mode=request.get("TracingConfig", {}).get(
1007
                        "Mode", TracingMode.PassThrough
1008
                    ),
1009
                    image=image,
1010
                    image_config=image_config,
1011
                    code=code,
1012
                    layers=self.map_layers(layers),
1013
                    internal_revision=short_uid(),
1014
                    ephemeral_storage=LambdaEphemeralStorage(
1015
                        size=request.get("EphemeralStorage", {}).get("Size", 512)
1016
                    ),
1017
                    snap_start=SnapStartResponse(
1018
                        ApplyOn=request.get("SnapStart", {}).get("ApplyOn", SnapStartApplyOn.None_),
1019
                        OptimizationStatus=SnapStartOptimizationStatus.Off,
1020
                    ),
1021
                    runtime_version_config=runtime_version_config,
1022
                    dead_letter_arn=request.get("DeadLetterConfig", {}).get("TargetArn"),
1023
                    vpc_config=self._build_vpc_config(
1024
                        context_account_id, context_region, request.get("VpcConfig")
1025
                    ),
1026
                    state=VersionState(
1027
                        state=State.Pending,
1028
                        code=StateReasonCode.Creating,
1029
                        reason="The function is being created.",
1030
                    ),
1031
                    logging_config=logging_config,
1032
                ),
1033
            )
1034
            fn.versions["$LATEST"] = version
1✔
1035
            state.functions[function_name] = fn
1✔
1036
        self.lambda_service.create_function_version(version)
1✔
1037

1038
        if tags := request.get("Tags"):
1✔
1039
            # This will check whether the function exists.
1040
            self._store_tags(arn.unqualified_arn(), tags)
1✔
1041

1042
        if request.get("Publish"):
1✔
1043
            version = self._publish_version_with_changes(
1✔
1044
                function_name=function_name, region=context_region, account_id=context_account_id
1045
            )
1046

1047
        if config.LAMBDA_SYNCHRONOUS_CREATE:
1✔
1048
            # block via retrying until "terminal" condition reached before returning
UNCOV
1049
            if not poll_condition(
×
1050
                lambda: get_function_version(
1051
                    function_name, version.id.qualifier, version.id.account, version.id.region
1052
                ).config.state.state
1053
                in [State.Active, State.Failed],
1054
                timeout=10,
1055
            ):
UNCOV
1056
                LOG.warning(
×
1057
                    "LAMBDA_SYNCHRONOUS_CREATE is active, but waiting for %s reached timeout.",
1058
                    function_name,
1059
                )
1060

1061
        return api_utils.map_config_out(
1✔
1062
            version, return_qualified_arn=False, return_update_status=False
1063
        )
1064

1065
    def _validate_runtime(self, package_type, runtime):
1✔
1066
        runtimes = ALL_RUNTIMES
1✔
1067
        if config.LAMBDA_RUNTIME_VALIDATION:
1✔
1068
            runtimes = list(itertools.chain(RUNTIMES_AGGREGATED.values()))
1✔
1069

1070
        if package_type == PackageType.Zip and runtime not in runtimes:
1✔
1071
            # deprecated runtimes have different error
1072
            if runtime in DEPRECATED_RUNTIMES:
1✔
1073
                HINT_LOG.info(
1✔
1074
                    "Set env variable LAMBDA_RUNTIME_VALIDATION to 0"
1075
                    " in order to allow usage of deprecated runtimes"
1076
                )
1077
                self._check_for_recomended_migration_target(runtime)
1✔
1078

1079
            raise InvalidParameterValueException(
1✔
1080
                f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1081
                Type="User",
1082
            )
1083

1084
    def _check_for_recomended_migration_target(self, deprecated_runtime):
1✔
1085
        # AWS offers recommended runtime for migration for "newly" deprecated runtimes
1086
        # in order to preserve parity with error messages we need the code bellow
1087
        latest_runtime = DEPRECATED_RUNTIMES_UPGRADES.get(deprecated_runtime)
1✔
1088

1089
        if latest_runtime is not None:
1✔
1090
            LOG.debug(
1✔
1091
                "The Lambda runtime %s is deprecated. Please upgrade to a supported Lambda runtime such as %s.",
1092
                deprecated_runtime,
1093
                latest_runtime,
1094
            )
1095
            raise InvalidParameterValueException(
1✔
1096
                f"The runtime parameter of {deprecated_runtime} is no longer supported for creating or updating AWS Lambda functions. We recommend you use a supported runtime while creating or updating functions.",
1097
                Type="User",
1098
            )
1099

1100
    @handler(operation="UpdateFunctionConfiguration", expand=False)
1✔
1101
    def update_function_configuration(
1✔
1102
        self, context: RequestContext, request: UpdateFunctionConfigurationRequest
1103
    ) -> FunctionConfiguration:
1104
        """updates the $LATEST version of the function"""
1105
        function_name = request.get("FunctionName")
1✔
1106

1107
        # in case we got ARN or partial ARN
1108
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1109
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1110
        state = lambda_stores[account_id][region]
1✔
1111

1112
        if function_name not in state.functions:
1✔
UNCOV
1113
            raise ResourceNotFoundException(
×
1114
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1115
                Type="User",
1116
            )
1117
        function = state.functions[function_name]
1✔
1118

1119
        # TODO: lock modification of latest version
1120
        # TODO: notify service for changes relevant to re-provisioning of $LATEST
1121
        latest_version = function.latest()
1✔
1122
        latest_version_config = latest_version.config
1✔
1123

1124
        revision_id = request.get("RevisionId")
1✔
1125
        if revision_id and revision_id != latest_version.config.revision_id:
1✔
1126
            raise PreconditionFailedException(
1✔
1127
                "The Revision Id provided does not match the latest Revision Id. "
1128
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1129
                Type="User",
1130
            )
1131

1132
        replace_kwargs = {}
1✔
1133
        if "EphemeralStorage" in request:
1✔
UNCOV
1134
            replace_kwargs["ephemeral_storage"] = LambdaEphemeralStorage(
×
1135
                request.get("EphemeralStorage", {}).get("Size", 512)
1136
            )  # TODO: do defaults here apply as well?
1137

1138
        if "Role" in request:
1✔
1139
            if not api_utils.is_role_arn(request["Role"]):
1✔
1140
                raise ValidationException(
1✔
1141
                    f"1 validation error detected: Value '{request.get('Role')}'"
1142
                    + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
1143
                )
1144
            replace_kwargs["role"] = request["Role"]
1✔
1145

1146
        if "Description" in request:
1✔
1147
            replace_kwargs["description"] = request["Description"]
1✔
1148

1149
        if "Timeout" in request:
1✔
1150
            replace_kwargs["timeout"] = request["Timeout"]
1✔
1151

1152
        if "MemorySize" in request:
1✔
1153
            replace_kwargs["memory_size"] = request["MemorySize"]
1✔
1154

1155
        if "DeadLetterConfig" in request:
1✔
1156
            replace_kwargs["dead_letter_arn"] = request.get("DeadLetterConfig", {}).get("TargetArn")
1✔
1157

1158
        if vpc_config := request.get("VpcConfig"):
1✔
1159
            replace_kwargs["vpc_config"] = self._build_vpc_config(account_id, region, vpc_config)
1✔
1160

1161
        if "Handler" in request:
1✔
1162
            replace_kwargs["handler"] = request["Handler"]
1✔
1163

1164
        if "Runtime" in request:
1✔
1165
            runtime = request["Runtime"]
1✔
1166

1167
            if runtime not in ALL_RUNTIMES:
1✔
1168
                raise InvalidParameterValueException(
1✔
1169
                    f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1170
                    Type="User",
1171
                )
1172
            if runtime in DEPRECATED_RUNTIMES:
1✔
UNCOV
1173
                LOG.warning(
×
1174
                    "The Lambda runtime %s is deprecated. "
1175
                    "Please upgrade the runtime for the function %s: "
1176
                    "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
1177
                    runtime,
1178
                    function_name,
1179
                )
1180
            replace_kwargs["runtime"] = request["Runtime"]
1✔
1181

1182
        if snap_start := request.get("SnapStart"):
1✔
1183
            runtime = replace_kwargs.get("runtime") or latest_version_config.runtime
1✔
1184
            self._validate_snapstart(snap_start, runtime)
1✔
1185
            replace_kwargs["snap_start"] = SnapStartResponse(
1✔
1186
                ApplyOn=snap_start.get("ApplyOn", SnapStartApplyOn.None_),
1187
                OptimizationStatus=SnapStartOptimizationStatus.Off,
1188
            )
1189

1190
        if "Environment" in request:
1✔
1191
            if env_vars := request.get("Environment", {}).get("Variables", {}):
1✔
1192
                self._verify_env_variables(env_vars)
1✔
1193
            replace_kwargs["environment"] = env_vars
1✔
1194

1195
        if "Layers" in request:
1✔
1196
            new_layers = request["Layers"]
1✔
1197
            if new_layers:
1✔
1198
                self._validate_layers(new_layers, region=region, account_id=account_id)
1✔
1199
            replace_kwargs["layers"] = self.map_layers(new_layers)
1✔
1200

1201
        if "ImageConfig" in request:
1✔
1202
            new_image_config = request["ImageConfig"]
1✔
1203
            replace_kwargs["image_config"] = ImageConfig(
1✔
1204
                command=new_image_config.get("Command"),
1205
                entrypoint=new_image_config.get("EntryPoint"),
1206
                working_directory=new_image_config.get("WorkingDirectory"),
1207
            )
1208

1209
        if "LoggingConfig" in request:
1✔
1210
            logging_config = request["LoggingConfig"]
1✔
1211
            LOG.warning(
1✔
1212
                "Advanced Lambda Logging Configuration is currently mocked "
1213
                "and will not impact the logging behavior. "
1214
                "Please create a feature request if needed."
1215
            )
1216

1217
            # when switching to JSON, app and system level log is auto set to INFO
1218
            if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
1219
                logging_config = {
1✔
1220
                    "ApplicationLogLevel": "INFO",
1221
                    "SystemLogLevel": "INFO",
1222
                } | logging_config
1223

1224
            last_config = latest_version_config.logging_config
1✔
1225

1226
            # add partial update
1227
            new_logging_config = last_config | logging_config
1✔
1228

1229
            # in case we switched from JSON to Text we need to remove LogLevel keys
1230
            if (
1✔
1231
                new_logging_config.get("LogFormat") == LogFormat.Text
1232
                and last_config.get("LogFormat") == LogFormat.JSON
1233
            ):
1234
                new_logging_config.pop("ApplicationLogLevel", None)
1✔
1235
                new_logging_config.pop("SystemLogLevel", None)
1✔
1236

1237
            replace_kwargs["logging_config"] = new_logging_config
1✔
1238

1239
        if "TracingConfig" in request:
1✔
UNCOV
1240
            new_mode = request.get("TracingConfig", {}).get("Mode")
×
UNCOV
1241
            if new_mode:
×
UNCOV
1242
                replace_kwargs["tracing_config_mode"] = new_mode
×
1243

1244
        new_latest_version = dataclasses.replace(
1✔
1245
            latest_version,
1246
            config=dataclasses.replace(
1247
                latest_version_config,
1248
                last_modified=api_utils.generate_lambda_date(),
1249
                internal_revision=short_uid(),
1250
                last_update=UpdateStatus(
1251
                    status=LastUpdateStatus.InProgress,
1252
                    code="Creating",
1253
                    reason="The function is being created.",
1254
                ),
1255
                **replace_kwargs,
1256
            ),
1257
        )
1258
        function.versions["$LATEST"] = new_latest_version  # TODO: notify
1✔
1259
        self.lambda_service.update_version(new_version=new_latest_version)
1✔
1260

1261
        return api_utils.map_config_out(new_latest_version)
1✔
1262

1263
    @handler(operation="UpdateFunctionCode", expand=False)
1✔
1264
    def update_function_code(
1✔
1265
        self, context: RequestContext, request: UpdateFunctionCodeRequest
1266
    ) -> FunctionConfiguration:
1267
        """updates the $LATEST version of the function"""
1268
        # only supports normal zip packaging atm
1269
        # if request.get("Publish"):
1270
        #     self.lambda_service.create_function_version()
1271

1272
        function_name = request.get("FunctionName")
1✔
1273
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1274
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1275

1276
        store = lambda_stores[account_id][region]
1✔
1277
        if function_name not in store.functions:
1✔
UNCOV
1278
            raise ResourceNotFoundException(
×
1279
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1280
                Type="User",
1281
            )
1282
        function = store.functions[function_name]
1✔
1283

1284
        revision_id = request.get("RevisionId")
1✔
1285
        if revision_id and revision_id != function.latest().config.revision_id:
1✔
1286
            raise PreconditionFailedException(
1✔
1287
                "The Revision Id provided does not match the latest Revision Id. "
1288
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1289
                Type="User",
1290
            )
1291

1292
        # TODO verify if correct combination of code is set
1293
        image = None
1✔
1294
        if (
1✔
1295
            request.get("ZipFile") or request.get("S3Bucket")
1296
        ) and function.latest().config.package_type == PackageType.Image:
1297
            raise InvalidParameterValueException(
1✔
1298
                "Please provide ImageUri when updating a function with packageType Image.",
1299
                Type="User",
1300
            )
1301
        elif request.get("ImageUri") and function.latest().config.package_type == PackageType.Zip:
1✔
1302
            raise InvalidParameterValueException(
1✔
1303
                "Please don't provide ImageUri when updating a function with packageType Zip.",
1304
                Type="User",
1305
            )
1306

1307
        if zip_file := request.get("ZipFile"):
1✔
1308
            code = store_lambda_archive(
1✔
1309
                archive_file=zip_file,
1310
                function_name=function_name,
1311
                region_name=region,
1312
                account_id=account_id,
1313
            )
1314
        elif s3_bucket := request.get("S3Bucket"):
1✔
1315
            s3_key = request["S3Key"]
1✔
1316
            s3_object_version = request.get("S3ObjectVersion")
1✔
1317
            code = store_s3_bucket_archive(
1✔
1318
                archive_bucket=s3_bucket,
1319
                archive_key=s3_key,
1320
                archive_version=s3_object_version,
1321
                function_name=function_name,
1322
                region_name=region,
1323
                account_id=account_id,
1324
            )
1325
        elif image := request.get("ImageUri"):
1✔
1326
            code = None
1✔
1327
            image = create_image_code(image_uri=image)
1✔
1328
        else:
UNCOV
1329
            raise LambdaServiceException("Gotta have s3 bucket or zip file or image")
×
1330

1331
        old_function_version = function.versions.get("$LATEST")
1✔
1332
        replace_kwargs = {"code": code} if code else {"image": image}
1✔
1333

1334
        if architectures := request.get("Architectures"):
1✔
UNCOV
1335
            if len(architectures) != 1:
×
UNCOV
1336
                raise ValidationException(
×
1337
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1338
                    f"satisfy constraint: Member must have length less than or equal to 1",
1339
                )
1340
            # An empty list of architectures is also forbidden. Further exceptions are tested here for create_function:
1341
            # tests.aws.services.lambda_.test_lambda_api.TestLambdaFunction.test_create_lambda_exceptions
UNCOV
1342
            if architectures[0] not in ARCHITECTURES:
×
UNCOV
1343
                raise ValidationException(
×
1344
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1345
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
1346
                    f"[x86_64, arm64], Member must not be null]",
1347
                )
UNCOV
1348
            replace_kwargs["architectures"] = architectures
×
1349

1350
        config = dataclasses.replace(
1✔
1351
            old_function_version.config,
1352
            internal_revision=short_uid(),
1353
            last_modified=api_utils.generate_lambda_date(),
1354
            last_update=UpdateStatus(
1355
                status=LastUpdateStatus.InProgress,
1356
                code="Creating",
1357
                reason="The function is being created.",
1358
            ),
1359
            **replace_kwargs,
1360
        )
1361
        function_version = dataclasses.replace(old_function_version, config=config)
1✔
1362
        function.versions["$LATEST"] = function_version
1✔
1363

1364
        self.lambda_service.update_version(new_version=function_version)
1✔
1365
        if request.get("Publish"):
1✔
1366
            function_version = self._publish_version_with_changes(
1✔
1367
                function_name=function_name, region=region, account_id=account_id
1368
            )
1369
        return api_utils.map_config_out(
1✔
1370
            function_version, return_qualified_arn=bool(request.get("Publish"))
1371
        )
1372

1373
    # TODO: does deleting the latest published version affect the next versions number?
1374
    # TODO: what happens when we call this with a qualifier and a fully qualified ARN? (+ conflicts?)
1375
    # TODO: test different ARN patterns (shorthand ARN?)
1376
    # TODO: test deleting across regions?
1377
    # TODO: test mismatch between context region and region in ARN
1378
    # TODO: test qualifier $LATEST, alias-name and version
1379
    def delete_function(
1✔
1380
        self,
1381
        context: RequestContext,
1382
        function_name: FunctionName,
1383
        qualifier: Qualifier = None,
1384
        **kwargs,
1385
    ) -> None:
1386
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1387
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1388
            function_name, qualifier, context
1389
        )
1390

1391
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
UNCOV
1392
            raise InvalidParameterValueException(
×
1393
                "Deletion of aliases is not currently supported.",
1394
                Type="User",
1395
            )
1396

1397
        store = lambda_stores[account_id][region]
1✔
1398
        if qualifier == "$LATEST":
1✔
1399
            raise InvalidParameterValueException(
1✔
1400
                "$LATEST version cannot be deleted without deleting the function.", Type="User"
1401
            )
1402

1403
        if function_name not in store.functions:
1✔
1404
            e = ResourceNotFoundException(
1✔
1405
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1406
                Type="User",
1407
            )
1408
            raise e
1✔
1409
        function = store.functions.get(function_name)
1✔
1410

1411
        if qualifier:
1✔
1412
            # delete a version of the function
1413
            version = function.versions.pop(qualifier, None)
1✔
1414
            if version:
1✔
1415
                self.lambda_service.stop_version(version.id.qualified_arn())
1✔
1416
                destroy_code_if_not_used(code=version.config.code, function=function)
1✔
1417
        else:
1418
            # delete the whole function
1419
            # TODO: introduce locking for safe deletion: We could create a new version at the API layer before
1420
            #  the old version gets cleaned up in the internal lambda service.
1421
            function = store.functions.pop(function_name)
1✔
1422
            for version in function.versions.values():
1✔
1423
                self.lambda_service.stop_version(qualified_arn=version.id.qualified_arn())
1✔
1424
                # we can safely destroy the code here
1425
                if version.config.code:
1✔
1426
                    version.config.code.destroy()
1✔
1427

1428
    def list_functions(
1✔
1429
        self,
1430
        context: RequestContext,
1431
        master_region: MasterRegion = None,  # (only relevant for lambda@edge)
1432
        function_version: FunctionVersionApi = None,
1433
        marker: String = None,
1434
        max_items: MaxListItems = None,
1435
        **kwargs,
1436
    ) -> ListFunctionsResponse:
1437
        state = lambda_stores[context.account_id][context.region]
1✔
1438

1439
        if function_version and function_version != FunctionVersionApi.ALL:
1✔
1440
            raise ValidationException(
1✔
1441
                f"1 validation error detected: Value '{function_version}'"
1442
                + " at 'functionVersion' failed to satisfy constraint: Member must satisfy enum value set: [ALL]"
1443
            )
1444

1445
        if function_version == FunctionVersionApi.ALL:
1✔
1446
            # include all versions for all function
1447
            versions = [v for f in state.functions.values() for v in f.versions.values()]
1✔
1448
            return_qualified_arn = True
1✔
1449
        else:
1450
            versions = [f.latest() for f in state.functions.values()]
1✔
1451
            return_qualified_arn = False
1✔
1452

1453
        versions = [
1✔
1454
            api_utils.map_to_list_response(
1455
                api_utils.map_config_out(fc, return_qualified_arn=return_qualified_arn)
1456
            )
1457
            for fc in versions
1458
        ]
1459
        versions = PaginatedList(versions)
1✔
1460
        page, token = versions.get_page(
1✔
1461
            lambda version: version["FunctionArn"],
1462
            marker,
1463
            max_items,
1464
        )
1465
        return ListFunctionsResponse(Functions=page, NextMarker=token)
1✔
1466

1467
    def get_function(
1✔
1468
        self,
1469
        context: RequestContext,
1470
        function_name: NamespacedFunctionName,
1471
        qualifier: Qualifier = None,
1472
        **kwargs,
1473
    ) -> GetFunctionResponse:
1474
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1475
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1476
            function_name, qualifier, context
1477
        )
1478

1479
        fn = lambda_stores[account_id][region].functions.get(function_name)
1✔
1480
        if fn is None:
1✔
1481
            if qualifier is None:
1✔
1482
                raise ResourceNotFoundException(
1✔
1483
                    f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
1484
                    Type="User",
1485
                )
1486
            else:
1487
                raise ResourceNotFoundException(
1✔
1488
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
1489
                    Type="User",
1490
                )
1491
        alias_name = None
1✔
1492
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1493
            if qualifier not in fn.aliases:
1✔
1494
                alias_arn = api_utils.qualified_lambda_arn(
1✔
1495
                    function_name, qualifier, account_id, region
1496
                )
1497
                raise ResourceNotFoundException(f"Function not found: {alias_arn}", Type="User")
1✔
1498
            alias_name = qualifier
1✔
1499
            qualifier = fn.aliases[alias_name].function_version
1✔
1500

1501
        version = get_function_version(
1✔
1502
            function_name=function_name,
1503
            qualifier=qualifier,
1504
            account_id=account_id,
1505
            region=region,
1506
        )
1507
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
1508
        additional_fields = {}
1✔
1509
        if tags:
1✔
1510
            additional_fields["Tags"] = tags
1✔
1511
        code_location = None
1✔
1512
        if code := version.config.code:
1✔
1513
            code_location = FunctionCodeLocation(
1✔
1514
                Location=code.generate_presigned_url(), RepositoryType="S3"
1515
            )
1516
        elif image := version.config.image:
1✔
1517
            code_location = FunctionCodeLocation(
1✔
1518
                ImageUri=image.image_uri,
1519
                RepositoryType=image.repository_type,
1520
                ResolvedImageUri=image.resolved_image_uri,
1521
            )
1522

1523
        return GetFunctionResponse(
1✔
1524
            Configuration=api_utils.map_config_out(
1525
                version, return_qualified_arn=bool(qualifier), alias_name=alias_name
1526
            ),
1527
            Code=code_location,  # TODO
1528
            **additional_fields,
1529
            # Concurrency={},  # TODO
1530
        )
1531

1532
    def get_function_configuration(
1✔
1533
        self,
1534
        context: RequestContext,
1535
        function_name: NamespacedFunctionName,
1536
        qualifier: Qualifier = None,
1537
        **kwargs,
1538
    ) -> FunctionConfiguration:
1539
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1540
        # CAVE: THIS RETURN VALUE IS *NOT* THE SAME AS IN get_function (!) but seems to be only configuration part?
1541
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1542
            function_name, qualifier, context
1543
        )
1544
        version = get_function_version(
1✔
1545
            function_name=function_name,
1546
            qualifier=qualifier,
1547
            account_id=account_id,
1548
            region=region,
1549
        )
1550
        return api_utils.map_config_out(version, return_qualified_arn=bool(qualifier))
1✔
1551

1552
    def invoke(
1✔
1553
        self,
1554
        context: RequestContext,
1555
        function_name: NamespacedFunctionName,
1556
        invocation_type: InvocationType = None,
1557
        log_type: LogType = None,
1558
        client_context: String = None,
1559
        payload: IO[Blob] = None,
1560
        qualifier: Qualifier = None,
1561
        **kwargs,
1562
    ) -> InvocationResponse:
1563
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1564
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1565
            function_name, qualifier, context
1566
        )
1567

1568
        time_before = time.perf_counter()
1✔
1569
        try:
1✔
1570
            invocation_result = self.lambda_service.invoke(
1✔
1571
                function_name=function_name,
1572
                qualifier=qualifier,
1573
                region=region,
1574
                account_id=account_id,
1575
                invocation_type=invocation_type,
1576
                client_context=client_context,
1577
                request_id=context.request_id,
1578
                trace_context=context.trace_context,
1579
                payload=payload.read() if payload else None,
1580
            )
1581
        except ServiceException:
1✔
1582
            raise
1✔
1583
        except EnvironmentStartupTimeoutException as e:
1✔
1584
            raise LambdaServiceException("Internal error while executing lambda") from e
1✔
1585
        except Exception as e:
1✔
1586
            LOG.error("Error while invoking lambda", exc_info=e)
1✔
1587
            raise LambdaServiceException("Internal error while executing lambda") from e
1✔
1588

1589
        if invocation_type == InvocationType.Event:
1✔
1590
            # This happens when invocation type is event
1591
            return InvocationResponse(StatusCode=202)
1✔
1592
        if invocation_type == InvocationType.DryRun:
1✔
1593
            # This happens when invocation type is dryrun
1594
            return InvocationResponse(StatusCode=204)
1✔
1595
        LOG.debug("Lambda invocation duration: %0.2fms", (time.perf_counter() - time_before) * 1000)
1✔
1596

1597
        response = InvocationResponse(
1✔
1598
            StatusCode=200,
1599
            Payload=invocation_result.payload,
1600
            ExecutedVersion=invocation_result.executed_version,
1601
        )
1602

1603
        if invocation_result.is_error:
1✔
1604
            response["FunctionError"] = "Unhandled"
1✔
1605

1606
        if log_type == LogType.Tail:
1✔
1607
            response["LogResult"] = to_str(
1✔
1608
                base64.b64encode(to_bytes(invocation_result.logs)[-4096:])
1609
            )
1610

1611
        return response
1✔
1612

1613
    # Version operations
1614
    def publish_version(
1✔
1615
        self,
1616
        context: RequestContext,
1617
        function_name: FunctionName,
1618
        code_sha256: String = None,
1619
        description: Description = None,
1620
        revision_id: String = None,
1621
        **kwargs,
1622
    ) -> FunctionConfiguration:
1623
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1624
        function_name = api_utils.get_function_name(function_name, context)
1✔
1625
        new_version = self._publish_version_from_existing_version(
1✔
1626
            function_name=function_name,
1627
            description=description,
1628
            account_id=account_id,
1629
            region=region,
1630
            revision_id=revision_id,
1631
            code_sha256=code_sha256,
1632
        )
1633
        return api_utils.map_config_out(new_version, return_qualified_arn=True)
1✔
1634

1635
    def list_versions_by_function(
1✔
1636
        self,
1637
        context: RequestContext,
1638
        function_name: NamespacedFunctionName,
1639
        marker: String = None,
1640
        max_items: MaxListItems = None,
1641
        **kwargs,
1642
    ) -> ListVersionsByFunctionResponse:
1643
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1644
        function_name = api_utils.get_function_name(function_name, context)
1✔
1645
        function = self._get_function(
1✔
1646
            function_name=function_name, region=region, account_id=account_id
1647
        )
1648
        versions = [
1✔
1649
            api_utils.map_to_list_response(
1650
                api_utils.map_config_out(version=version, return_qualified_arn=True)
1651
            )
1652
            for version in function.versions.values()
1653
        ]
1654
        items = PaginatedList(versions)
1✔
1655
        page, token = items.get_page(
1✔
1656
            lambda item: item,
1657
            marker,
1658
            max_items,
1659
        )
1660
        return ListVersionsByFunctionResponse(Versions=page, NextMarker=token)
1✔
1661

1662
    # Alias
1663

1664
    def _create_routing_config_model(
1✔
1665
        self, routing_config_dict: dict[str, float], function_version: FunctionVersion
1666
    ):
1667
        if len(routing_config_dict) > 1:
1✔
1668
            raise InvalidParameterValueException(
1✔
1669
                "Number of items in AdditionalVersionWeights cannot be greater than 1",
1670
                Type="User",
1671
            )
1672
        # should be exactly one item here, still iterating, might be supported in the future
1673
        for key, value in routing_config_dict.items():
1✔
1674
            if value < 0.0 or value >= 1.0:
1✔
1675
                raise ValidationException(
1✔
1676
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map value must satisfy constraint: [Member must have value less than or equal to 1.0, Member must have value greater than or equal to 0.0, Member must not be null]"
1677
                )
1678
            if key == function_version.id.qualifier:
1✔
1679
                raise InvalidParameterValueException(
1✔
1680
                    f"Invalid function version {function_version.id.qualifier}. Function version {function_version.id.qualifier} is already included in routing configuration.",
1681
                    Type="User",
1682
                )
1683
            # check if version target is latest, then no routing config is allowed
1684
            if function_version.id.qualifier == "$LATEST":
1✔
1685
                raise InvalidParameterValueException(
1✔
1686
                    "$LATEST is not supported for an alias pointing to more than 1 version"
1687
                )
1688
            if not api_utils.qualifier_is_version(key):
1✔
1689
                raise ValidationException(
1✔
1690
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map keys must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: [0-9]+, Member must not be null]"
1691
                )
1692

1693
            # checking if the version in the config exists
1694
            get_function_version(
1✔
1695
                function_name=function_version.id.function_name,
1696
                qualifier=key,
1697
                region=function_version.id.region,
1698
                account_id=function_version.id.account,
1699
            )
1700
        return AliasRoutingConfig(version_weights=routing_config_dict)
1✔
1701

1702
    def create_alias(
1✔
1703
        self,
1704
        context: RequestContext,
1705
        function_name: FunctionName,
1706
        name: Alias,
1707
        function_version: Version,
1708
        description: Description = None,
1709
        routing_config: AliasRoutingConfiguration = None,
1710
        **kwargs,
1711
    ) -> AliasConfiguration:
1712
        if not api_utils.qualifier_is_alias(name):
1✔
1713
            raise ValidationException(
1✔
1714
                f"1 validation error detected: Value '{name}' at 'name' failed to satisfy constraint: Member must satisfy regular expression pattern: (?!^[0-9]+$)([a-zA-Z0-9-_]+)"
1715
            )
1716

1717
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1718
        function_name = api_utils.get_function_name(function_name, context)
1✔
1719
        target_version = get_function_version(
1✔
1720
            function_name=function_name,
1721
            qualifier=function_version,
1722
            region=region,
1723
            account_id=account_id,
1724
        )
1725
        function = self._get_function(
1✔
1726
            function_name=function_name, region=region, account_id=account_id
1727
        )
1728
        # description is always present, if not specified it's an empty string
1729
        description = description or ""
1✔
1730
        with function.lock:
1✔
1731
            if existing_alias := function.aliases.get(name):
1✔
1732
                raise ResourceConflictException(
1✔
1733
                    f"Alias already exists: {api_utils.map_alias_out(alias=existing_alias, function=function)['AliasArn']}",
1734
                    Type="User",
1735
                )
1736
            # checking if the version exists
1737
            routing_configuration = None
1✔
1738
            if routing_config and (
1✔
1739
                routing_config_dict := routing_config.get("AdditionalVersionWeights")
1740
            ):
1741
                routing_configuration = self._create_routing_config_model(
1✔
1742
                    routing_config_dict, target_version
1743
                )
1744

1745
            alias = VersionAlias(
1✔
1746
                name=name,
1747
                function_version=function_version,
1748
                description=description,
1749
                routing_configuration=routing_configuration,
1750
            )
1751
            function.aliases[name] = alias
1✔
1752
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1753

1754
    def list_aliases(
1✔
1755
        self,
1756
        context: RequestContext,
1757
        function_name: FunctionName,
1758
        function_version: Version = None,
1759
        marker: String = None,
1760
        max_items: MaxListItems = None,
1761
        **kwargs,
1762
    ) -> ListAliasesResponse:
1763
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1764
        function_name = api_utils.get_function_name(function_name, context)
1✔
1765
        function = self._get_function(
1✔
1766
            function_name=function_name, region=region, account_id=account_id
1767
        )
1768
        aliases = [
1✔
1769
            api_utils.map_alias_out(alias, function)
1770
            for alias in function.aliases.values()
1771
            if function_version is None or alias.function_version == function_version
1772
        ]
1773

1774
        aliases = PaginatedList(aliases)
1✔
1775
        page, token = aliases.get_page(
1✔
1776
            lambda alias: alias["AliasArn"],
1777
            marker,
1778
            max_items,
1779
        )
1780

1781
        return ListAliasesResponse(Aliases=page, NextMarker=token)
1✔
1782

1783
    def delete_alias(
1✔
1784
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
1785
    ) -> None:
1786
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1787
        function_name = api_utils.get_function_name(function_name, context)
1✔
1788
        function = self._get_function(
1✔
1789
            function_name=function_name, region=region, account_id=account_id
1790
        )
1791
        version_alias = function.aliases.pop(name, None)
1✔
1792

1793
        # cleanup related resources
1794
        if name in function.provisioned_concurrency_configs:
1✔
1795
            function.provisioned_concurrency_configs.pop(name)
1✔
1796

1797
        # TODO: Allow for deactivating/unregistering specific Lambda URLs
1798
        if version_alias and name in function.function_url_configs:
1✔
1799
            url_config = function.function_url_configs.pop(name)
1✔
1800
            LOG.debug(
1✔
1801
                "Stopping aliased Lambda Function URL %s for %s",
1802
                url_config.url,
1803
                url_config.function_name,
1804
            )
1805

1806
    def get_alias(
1✔
1807
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
1808
    ) -> AliasConfiguration:
1809
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1810
        function_name = api_utils.get_function_name(function_name, context)
1✔
1811
        function = self._get_function(
1✔
1812
            function_name=function_name, region=region, account_id=account_id
1813
        )
1814
        if not (alias := function.aliases.get(name)):
1✔
1815
            raise ResourceNotFoundException(
1✔
1816
                f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name=function_name, qualifier=name, region=region, account=account_id)}",
1817
                Type="User",
1818
            )
1819
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1820

1821
    def update_alias(
1✔
1822
        self,
1823
        context: RequestContext,
1824
        function_name: FunctionName,
1825
        name: Alias,
1826
        function_version: Version = None,
1827
        description: Description = None,
1828
        routing_config: AliasRoutingConfiguration = None,
1829
        revision_id: String = None,
1830
        **kwargs,
1831
    ) -> AliasConfiguration:
1832
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1833
        function_name = api_utils.get_function_name(function_name, context)
1✔
1834
        function = self._get_function(
1✔
1835
            function_name=function_name, region=region, account_id=account_id
1836
        )
1837
        if not (alias := function.aliases.get(name)):
1✔
1838
            fn_arn = api_utils.qualified_lambda_arn(function_name, name, account_id, region)
1✔
1839
            raise ResourceNotFoundException(
1✔
1840
                f"Alias not found: {fn_arn}",
1841
                Type="User",
1842
            )
1843
        if revision_id and alias.revision_id != revision_id:
1✔
1844
            raise PreconditionFailedException(
1✔
1845
                "The Revision Id provided does not match the latest Revision Id. "
1846
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1847
                Type="User",
1848
            )
1849
        changes = {}
1✔
1850
        if function_version is not None:
1✔
1851
            changes |= {"function_version": function_version}
1✔
1852
        if description is not None:
1✔
1853
            changes |= {"description": description}
1✔
1854
        if routing_config is not None:
1✔
1855
            # if it is an empty dict or AdditionalVersionWeights is empty, set routing config to None
1856
            new_routing_config = None
1✔
1857
            if routing_config_dict := routing_config.get("AdditionalVersionWeights"):
1✔
UNCOV
1858
                new_routing_config = self._create_routing_config_model(routing_config_dict)
×
1859
            changes |= {"routing_configuration": new_routing_config}
1✔
1860
        # even if no changes are done, we have to update revision id for some reason
1861
        old_alias = alias
1✔
1862
        alias = dataclasses.replace(alias, **changes)
1✔
1863
        function.aliases[name] = alias
1✔
1864

1865
        # TODO: signal lambda service that pointer potentially changed
1866
        self.lambda_service.update_alias(old_alias=old_alias, new_alias=alias, function=function)
1✔
1867

1868
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1869

1870
    # =======================================
1871
    # ======= EVENT SOURCE MAPPINGS =========
1872
    # =======================================
1873
    def check_service_resource_exists(
1✔
1874
        self, service: str, resource_arn: str, function_arn: str, function_role_arn: str
1875
    ):
1876
        """
1877
        Check if the service resource exists and if the function has access to it.
1878

1879
        Raises:
1880
            InvalidParameterValueException: If the service resource does not exist or the function does not have access to it.
1881
        """
1882
        arn = parse_arn(resource_arn)
1✔
1883
        source_client = get_internal_client(
1✔
1884
            arn=resource_arn,
1885
            role_arn=function_role_arn,
1886
            service_principal=ServicePrincipal.lambda_,
1887
            source_arn=function_arn,
1888
        )
1889
        if service in ["sqs", "sqs-fifo"]:
1✔
1890
            try:
1✔
1891
                source_client.get_queue_attributes(QueueUrl=arn["resource"])
1✔
1892
            except ClientError as e:
1✔
1893
                if e.response["Error"]["Code"] == "AWS.SimpleQueueService.NonExistentQueue":
1✔
1894
                    raise InvalidParameterValueException(
1✔
1895
                        f"Error occurred while ReceiveMessage. SQS Error Code: {e.response['Error']['Code']}. SQS Error Message: {e.response['Error']['Message']}",
1896
                        Type="User",
1897
                    )
UNCOV
1898
                raise e
×
1899
        elif service in ["kinesis"]:
1✔
1900
            try:
1✔
1901
                source_client.describe_stream(StreamARN=resource_arn)
1✔
1902
            except ClientError as e:
1✔
1903
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
1904
                    raise InvalidParameterValueException(
1✔
1905
                        f"Stream not found: {resource_arn}",
1906
                        Type="User",
1907
                    )
UNCOV
1908
                raise e
×
1909
        elif service in ["dynamodb"]:
1✔
1910
            try:
1✔
1911
                source_client.describe_stream(StreamArn=resource_arn)
1✔
1912
            except ClientError as e:
1✔
1913
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
1914
                    raise InvalidParameterValueException(
1✔
1915
                        f"Stream not found: {resource_arn}",
1916
                        Type="User",
1917
                    )
UNCOV
1918
                raise e
×
1919

1920
    @handler("CreateEventSourceMapping", expand=False)
1✔
1921
    def create_event_source_mapping(
1✔
1922
        self,
1923
        context: RequestContext,
1924
        request: CreateEventSourceMappingRequest,
1925
    ) -> EventSourceMappingConfiguration:
1926
        return self.create_event_source_mapping_v2(context, request)
1✔
1927

1928
    def create_event_source_mapping_v2(
1✔
1929
        self,
1930
        context: RequestContext,
1931
        request: CreateEventSourceMappingRequest,
1932
    ) -> EventSourceMappingConfiguration:
1933
        # Validations
1934
        function_arn, function_name, state, function_version, function_role = (
1✔
1935
            self.validate_event_source_mapping(context, request)
1936
        )
1937

1938
        esm_config = EsmConfigFactory(request, context, function_arn).get_esm_config()
1✔
1939

1940
        # Copy esm_config to avoid a race condition with potential async update in the store
1941
        state.event_source_mappings[esm_config["UUID"]] = esm_config.copy()
1✔
1942
        enabled = request.get("Enabled", True)
1✔
1943
        # TODO: check for potential async race condition update -> think about locking
1944
        esm_worker = EsmWorkerFactory(esm_config, function_role, enabled).get_esm_worker()
1✔
1945
        self.esm_workers[esm_worker.uuid] = esm_worker
1✔
1946
        # TODO: check StateTransitionReason, LastModified, LastProcessingResult (concurrent updates requires locking!)
1947
        if tags := request.get("Tags"):
1✔
1948
            self._store_tags(esm_config.get("EventSourceMappingArn"), tags)
1✔
1949
        esm_worker.create()
1✔
1950
        return esm_config
1✔
1951

1952
    def validate_event_source_mapping(self, context, request):
1✔
1953
        # TODO: test whether stream ARNs are valid sources for Pipes or ESM or whether only DynamoDB table ARNs work
1954
        is_create_esm_request = context.operation.name == self.create_event_source_mapping.operation
1✔
1955

1956
        if destination_config := request.get("DestinationConfig"):
1✔
1957
            if "OnSuccess" in destination_config:
1✔
1958
                raise InvalidParameterValueException(
1✔
1959
                    "Unsupported DestinationConfig parameter for given event source mapping type.",
1960
                    Type="User",
1961
                )
1962

1963
        service = None
1✔
1964
        if "SelfManagedEventSource" in request:
1✔
UNCOV
1965
            service = "kafka"
×
UNCOV
1966
            if "SourceAccessConfigurations" not in request:
×
UNCOV
1967
                raise InvalidParameterValueException(
×
1968
                    "Required 'sourceAccessConfigurations' parameter is missing.", Type="User"
1969
                )
1970
        if service is None and "EventSourceArn" not in request:
1✔
1971
            raise InvalidParameterValueException("Unrecognized event source.", Type="User")
1✔
1972
        if service is None:
1✔
1973
            service = extract_service_from_arn(request["EventSourceArn"])
1✔
1974

1975
        batch_size = api_utils.validate_and_set_batch_size(service, request.get("BatchSize"))
1✔
1976
        if service in ["dynamodb", "kinesis"] and "StartingPosition" not in request:
1✔
1977
            raise InvalidParameterValueException(
1✔
1978
                "1 validation error detected: Value null at 'startingPosition' failed to satisfy constraint: Member must not be null.",
1979
                Type="User",
1980
            )
1981
        if service in ["sqs", "sqs-fifo"]:
1✔
1982
            if batch_size > 10 and request.get("MaximumBatchingWindowInSeconds", 0) == 0:
1✔
1983
                raise InvalidParameterValueException(
1✔
1984
                    "Maximum batch window in seconds must be greater than 0 if maximum batch size is greater than 10",
1985
                    Type="User",
1986
                )
1987

1988
        if (filter_criteria := request.get("FilterCriteria")) is not None:
1✔
1989
            for filter_ in filter_criteria.get("Filters", []):
1✔
1990
                pattern_str = filter_.get("Pattern")
1✔
1991
                if not pattern_str or not isinstance(pattern_str, str):
1✔
UNCOV
1992
                    raise InvalidParameterValueException(
×
1993
                        "Invalid filter pattern definition.", Type="User"
1994
                    )
1995

1996
                if not validate_event_pattern(pattern_str):
1✔
1997
                    raise InvalidParameterValueException(
1✔
1998
                        "Invalid filter pattern definition.", Type="User"
1999
                    )
2000

2001
        # Can either have a FunctionName (i.e CreateEventSourceMapping request) or
2002
        # an internal EventSourceMappingConfiguration representation
2003
        request_function_name = request.get("FunctionName") or request.get("FunctionArn")
1✔
2004
        # can be either a partial arn or a full arn for the version/alias
2005
        function_name, qualifier, account, region = function_locators_from_arn(
1✔
2006
            request_function_name
2007
        )
2008
        # TODO: validate `context.region` vs. `region(request["FunctionName"])` vs. `region(request["EventSourceArn"])`
2009
        account = account or context.account_id
1✔
2010
        region = region or context.region
1✔
2011
        state = lambda_stores[account][region]
1✔
2012
        fn = state.functions.get(function_name)
1✔
2013
        if not fn:
1✔
2014
            raise InvalidParameterValueException("Function does not exist", Type="User")
1✔
2015

2016
        if qualifier:
1✔
2017
            # make sure the function version/alias exists
2018
            if api_utils.qualifier_is_alias(qualifier):
1✔
2019
                fn_alias = fn.aliases.get(qualifier)
1✔
2020
                if not fn_alias:
1✔
UNCOV
2021
                    raise Exception("unknown alias")  # TODO: cover via test
×
2022
            elif api_utils.qualifier_is_version(qualifier):
1✔
2023
                fn_version = fn.versions.get(qualifier)
1✔
2024
                if not fn_version:
1✔
UNCOV
2025
                    raise Exception("unknown version")  # TODO: cover via test
×
2026
            elif qualifier == "$LATEST":
1✔
2027
                pass
1✔
2028
            else:
UNCOV
2029
                raise Exception("invalid functionname")  # TODO: cover via test
×
2030
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account, region)
1✔
2031

2032
        else:
2033
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account, region)
1✔
2034

2035
        function_version = get_function_version_from_arn(fn_arn)
1✔
2036
        function_role = function_version.config.role
1✔
2037

2038
        if source_arn := request.get("EventSourceArn"):
1✔
2039
            self.check_service_resource_exists(service, source_arn, fn_arn, function_role)
1✔
2040
        # Check we are validating a CreateEventSourceMapping request
2041
        if is_create_esm_request:
1✔
2042

2043
            def _get_mapping_sources(mapping: dict[str, Any]) -> list[str]:
1✔
2044
                if event_source_arn := mapping.get("EventSourceArn"):
1✔
2045
                    return [event_source_arn]
1✔
UNCOV
2046
                return (
×
2047
                    mapping.get("SelfManagedEventSource", {})
2048
                    .get("Endpoints", {})
2049
                    .get("KAFKA_BOOTSTRAP_SERVERS", [])
2050
                )
2051

2052
            # check for event source duplicates
2053
            # TODO: currently validated for sqs, kinesis, and dynamodb
2054
            service_id = load_service(service).service_id
1✔
2055
            for uuid, mapping in state.event_source_mappings.items():
1✔
2056
                mapping_sources = _get_mapping_sources(mapping)
1✔
2057
                request_sources = _get_mapping_sources(request)
1✔
2058
                if mapping["FunctionArn"] == fn_arn and (
1✔
2059
                    set(mapping_sources).intersection(request_sources)
2060
                ):
2061
                    if service == "sqs":
1✔
2062
                        # *shakes fist at SQS*
2063
                        raise ResourceConflictException(
1✔
2064
                            f'An event source mapping with {service_id} arn (" {mapping["EventSourceArn"]} ") '
2065
                            f'and function (" {function_name} ") already exists. Please update or delete the '
2066
                            f"existing mapping with UUID {uuid}",
2067
                            Type="User",
2068
                        )
2069
                    elif service == "kafka":
1✔
UNCOV
2070
                        if set(mapping["Topics"]).intersection(request["Topics"]):
×
UNCOV
2071
                            raise ResourceConflictException(
×
2072
                                f'An event source mapping with event source ("{",".join(request_sources)}"), '
2073
                                f'function ("{fn_arn}"), '
2074
                                f'topics ("{",".join(request["Topics"])}") already exists. Please update or delete the '
2075
                                f"existing mapping with UUID {uuid}",
2076
                                Type="User",
2077
                            )
2078
                    else:
2079
                        raise ResourceConflictException(
1✔
2080
                            f'The event source arn (" {mapping["EventSourceArn"]} ") and function '
2081
                            f'(" {function_name} ") provided mapping already exists. Please update or delete the '
2082
                            f"existing mapping with UUID {uuid}",
2083
                            Type="User",
2084
                        )
2085
        return fn_arn, function_name, state, function_version, function_role
1✔
2086

2087
    @handler("UpdateEventSourceMapping", expand=False)
1✔
2088
    def update_event_source_mapping(
1✔
2089
        self,
2090
        context: RequestContext,
2091
        request: UpdateEventSourceMappingRequest,
2092
    ) -> EventSourceMappingConfiguration:
2093
        return self.update_event_source_mapping_v2(context, request)
1✔
2094

2095
    def update_event_source_mapping_v2(
1✔
2096
        self,
2097
        context: RequestContext,
2098
        request: UpdateEventSourceMappingRequest,
2099
    ) -> EventSourceMappingConfiguration:
2100
        # TODO: test and implement this properly (quite complex with many validations and limitations!)
2101
        LOG.warning(
1✔
2102
            "Updating Lambda Event Source Mapping is in experimental state and not yet fully tested."
2103
        )
2104
        state = lambda_stores[context.account_id][context.region]
1✔
2105
        request_data = {**request}
1✔
2106
        uuid = request_data.pop("UUID", None)
1✔
2107
        if not uuid:
1✔
UNCOV
2108
            raise ResourceNotFoundException(
×
2109
                "The resource you requested does not exist.", Type="User"
2110
            )
2111
        old_event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2112
        esm_worker = self.esm_workers.get(uuid)
1✔
2113
        if old_event_source_mapping is None or esm_worker is None:
1✔
2114
            raise ResourceNotFoundException(
1✔
2115
                "The resource you requested does not exist.", Type="User"
2116
            )  # TODO: test?
2117

2118
        # normalize values to overwrite
2119
        event_source_mapping = old_event_source_mapping | request_data
1✔
2120

2121
        temp_params = {}  # values only set for the returned response, not saved internally (e.g. transient state)
1✔
2122

2123
        # Validate the newly updated ESM object. We ignore the output here since we only care whether an Exception is raised.
2124
        function_arn, _, _, function_version, function_role = self.validate_event_source_mapping(
1✔
2125
            context, event_source_mapping
2126
        )
2127

2128
        # remove the FunctionName field
2129
        event_source_mapping.pop("FunctionName", None)
1✔
2130

2131
        if function_arn:
1✔
2132
            event_source_mapping["FunctionArn"] = function_arn
1✔
2133

2134
        # Only apply update if the desired state differs
2135
        enabled = request.get("Enabled")
1✔
2136
        if enabled is not None:
1✔
2137
            if enabled and old_event_source_mapping["State"] != EsmState.ENABLED:
1✔
2138
                event_source_mapping["State"] = EsmState.ENABLING
1✔
2139
            # TODO: What happens when trying to update during an update or failed state?!
2140
            elif not enabled and old_event_source_mapping["State"] == EsmState.ENABLED:
1✔
2141
                event_source_mapping["State"] = EsmState.DISABLING
1✔
2142
        else:
2143
            event_source_mapping["State"] = EsmState.UPDATING
1✔
2144

2145
        # To ensure parity, certain responses need to be immediately returned
2146
        temp_params["State"] = event_source_mapping["State"]
1✔
2147

2148
        state.event_source_mappings[uuid] = event_source_mapping
1✔
2149

2150
        # TODO: Currently, we re-create the entire ESM worker. Look into approach with better performance.
2151
        worker_factory = EsmWorkerFactory(
1✔
2152
            event_source_mapping, function_role, request.get("Enabled", esm_worker.enabled)
2153
        )
2154

2155
        # Get a new ESM worker object but do not active it, since the factory holds all logic for creating new worker from configuration.
2156
        updated_esm_worker = worker_factory.get_esm_worker()
1✔
2157
        self.esm_workers[uuid] = updated_esm_worker
1✔
2158

2159
        # We should stop() the worker since the delete() will remove the ESM from the state mapping.
2160
        esm_worker.stop()
1✔
2161
        # This will either create an EsmWorker in the CREATING state if enabled. Otherwise, the DISABLING state is set.
2162
        updated_esm_worker.create()
1✔
2163

2164
        return {**event_source_mapping, **temp_params}
1✔
2165

2166
    def delete_event_source_mapping(
1✔
2167
        self, context: RequestContext, uuid: String, **kwargs
2168
    ) -> EventSourceMappingConfiguration:
2169
        state = lambda_stores[context.account_id][context.region]
1✔
2170
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2171
        if not event_source_mapping:
1✔
2172
            raise ResourceNotFoundException(
1✔
2173
                "The resource you requested does not exist.", Type="User"
2174
            )
2175
        esm = state.event_source_mappings[uuid]
1✔
2176
        # TODO: add proper locking
2177
        esm_worker = self.esm_workers.pop(uuid, None)
1✔
2178
        # Asynchronous delete in v2
2179
        if not esm_worker:
1✔
2180
            raise ResourceNotFoundException(
×
2181
                "The resource you requested does not exist.", Type="User"
2182
            )
2183
        esm_worker.delete()
1✔
2184
        return {**esm, "State": EsmState.DELETING}
1✔
2185

2186
    def get_event_source_mapping(
1✔
2187
        self, context: RequestContext, uuid: String, **kwargs
2188
    ) -> EventSourceMappingConfiguration:
2189
        state = lambda_stores[context.account_id][context.region]
1✔
2190
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2191
        if not event_source_mapping:
1✔
2192
            raise ResourceNotFoundException(
1✔
2193
                "The resource you requested does not exist.", Type="User"
2194
            )
2195
        esm_worker = self.esm_workers.get(uuid)
1✔
2196
        if not esm_worker:
1✔
UNCOV
2197
            raise ResourceNotFoundException(
×
2198
                "The resource you requested does not exist.", Type="User"
2199
            )
2200
        event_source_mapping["State"] = esm_worker.current_state
1✔
2201
        event_source_mapping["StateTransitionReason"] = esm_worker.state_transition_reason
1✔
2202
        return event_source_mapping
1✔
2203

2204
    def list_event_source_mappings(
1✔
2205
        self,
2206
        context: RequestContext,
2207
        event_source_arn: Arn = None,
2208
        function_name: FunctionName = None,
2209
        marker: String = None,
2210
        max_items: MaxListItems = None,
2211
        **kwargs,
2212
    ) -> ListEventSourceMappingsResponse:
2213
        state = lambda_stores[context.account_id][context.region]
1✔
2214

2215
        esms = state.event_source_mappings.values()
1✔
2216
        # TODO: update and test State and StateTransitionReason for ESM v2
2217

2218
        if event_source_arn:  # TODO: validate pattern
1✔
2219
            esms = [e for e in esms if e.get("EventSourceArn") == event_source_arn]
1✔
2220

2221
        if function_name:
1✔
2222
            esms = [e for e in esms if function_name in e["FunctionArn"]]
1✔
2223

2224
        esms = PaginatedList(esms)
1✔
2225
        page, token = esms.get_page(
1✔
2226
            lambda x: x["UUID"],
2227
            marker,
2228
            max_items,
2229
        )
2230
        return ListEventSourceMappingsResponse(EventSourceMappings=page, NextMarker=token)
1✔
2231

2232
    def get_source_type_from_request(self, request: dict[str, Any]) -> str:
1✔
UNCOV
2233
        if event_source_arn := request.get("EventSourceArn", ""):
×
UNCOV
2234
            service = extract_service_from_arn(event_source_arn)
×
UNCOV
2235
            if service == "sqs" and "fifo" in event_source_arn:
×
UNCOV
2236
                service = "sqs-fifo"
×
UNCOV
2237
            return service
×
UNCOV
2238
        elif request.get("SelfManagedEventSource"):
×
UNCOV
2239
            return "kafka"
×
2240

2241
    # =======================================
2242
    # ============ FUNCTION URLS ============
2243
    # =======================================
2244

2245
    @staticmethod
1✔
2246
    def _validate_qualifier(qualifier: str) -> None:
1✔
2247
        if qualifier == "$LATEST" or (qualifier and api_utils.qualifier_is_version(qualifier)):
1✔
2248
            raise ValidationException(
1✔
2249
                f"1 validation error detected: Value '{qualifier}' at 'qualifier' failed to satisfy constraint: Member must satisfy regular expression pattern: ((?!^\\d+$)^[0-9a-zA-Z-_]+$)"
2250
            )
2251

2252
    @staticmethod
1✔
2253
    def _validate_invoke_mode(invoke_mode: str) -> None:
1✔
2254
        if invoke_mode and invoke_mode not in [InvokeMode.BUFFERED, InvokeMode.RESPONSE_STREAM]:
1✔
2255
            raise ValidationException(
1✔
2256
                f"1 validation error detected: Value '{invoke_mode}' at 'invokeMode' failed to satisfy constraint: Member must satisfy enum value set: [RESPONSE_STREAM, BUFFERED]"
2257
            )
2258
        if invoke_mode == InvokeMode.RESPONSE_STREAM:
1✔
2259
            # TODO should we actually fail for setting RESPONSE_STREAM?
2260
            #  It should trigger InvokeWithResponseStream which is not implemented
2261
            LOG.warning(
1✔
2262
                "The invokeMode 'RESPONSE_STREAM' is not yet supported on LocalStack. The property is only mocked, the execution will still be 'BUFFERED'"
2263
            )
2264

2265
    # TODO: what happens if function state is not active?
2266
    def create_function_url_config(
1✔
2267
        self,
2268
        context: RequestContext,
2269
        function_name: FunctionName,
2270
        auth_type: FunctionUrlAuthType,
2271
        qualifier: FunctionUrlQualifier = None,
2272
        cors: Cors = None,
2273
        invoke_mode: InvokeMode = None,
2274
        **kwargs,
2275
    ) -> CreateFunctionUrlConfigResponse:
2276
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2277
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2278
            function_name, qualifier, context
2279
        )
2280
        state = lambda_stores[account_id][region]
1✔
2281
        self._validate_qualifier(qualifier)
1✔
2282
        self._validate_invoke_mode(invoke_mode)
1✔
2283

2284
        fn = state.functions.get(function_name)
1✔
2285
        if fn is None:
1✔
2286
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2287

2288
        url_config = fn.function_url_configs.get(qualifier or "$LATEST")
1✔
2289
        if url_config:
1✔
2290
            raise ResourceConflictException(
1✔
2291
                f"Failed to create function url config for [functionArn = {url_config.function_arn}]. Error message:  FunctionUrlConfig exists for this Lambda function",
2292
                Type="User",
2293
            )
2294

2295
        if qualifier and qualifier != "$LATEST" and qualifier not in fn.aliases:
1✔
2296
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2297

2298
        normalized_qualifier = qualifier or "$LATEST"
1✔
2299

2300
        function_arn = (
1✔
2301
            api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
2302
            if qualifier
2303
            else api_utils.unqualified_lambda_arn(function_name, account_id, region)
2304
        )
2305

2306
        custom_id: str | None = None
1✔
2307

2308
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
2309
        if TAG_KEY_CUSTOM_URL in tags:
1✔
2310
            # Note: I really wanted to add verification here that the
2311
            # url_id is unique, so we could surface that to the user ASAP.
2312
            # However, it seems like that information isn't available yet,
2313
            # since (as far as I can tell) we call
2314
            # self.router.register_routes() once, in a single shot, for all
2315
            # of the routes -- and we need to verify that it's unique not
2316
            # just for this particular lambda function, but for the entire
2317
            # lambda provider. Therefore... that idea proved non-trivial!
2318
            custom_id_tag_value = (
1✔
2319
                f"{tags[TAG_KEY_CUSTOM_URL]}-{qualifier}" if qualifier else tags[TAG_KEY_CUSTOM_URL]
2320
            )
2321
            if TAG_KEY_CUSTOM_URL_VALIDATOR.match(custom_id_tag_value):
1✔
2322
                custom_id = custom_id_tag_value
1✔
2323

2324
            else:
2325
                # Note: we're logging here instead of raising to prioritize
2326
                # strict parity with AWS over the localstack-only custom_id
2327
                LOG.warning(
1✔
2328
                    "Invalid custom ID tag value for lambda URL (%s=%s). "
2329
                    "Replaced with default (random id)",
2330
                    TAG_KEY_CUSTOM_URL,
2331
                    custom_id_tag_value,
2332
                )
2333

2334
        # The url_id is the subdomain used for the URL we're creating. This
2335
        # is either created randomly (as in AWS), or can be passed as a tag
2336
        # to the lambda itself (localstack-only).
2337
        url_id: str
2338
        if custom_id is None:
1✔
2339
            url_id = api_utils.generate_random_url_id()
1✔
2340
        else:
2341
            url_id = custom_id
1✔
2342

2343
        host_definition = localstack_host(custom_port=config.GATEWAY_LISTEN[0].port)
1✔
2344
        fn.function_url_configs[normalized_qualifier] = FunctionUrlConfig(
1✔
2345
            function_arn=function_arn,
2346
            function_name=function_name,
2347
            cors=cors,
2348
            url_id=url_id,
2349
            url=f"http://{url_id}.lambda-url.{context.region}.{host_definition.host_and_port()}/",  # TODO: https support
2350
            auth_type=auth_type,
2351
            creation_time=api_utils.generate_lambda_date(),
2352
            last_modified_time=api_utils.generate_lambda_date(),
2353
            invoke_mode=invoke_mode,
2354
        )
2355

2356
        # persist and start URL
2357
        # TODO: implement URL invoke
2358
        api_url_config = api_utils.map_function_url_config(
1✔
2359
            fn.function_url_configs[normalized_qualifier]
2360
        )
2361

2362
        return CreateFunctionUrlConfigResponse(
1✔
2363
            FunctionUrl=api_url_config["FunctionUrl"],
2364
            FunctionArn=api_url_config["FunctionArn"],
2365
            AuthType=api_url_config["AuthType"],
2366
            Cors=api_url_config["Cors"],
2367
            CreationTime=api_url_config["CreationTime"],
2368
            InvokeMode=api_url_config["InvokeMode"],
2369
        )
2370

2371
    def get_function_url_config(
1✔
2372
        self,
2373
        context: RequestContext,
2374
        function_name: FunctionName,
2375
        qualifier: FunctionUrlQualifier = None,
2376
        **kwargs,
2377
    ) -> GetFunctionUrlConfigResponse:
2378
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2379
        state = lambda_stores[account_id][region]
1✔
2380

2381
        fn_name, qualifier = api_utils.get_name_and_qualifier(function_name, qualifier, context)
1✔
2382

2383
        self._validate_qualifier(qualifier)
1✔
2384

2385
        resolved_fn = state.functions.get(fn_name)
1✔
2386
        if not resolved_fn:
1✔
2387
            raise ResourceNotFoundException(
1✔
2388
                "The resource you requested does not exist.", Type="User"
2389
            )
2390

2391
        qualifier = qualifier or "$LATEST"
1✔
2392
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2393
        if not url_config:
1✔
2394
            raise ResourceNotFoundException(
1✔
2395
                "The resource you requested does not exist.", Type="User"
2396
            )
2397

2398
        return api_utils.map_function_url_config(url_config)
1✔
2399

2400
    def update_function_url_config(
1✔
2401
        self,
2402
        context: RequestContext,
2403
        function_name: FunctionName,
2404
        qualifier: FunctionUrlQualifier = None,
2405
        auth_type: FunctionUrlAuthType = None,
2406
        cors: Cors = None,
2407
        invoke_mode: InvokeMode = None,
2408
        **kwargs,
2409
    ) -> UpdateFunctionUrlConfigResponse:
2410
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2411
        state = lambda_stores[account_id][region]
1✔
2412

2413
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2414
            function_name, qualifier, context
2415
        )
2416
        self._validate_qualifier(qualifier)
1✔
2417
        self._validate_invoke_mode(invoke_mode)
1✔
2418

2419
        fn = state.functions.get(function_name)
1✔
2420
        if not fn:
1✔
2421
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2422

2423
        normalized_qualifier = qualifier or "$LATEST"
1✔
2424

2425
        if (
1✔
2426
            api_utils.qualifier_is_alias(normalized_qualifier)
2427
            and normalized_qualifier not in fn.aliases
2428
        ):
2429
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2430

2431
        url_config = fn.function_url_configs.get(normalized_qualifier)
1✔
2432
        if not url_config:
1✔
2433
            raise ResourceNotFoundException(
1✔
2434
                "The resource you requested does not exist.", Type="User"
2435
            )
2436

2437
        changes = {
1✔
2438
            "last_modified_time": api_utils.generate_lambda_date(),
2439
            **({"cors": cors} if cors is not None else {}),
2440
            **({"auth_type": auth_type} if auth_type is not None else {}),
2441
        }
2442

2443
        if invoke_mode:
1✔
2444
            changes["invoke_mode"] = invoke_mode
1✔
2445

2446
        new_url_config = dataclasses.replace(url_config, **changes)
1✔
2447
        fn.function_url_configs[normalized_qualifier] = new_url_config
1✔
2448

2449
        return UpdateFunctionUrlConfigResponse(
1✔
2450
            FunctionUrl=new_url_config.url,
2451
            FunctionArn=new_url_config.function_arn,
2452
            AuthType=new_url_config.auth_type,
2453
            Cors=new_url_config.cors,
2454
            CreationTime=new_url_config.creation_time,
2455
            LastModifiedTime=new_url_config.last_modified_time,
2456
            InvokeMode=new_url_config.invoke_mode,
2457
        )
2458

2459
    def delete_function_url_config(
1✔
2460
        self,
2461
        context: RequestContext,
2462
        function_name: FunctionName,
2463
        qualifier: FunctionUrlQualifier = None,
2464
        **kwargs,
2465
    ) -> None:
2466
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2467
        state = lambda_stores[account_id][region]
1✔
2468

2469
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2470
            function_name, qualifier, context
2471
        )
2472
        self._validate_qualifier(qualifier)
1✔
2473

2474
        resolved_fn = state.functions.get(function_name)
1✔
2475
        if not resolved_fn:
1✔
2476
            raise ResourceNotFoundException(
1✔
2477
                "The resource you requested does not exist.", Type="User"
2478
            )
2479

2480
        qualifier = qualifier or "$LATEST"
1✔
2481
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2482
        if not url_config:
1✔
2483
            raise ResourceNotFoundException(
1✔
2484
                "The resource you requested does not exist.", Type="User"
2485
            )
2486

2487
        del resolved_fn.function_url_configs[qualifier]
1✔
2488

2489
    def list_function_url_configs(
1✔
2490
        self,
2491
        context: RequestContext,
2492
        function_name: FunctionName,
2493
        marker: String = None,
2494
        max_items: MaxItems = None,
2495
        **kwargs,
2496
    ) -> ListFunctionUrlConfigsResponse:
2497
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2498
        state = lambda_stores[account_id][region]
1✔
2499

2500
        fn_name = api_utils.get_function_name(function_name, context)
1✔
2501
        resolved_fn = state.functions.get(fn_name)
1✔
2502
        if not resolved_fn:
1✔
2503
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2504

2505
        url_configs = [
1✔
2506
            api_utils.map_function_url_config(fn_conf)
2507
            for fn_conf in resolved_fn.function_url_configs.values()
2508
        ]
2509
        url_configs = PaginatedList(url_configs)
1✔
2510
        page, token = url_configs.get_page(
1✔
2511
            lambda url_config: url_config["FunctionArn"],
2512
            marker,
2513
            max_items,
2514
        )
2515
        url_configs = page
1✔
2516
        return ListFunctionUrlConfigsResponse(FunctionUrlConfigs=url_configs, NextMarker=token)
1✔
2517

2518
    # =======================================
2519
    # ============  Permissions  ============
2520
    # =======================================
2521

2522
    @handler("AddPermission", expand=False)
1✔
2523
    def add_permission(
1✔
2524
        self,
2525
        context: RequestContext,
2526
        request: AddPermissionRequest,
2527
    ) -> AddPermissionResponse:
2528
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2529
            request.get("FunctionName"), request.get("Qualifier"), context
2530
        )
2531

2532
        # validate qualifier
2533
        if qualifier is not None:
1✔
2534
            self._validate_qualifier_expression(qualifier)
1✔
2535
            if qualifier == "$LATEST":
1✔
2536
                raise InvalidParameterValueException(
1✔
2537
                    "We currently do not support adding policies for $LATEST.", Type="User"
2538
                )
2539
        account_id, region = api_utils.get_account_and_region(request.get("FunctionName"), context)
1✔
2540

2541
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2542
        resolved_qualifier, fn_arn = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2543

2544
        revision_id = request.get("RevisionId")
1✔
2545
        if revision_id:
1✔
2546
            fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2547
            if revision_id != fn_revision_id:
1✔
2548
                raise PreconditionFailedException(
1✔
2549
                    "The Revision Id provided does not match the latest Revision Id. "
2550
                    "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2551
                    Type="User",
2552
                )
2553

2554
        request_sid = request["StatementId"]
1✔
2555
        if not bool(STATEMENT_ID_REGEX.match(request_sid)):
1✔
2556
            raise ValidationException(
1✔
2557
                f"1 validation error detected: Value '{request_sid}' at 'statementId' failed to satisfy constraint: Member must satisfy regular expression pattern: ([a-zA-Z0-9-_]+)"
2558
            )
2559
        # check for an already existing policy and any conflicts in existing statements
2560
        existing_policy = resolved_fn.permissions.get(resolved_qualifier)
1✔
2561
        if existing_policy:
1✔
2562
            if request_sid in [s["Sid"] for s in existing_policy.policy.Statement]:
1✔
2563
                # uniqueness scope: statement id needs to be unique per qualified function ($LATEST, version, or alias)
2564
                # Counterexample: the same sid can exist within $LATEST, version, and alias
2565
                raise ResourceConflictException(
1✔
2566
                    f"The statement id ({request_sid}) provided already exists. Please provide a new statement id, or remove the existing statement.",
2567
                    Type="User",
2568
                )
2569

2570
        permission_statement = api_utils.build_statement(
1✔
2571
            partition=context.partition,
2572
            resource_arn=fn_arn,
2573
            statement_id=request["StatementId"],
2574
            action=request["Action"],
2575
            principal=request["Principal"],
2576
            source_arn=request.get("SourceArn"),
2577
            source_account=request.get("SourceAccount"),
2578
            principal_org_id=request.get("PrincipalOrgID"),
2579
            event_source_token=request.get("EventSourceToken"),
2580
            auth_type=request.get("FunctionUrlAuthType"),
2581
        )
2582
        new_policy = existing_policy
1✔
2583
        if not existing_policy:
1✔
2584
            new_policy = FunctionResourcePolicy(
1✔
2585
                policy=ResourcePolicy(Version="2012-10-17", Id="default", Statement=[])
2586
            )
2587
        new_policy.policy.Statement.append(permission_statement)
1✔
2588
        if not existing_policy:
1✔
2589
            resolved_fn.permissions[resolved_qualifier] = new_policy
1✔
2590

2591
        # Update revision id of alias or version
2592
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2593
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2594
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2595
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2596
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
1✔
2597
        # Assumes that a non-alias is a version
2598
        else:
2599
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2600
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2601
                resolved_version, config=dataclasses.replace(resolved_version.config)
2602
            )
2603
        return AddPermissionResponse(Statement=json.dumps(permission_statement))
1✔
2604

2605
    def remove_permission(
1✔
2606
        self,
2607
        context: RequestContext,
2608
        function_name: FunctionName,
2609
        statement_id: NamespacedStatementId,
2610
        qualifier: Qualifier = None,
2611
        revision_id: String = None,
2612
        **kwargs,
2613
    ) -> None:
2614
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2615
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2616
            function_name, qualifier, context
2617
        )
2618
        if qualifier is not None:
1✔
2619
            self._validate_qualifier_expression(qualifier)
1✔
2620

2621
        state = lambda_stores[account_id][region]
1✔
2622
        resolved_fn = state.functions.get(function_name)
1✔
2623
        if resolved_fn is None:
1✔
2624
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2625
            raise ResourceNotFoundException(f"No policy found for: {fn_arn}", Type="User")
1✔
2626

2627
        resolved_qualifier, _ = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2628
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2629
        if not function_permission:
1✔
2630
            raise ResourceNotFoundException(
1✔
2631
                "No policy is associated with the given resource.", Type="User"
2632
            )
2633

2634
        # try to find statement in policy and delete it
2635
        statement = None
1✔
2636
        for s in function_permission.policy.Statement:
1✔
2637
            if s["Sid"] == statement_id:
1✔
2638
                statement = s
1✔
2639
                break
1✔
2640

2641
        if not statement:
1✔
2642
            raise ResourceNotFoundException(
1✔
2643
                f"Statement {statement_id} is not found in resource policy.", Type="User"
2644
            )
2645
        fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2646
        if revision_id and revision_id != fn_revision_id:
1✔
2647
            raise PreconditionFailedException(
1✔
2648
                "The Revision Id provided does not match the latest Revision Id. "
2649
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2650
                Type="User",
2651
            )
2652
        function_permission.policy.Statement.remove(statement)
1✔
2653

2654
        # Update revision id for alias or version
2655
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2656
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2657
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
UNCOV
2658
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
×
UNCOV
2659
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
×
2660
        # Assumes that a non-alias is a version
2661
        else:
2662
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2663
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2664
                resolved_version, config=dataclasses.replace(resolved_version.config)
2665
            )
2666

2667
        # remove the policy as a whole when there's no statement left in it
2668
        if len(function_permission.policy.Statement) == 0:
1✔
2669
            del resolved_fn.permissions[resolved_qualifier]
1✔
2670

2671
    def get_policy(
1✔
2672
        self,
2673
        context: RequestContext,
2674
        function_name: NamespacedFunctionName,
2675
        qualifier: Qualifier = None,
2676
        **kwargs,
2677
    ) -> GetPolicyResponse:
2678
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2679
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2680
            function_name, qualifier, context
2681
        )
2682

2683
        if qualifier is not None:
1✔
2684
            self._validate_qualifier_expression(qualifier)
1✔
2685

2686
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2687

2688
        resolved_qualifier = qualifier or "$LATEST"
1✔
2689
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2690
        if not function_permission:
1✔
2691
            raise ResourceNotFoundException(
1✔
2692
                "The resource you requested does not exist.", Type="User"
2693
            )
2694

2695
        fn_revision_id = None
1✔
2696
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2697
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2698
            fn_revision_id = resolved_alias.revision_id
1✔
2699
        # Assumes that a non-alias is a version
2700
        else:
2701
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2702
            fn_revision_id = resolved_version.config.revision_id
1✔
2703

2704
        return GetPolicyResponse(
1✔
2705
            Policy=json.dumps(dataclasses.asdict(function_permission.policy)),
2706
            RevisionId=fn_revision_id,
2707
        )
2708

2709
    # =======================================
2710
    # ========  Code signing config  ========
2711
    # =======================================
2712

2713
    def create_code_signing_config(
1✔
2714
        self,
2715
        context: RequestContext,
2716
        allowed_publishers: AllowedPublishers,
2717
        description: Description = None,
2718
        code_signing_policies: CodeSigningPolicies = None,
2719
        tags: Tags = None,
2720
        **kwargs,
2721
    ) -> CreateCodeSigningConfigResponse:
2722
        account = context.account_id
1✔
2723
        region = context.region
1✔
2724

2725
        state = lambda_stores[account][region]
1✔
2726
        # TODO: can there be duplicates?
2727
        csc_id = f"csc-{get_random_hex(17)}"  # e.g. 'csc-077c33b4c19e26036'
1✔
2728
        csc_arn = f"arn:{context.partition}:lambda:{region}:{account}:code-signing-config:{csc_id}"
1✔
2729
        csc = CodeSigningConfig(
1✔
2730
            csc_id=csc_id,
2731
            arn=csc_arn,
2732
            allowed_publishers=allowed_publishers,
2733
            policies=code_signing_policies,
2734
            last_modified=api_utils.generate_lambda_date(),
2735
            description=description,
2736
        )
2737
        state.code_signing_configs[csc_arn] = csc
1✔
2738
        return CreateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
2739

2740
    def put_function_code_signing_config(
1✔
2741
        self,
2742
        context: RequestContext,
2743
        code_signing_config_arn: CodeSigningConfigArn,
2744
        function_name: FunctionName,
2745
        **kwargs,
2746
    ) -> PutFunctionCodeSigningConfigResponse:
2747
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2748
        state = lambda_stores[account_id][region]
1✔
2749
        function_name = api_utils.get_function_name(function_name, context)
1✔
2750

2751
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2752
        if not csc:
1✔
2753
            raise CodeSigningConfigNotFoundException(
1✔
2754
                f"The code signing configuration cannot be found. Check that the provided configuration is not deleted: {code_signing_config_arn}.",
2755
                Type="User",
2756
            )
2757

2758
        fn = state.functions.get(function_name)
1✔
2759
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2760
        if not fn:
1✔
2761
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
2762

2763
        fn.code_signing_config_arn = code_signing_config_arn
1✔
2764
        return PutFunctionCodeSigningConfigResponse(
1✔
2765
            CodeSigningConfigArn=code_signing_config_arn, FunctionName=function_name
2766
        )
2767

2768
    def update_code_signing_config(
1✔
2769
        self,
2770
        context: RequestContext,
2771
        code_signing_config_arn: CodeSigningConfigArn,
2772
        description: Description = None,
2773
        allowed_publishers: AllowedPublishers = None,
2774
        code_signing_policies: CodeSigningPolicies = None,
2775
        **kwargs,
2776
    ) -> UpdateCodeSigningConfigResponse:
2777
        state = lambda_stores[context.account_id][context.region]
1✔
2778
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2779
        if not csc:
1✔
2780
            raise ResourceNotFoundException(
1✔
2781
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2782
            )
2783

2784
        changes = {
1✔
2785
            **(
2786
                {"allowed_publishers": allowed_publishers} if allowed_publishers is not None else {}
2787
            ),
2788
            **({"policies": code_signing_policies} if code_signing_policies is not None else {}),
2789
            **({"description": description} if description is not None else {}),
2790
        }
2791
        new_csc = dataclasses.replace(
1✔
2792
            csc, last_modified=api_utils.generate_lambda_date(), **changes
2793
        )
2794
        state.code_signing_configs[code_signing_config_arn] = new_csc
1✔
2795

2796
        return UpdateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(new_csc))
1✔
2797

2798
    def get_code_signing_config(
1✔
2799
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
2800
    ) -> GetCodeSigningConfigResponse:
2801
        state = lambda_stores[context.account_id][context.region]
1✔
2802
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2803
        if not csc:
1✔
2804
            raise ResourceNotFoundException(
1✔
2805
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2806
            )
2807

2808
        return GetCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
2809

2810
    def get_function_code_signing_config(
1✔
2811
        self, context: RequestContext, function_name: FunctionName, **kwargs
2812
    ) -> GetFunctionCodeSigningConfigResponse:
2813
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2814
        state = lambda_stores[account_id][region]
1✔
2815
        function_name = api_utils.get_function_name(function_name, context)
1✔
2816
        fn = state.functions.get(function_name)
1✔
2817
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2818
        if not fn:
1✔
2819
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
2820

2821
        if fn.code_signing_config_arn:
1✔
2822
            return GetFunctionCodeSigningConfigResponse(
1✔
2823
                CodeSigningConfigArn=fn.code_signing_config_arn, FunctionName=function_name
2824
            )
2825

2826
        return GetFunctionCodeSigningConfigResponse()
1✔
2827

2828
    def delete_function_code_signing_config(
1✔
2829
        self, context: RequestContext, function_name: FunctionName, **kwargs
2830
    ) -> None:
2831
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2832
        state = lambda_stores[account_id][region]
1✔
2833
        function_name = api_utils.get_function_name(function_name, context)
1✔
2834
        fn = state.functions.get(function_name)
1✔
2835
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2836
        if not fn:
1✔
2837
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
2838

2839
        fn.code_signing_config_arn = None
1✔
2840

2841
    def delete_code_signing_config(
1✔
2842
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
2843
    ) -> DeleteCodeSigningConfigResponse:
2844
        state = lambda_stores[context.account_id][context.region]
1✔
2845

2846
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2847
        if not csc:
1✔
2848
            raise ResourceNotFoundException(
1✔
2849
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2850
            )
2851

2852
        del state.code_signing_configs[code_signing_config_arn]
1✔
2853

2854
        return DeleteCodeSigningConfigResponse()
1✔
2855

2856
    def list_code_signing_configs(
1✔
2857
        self,
2858
        context: RequestContext,
2859
        marker: String = None,
2860
        max_items: MaxListItems = None,
2861
        **kwargs,
2862
    ) -> ListCodeSigningConfigsResponse:
2863
        state = lambda_stores[context.account_id][context.region]
1✔
2864

2865
        cscs = [api_utils.map_csc(csc) for csc in state.code_signing_configs.values()]
1✔
2866
        cscs = PaginatedList(cscs)
1✔
2867
        page, token = cscs.get_page(
1✔
2868
            lambda csc: csc["CodeSigningConfigId"],
2869
            marker,
2870
            max_items,
2871
        )
2872
        return ListCodeSigningConfigsResponse(CodeSigningConfigs=page, NextMarker=token)
1✔
2873

2874
    def list_functions_by_code_signing_config(
1✔
2875
        self,
2876
        context: RequestContext,
2877
        code_signing_config_arn: CodeSigningConfigArn,
2878
        marker: String = None,
2879
        max_items: MaxListItems = None,
2880
        **kwargs,
2881
    ) -> ListFunctionsByCodeSigningConfigResponse:
2882
        account = context.account_id
1✔
2883
        region = context.region
1✔
2884

2885
        state = lambda_stores[account][region]
1✔
2886

2887
        if code_signing_config_arn not in state.code_signing_configs:
1✔
2888
            raise ResourceNotFoundException(
1✔
2889
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2890
            )
2891

2892
        fn_arns = [
1✔
2893
            api_utils.unqualified_lambda_arn(fn.function_name, account, region)
2894
            for fn in state.functions.values()
2895
            if fn.code_signing_config_arn == code_signing_config_arn
2896
        ]
2897

2898
        cscs = PaginatedList(fn_arns)
1✔
2899
        page, token = cscs.get_page(
1✔
2900
            lambda x: x,
2901
            marker,
2902
            max_items,
2903
        )
2904
        return ListFunctionsByCodeSigningConfigResponse(FunctionArns=page, NextMarker=token)
1✔
2905

2906
    # =======================================
2907
    # =========  Account Settings   =========
2908
    # =======================================
2909

2910
    # CAVE: these settings & usages are *per* region!
2911
    # Lambda quotas: https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
2912
    def get_account_settings(self, context: RequestContext, **kwargs) -> GetAccountSettingsResponse:
1✔
2913
        state = lambda_stores[context.account_id][context.region]
1✔
2914

2915
        fn_count = 0
1✔
2916
        code_size_sum = 0
1✔
2917
        reserved_concurrency_sum = 0
1✔
2918
        for fn in state.functions.values():
1✔
2919
            fn_count += 1
1✔
2920
            for fn_version in fn.versions.values():
1✔
2921
                # Image-based Lambdas do not have a code attribute and count against the ECR quotas instead
2922
                if fn_version.config.package_type == PackageType.Zip:
1✔
2923
                    code_size_sum += fn_version.config.code.code_size
1✔
2924
            if fn.reserved_concurrent_executions is not None:
1✔
2925
                reserved_concurrency_sum += fn.reserved_concurrent_executions
1✔
2926
            for c in fn.provisioned_concurrency_configs.values():
1✔
2927
                reserved_concurrency_sum += c.provisioned_concurrent_executions
1✔
2928
        for layer in state.layers.values():
1✔
2929
            for layer_version in layer.layer_versions.values():
1✔
2930
                code_size_sum += layer_version.code.code_size
1✔
2931
        return GetAccountSettingsResponse(
1✔
2932
            AccountLimit=AccountLimit(
2933
                TotalCodeSize=config.LAMBDA_LIMITS_TOTAL_CODE_SIZE,
2934
                CodeSizeZipped=config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED,
2935
                CodeSizeUnzipped=config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED,
2936
                ConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS,
2937
                UnreservedConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS
2938
                - reserved_concurrency_sum,
2939
            ),
2940
            AccountUsage=AccountUsage(
2941
                TotalCodeSize=code_size_sum,
2942
                FunctionCount=fn_count,
2943
            ),
2944
        )
2945

2946
    # =======================================
2947
    # ==  Provisioned Concurrency Config   ==
2948
    # =======================================
2949

2950
    def _get_provisioned_config(
1✔
2951
        self, context: RequestContext, function_name: str, qualifier: str
2952
    ) -> ProvisionedConcurrencyConfiguration | None:
2953
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2954
        state = lambda_stores[account_id][region]
1✔
2955
        function_name = api_utils.get_function_name(function_name, context)
1✔
2956
        fn = state.functions.get(function_name)
1✔
2957
        if api_utils.qualifier_is_alias(qualifier):
1✔
2958
            fn_alias = None
1✔
2959
            if fn:
1✔
2960
                fn_alias = fn.aliases.get(qualifier)
1✔
2961
            if fn_alias is None:
1✔
2962
                raise ResourceNotFoundException(
1✔
2963
                    f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
2964
                    Type="User",
2965
                )
2966
        elif api_utils.qualifier_is_version(qualifier):
1✔
2967
            fn_version = None
1✔
2968
            if fn:
1✔
2969
                fn_version = fn.versions.get(qualifier)
1✔
2970
            if fn_version is None:
1✔
2971
                raise ResourceNotFoundException(
1✔
2972
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
2973
                    Type="User",
2974
                )
2975

2976
        return fn.provisioned_concurrency_configs.get(qualifier)
1✔
2977

2978
    def put_provisioned_concurrency_config(
1✔
2979
        self,
2980
        context: RequestContext,
2981
        function_name: FunctionName,
2982
        qualifier: Qualifier,
2983
        provisioned_concurrent_executions: PositiveInteger,
2984
        **kwargs,
2985
    ) -> PutProvisionedConcurrencyConfigResponse:
2986
        if provisioned_concurrent_executions <= 0:
1✔
2987
            raise ValidationException(
1✔
2988
                f"1 validation error detected: Value '{provisioned_concurrent_executions}' at 'provisionedConcurrentExecutions' failed to satisfy constraint: Member must have value greater than or equal to 1"
2989
            )
2990

2991
        if qualifier == "$LATEST":
1✔
2992
            raise InvalidParameterValueException(
1✔
2993
                "Provisioned Concurrency Configs cannot be applied to unpublished function versions.",
2994
                Type="User",
2995
            )
2996
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2997
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2998
            function_name, qualifier, context
2999
        )
3000
        state = lambda_stores[account_id][region]
1✔
3001
        fn = state.functions.get(function_name)
1✔
3002

3003
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3004

3005
        if provisioned_config:  # TODO: merge?
1✔
3006
            # TODO: add a test for partial updates (if possible)
3007
            LOG.warning(
1✔
3008
                "Partial update of provisioned concurrency config is currently not supported."
3009
            )
3010

3011
        other_provisioned_sum = sum(
1✔
3012
            [
3013
                provisioned_configs.provisioned_concurrent_executions
3014
                for provisioned_qualifier, provisioned_configs in fn.provisioned_concurrency_configs.items()
3015
                if provisioned_qualifier != qualifier
3016
            ]
3017
        )
3018

3019
        if (
1✔
3020
            fn.reserved_concurrent_executions is not None
3021
            and fn.reserved_concurrent_executions
3022
            < other_provisioned_sum + provisioned_concurrent_executions
3023
        ):
3024
            raise InvalidParameterValueException(
1✔
3025
                "Requested Provisioned Concurrency should not be greater than the reservedConcurrentExecution for function",
3026
                Type="User",
3027
            )
3028

3029
        if provisioned_concurrent_executions > config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS:
1✔
3030
            raise InvalidParameterValueException(
1✔
3031
                f"Specified ConcurrentExecutions for function is greater than account's unreserved concurrency"
3032
                f" [{config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS}]."
3033
            )
3034

3035
        settings = self.get_account_settings(context)
1✔
3036
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
3037
            "UnreservedConcurrentExecutions"
3038
        ]
3039
        if (
1✔
3040
            unreserved_concurrent_executions - provisioned_concurrent_executions
3041
            < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY
3042
        ):
3043
            raise InvalidParameterValueException(
1✔
3044
                f"Specified ConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below"
3045
                f" its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
3046
            )
3047

3048
        provisioned_config = ProvisionedConcurrencyConfiguration(
1✔
3049
            provisioned_concurrent_executions, api_utils.generate_lambda_date()
3050
        )
3051
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3052

3053
        if api_utils.qualifier_is_alias(qualifier):
1✔
3054
            alias = fn.aliases.get(qualifier)
1✔
3055
            resolved_version = fn.versions.get(alias.function_version)
1✔
3056

3057
            if (
1✔
3058
                resolved_version
3059
                and fn.provisioned_concurrency_configs.get(alias.function_version) is not None
3060
            ):
3061
                raise ResourceConflictException(
1✔
3062
                    "Alias can't be used for Provisioned Concurrency configuration on an already Provisioned version",
3063
                    Type="User",
3064
                )
3065
            fn_arn = resolved_version.id.qualified_arn()
1✔
3066
        elif api_utils.qualifier_is_version(qualifier):
1✔
3067
            fn_version = fn.versions.get(qualifier)
1✔
3068

3069
            # TODO: might be useful other places, utilize
3070
            pointing_aliases = []
1✔
3071
            for alias in fn.aliases.values():
1✔
3072
                if (
1✔
3073
                    alias.function_version == qualifier
3074
                    and fn.provisioned_concurrency_configs.get(alias.name) is not None
3075
                ):
3076
                    pointing_aliases.append(alias.name)
1✔
3077
            if pointing_aliases:
1✔
3078
                raise ResourceConflictException(
1✔
3079
                    "Version is pointed by a Provisioned Concurrency alias", Type="User"
3080
                )
3081

3082
            fn_arn = fn_version.id.qualified_arn()
1✔
3083

3084
        manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3085

3086
        fn.provisioned_concurrency_configs[qualifier] = provisioned_config
1✔
3087

3088
        manager.update_provisioned_concurrency_config(
1✔
3089
            provisioned_config.provisioned_concurrent_executions
3090
        )
3091

3092
        return PutProvisionedConcurrencyConfigResponse(
1✔
3093
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3094
            AvailableProvisionedConcurrentExecutions=0,
3095
            AllocatedProvisionedConcurrentExecutions=0,
3096
            Status=ProvisionedConcurrencyStatusEnum.IN_PROGRESS,
3097
            # StatusReason=manager.provisioned_state.status_reason,
3098
            LastModified=provisioned_config.last_modified,  # TODO: does change with configuration or also with state changes?
3099
        )
3100

3101
    def get_provisioned_concurrency_config(
1✔
3102
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3103
    ) -> GetProvisionedConcurrencyConfigResponse:
3104
        if qualifier == "$LATEST":
1✔
3105
            raise InvalidParameterValueException(
1✔
3106
                "The function resource provided must be an alias or a published version.",
3107
                Type="User",
3108
            )
3109
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3110
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3111
            function_name, qualifier, context
3112
        )
3113

3114
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3115
        if not provisioned_config:
1✔
3116
            raise ProvisionedConcurrencyConfigNotFoundException(
1✔
3117
                "No Provisioned Concurrency Config found for this function", Type="User"
3118
            )
3119

3120
        # TODO: make this compatible with alias pointer migration on update
3121
        if api_utils.qualifier_is_alias(qualifier):
1✔
3122
            state = lambda_stores[account_id][region]
1✔
3123
            fn = state.functions.get(function_name)
1✔
3124
            alias = fn.aliases.get(qualifier)
1✔
3125
            fn_arn = api_utils.qualified_lambda_arn(
1✔
3126
                function_name, alias.function_version, account_id, region
3127
            )
3128
        else:
3129
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3130

3131
        ver_manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3132

3133
        return GetProvisionedConcurrencyConfigResponse(
1✔
3134
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3135
            LastModified=provisioned_config.last_modified,
3136
            AvailableProvisionedConcurrentExecutions=ver_manager.provisioned_state.available,
3137
            AllocatedProvisionedConcurrentExecutions=ver_manager.provisioned_state.allocated,
3138
            Status=ver_manager.provisioned_state.status,
3139
            StatusReason=ver_manager.provisioned_state.status_reason,
3140
        )
3141

3142
    def list_provisioned_concurrency_configs(
1✔
3143
        self,
3144
        context: RequestContext,
3145
        function_name: FunctionName,
3146
        marker: String = None,
3147
        max_items: MaxProvisionedConcurrencyConfigListItems = None,
3148
        **kwargs,
3149
    ) -> ListProvisionedConcurrencyConfigsResponse:
3150
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3151
        state = lambda_stores[account_id][region]
1✔
3152

3153
        function_name = api_utils.get_function_name(function_name, context)
1✔
3154
        fn = state.functions.get(function_name)
1✔
3155
        if fn is None:
1✔
3156
            raise ResourceNotFoundException(
1✔
3157
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
3158
                Type="User",
3159
            )
3160

3161
        configs = []
1✔
3162
        for qualifier, pc_config in fn.provisioned_concurrency_configs.items():
1✔
UNCOV
3163
            if api_utils.qualifier_is_alias(qualifier):
×
UNCOV
3164
                alias = fn.aliases.get(qualifier)
×
UNCOV
3165
                fn_arn = api_utils.qualified_lambda_arn(
×
3166
                    function_name, alias.function_version, account_id, region
3167
                )
3168
            else:
UNCOV
3169
                fn_arn = api_utils.qualified_lambda_arn(
×
3170
                    function_name, qualifier, account_id, region
3171
                )
3172

UNCOV
3173
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
3174

UNCOV
3175
            configs.append(
×
3176
                ProvisionedConcurrencyConfigListItem(
3177
                    FunctionArn=api_utils.qualified_lambda_arn(
3178
                        function_name, qualifier, account_id, region
3179
                    ),
3180
                    RequestedProvisionedConcurrentExecutions=pc_config.provisioned_concurrent_executions,
3181
                    AvailableProvisionedConcurrentExecutions=manager.provisioned_state.available,
3182
                    AllocatedProvisionedConcurrentExecutions=manager.provisioned_state.allocated,
3183
                    Status=manager.provisioned_state.status,
3184
                    StatusReason=manager.provisioned_state.status_reason,
3185
                    LastModified=pc_config.last_modified,
3186
                )
3187
            )
3188

3189
        provisioned_concurrency_configs = configs
1✔
3190
        provisioned_concurrency_configs = PaginatedList(provisioned_concurrency_configs)
1✔
3191
        page, token = provisioned_concurrency_configs.get_page(
1✔
3192
            lambda x: x,
3193
            marker,
3194
            max_items,
3195
        )
3196
        return ListProvisionedConcurrencyConfigsResponse(
1✔
3197
            ProvisionedConcurrencyConfigs=page, NextMarker=token
3198
        )
3199

3200
    def delete_provisioned_concurrency_config(
1✔
3201
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3202
    ) -> None:
3203
        if qualifier == "$LATEST":
1✔
3204
            raise InvalidParameterValueException(
1✔
3205
                "The function resource provided must be an alias or a published version.",
3206
                Type="User",
3207
            )
3208
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3209
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3210
            function_name, qualifier, context
3211
        )
3212
        state = lambda_stores[account_id][region]
1✔
3213
        fn = state.functions.get(function_name)
1✔
3214

3215
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3216
        # delete is idempotent and doesn't actually care about the provisioned concurrency config not existing
3217
        if provisioned_config:
1✔
3218
            fn.provisioned_concurrency_configs.pop(qualifier)
1✔
3219
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3220
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3221
            manager.update_provisioned_concurrency_config(0)
1✔
3222

3223
    # =======================================
3224
    # =======  Event Invoke Config   ========
3225
    # =======================================
3226

3227
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)"
3228
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]2((-gov)|(-iso(b?)))?-[a-z]+-\\d1)?:(\\d12)?:(.*)" ... (expected → actual)
3229

3230
    def _validate_destination_config(
1✔
3231
        self, store: LambdaStore, function_name: str, destination_config: DestinationConfig
3232
    ):
3233
        def _validate_destination_arn(destination_arn) -> bool:
1✔
3234
            if not api_utils.DESTINATION_ARN_PATTERN.match(destination_arn):
1✔
3235
                # technically we shouldn't handle this in the provider
3236
                raise ValidationException(
1✔
3237
                    "1 validation error detected: Value '"
3238
                    + destination_arn
3239
                    + r"' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\-])+:([a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\d{1})?:(\d{12})?:(.*)"
3240
                )
3241

3242
            match destination_arn.split(":")[2]:
1✔
3243
                case "lambda":
1✔
3244
                    fn_parts = api_utils.FULL_FN_ARN_PATTERN.search(destination_arn).groupdict()
1✔
3245
                    if fn_parts:
1✔
3246
                        # check if it exists
3247
                        fn = store.functions.get(fn_parts["function_name"])
1✔
3248
                        if not fn:
1✔
3249
                            raise InvalidParameterValueException(
1✔
3250
                                f"The destination ARN {destination_arn} is invalid.", Type="User"
3251
                            )
3252
                        if fn_parts["function_name"] == function_name:
1✔
3253
                            raise InvalidParameterValueException(
1✔
3254
                                "You can't specify the function as a destination for itself.",
3255
                                Type="User",
3256
                            )
3257
                case "sns" | "sqs" | "events":
1✔
3258
                    pass
1✔
3259
                case _:
1✔
3260
                    return False
1✔
3261
            return True
1✔
3262

3263
        validation_err = False
1✔
3264

3265
        failure_destination = destination_config.get("OnFailure", {}).get("Destination")
1✔
3266
        if failure_destination:
1✔
3267
            validation_err = validation_err or not _validate_destination_arn(failure_destination)
1✔
3268

3269
        success_destination = destination_config.get("OnSuccess", {}).get("Destination")
1✔
3270
        if success_destination:
1✔
3271
            validation_err = validation_err or not _validate_destination_arn(success_destination)
1✔
3272

3273
        if validation_err:
1✔
3274
            on_success_part = (
1✔
3275
                f"OnSuccess(destination={success_destination})" if success_destination else "null"
3276
            )
3277
            on_failure_part = (
1✔
3278
                f"OnFailure(destination={failure_destination})" if failure_destination else "null"
3279
            )
3280
            raise InvalidParameterValueException(
1✔
3281
                f"The provided destination config DestinationConfig(onSuccess={on_success_part}, onFailure={on_failure_part}) is invalid.",
3282
                Type="User",
3283
            )
3284

3285
    def put_function_event_invoke_config(
1✔
3286
        self,
3287
        context: RequestContext,
3288
        function_name: FunctionName,
3289
        qualifier: Qualifier = None,
3290
        maximum_retry_attempts: MaximumRetryAttempts = None,
3291
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3292
        destination_config: DestinationConfig = None,
3293
        **kwargs,
3294
    ) -> FunctionEventInvokeConfig:
3295
        """
3296
        Destination ARNs can be:
3297
        * SQS arn
3298
        * SNS arn
3299
        * Lambda arn
3300
        * EventBridge arn
3301

3302
        Differences between put_ and update_:
3303
            * put overwrites any existing config
3304
            * update allows changes only single values while keeping the rest of existing ones
3305
            * update fails on non-existing configs
3306

3307
        Differences between destination and DLQ
3308
            * "However, a dead-letter queue is part of a function's version-specific configuration, so it is locked in when you publish a version."
3309
            *  "On-failure destinations also support additional targets and include details about the function's response in the invocation record."
3310

3311
        """
3312
        if (
1✔
3313
            maximum_event_age_in_seconds is None
3314
            and maximum_retry_attempts is None
3315
            and destination_config is None
3316
        ):
3317
            raise InvalidParameterValueException(
1✔
3318
                "You must specify at least one of error handling or destination setting.",
3319
                Type="User",
3320
            )
3321
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3322
        state = lambda_stores[account_id][region]
1✔
3323
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3324
            function_name, qualifier, context
3325
        )
3326
        fn = state.functions.get(function_name)
1✔
3327
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3328
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3329

3330
        qualifier = qualifier or "$LATEST"
1✔
3331

3332
        # validate and normalize destination config
3333
        if destination_config:
1✔
3334
            self._validate_destination_config(state, function_name, destination_config)
1✔
3335

3336
        destination_config = DestinationConfig(
1✔
3337
            OnSuccess=OnSuccess(
3338
                Destination=(destination_config or {}).get("OnSuccess", {}).get("Destination")
3339
            ),
3340
            OnFailure=OnFailure(
3341
                Destination=(destination_config or {}).get("OnFailure", {}).get("Destination")
3342
            ),
3343
        )
3344

3345
        config = EventInvokeConfig(
1✔
3346
            function_name=function_name,
3347
            qualifier=qualifier,
3348
            maximum_event_age_in_seconds=maximum_event_age_in_seconds,
3349
            maximum_retry_attempts=maximum_retry_attempts,
3350
            last_modified=api_utils.generate_lambda_date(),
3351
            destination_config=destination_config,
3352
        )
3353
        fn.event_invoke_configs[qualifier] = config
1✔
3354

3355
        return FunctionEventInvokeConfig(
1✔
3356
            LastModified=datetime.datetime.strptime(
3357
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3358
            ),
3359
            FunctionArn=api_utils.qualified_lambda_arn(
3360
                function_name, qualifier or "$LATEST", account_id, region
3361
            ),
3362
            DestinationConfig=destination_config,
3363
            MaximumEventAgeInSeconds=maximum_event_age_in_seconds,
3364
            MaximumRetryAttempts=maximum_retry_attempts,
3365
        )
3366

3367
    def get_function_event_invoke_config(
1✔
3368
        self,
3369
        context: RequestContext,
3370
        function_name: FunctionName,
3371
        qualifier: Qualifier = None,
3372
        **kwargs,
3373
    ) -> FunctionEventInvokeConfig:
3374
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3375
        state = lambda_stores[account_id][region]
1✔
3376
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3377
            function_name, qualifier, context
3378
        )
3379

3380
        qualifier = qualifier or "$LATEST"
1✔
3381
        fn = state.functions.get(function_name)
1✔
3382
        if not fn:
1✔
3383
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3384
            raise ResourceNotFoundException(
1✔
3385
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3386
            )
3387

3388
        config = fn.event_invoke_configs.get(qualifier)
1✔
3389
        if not config:
1✔
3390
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3391
            raise ResourceNotFoundException(
1✔
3392
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3393
            )
3394

3395
        return FunctionEventInvokeConfig(
1✔
3396
            LastModified=datetime.datetime.strptime(
3397
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3398
            ),
3399
            FunctionArn=api_utils.qualified_lambda_arn(
3400
                function_name, qualifier, account_id, region
3401
            ),
3402
            DestinationConfig=config.destination_config,
3403
            MaximumEventAgeInSeconds=config.maximum_event_age_in_seconds,
3404
            MaximumRetryAttempts=config.maximum_retry_attempts,
3405
        )
3406

3407
    def list_function_event_invoke_configs(
1✔
3408
        self,
3409
        context: RequestContext,
3410
        function_name: FunctionName,
3411
        marker: String = None,
3412
        max_items: MaxFunctionEventInvokeConfigListItems = None,
3413
        **kwargs,
3414
    ) -> ListFunctionEventInvokeConfigsResponse:
3415
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3416
        state = lambda_stores[account_id][region]
1✔
3417
        fn = state.functions.get(function_name)
1✔
3418
        if not fn:
1✔
3419
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3420

3421
        event_invoke_configs = [
1✔
3422
            FunctionEventInvokeConfig(
3423
                LastModified=c.last_modified,
3424
                FunctionArn=api_utils.qualified_lambda_arn(
3425
                    function_name, c.qualifier, account_id, region
3426
                ),
3427
                MaximumEventAgeInSeconds=c.maximum_event_age_in_seconds,
3428
                MaximumRetryAttempts=c.maximum_retry_attempts,
3429
                DestinationConfig=c.destination_config,
3430
            )
3431
            for c in fn.event_invoke_configs.values()
3432
        ]
3433

3434
        event_invoke_configs = PaginatedList(event_invoke_configs)
1✔
3435
        page, token = event_invoke_configs.get_page(
1✔
3436
            lambda x: x["FunctionArn"],
3437
            marker,
3438
            max_items,
3439
        )
3440
        return ListFunctionEventInvokeConfigsResponse(
1✔
3441
            FunctionEventInvokeConfigs=page, NextMarker=token
3442
        )
3443

3444
    def delete_function_event_invoke_config(
1✔
3445
        self,
3446
        context: RequestContext,
3447
        function_name: FunctionName,
3448
        qualifier: Qualifier = None,
3449
        **kwargs,
3450
    ) -> None:
3451
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3452
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3453
            function_name, qualifier, context
3454
        )
3455
        state = lambda_stores[account_id][region]
1✔
3456
        fn = state.functions.get(function_name)
1✔
3457
        resolved_qualifier = qualifier or "$LATEST"
1✔
3458
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3459
        if not fn:
1✔
3460
            raise ResourceNotFoundException(
1✔
3461
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3462
            )
3463

3464
        config = fn.event_invoke_configs.get(resolved_qualifier)
1✔
3465
        if not config:
1✔
3466
            raise ResourceNotFoundException(
1✔
3467
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3468
            )
3469

3470
        del fn.event_invoke_configs[resolved_qualifier]
1✔
3471

3472
    def update_function_event_invoke_config(
1✔
3473
        self,
3474
        context: RequestContext,
3475
        function_name: FunctionName,
3476
        qualifier: Qualifier = None,
3477
        maximum_retry_attempts: MaximumRetryAttempts = None,
3478
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3479
        destination_config: DestinationConfig = None,
3480
        **kwargs,
3481
    ) -> FunctionEventInvokeConfig:
3482
        # like put but only update single fields via replace
3483
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3484
        state = lambda_stores[account_id][region]
1✔
3485
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3486
            function_name, qualifier, context
3487
        )
3488

3489
        if (
1✔
3490
            maximum_event_age_in_seconds is None
3491
            and maximum_retry_attempts is None
3492
            and destination_config is None
3493
        ):
UNCOV
3494
            raise InvalidParameterValueException(
×
3495
                "You must specify at least one of error handling or destination setting.",
3496
                Type="User",
3497
            )
3498

3499
        fn = state.functions.get(function_name)
1✔
3500
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3501
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3502

3503
        qualifier = qualifier or "$LATEST"
1✔
3504

3505
        config = fn.event_invoke_configs.get(qualifier)
1✔
3506
        if not config:
1✔
3507
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3508
            raise ResourceNotFoundException(
1✔
3509
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3510
            )
3511

3512
        if destination_config:
1✔
UNCOV
3513
            self._validate_destination_config(state, function_name, destination_config)
×
3514

3515
        optional_kwargs = {
1✔
3516
            k: v
3517
            for k, v in {
3518
                "destination_config": destination_config,
3519
                "maximum_retry_attempts": maximum_retry_attempts,
3520
                "maximum_event_age_in_seconds": maximum_event_age_in_seconds,
3521
            }.items()
3522
            if v is not None
3523
        }
3524

3525
        new_config = dataclasses.replace(
1✔
3526
            config, last_modified=api_utils.generate_lambda_date(), **optional_kwargs
3527
        )
3528
        fn.event_invoke_configs[qualifier] = new_config
1✔
3529

3530
        return FunctionEventInvokeConfig(
1✔
3531
            LastModified=datetime.datetime.strptime(
3532
                new_config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3533
            ),
3534
            FunctionArn=api_utils.qualified_lambda_arn(
3535
                function_name, qualifier or "$LATEST", account_id, region
3536
            ),
3537
            DestinationConfig=new_config.destination_config,
3538
            MaximumEventAgeInSeconds=new_config.maximum_event_age_in_seconds,
3539
            MaximumRetryAttempts=new_config.maximum_retry_attempts,
3540
        )
3541

3542
    # =======================================
3543
    # ======  Layer & Layer Versions  =======
3544
    # =======================================
3545

3546
    @staticmethod
1✔
3547
    def _resolve_layer(
1✔
3548
        layer_name_or_arn: str, context: RequestContext
3549
    ) -> Tuple[str, str, str, Optional[str]]:
3550
        """
3551
        Return locator attributes for a given Lambda layer.
3552

3553
        :param layer_name_or_arn: Layer name or ARN
3554
        :param context: Request context
3555
        :return: Tuple of region, account ID, layer name, layer version
3556
        """
3557
        if api_utils.is_layer_arn(layer_name_or_arn):
1✔
3558
            return api_utils.parse_layer_arn(layer_name_or_arn)
1✔
3559

3560
        return context.region, context.account_id, layer_name_or_arn, None
1✔
3561

3562
    def publish_layer_version(
1✔
3563
        self,
3564
        context: RequestContext,
3565
        layer_name: LayerName,
3566
        content: LayerVersionContentInput,
3567
        description: Description = None,
3568
        compatible_runtimes: CompatibleRuntimes = None,
3569
        license_info: LicenseInfo = None,
3570
        compatible_architectures: CompatibleArchitectures = None,
3571
        **kwargs,
3572
    ) -> PublishLayerVersionResponse:
3573
        """
3574
        On first use of a LayerName a new layer is created and for each subsequent call with the same LayerName a new version is created.
3575
        Note that there are no $LATEST versions with layers!
3576

3577
        """
3578
        account = context.account_id
1✔
3579
        region = context.region
1✔
3580

3581
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
3582
            compatible_runtimes, compatible_architectures
3583
        )
3584
        if validation_errors:
1✔
3585
            raise ValidationException(
1✔
3586
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {'; '.join(validation_errors)}"
3587
            )
3588

3589
        state = lambda_stores[account][region]
1✔
3590
        with self.create_layer_lock:
1✔
3591
            if layer_name not in state.layers:
1✔
3592
                # we don't have a version so create new layer object
3593
                # lock is required to avoid creating two v1 objects for the same name
3594
                layer = Layer(
1✔
3595
                    arn=api_utils.layer_arn(layer_name=layer_name, account=account, region=region)
3596
                )
3597
                state.layers[layer_name] = layer
1✔
3598

3599
        layer = state.layers[layer_name]
1✔
3600
        with layer.next_version_lock:
1✔
3601
            next_version = LambdaLayerVersionIdentifier(
1✔
3602
                account_id=account, region=region, layer_name=layer_name
3603
            ).generate(next_version=layer.next_version)
3604
            # When creating a layer with user defined layer version, it is possible that we
3605
            # create layer versions out of order.
3606
            # ie. a user could replicate layer v2 then layer v1. It is important to always keep the maximum possible
3607
            # value for next layer to avoid overwriting existing versions
3608
            if layer.next_version <= next_version:
1✔
3609
                # We don't need to update layer.next_version if the created version is lower than the "next in line"
3610
                layer.next_version = max(next_version, layer.next_version) + 1
1✔
3611

3612
        # creating a new layer
3613
        if content.get("ZipFile"):
1✔
3614
            code = store_lambda_archive(
1✔
3615
                archive_file=content["ZipFile"],
3616
                function_name=layer_name,
3617
                region_name=region,
3618
                account_id=account,
3619
            )
3620
        else:
3621
            code = store_s3_bucket_archive(
1✔
3622
                archive_bucket=content["S3Bucket"],
3623
                archive_key=content["S3Key"],
3624
                archive_version=content.get("S3ObjectVersion"),
3625
                function_name=layer_name,
3626
                region_name=region,
3627
                account_id=account,
3628
            )
3629

3630
        new_layer_version = LayerVersion(
1✔
3631
            layer_version_arn=api_utils.layer_version_arn(
3632
                layer_name=layer_name,
3633
                account=account,
3634
                region=region,
3635
                version=str(next_version),
3636
            ),
3637
            layer_arn=layer.arn,
3638
            version=next_version,
3639
            description=description or "",
3640
            license_info=license_info,
3641
            compatible_runtimes=compatible_runtimes,
3642
            compatible_architectures=compatible_architectures,
3643
            created=api_utils.generate_lambda_date(),
3644
            code=code,
3645
        )
3646

3647
        layer.layer_versions[str(next_version)] = new_layer_version
1✔
3648

3649
        return api_utils.map_layer_out(new_layer_version)
1✔
3650

3651
    def get_layer_version(
1✔
3652
        self,
3653
        context: RequestContext,
3654
        layer_name: LayerName,
3655
        version_number: LayerVersionNumber,
3656
        **kwargs,
3657
    ) -> GetLayerVersionResponse:
3658
        # TODO: handle layer_name as an ARN
3659

3660
        region_name, account_id, layer_name, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
3661
        state = lambda_stores[account_id][region_name]
1✔
3662

3663
        layer = state.layers.get(layer_name)
1✔
3664
        if version_number < 1:
1✔
3665
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
3666
        if layer is None:
1✔
3667
            raise ResourceNotFoundException(
1✔
3668
                "The resource you requested does not exist.", Type="User"
3669
            )
3670
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3671
        if layer_version is None:
1✔
3672
            raise ResourceNotFoundException(
1✔
3673
                "The resource you requested does not exist.", Type="User"
3674
            )
3675
        return api_utils.map_layer_out(layer_version)
1✔
3676

3677
    def get_layer_version_by_arn(
1✔
3678
        self, context: RequestContext, arn: LayerVersionArn, **kwargs
3679
    ) -> GetLayerVersionResponse:
3680
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3681
            arn, context
3682
        )
3683

3684
        if not layer_version:
1✔
3685
            raise ValidationException(
1✔
3686
                f"1 validation error detected: Value '{arn}' at 'arn' failed to satisfy constraint: Member must satisfy regular expression pattern: "
3687
                + "(arn:(aws[a-zA-Z-]*)?:lambda:[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+)"
3688
            )
3689

3690
        store = lambda_stores[account_id][region_name]
1✔
3691
        if not (layers := store.layers.get(layer_name)):
1✔
UNCOV
3692
            raise ResourceNotFoundException(
×
3693
                "The resource you requested does not exist.", Type="User"
3694
            )
3695

3696
        layer_version = layers.layer_versions.get(layer_version)
1✔
3697

3698
        if not layer_version:
1✔
3699
            raise ResourceNotFoundException(
1✔
3700
                "The resource you requested does not exist.", Type="User"
3701
            )
3702

3703
        return api_utils.map_layer_out(layer_version)
1✔
3704

3705
    def list_layers(
1✔
3706
        self,
3707
        context: RequestContext,
3708
        compatible_runtime: Runtime = None,
3709
        marker: String = None,
3710
        max_items: MaxLayerListItems = None,
3711
        compatible_architecture: Architecture = None,
3712
        **kwargs,
3713
    ) -> ListLayersResponse:
3714
        validation_errors = []
1✔
3715

3716
        validation_error_arch = api_utils.validate_layer_architecture(compatible_architecture)
1✔
3717
        if validation_error_arch:
1✔
3718
            validation_errors.append(validation_error_arch)
1✔
3719

3720
        validation_error_runtime = api_utils.validate_layer_runtime(compatible_runtime)
1✔
3721
        if validation_error_runtime:
1✔
3722
            validation_errors.append(validation_error_runtime)
1✔
3723

3724
        if validation_errors:
1✔
3725
            raise ValidationException(
1✔
3726
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
3727
            )
3728
        # TODO: handle filter: compatible_runtime
3729
        # TODO: handle filter: compatible_architecture
3730

UNCOV
3731
        state = lambda_stores[context.account_id][context.region]
×
UNCOV
3732
        layers = state.layers
×
3733

3734
        # TODO: test how filters behave together with only returning layers here? Does it return the latest "matching" layer, i.e. does it ignore later layer versions that don't match?
3735

UNCOV
3736
        responses: list[LayersListItem] = []
×
UNCOV
3737
        for layer_name, layer in layers.items():
×
3738
            # fetch latest version
UNCOV
3739
            layer_versions = list(layer.layer_versions.values())
×
UNCOV
3740
            sorted(layer_versions, key=lambda x: x.version)
×
UNCOV
3741
            latest_layer_version = layer_versions[-1]
×
UNCOV
3742
            responses.append(
×
3743
                LayersListItem(
3744
                    LayerName=layer_name,
3745
                    LayerArn=layer.arn,
3746
                    LatestMatchingVersion=api_utils.map_layer_out(latest_layer_version),
3747
                )
3748
            )
3749

UNCOV
3750
        responses = PaginatedList(responses)
×
UNCOV
3751
        page, token = responses.get_page(
×
3752
            lambda version: version,
3753
            marker,
3754
            max_items,
3755
        )
3756

UNCOV
3757
        return ListLayersResponse(NextMarker=token, Layers=page)
×
3758

3759
    def list_layer_versions(
1✔
3760
        self,
3761
        context: RequestContext,
3762
        layer_name: LayerName,
3763
        compatible_runtime: Runtime = None,
3764
        marker: String = None,
3765
        max_items: MaxLayerListItems = None,
3766
        compatible_architecture: Architecture = None,
3767
        **kwargs,
3768
    ) -> ListLayerVersionsResponse:
3769
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
3770
            [compatible_runtime] if compatible_runtime else [],
3771
            [compatible_architecture] if compatible_architecture else [],
3772
        )
3773
        if validation_errors:
1✔
UNCOV
3774
            raise ValidationException(
×
3775
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
3776
            )
3777

3778
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3779
            layer_name, context
3780
        )
3781
        state = lambda_stores[account_id][region_name]
1✔
3782

3783
        # TODO: Test & handle filter: compatible_runtime
3784
        # TODO: Test & handle filter: compatible_architecture
3785
        all_layer_versions = []
1✔
3786
        layer = state.layers.get(layer_name)
1✔
3787
        if layer is not None:
1✔
3788
            for layer_version in layer.layer_versions.values():
1✔
3789
                all_layer_versions.append(api_utils.map_layer_out(layer_version))
1✔
3790

3791
        all_layer_versions.sort(key=lambda x: x["Version"], reverse=True)
1✔
3792
        all_layer_versions = PaginatedList(all_layer_versions)
1✔
3793
        page, token = all_layer_versions.get_page(
1✔
3794
            lambda version: version["LayerVersionArn"],
3795
            marker,
3796
            max_items,
3797
        )
3798
        return ListLayerVersionsResponse(NextMarker=token, LayerVersions=page)
1✔
3799

3800
    def delete_layer_version(
1✔
3801
        self,
3802
        context: RequestContext,
3803
        layer_name: LayerName,
3804
        version_number: LayerVersionNumber,
3805
        **kwargs,
3806
    ) -> None:
3807
        if version_number < 1:
1✔
3808
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
3809

3810
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3811
            layer_name, context
3812
        )
3813

3814
        store = lambda_stores[account_id][region_name]
1✔
3815
        layer = store.layers.get(layer_name, {})
1✔
3816
        if layer:
1✔
3817
            layer.layer_versions.pop(str(version_number), None)
1✔
3818

3819
    # =======================================
3820
    # =====  Layer Version Permissions  =====
3821
    # =======================================
3822
    # TODO: lock updates that change revision IDs
3823

3824
    def add_layer_version_permission(
1✔
3825
        self,
3826
        context: RequestContext,
3827
        layer_name: LayerName,
3828
        version_number: LayerVersionNumber,
3829
        statement_id: StatementId,
3830
        action: LayerPermissionAllowedAction,
3831
        principal: LayerPermissionAllowedPrincipal,
3832
        organization_id: OrganizationId = None,
3833
        revision_id: String = None,
3834
        **kwargs,
3835
    ) -> AddLayerVersionPermissionResponse:
3836
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
3837
        # `layer_n` contains the layer name.
3838
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
3839

3840
        if action != "lambda:GetLayerVersion":
1✔
3841
            raise ValidationException(
1✔
3842
                f"1 validation error detected: Value '{action}' at 'action' failed to satisfy constraint: Member must satisfy regular expression pattern: lambda:GetLayerVersion"
3843
            )
3844

3845
        store = lambda_stores[account_id][region_name]
1✔
3846
        layer = store.layers.get(layer_n)
1✔
3847

3848
        layer_version_arn = api_utils.layer_version_arn(
1✔
3849
            layer_name, account_id, region_name, str(version_number)
3850
        )
3851

3852
        if layer is None:
1✔
3853
            raise ResourceNotFoundException(
1✔
3854
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3855
            )
3856
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3857
        if layer_version is None:
1✔
3858
            raise ResourceNotFoundException(
1✔
3859
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3860
            )
3861
        # do we have a policy? if not set one
3862
        if layer_version.policy is None:
1✔
3863
            layer_version.policy = LayerPolicy()
1✔
3864

3865
        if statement_id in layer_version.policy.statements:
1✔
3866
            raise ResourceConflictException(
1✔
3867
                f"The statement id ({statement_id}) provided already exists. Please provide a new statement id, or remove the existing statement.",
3868
                Type="User",
3869
            )
3870

3871
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
3872
            raise PreconditionFailedException(
1✔
3873
                "The Revision Id provided does not match the latest Revision Id. "
3874
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
3875
                Type="User",
3876
            )
3877

3878
        statement = LayerPolicyStatement(
1✔
3879
            sid=statement_id, action=action, principal=principal, organization_id=organization_id
3880
        )
3881

3882
        old_statements = layer_version.policy.statements
1✔
3883
        layer_version.policy = dataclasses.replace(
1✔
3884
            layer_version.policy, statements={**old_statements, statement_id: statement}
3885
        )
3886

3887
        return AddLayerVersionPermissionResponse(
1✔
3888
            Statement=json.dumps(
3889
                {
3890
                    "Sid": statement.sid,
3891
                    "Effect": "Allow",
3892
                    "Principal": statement.principal,
3893
                    "Action": statement.action,
3894
                    "Resource": layer_version.layer_version_arn,
3895
                }
3896
            ),
3897
            RevisionId=layer_version.policy.revision_id,
3898
        )
3899

3900
    def remove_layer_version_permission(
1✔
3901
        self,
3902
        context: RequestContext,
3903
        layer_name: LayerName,
3904
        version_number: LayerVersionNumber,
3905
        statement_id: StatementId,
3906
        revision_id: String = None,
3907
        **kwargs,
3908
    ) -> None:
3909
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
3910
        # `layer_n` contains the layer name.
3911
        region_name, account_id, layer_n, layer_version = LambdaProvider._resolve_layer(
1✔
3912
            layer_name, context
3913
        )
3914

3915
        layer_version_arn = api_utils.layer_version_arn(
1✔
3916
            layer_name, account_id, region_name, str(version_number)
3917
        )
3918

3919
        state = lambda_stores[account_id][region_name]
1✔
3920
        layer = state.layers.get(layer_n)
1✔
3921
        if layer is None:
1✔
3922
            raise ResourceNotFoundException(
1✔
3923
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3924
            )
3925
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3926
        if layer_version is None:
1✔
3927
            raise ResourceNotFoundException(
1✔
3928
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3929
            )
3930

3931
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
3932
            raise PreconditionFailedException(
1✔
3933
                "The Revision Id provided does not match the latest Revision Id. "
3934
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
3935
                Type="User",
3936
            )
3937

3938
        if statement_id not in layer_version.policy.statements:
1✔
3939
            raise ResourceNotFoundException(
1✔
3940
                f"Statement {statement_id} is not found in resource policy.", Type="User"
3941
            )
3942

3943
        old_statements = layer_version.policy.statements
1✔
3944
        layer_version.policy = dataclasses.replace(
1✔
3945
            layer_version.policy,
3946
            statements={k: v for k, v in old_statements.items() if k != statement_id},
3947
        )
3948

3949
    def get_layer_version_policy(
1✔
3950
        self,
3951
        context: RequestContext,
3952
        layer_name: LayerName,
3953
        version_number: LayerVersionNumber,
3954
        **kwargs,
3955
    ) -> GetLayerVersionPolicyResponse:
3956
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
3957
        # `layer_n` contains the layer name.
3958
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
3959

3960
        layer_version_arn = api_utils.layer_version_arn(
1✔
3961
            layer_name, account_id, region_name, str(version_number)
3962
        )
3963

3964
        store = lambda_stores[account_id][region_name]
1✔
3965
        layer = store.layers.get(layer_n)
1✔
3966

3967
        if layer is None:
1✔
3968
            raise ResourceNotFoundException(
1✔
3969
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3970
            )
3971

3972
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3973
        if layer_version is None:
1✔
3974
            raise ResourceNotFoundException(
1✔
3975
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3976
            )
3977

3978
        if layer_version.policy is None:
1✔
3979
            raise ResourceNotFoundException(
1✔
3980
                "No policy is associated with the given resource.", Type="User"
3981
            )
3982

3983
        return GetLayerVersionPolicyResponse(
1✔
3984
            Policy=json.dumps(
3985
                {
3986
                    "Version": layer_version.policy.version,
3987
                    "Id": layer_version.policy.id,
3988
                    "Statement": [
3989
                        {
3990
                            "Sid": ps.sid,
3991
                            "Effect": "Allow",
3992
                            "Principal": ps.principal,
3993
                            "Action": ps.action,
3994
                            "Resource": layer_version.layer_version_arn,
3995
                        }
3996
                        for ps in layer_version.policy.statements.values()
3997
                    ],
3998
                }
3999
            ),
4000
            RevisionId=layer_version.policy.revision_id,
4001
        )
4002

4003
    # =======================================
4004
    # =======  Function Concurrency  ========
4005
    # =======================================
4006
    # (Reserved) function concurrency is scoped to the whole function
4007

4008
    def get_function_concurrency(
1✔
4009
        self, context: RequestContext, function_name: FunctionName, **kwargs
4010
    ) -> GetFunctionConcurrencyResponse:
4011
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4012
        function_name = api_utils.get_function_name(function_name, context)
1✔
4013
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
4014
        return GetFunctionConcurrencyResponse(
1✔
4015
            ReservedConcurrentExecutions=fn.reserved_concurrent_executions
4016
        )
4017

4018
    def put_function_concurrency(
1✔
4019
        self,
4020
        context: RequestContext,
4021
        function_name: FunctionName,
4022
        reserved_concurrent_executions: ReservedConcurrentExecutions,
4023
        **kwargs,
4024
    ) -> Concurrency:
4025
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4026

4027
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4028
        if qualifier:
1✔
4029
            raise InvalidParameterValueException(
1✔
4030
                "This operation is permitted on Lambda functions only. Aliases and versions do not support this operation. Please specify either a function name or an unqualified function ARN.",
4031
                Type="User",
4032
            )
4033

4034
        store = lambda_stores[account_id][region]
1✔
4035
        fn = store.functions.get(function_name)
1✔
4036
        if not fn:
1✔
4037
            fn_arn = api_utils.qualified_lambda_arn(
1✔
4038
                function_name,
4039
                qualifier="$LATEST",
4040
                account=account_id,
4041
                region=region,
4042
            )
4043
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
4044

4045
        settings = self.get_account_settings(context)
1✔
4046
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
4047
            "UnreservedConcurrentExecutions"
4048
        ]
4049

4050
        # The existing reserved concurrent executions for the same function are already deduced in
4051
        # unreserved_concurrent_executions but must not count because the new one will replace the existing one.
4052
        # Joel tested this behavior manually against AWS (2023-11-28).
4053
        existing_reserved_concurrent_executions = (
1✔
4054
            fn.reserved_concurrent_executions if fn.reserved_concurrent_executions else 0
4055
        )
4056
        if (
1✔
4057
            unreserved_concurrent_executions
4058
            - reserved_concurrent_executions
4059
            + existing_reserved_concurrent_executions
4060
        ) < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY:
4061
            raise InvalidParameterValueException(
1✔
4062
                f"Specified ReservedConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
4063
            )
4064

4065
        total_provisioned_concurrency = sum(
1✔
4066
            [
4067
                provisioned_configs.provisioned_concurrent_executions
4068
                for provisioned_configs in fn.provisioned_concurrency_configs.values()
4069
            ]
4070
        )
4071
        if total_provisioned_concurrency > reserved_concurrent_executions:
1✔
4072
            raise InvalidParameterValueException(
1✔
4073
                f" ReservedConcurrentExecutions  {reserved_concurrent_executions} should not be lower than function's total provisioned concurrency [{total_provisioned_concurrency}]."
4074
            )
4075

4076
        fn.reserved_concurrent_executions = reserved_concurrent_executions
1✔
4077

4078
        return Concurrency(ReservedConcurrentExecutions=fn.reserved_concurrent_executions)
1✔
4079

4080
    def delete_function_concurrency(
1✔
4081
        self, context: RequestContext, function_name: FunctionName, **kwargs
4082
    ) -> None:
4083
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4084
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4085
        store = lambda_stores[account_id][region]
1✔
4086
        fn = store.functions.get(function_name)
1✔
4087
        fn.reserved_concurrent_executions = None
1✔
4088

4089
    # =======================================
4090
    # ===============  TAGS   ===============
4091
    # =======================================
4092
    # only Function, Event Source Mapping, and Code Signing Config (not currently supported by LocalStack) ARNs an are available for tagging in AWS
4093

4094
    def _get_tags(self, resource: TaggableResource) -> dict[str, str]:
1✔
4095
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4096
        lambda_adapted_tags = {
1✔
4097
            tag["Key"]: tag["Value"]
4098
            for tag in state.TAGS.list_tags_for_resource(resource).get("Tags")
4099
        }
4100
        return lambda_adapted_tags
1✔
4101

4102
    def _store_tags(self, resource: TaggableResource, tags: dict[str, str]):
1✔
4103
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4104
        if len(state.TAGS.tags.get(resource, {}) | tags) > LAMBDA_TAG_LIMIT_PER_RESOURCE:
1✔
4105
            raise InvalidParameterValueException(
1✔
4106
                "Number of tags exceeds resource tag limit.", Type="User"
4107
            )
4108

4109
        tag_svc_adapted_tags = [{"Key": key, "Value": value} for key, value in tags.items()]
1✔
4110
        state.TAGS.tag_resource(resource, tag_svc_adapted_tags)
1✔
4111

4112
    def fetch_lambda_store_for_tagging(self, resource: TaggableResource) -> LambdaStore:
1✔
4113
        """
4114
        Takes a resource ARN for a TaggableResource (Lambda Function, Event Source Mapping, or Code Signing Config) and returns a corresponding
4115
        LambdaStore for its region and account.
4116

4117
        In addition, this function validates that the ARN is a valid TaggableResource type, and that the TaggableResource exists.
4118

4119
        Raises:
4120
            ValidationException: If the resource ARN is not a full ARN for a TaggableResource.
4121
            ResourceNotFoundException: If the specified resource does not exist.
4122
            InvalidParameterValueException: If the resource ARN is a qualified Lambda Function.
4123
        """
4124

4125
        def _raise_validation_exception():
1✔
4126
            raise ValidationException(
1✔
4127
                f"1 validation error detected: Value '{resource}' at 'resource' failed to satisfy constraint: Member must satisfy regular expression pattern: {api_utils.TAGGABLE_RESOURCE_ARN_PATTERN}"
4128
            )
4129

4130
        # Check whether the ARN we have been passed is correctly formatted
4131
        parsed_resource_arn: ArnData = None
1✔
4132
        try:
1✔
4133
            parsed_resource_arn = parse_arn(resource)
1✔
4134
        except Exception:
1✔
4135
            _raise_validation_exception()
1✔
4136

4137
        # TODO: Should we be checking whether this is a full ARN?
4138
        region, account_id, resource_type = map(
1✔
4139
            parsed_resource_arn.get, ("region", "account", "resource")
4140
        )
4141

4142
        if not all((region, account_id, resource_type)):
1✔
UNCOV
4143
            _raise_validation_exception()
×
4144

4145
        if not (parts := resource_type.split(":")):
1✔
UNCOV
4146
            _raise_validation_exception()
×
4147

4148
        resource_type, resource_identifier, *qualifier = parts
1✔
4149
        if resource_type not in {"event-source-mapping", "code-signing-config", "function"}:
1✔
4150
            _raise_validation_exception()
1✔
4151

4152
        if qualifier:
1✔
4153
            if resource_type == "function":
1✔
4154
                raise InvalidParameterValueException(
1✔
4155
                    "Tags on function aliases and versions are not supported. Please specify a function ARN.",
4156
                    Type="User",
4157
                )
4158
            _raise_validation_exception()
1✔
4159

4160
        match resource_type:
1✔
4161
            case "event-source-mapping":
1✔
4162
                self._get_esm(resource_identifier, account_id, region)
1✔
4163
            case "code-signing-config":
1✔
4164
                raise NotImplementedError("Resource tagging on CSC not yet implemented.")
4165
            case "function":
1✔
4166
                self._get_function(
1✔
4167
                    function_name=resource_identifier, account_id=account_id, region=region
4168
                )
4169

4170
        # If no exceptions are raised, assume ARN and referenced resource is valid for tag operations
4171
        return lambda_stores[account_id][region]
1✔
4172

4173
    def tag_resource(
1✔
4174
        self, context: RequestContext, resource: TaggableResource, tags: Tags, **kwargs
4175
    ) -> None:
4176
        if not tags:
1✔
4177
            raise InvalidParameterValueException(
1✔
4178
                "An error occurred and the request cannot be processed.", Type="User"
4179
            )
4180
        self._store_tags(resource, tags)
1✔
4181

4182
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4183
            "function"
4184
        ):
4185
            name, _, account, region = function_locators_from_arn(resource)
1✔
4186
            function = self._get_function(name, account, region)
1✔
4187
            with function.lock:
1✔
4188
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4189
                latest_version = function.versions["$LATEST"]
1✔
4190
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4191
                    latest_version, config=dataclasses.replace(latest_version.config)
4192
                )
4193

4194
    def list_tags(
1✔
4195
        self, context: RequestContext, resource: TaggableResource, **kwargs
4196
    ) -> ListTagsResponse:
4197
        tags = self._get_tags(resource)
1✔
4198
        return ListTagsResponse(Tags=tags)
1✔
4199

4200
    def untag_resource(
1✔
4201
        self, context: RequestContext, resource: TaggableResource, tag_keys: TagKeyList, **kwargs
4202
    ) -> None:
4203
        if not tag_keys:
1✔
4204
            raise ValidationException(
1✔
4205
                "1 validation error detected: Value null at 'tagKeys' failed to satisfy constraint: Member must not be null"
4206
            )  # should probably be generalized a bit
4207

4208
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4209
        state.TAGS.untag_resource(resource, tag_keys)
1✔
4210

4211
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4212
            "function"
4213
        ):
4214
            name, _, account, region = function_locators_from_arn(resource)
1✔
4215
            function = self._get_function(name, account, region)
1✔
4216
            # TODO: Potential race condition
4217
            with function.lock:
1✔
4218
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4219
                latest_version = function.versions["$LATEST"]
1✔
4220
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4221
                    latest_version, config=dataclasses.replace(latest_version.config)
4222
                )
4223

4224
    # =======================================
4225
    # =======  LEGACY / DEPRECATED   ========
4226
    # =======================================
4227

4228
    def invoke_async(
1✔
4229
        self,
4230
        context: RequestContext,
4231
        function_name: NamespacedFunctionName,
4232
        invoke_args: IO[BlobStream],
4233
        **kwargs,
4234
    ) -> InvokeAsyncResponse:
4235
        """LEGACY API endpoint. Even AWS heavily discourages its usage."""
4236
        raise NotImplementedError
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc