• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19656300538

24 Nov 2025 06:52PM UTC coverage: 86.867% (-0.01%) from 86.879%
19656300538

push

github

web-flow
CFn: validate conditions exist in Fn::If (#13243)

3 of 4 new or added lines in 1 file covered. (75.0%)

236 existing lines in 7 files now uncovered.

68861 of 79272 relevant lines covered (86.87%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.02
/localstack-core/localstack/services/lambda_/provider.py
1
import base64
1✔
2
import dataclasses
1✔
3
import datetime
1✔
4
import itertools
1✔
5
import json
1✔
6
import logging
1✔
7
import re
1✔
8
import threading
1✔
9
import time
1✔
10
from typing import IO, Any
1✔
11

12
from botocore.exceptions import ClientError
1✔
13

14
from localstack import config
1✔
15
from localstack.aws.api import RequestContext, ServiceException, handler
1✔
16
from localstack.aws.api.lambda_ import (
1✔
17
    AccountLimit,
18
    AccountUsage,
19
    AddLayerVersionPermissionResponse,
20
    AddPermissionRequest,
21
    AddPermissionResponse,
22
    Alias,
23
    AliasConfiguration,
24
    AliasRoutingConfiguration,
25
    AllowedPublishers,
26
    Architecture,
27
    Arn,
28
    Blob,
29
    BlobStream,
30
    CodeSigningConfigArn,
31
    CodeSigningConfigNotFoundException,
32
    CodeSigningPolicies,
33
    CompatibleArchitectures,
34
    CompatibleRuntimes,
35
    Concurrency,
36
    Cors,
37
    CreateCodeSigningConfigResponse,
38
    CreateEventSourceMappingRequest,
39
    CreateFunctionRequest,
40
    CreateFunctionUrlConfigResponse,
41
    DeleteCodeSigningConfigResponse,
42
    Description,
43
    DestinationConfig,
44
    EventSourceMappingConfiguration,
45
    FunctionCodeLocation,
46
    FunctionConfiguration,
47
    FunctionEventInvokeConfig,
48
    FunctionName,
49
    FunctionUrlAuthType,
50
    FunctionUrlQualifier,
51
    GetAccountSettingsResponse,
52
    GetCodeSigningConfigResponse,
53
    GetFunctionCodeSigningConfigResponse,
54
    GetFunctionConcurrencyResponse,
55
    GetFunctionRecursionConfigResponse,
56
    GetFunctionResponse,
57
    GetFunctionUrlConfigResponse,
58
    GetLayerVersionPolicyResponse,
59
    GetLayerVersionResponse,
60
    GetPolicyResponse,
61
    GetProvisionedConcurrencyConfigResponse,
62
    InvalidParameterValueException,
63
    InvocationResponse,
64
    InvocationType,
65
    InvokeAsyncResponse,
66
    InvokeMode,
67
    LambdaApi,
68
    LastUpdateStatus,
69
    LayerName,
70
    LayerPermissionAllowedAction,
71
    LayerPermissionAllowedPrincipal,
72
    LayersListItem,
73
    LayerVersionArn,
74
    LayerVersionContentInput,
75
    LayerVersionNumber,
76
    LicenseInfo,
77
    ListAliasesResponse,
78
    ListCodeSigningConfigsResponse,
79
    ListEventSourceMappingsResponse,
80
    ListFunctionEventInvokeConfigsResponse,
81
    ListFunctionsByCodeSigningConfigResponse,
82
    ListFunctionsResponse,
83
    ListFunctionUrlConfigsResponse,
84
    ListLayersResponse,
85
    ListLayerVersionsResponse,
86
    ListProvisionedConcurrencyConfigsResponse,
87
    ListTagsResponse,
88
    ListVersionsByFunctionResponse,
89
    LogFormat,
90
    LoggingConfig,
91
    LogType,
92
    MasterRegion,
93
    MaxFunctionEventInvokeConfigListItems,
94
    MaximumEventAgeInSeconds,
95
    MaximumRetryAttempts,
96
    MaxItems,
97
    MaxLayerListItems,
98
    MaxListItems,
99
    MaxProvisionedConcurrencyConfigListItems,
100
    NamespacedFunctionName,
101
    NamespacedStatementId,
102
    OnFailure,
103
    OnSuccess,
104
    OrganizationId,
105
    PackageType,
106
    PositiveInteger,
107
    PreconditionFailedException,
108
    ProvisionedConcurrencyConfigListItem,
109
    ProvisionedConcurrencyConfigNotFoundException,
110
    ProvisionedConcurrencyStatusEnum,
111
    PublishLayerVersionResponse,
112
    PutFunctionCodeSigningConfigResponse,
113
    PutFunctionRecursionConfigResponse,
114
    PutProvisionedConcurrencyConfigResponse,
115
    Qualifier,
116
    RecursiveLoop,
117
    ReservedConcurrentExecutions,
118
    ResourceConflictException,
119
    ResourceNotFoundException,
120
    Runtime,
121
    RuntimeVersionConfig,
122
    SnapStart,
123
    SnapStartApplyOn,
124
    SnapStartOptimizationStatus,
125
    SnapStartResponse,
126
    State,
127
    StatementId,
128
    StateReasonCode,
129
    String,
130
    TaggableResource,
131
    TagKeyList,
132
    Tags,
133
    TenantId,
134
    TracingMode,
135
    UnqualifiedFunctionName,
136
    UpdateCodeSigningConfigResponse,
137
    UpdateEventSourceMappingRequest,
138
    UpdateFunctionCodeRequest,
139
    UpdateFunctionConfigurationRequest,
140
    UpdateFunctionUrlConfigResponse,
141
    Version,
142
)
143
from localstack.aws.api.lambda_ import FunctionVersion as FunctionVersionApi
1✔
144
from localstack.aws.api.lambda_ import ServiceException as LambdaServiceException
1✔
145
from localstack.aws.api.pipes import (
1✔
146
    DynamoDBStreamStartPosition,
147
    KinesisStreamStartPosition,
148
)
149
from localstack.aws.connect import connect_to
1✔
150
from localstack.aws.spec import load_service
1✔
151
from localstack.services.edge import ROUTER
1✔
152
from localstack.services.lambda_ import api_utils
1✔
153
from localstack.services.lambda_ import hooks as lambda_hooks
1✔
154
from localstack.services.lambda_.analytics import (
1✔
155
    FunctionOperation,
156
    FunctionStatus,
157
    function_counter,
158
)
159
from localstack.services.lambda_.api_utils import (
1✔
160
    ARCHITECTURES,
161
    STATEMENT_ID_REGEX,
162
    SUBNET_ID_REGEX,
163
    function_locators_from_arn,
164
)
165
from localstack.services.lambda_.event_source_mapping.esm_config_factory import (
1✔
166
    EsmConfigFactory,
167
)
168
from localstack.services.lambda_.event_source_mapping.esm_worker import (
1✔
169
    EsmState,
170
    EsmWorker,
171
)
172
from localstack.services.lambda_.event_source_mapping.esm_worker_factory import (
1✔
173
    EsmWorkerFactory,
174
)
175
from localstack.services.lambda_.event_source_mapping.pipe_utils import get_internal_client
1✔
176
from localstack.services.lambda_.invocation import AccessDeniedException
1✔
177
from localstack.services.lambda_.invocation.execution_environment import (
1✔
178
    EnvironmentStartupTimeoutException,
179
)
180
from localstack.services.lambda_.invocation.lambda_models import (
1✔
181
    AliasRoutingConfig,
182
    CodeSigningConfig,
183
    EventInvokeConfig,
184
    Function,
185
    FunctionResourcePolicy,
186
    FunctionUrlConfig,
187
    FunctionVersion,
188
    ImageConfig,
189
    LambdaEphemeralStorage,
190
    Layer,
191
    LayerPolicy,
192
    LayerPolicyStatement,
193
    LayerVersion,
194
    ProvisionedConcurrencyConfiguration,
195
    RequestEntityTooLargeException,
196
    ResourcePolicy,
197
    UpdateStatus,
198
    ValidationException,
199
    VersionAlias,
200
    VersionFunctionConfiguration,
201
    VersionIdentifier,
202
    VersionState,
203
    VpcConfig,
204
)
205
from localstack.services.lambda_.invocation.lambda_service import (
1✔
206
    LambdaService,
207
    create_image_code,
208
    destroy_code_if_not_used,
209
    lambda_stores,
210
    store_lambda_archive,
211
    store_s3_bucket_archive,
212
)
213
from localstack.services.lambda_.invocation.models import LambdaStore
1✔
214
from localstack.services.lambda_.invocation.runtime_executor import get_runtime_executor
1✔
215
from localstack.services.lambda_.lambda_utils import HINT_LOG
1✔
216
from localstack.services.lambda_.layerfetcher.layer_fetcher import LayerFetcher
1✔
217
from localstack.services.lambda_.provider_utils import (
1✔
218
    LambdaLayerVersionIdentifier,
219
    get_function_version,
220
    get_function_version_from_arn,
221
)
222
from localstack.services.lambda_.runtimes import (
1✔
223
    ALL_RUNTIMES,
224
    DEPRECATED_RUNTIMES,
225
    DEPRECATED_RUNTIMES_UPGRADES,
226
    RUNTIMES_AGGREGATED,
227
    SNAP_START_SUPPORTED_RUNTIMES,
228
    VALID_RUNTIMES,
229
)
230
from localstack.services.lambda_.urlrouter import FunctionUrlRouter
1✔
231
from localstack.services.plugins import ServiceLifecycleHook
1✔
232
from localstack.state import StateVisitor
1✔
233
from localstack.utils.aws.arns import (
1✔
234
    ArnData,
235
    extract_resource_from_arn,
236
    extract_service_from_arn,
237
    get_partition,
238
    lambda_event_source_mapping_arn,
239
    parse_arn,
240
)
241
from localstack.utils.aws.client_types import ServicePrincipal
1✔
242
from localstack.utils.bootstrap import is_api_enabled
1✔
243
from localstack.utils.collections import PaginatedList
1✔
244
from localstack.utils.event_matcher import validate_event_pattern
1✔
245
from localstack.utils.strings import get_random_hex, short_uid, to_bytes, to_str
1✔
246
from localstack.utils.sync import poll_condition
1✔
247
from localstack.utils.urls import localstack_host
1✔
248

249
LOG = logging.getLogger(__name__)
1✔
250

251
LAMBDA_DEFAULT_TIMEOUT = 3
1✔
252
LAMBDA_DEFAULT_MEMORY_SIZE = 128
1✔
253

254
LAMBDA_TAG_LIMIT_PER_RESOURCE = 50
1✔
255
LAMBDA_LAYERS_LIMIT_PER_FUNCTION = 5
1✔
256

257
TAG_KEY_CUSTOM_URL = "_custom_id_"
1✔
258
# Requirements (from RFC3986 & co): not longer than 63, first char must be
259
# alpha, then alphanumeric or hyphen, except cannot start or end with hyphen
260
TAG_KEY_CUSTOM_URL_VALIDATOR = re.compile(r"^[A-Za-z]([A-Za-z0-9\-]{0,61}[A-Za-z0-9])?$")
1✔
261

262

263
class LambdaProvider(LambdaApi, ServiceLifecycleHook):
1✔
264
    lambda_service: LambdaService
1✔
265
    create_fn_lock: threading.RLock
1✔
266
    create_layer_lock: threading.RLock
1✔
267
    router: FunctionUrlRouter
1✔
268
    esm_workers: dict[str, EsmWorker]
1✔
269
    layer_fetcher: LayerFetcher | None
1✔
270

271
    def __init__(self) -> None:
1✔
272
        self.lambda_service = LambdaService()
1✔
273
        self.create_fn_lock = threading.RLock()
1✔
274
        self.create_layer_lock = threading.RLock()
1✔
275
        self.router = FunctionUrlRouter(ROUTER, self.lambda_service)
1✔
276
        self.esm_workers = {}
1✔
277
        self.layer_fetcher = None
1✔
278
        lambda_hooks.inject_layer_fetcher.run(self)
1✔
279

280
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
UNCOV
281
        visitor.visit(lambda_stores)
×
282

283
    def on_before_state_reset(self):
1✔
UNCOV
284
        self.lambda_service.stop()
×
285

286
    def on_after_state_reset(self):
1✔
UNCOV
287
        self.router.lambda_service = self.lambda_service = LambdaService()
×
288

289
    def on_before_state_load(self):
1✔
UNCOV
290
        self.lambda_service.stop()
×
291

292
    def on_after_state_load(self):
1✔
293
        self.lambda_service = LambdaService()
×
UNCOV
294
        self.router.lambda_service = self.lambda_service
×
295

296
        for account_id, account_bundle in lambda_stores.items():
×
297
            for region_name, state in account_bundle.items():
×
298
                for fn in state.functions.values():
×
UNCOV
299
                    for fn_version in fn.versions.values():
×
300
                        # restore the "Pending" state for every function version and start it
301
                        try:
×
UNCOV
302
                            new_state = VersionState(
×
303
                                state=State.Pending,
304
                                code=StateReasonCode.Creating,
305
                                reason="The function is being created.",
306
                            )
307
                            new_config = dataclasses.replace(fn_version.config, state=new_state)
×
308
                            new_version = dataclasses.replace(fn_version, config=new_config)
×
309
                            fn.versions[fn_version.id.qualifier] = new_version
×
UNCOV
310
                            self.lambda_service.create_function_version(fn_version).result(
×
311
                                timeout=5
312
                            )
313
                        except Exception:
×
UNCOV
314
                            LOG.warning(
×
315
                                "Failed to restore function version %s",
316
                                fn_version.id.qualified_arn(),
317
                                exc_info=LOG.isEnabledFor(logging.DEBUG),
318
                            )
319
                    # restore provisioned concurrency per function considering both versions and aliases
UNCOV
320
                    for (
×
321
                        provisioned_qualifier,
322
                        provisioned_config,
323
                    ) in fn.provisioned_concurrency_configs.items():
324
                        fn_arn = None
×
325
                        try:
×
326
                            if api_utils.qualifier_is_alias(provisioned_qualifier):
×
327
                                alias = fn.aliases.get(provisioned_qualifier)
×
328
                                resolved_version = fn.versions.get(alias.function_version)
×
329
                                fn_arn = resolved_version.id.qualified_arn()
×
330
                            elif api_utils.qualifier_is_version(provisioned_qualifier):
×
331
                                fn_version = fn.versions.get(provisioned_qualifier)
×
UNCOV
332
                                fn_arn = fn_version.id.qualified_arn()
×
333
                            else:
UNCOV
334
                                raise InvalidParameterValueException(
×
335
                                    "Invalid qualifier type:"
336
                                    " Qualifier can only be an alias or a version for provisioned concurrency."
337
                                )
338

339
                            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
UNCOV
340
                            manager.update_provisioned_concurrency_config(
×
341
                                provisioned_config.provisioned_concurrent_executions
342
                            )
343
                        except Exception:
×
UNCOV
344
                            LOG.warning(
×
345
                                "Failed to restore provisioned concurrency %s for function %s",
346
                                provisioned_config,
347
                                fn_arn,
348
                                exc_info=LOG.isEnabledFor(logging.DEBUG),
349
                            )
350

UNCOV
351
                for esm in state.event_source_mappings.values():
×
352
                    # Restores event source workers
UNCOV
353
                    function_arn = esm.get("FunctionArn")
×
354

355
                    # TODO: How do we know the event source is up?
356
                    # A basic poll to see if the mapped Lambda function is active/failed
UNCOV
357
                    if not poll_condition(
×
358
                        lambda: get_function_version_from_arn(function_arn).config.state.state
359
                        in [State.Active, State.Failed],
360
                        timeout=10,
361
                    ):
UNCOV
362
                        LOG.warning(
×
363
                            "Creating ESM for Lambda that is not in running state: %s",
364
                            function_arn,
365
                        )
366

367
                    function_version = get_function_version_from_arn(function_arn)
×
UNCOV
368
                    function_role = function_version.config.role
×
369

UNCOV
370
                    is_esm_enabled = esm.get("State", EsmState.DISABLED) not in (
×
371
                        EsmState.DISABLED,
372
                        EsmState.DISABLING,
373
                    )
UNCOV
374
                    esm_worker = EsmWorkerFactory(
×
375
                        esm, function_role, is_esm_enabled
376
                    ).get_esm_worker()
377

378
                    # Note: a worker is created in the DISABLED state if not enabled
UNCOV
379
                    esm_worker.create()
×
380
                    # TODO: assigning the esm_worker to the dict only works after .create(). Could it cause a race
381
                    #  condition if we get a shutdown here and have a worker thread spawned but not accounted for?
UNCOV
382
                    self.esm_workers[esm_worker.uuid] = esm_worker
×
383

384
    def on_after_init(self):
1✔
385
        self.router.register_routes()
1✔
386
        get_runtime_executor().validate_environment()
1✔
387

388
    def on_before_stop(self) -> None:
1✔
389
        for esm_worker in self.esm_workers.values():
1✔
390
            esm_worker.stop_for_shutdown()
1✔
391

392
        # TODO: should probably unregister routes?
393
        self.lambda_service.stop()
1✔
394

395
    @staticmethod
1✔
396
    def _get_function(function_name: str, account_id: str, region: str) -> Function:
1✔
397
        state = lambda_stores[account_id][region]
1✔
398
        function = state.functions.get(function_name)
1✔
399
        if not function:
1✔
400
            arn = api_utils.unqualified_lambda_arn(
1✔
401
                function_name=function_name,
402
                account=account_id,
403
                region=region,
404
            )
405
            raise ResourceNotFoundException(
1✔
406
                f"Function not found: {arn}",
407
                Type="User",
408
            )
409
        return function
1✔
410

411
    @staticmethod
1✔
412
    def _get_esm(uuid: str, account_id: str, region: str) -> EventSourceMappingConfiguration:
1✔
413
        state = lambda_stores[account_id][region]
1✔
414
        esm = state.event_source_mappings.get(uuid)
1✔
415
        if not esm:
1✔
416
            arn = lambda_event_source_mapping_arn(uuid, account_id, region)
1✔
417
            raise ResourceNotFoundException(
1✔
418
                f"Event source mapping not found: {arn}",
419
                Type="User",
420
            )
421
        return esm
1✔
422

423
    @staticmethod
1✔
424
    def _validate_qualifier_expression(qualifier: str) -> None:
1✔
425
        if error_messages := api_utils.validate_qualifier(qualifier):
1✔
UNCOV
426
            raise ValidationException(
×
427
                message=api_utils.construct_validation_exception_message(error_messages)
428
            )
429

430
    @staticmethod
1✔
431
    def _resolve_fn_qualifier(resolved_fn: Function, qualifier: str | None) -> tuple[str, str]:
1✔
432
        """Attempts to resolve a given qualifier and returns a qualifier that exists or
433
        raises an appropriate ResourceNotFoundException.
434

435
        :param resolved_fn: The resolved lambda function
436
        :param qualifier: The qualifier to be resolved or None
437
        :return: Tuple of (resolved qualifier, function arn either qualified or unqualified)"""
438
        function_name = resolved_fn.function_name
1✔
439
        # assuming function versions need to live in the same account and region
440
        account_id = resolved_fn.latest().id.account
1✔
441
        region = resolved_fn.latest().id.region
1✔
442
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
443
        if qualifier is not None:
1✔
444
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
445
            if api_utils.qualifier_is_alias(qualifier):
1✔
446
                if qualifier not in resolved_fn.aliases:
1✔
447
                    raise ResourceNotFoundException(f"Cannot find alias arn: {fn_arn}", Type="User")
1✔
448
            elif api_utils.qualifier_is_version(qualifier) or qualifier == "$LATEST":
1✔
449
                if qualifier not in resolved_fn.versions:
1✔
450
                    raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
451
            else:
452
                # matches qualifier pattern but invalid alias or version
453
                raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
454
        resolved_qualifier = qualifier or "$LATEST"
1✔
455
        return resolved_qualifier, fn_arn
1✔
456

457
    @staticmethod
1✔
458
    def _function_revision_id(resolved_fn: Function, resolved_qualifier: str) -> str:
1✔
459
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
460
            return resolved_fn.aliases[resolved_qualifier].revision_id
1✔
461
        # Assumes that a non-alias is a version
462
        else:
463
            return resolved_fn.versions[resolved_qualifier].config.revision_id
1✔
464

465
    def _resolve_vpc_id(self, account_id: str, region_name: str, subnet_id: str) -> str:
1✔
466
        ec2_client = connect_to(aws_access_key_id=account_id, region_name=region_name).ec2
1✔
467
        try:
1✔
468
            return ec2_client.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["VpcId"]
1✔
469
        except ec2_client.exceptions.ClientError as e:
1✔
470
            code = e.response["Error"]["Code"]
1✔
471
            message = e.response["Error"]["Message"]
1✔
472
            raise InvalidParameterValueException(
1✔
473
                f"Error occurred while DescribeSubnets. EC2 Error Code: {code}. EC2 Error Message: {message}",
474
                Type="User",
475
            )
476

477
    def _build_vpc_config(
1✔
478
        self,
479
        account_id: str,
480
        region_name: str,
481
        vpc_config: dict | None = None,
482
    ) -> VpcConfig | None:
483
        if not vpc_config or not is_api_enabled("ec2"):
1✔
484
            return None
1✔
485

486
        subnet_ids = vpc_config.get("SubnetIds", [])
1✔
487
        if subnet_ids is not None and len(subnet_ids) == 0:
1✔
488
            return VpcConfig(vpc_id="", security_group_ids=[], subnet_ids=[])
1✔
489

490
        subnet_id = subnet_ids[0]
1✔
491
        if not bool(SUBNET_ID_REGEX.match(subnet_id)):
1✔
492
            raise ValidationException(
1✔
493
                f"1 validation error detected: Value '[{subnet_id}]' at 'vpcConfig.subnetIds' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 0, Member must satisfy regular expression pattern: ^subnet-[0-9a-z]*$]"
494
            )
495

496
        return VpcConfig(
1✔
497
            vpc_id=self._resolve_vpc_id(account_id, region_name, subnet_id),
498
            security_group_ids=vpc_config.get("SecurityGroupIds", []),
499
            subnet_ids=subnet_ids,
500
        )
501

502
    def _create_version_model(
1✔
503
        self,
504
        function_name: str,
505
        region: str,
506
        account_id: str,
507
        description: str | None = None,
508
        revision_id: str | None = None,
509
        code_sha256: str | None = None,
510
    ) -> tuple[FunctionVersion, bool]:
511
        """
512
        Release a new version to the model if all restrictions are met.
513
        Restrictions:
514
          - CodeSha256, if provided, must equal the current latest version code hash
515
          - RevisionId, if provided, must equal the current latest version revision id
516
          - Some changes have been done to the latest version since last publish
517
        Will return a tuple of the version, and whether the version was published (True) or the latest available version was taken (False).
518
        This can happen if the latest version has not been changed since the last version publish, in this case the last version will be returned.
519

520
        :param function_name: Function name to be published
521
        :param region: Region of the function
522
        :param account_id: Account of the function
523
        :param description: new description of the version (will be the description of the function if missing)
524
        :param revision_id: Revision id, function will raise error if it does not match latest revision id
525
        :param code_sha256: Code sha256, function will raise error if it does not match latest code hash
526
        :return: Tuple of (published version, whether version was released or last released version returned, since nothing changed)
527
        """
528
        current_latest_version = get_function_version(
1✔
529
            function_name=function_name, qualifier="$LATEST", account_id=account_id, region=region
530
        )
531
        if revision_id and current_latest_version.config.revision_id != revision_id:
1✔
532
            raise PreconditionFailedException(
1✔
533
                "The Revision Id provided does not match the latest Revision Id. Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
534
                Type="User",
535
            )
536

537
        # check if code hashes match if they are specified
538
        current_hash = (
1✔
539
            current_latest_version.config.code.code_sha256
540
            if current_latest_version.config.package_type == PackageType.Zip
541
            else current_latest_version.config.image.code_sha256
542
        )
543
        # if the code is a zip package and hot reloaded (hot reloading is currently only supported for zip packagetypes)
544
        # we cannot enforce the codesha256 check
545
        is_hot_reloaded_zip_package = (
1✔
546
            current_latest_version.config.package_type == PackageType.Zip
547
            and current_latest_version.config.code.is_hot_reloading()
548
        )
549
        if code_sha256 and current_hash != code_sha256 and not is_hot_reloaded_zip_package:
1✔
550
            raise InvalidParameterValueException(
1✔
551
                f"CodeSHA256 ({code_sha256}) is different from current CodeSHA256 in $LATEST ({current_hash}). Please try again with the CodeSHA256 in $LATEST.",
552
                Type="User",
553
            )
554

555
        state = lambda_stores[account_id][region]
1✔
556
        function = state.functions.get(function_name)
1✔
557
        changes = {}
1✔
558
        if description is not None:
1✔
559
            changes["description"] = description
1✔
560
        # TODO copy environment instead of restarting one, get rid of all the "Pending"s
561

562
        with function.lock:
1✔
563
            if function.next_version > 1 and (
1✔
564
                prev_version := function.versions.get(str(function.next_version - 1))
565
            ):
566
                if (
1✔
567
                    prev_version.config.internal_revision
568
                    == current_latest_version.config.internal_revision
569
                ):
570
                    return prev_version, False
1✔
571
            # TODO check if there was a change since last version
572
            next_version = str(function.next_version)
1✔
573
            function.next_version += 1
1✔
574
            new_id = VersionIdentifier(
1✔
575
                function_name=function_name,
576
                qualifier=next_version,
577
                region=region,
578
                account=account_id,
579
            )
580
            apply_on = current_latest_version.config.snap_start["ApplyOn"]
1✔
581
            optimization_status = SnapStartOptimizationStatus.Off
1✔
582
            if apply_on == SnapStartApplyOn.PublishedVersions:
1✔
UNCOV
583
                optimization_status = SnapStartOptimizationStatus.On
×
584
            snap_start = SnapStartResponse(
1✔
585
                ApplyOn=apply_on,
586
                OptimizationStatus=optimization_status,
587
            )
588
            new_version = dataclasses.replace(
1✔
589
                current_latest_version,
590
                config=dataclasses.replace(
591
                    current_latest_version.config,
592
                    last_update=None,  # versions never have a last update status
593
                    state=VersionState(
594
                        state=State.Pending,
595
                        code=StateReasonCode.Creating,
596
                        reason="The function is being created.",
597
                    ),
598
                    snap_start=snap_start,
599
                    **changes,
600
                ),
601
                id=new_id,
602
            )
603
            function.versions[next_version] = new_version
1✔
604
        return new_version, True
1✔
605

606
    def _publish_version_from_existing_version(
1✔
607
        self,
608
        function_name: str,
609
        region: str,
610
        account_id: str,
611
        description: str | None = None,
612
        revision_id: str | None = None,
613
        code_sha256: str | None = None,
614
    ) -> FunctionVersion:
615
        """
616
        Publish version from an existing, already initialized LATEST
617

618
        :param function_name: Function name
619
        :param region: region
620
        :param account_id: account id
621
        :param description: description
622
        :param revision_id: revision id (check if current version matches)
623
        :param code_sha256: code sha (check if current code matches)
624
        :return: new version
625
        """
626
        new_version, changed = self._create_version_model(
1✔
627
            function_name=function_name,
628
            region=region,
629
            account_id=account_id,
630
            description=description,
631
            revision_id=revision_id,
632
            code_sha256=code_sha256,
633
        )
634
        if not changed:
1✔
635
            return new_version
1✔
636
        self.lambda_service.publish_version(new_version)
1✔
637
        state = lambda_stores[account_id][region]
1✔
638
        function = state.functions.get(function_name)
1✔
639
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
640
        latest_version = function.versions["$LATEST"]
1✔
641
        function.versions["$LATEST"] = dataclasses.replace(
1✔
642
            latest_version, config=dataclasses.replace(latest_version.config)
643
        )
644
        return function.versions.get(new_version.id.qualifier)
1✔
645

646
    def _publish_version_with_changes(
1✔
647
        self,
648
        function_name: str,
649
        region: str,
650
        account_id: str,
651
        description: str | None = None,
652
        revision_id: str | None = None,
653
        code_sha256: str | None = None,
654
    ) -> FunctionVersion:
655
        """
656
        Publish version together with a new latest version (publish on create / update)
657

658
        :param function_name: Function name
659
        :param region: region
660
        :param account_id: account id
661
        :param description: description
662
        :param revision_id: revision id (check if current version matches)
663
        :param code_sha256: code sha (check if current code matches)
664
        :return: new version
665
        """
666
        new_version, changed = self._create_version_model(
1✔
667
            function_name=function_name,
668
            region=region,
669
            account_id=account_id,
670
            description=description,
671
            revision_id=revision_id,
672
            code_sha256=code_sha256,
673
        )
674
        if not changed:
1✔
UNCOV
675
            return new_version
×
676
        self.lambda_service.create_function_version(new_version)
1✔
677
        return new_version
1✔
678

679
    @staticmethod
1✔
680
    def _verify_env_variables(env_vars: dict[str, str]):
1✔
681
        dumped_env_vars = json.dumps(env_vars, separators=(",", ":"))
1✔
682
        if (
1✔
683
            len(dumped_env_vars.encode("utf-8"))
684
            > config.LAMBDA_LIMITS_MAX_FUNCTION_ENVVAR_SIZE_BYTES
685
        ):
686
            raise InvalidParameterValueException(
1✔
687
                f"Lambda was unable to configure your environment variables because the environment variables you have provided exceeded the 4KB limit. String measured: {dumped_env_vars}",
688
                Type="User",
689
            )
690

691
    @staticmethod
1✔
692
    def _validate_snapstart(snap_start: SnapStart, runtime: Runtime):
1✔
693
        apply_on = snap_start.get("ApplyOn")
1✔
694
        if apply_on not in [
1✔
695
            SnapStartApplyOn.PublishedVersions,
696
            SnapStartApplyOn.None_,
697
        ]:
698
            raise ValidationException(
1✔
699
                f"1 validation error detected: Value '{apply_on}' at 'snapStart.applyOn' failed to satisfy constraint: Member must satisfy enum value set: [PublishedVersions, None]"
700
            )
701

702
        if runtime not in SNAP_START_SUPPORTED_RUNTIMES:
1✔
UNCOV
703
            raise InvalidParameterValueException(
×
704
                f"{runtime} is not supported for SnapStart enabled functions.", Type="User"
705
            )
706

707
    def _validate_layers(self, new_layers: list[str], region: str, account_id: str):
1✔
708
        if len(new_layers) > LAMBDA_LAYERS_LIMIT_PER_FUNCTION:
1✔
709
            raise InvalidParameterValueException(
1✔
710
                "Cannot reference more than 5 layers.", Type="User"
711
            )
712

713
        visited_layers = {}
1✔
714
        for layer_version_arn in new_layers:
1✔
715
            (
1✔
716
                layer_region,
717
                layer_account_id,
718
                layer_name,
719
                layer_version_str,
720
            ) = api_utils.parse_layer_arn(layer_version_arn)
721
            if layer_version_str is None:
1✔
722
                raise ValidationException(
1✔
723
                    f"1 validation error detected: Value '[{layer_version_arn}]'"
724
                    + r" at 'layers' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 140, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: (arn:[a-zA-Z0-9-]+:lambda:[a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\d{1}:\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+), Member must not be null]",
725
                )
726

727
            state = lambda_stores[layer_account_id][layer_region]
1✔
728
            layer = state.layers.get(layer_name)
1✔
729
            layer_version = None
1✔
730
            if layer is not None:
1✔
731
                layer_version = layer.layer_versions.get(layer_version_str)
1✔
732
            if layer_account_id == account_id:
1✔
733
                if region and layer_region != region:
1✔
734
                    raise InvalidParameterValueException(
1✔
735
                        f"Layers are not in the same region as the function. "
736
                        f"Layers are expected to be in region {region}.",
737
                        Type="User",
738
                    )
739
                if layer is None or layer.layer_versions.get(layer_version_str) is None:
1✔
740
                    raise InvalidParameterValueException(
1✔
741
                        f"Layer version {layer_version_arn} does not exist.", Type="User"
742
                    )
743
            else:  # External layer from other account
744
                # TODO: validate IAM layer policy here, allowing access by default for now and only checking region
UNCOV
745
                if region and layer_region != region:
×
746
                    # TODO: detect user or role from context when IAM users are implemented
747
                    user = "user/localstack-testing"
×
UNCOV
748
                    raise AccessDeniedException(
×
749
                        f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
750
                    )
UNCOV
751
                if layer is None or layer_version is None:
×
752
                    # Limitation: cannot fetch external layers when using the same account id as the target layer
753
                    # because we do not want to trigger the layer fetcher for every non-existing layer.
UNCOV
754
                    if self.layer_fetcher is None:
×
755
                        raise NotImplementedError(
756
                            "Fetching shared layers from AWS is a pro feature."
757
                        )
758

759
                    layer = self.layer_fetcher.fetch_layer(layer_version_arn)
×
UNCOV
760
                    if layer is None:
×
761
                        # TODO: detect user or role from context when IAM users are implemented
762
                        user = "user/localstack-testing"
×
UNCOV
763
                        raise AccessDeniedException(
×
764
                            f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
765
                        )
766

767
                    # Distinguish between new layer and new layer version
UNCOV
768
                    if layer_version is None:
×
769
                        # Create whole layer from scratch
UNCOV
770
                        state.layers[layer_name] = layer
×
771
                    else:
772
                        # Create layer version if another version of the same layer already exists
UNCOV
773
                        state.layers[layer_name].layer_versions[layer_version_str] = (
×
774
                            layer.layer_versions.get(layer_version_str)
775
                        )
776

777
            # only the first two matches in the array are considered for the error message
778
            layer_arn = ":".join(layer_version_arn.split(":")[:-1])
1✔
779
            if layer_arn in visited_layers:
1✔
780
                conflict_layer_version_arn = visited_layers[layer_arn]
1✔
781
                raise InvalidParameterValueException(
1✔
782
                    f"Two different versions of the same layer are not allowed to be referenced in the same function. {conflict_layer_version_arn} and {layer_version_arn} are versions of the same layer.",
783
                    Type="User",
784
                )
785
            visited_layers[layer_arn] = layer_version_arn
1✔
786

787
    @staticmethod
1✔
788
    def map_layers(new_layers: list[str]) -> list[LayerVersion]:
1✔
789
        layers = []
1✔
790
        for layer_version_arn in new_layers:
1✔
791
            region_name, account_id, layer_name, layer_version = api_utils.parse_layer_arn(
1✔
792
                layer_version_arn
793
            )
794
            layer = lambda_stores[account_id][region_name].layers.get(layer_name)
1✔
795
            layer_version = layer.layer_versions.get(layer_version)
1✔
796
            layers.append(layer_version)
1✔
797
        return layers
1✔
798

799
    def get_function_recursion_config(
1✔
800
        self,
801
        context: RequestContext,
802
        function_name: UnqualifiedFunctionName,
803
        **kwargs,
804
    ) -> GetFunctionRecursionConfigResponse:
805
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
806
        function_name = api_utils.get_function_name(function_name, context)
1✔
807
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
808
        return GetFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
809

810
    def put_function_recursion_config(
1✔
811
        self,
812
        context: RequestContext,
813
        function_name: UnqualifiedFunctionName,
814
        recursive_loop: RecursiveLoop,
815
        **kwargs,
816
    ) -> PutFunctionRecursionConfigResponse:
817
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
818
        function_name = api_utils.get_function_name(function_name, context)
1✔
819

820
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
821

822
        allowed_values = list(RecursiveLoop.__members__.values())
1✔
823
        if recursive_loop not in allowed_values:
1✔
824
            raise ValidationException(
1✔
825
                f"1 validation error detected: Value '{recursive_loop}' at 'recursiveLoop' failed to satisfy constraint: "
826
                f"Member must satisfy enum value set: [Terminate, Allow]"
827
            )
828

829
        fn.recursive_loop = recursive_loop
1✔
830
        return PutFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
831

832
    @handler(operation="CreateFunction", expand=False)
1✔
833
    def create_function(
1✔
834
        self,
835
        context: RequestContext,
836
        request: CreateFunctionRequest,
837
    ) -> FunctionConfiguration:
838
        context_region = context.region
1✔
839
        context_account_id = context.account_id
1✔
840

841
        zip_file = request.get("Code", {}).get("ZipFile")
1✔
842
        if zip_file and len(zip_file) > config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED:
1✔
843
            raise RequestEntityTooLargeException(
1✔
844
                f"Zipped size must be smaller than {config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED} bytes"
845
            )
846

847
        if context.request.content_length > config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE:
1✔
848
            raise RequestEntityTooLargeException(
1✔
849
                f"Request must be smaller than {config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE} bytes for the CreateFunction operation"
850
            )
851

852
        if architectures := request.get("Architectures"):
1✔
853
            if len(architectures) != 1:
1✔
854
                raise ValidationException(
1✔
855
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
856
                    f"satisfy constraint: Member must have length less than or equal to 1",
857
                )
858
            if architectures[0] not in ARCHITECTURES:
1✔
859
                raise ValidationException(
1✔
860
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
861
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
862
                    f"[x86_64, arm64], Member must not be null]",
863
                )
864

865
        if env_vars := request.get("Environment", {}).get("Variables"):
1✔
866
            self._verify_env_variables(env_vars)
1✔
867

868
        if layers := request.get("Layers", []):
1✔
869
            self._validate_layers(layers, region=context_region, account_id=context_account_id)
1✔
870

871
        if not api_utils.is_role_arn(request.get("Role")):
1✔
872
            raise ValidationException(
1✔
873
                f"1 validation error detected: Value '{request.get('Role')}'"
874
                + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
875
            )
876
        if not self.lambda_service.can_assume_role(request.get("Role"), context.region):
1✔
UNCOV
877
            raise InvalidParameterValueException(
×
878
                "The role defined for the function cannot be assumed by Lambda.", Type="User"
879
            )
880
        package_type = request.get("PackageType", PackageType.Zip)
1✔
881
        runtime = request.get("Runtime")
1✔
882
        self._validate_runtime(package_type, runtime)
1✔
883

884
        request_function_name = request.get("FunctionName")
1✔
885

886
        function_name, *_ = api_utils.get_name_and_qualifier(
1✔
887
            function_arn_or_name=request_function_name,
888
            qualifier=None,
889
            context=context,
890
        )
891

892
        if runtime in DEPRECATED_RUNTIMES:
1✔
893
            LOG.warning(
1✔
894
                "The Lambda runtime %s} is deprecated. "
895
                "Please upgrade the runtime for the function %s: "
896
                "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
897
                runtime,
898
                function_name,
899
            )
900
        if snap_start := request.get("SnapStart"):
1✔
901
            self._validate_snapstart(snap_start, runtime)
1✔
902
        state = lambda_stores[context_account_id][context_region]
1✔
903

904
        with self.create_fn_lock:
1✔
905
            if function_name in state.functions:
1✔
UNCOV
906
                raise ResourceConflictException(f"Function already exist: {function_name}")
×
907
            fn = Function(function_name=function_name)
1✔
908
            arn = VersionIdentifier(
1✔
909
                function_name=function_name,
910
                qualifier="$LATEST",
911
                region=context_region,
912
                account=context_account_id,
913
            )
914
            # save function code to s3
915
            code = None
1✔
916
            image = None
1✔
917
            image_config = None
1✔
918
            runtime_version_config = RuntimeVersionConfig(
1✔
919
                # Limitation: the runtime id (presumably sha256 of image) is currently hardcoded
920
                # Potential implementation: provide (cached) sha256 hash of used Docker image
921
                RuntimeVersionArn=f"arn:{context.partition}:lambda:{context_region}::runtime:8eeff65f6809a3ce81507fe733fe09b835899b99481ba22fd75b5a7338290ec1"
922
            )
923
            request_code = request.get("Code")
1✔
924
            if package_type == PackageType.Zip:
1✔
925
                # TODO verify if correct combination of code is set
926
                if zip_file := request_code.get("ZipFile"):
1✔
927
                    code = store_lambda_archive(
1✔
928
                        archive_file=zip_file,
929
                        function_name=function_name,
930
                        region_name=context_region,
931
                        account_id=context_account_id,
932
                    )
933
                elif s3_bucket := request_code.get("S3Bucket"):
1✔
934
                    s3_key = request_code["S3Key"]
1✔
935
                    s3_object_version = request_code.get("S3ObjectVersion")
1✔
936
                    code = store_s3_bucket_archive(
1✔
937
                        archive_bucket=s3_bucket,
938
                        archive_key=s3_key,
939
                        archive_version=s3_object_version,
940
                        function_name=function_name,
941
                        region_name=context_region,
942
                        account_id=context_account_id,
943
                    )
944
                else:
UNCOV
945
                    raise LambdaServiceException("Gotta have s3 bucket or zip file")
×
946
            elif package_type == PackageType.Image:
1✔
947
                image = request_code.get("ImageUri")
1✔
948
                if not image:
1✔
UNCOV
949
                    raise LambdaServiceException("Gotta have an image when package type is image")
×
950
                image = create_image_code(image_uri=image)
1✔
951

952
                image_config_req = request.get("ImageConfig", {})
1✔
953
                image_config = ImageConfig(
1✔
954
                    command=image_config_req.get("Command"),
955
                    entrypoint=image_config_req.get("EntryPoint"),
956
                    working_directory=image_config_req.get("WorkingDirectory"),
957
                )
958
                # Runtime management controls are not available when providing a custom image
959
                runtime_version_config = None
1✔
960
            if "LoggingConfig" in request:
1✔
961
                logging_config = request["LoggingConfig"]
1✔
962
                LOG.warning(
1✔
963
                    "Advanced Lambda Logging Configuration is currently mocked "
964
                    "and will not impact the logging behavior. "
965
                    "Please create a feature request if needed."
966
                )
967

968
                # when switching to JSON, app and system level log is auto set to INFO
969
                if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
970
                    logging_config = {
1✔
971
                        "ApplicationLogLevel": "INFO",
972
                        "SystemLogLevel": "INFO",
973
                        "LogGroup": f"/aws/lambda/{function_name}",
974
                    } | logging_config
975
                else:
UNCOV
976
                    logging_config = (
×
977
                        LoggingConfig(
978
                            LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
979
                        )
980
                        | logging_config
981
                    )
982

983
            else:
984
                logging_config = LoggingConfig(
1✔
985
                    LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
986
                )
987

988
            version = FunctionVersion(
1✔
989
                id=arn,
990
                config=VersionFunctionConfiguration(
991
                    last_modified=api_utils.format_lambda_date(datetime.datetime.now()),
992
                    description=request.get("Description", ""),
993
                    role=request["Role"],
994
                    timeout=request.get("Timeout", LAMBDA_DEFAULT_TIMEOUT),
995
                    runtime=request.get("Runtime"),
996
                    memory_size=request.get("MemorySize", LAMBDA_DEFAULT_MEMORY_SIZE),
997
                    handler=request.get("Handler"),
998
                    package_type=package_type,
999
                    environment=env_vars,
1000
                    architectures=request.get("Architectures") or [Architecture.x86_64],
1001
                    tracing_config_mode=request.get("TracingConfig", {}).get(
1002
                        "Mode", TracingMode.PassThrough
1003
                    ),
1004
                    image=image,
1005
                    image_config=image_config,
1006
                    code=code,
1007
                    layers=self.map_layers(layers),
1008
                    internal_revision=short_uid(),
1009
                    ephemeral_storage=LambdaEphemeralStorage(
1010
                        size=request.get("EphemeralStorage", {}).get("Size", 512)
1011
                    ),
1012
                    snap_start=SnapStartResponse(
1013
                        ApplyOn=request.get("SnapStart", {}).get("ApplyOn", SnapStartApplyOn.None_),
1014
                        OptimizationStatus=SnapStartOptimizationStatus.Off,
1015
                    ),
1016
                    runtime_version_config=runtime_version_config,
1017
                    dead_letter_arn=request.get("DeadLetterConfig", {}).get("TargetArn"),
1018
                    vpc_config=self._build_vpc_config(
1019
                        context_account_id, context_region, request.get("VpcConfig")
1020
                    ),
1021
                    state=VersionState(
1022
                        state=State.Pending,
1023
                        code=StateReasonCode.Creating,
1024
                        reason="The function is being created.",
1025
                    ),
1026
                    logging_config=logging_config,
1027
                ),
1028
            )
1029
            fn.versions["$LATEST"] = version
1✔
1030
            state.functions[function_name] = fn
1✔
1031
        function_counter.labels(
1✔
1032
            operation=FunctionOperation.create,
1033
            runtime=runtime or "n/a",
1034
            status=FunctionStatus.success,
1035
            invocation_type="n/a",
1036
            package_type=package_type,
1037
        )
1038
        self.lambda_service.create_function_version(version)
1✔
1039

1040
        if tags := request.get("Tags"):
1✔
1041
            # This will check whether the function exists.
1042
            self._store_tags(arn.unqualified_arn(), tags)
1✔
1043

1044
        if request.get("Publish"):
1✔
1045
            version = self._publish_version_with_changes(
1✔
1046
                function_name=function_name, region=context_region, account_id=context_account_id
1047
            )
1048

1049
        if config.LAMBDA_SYNCHRONOUS_CREATE:
1✔
1050
            # block via retrying until "terminal" condition reached before returning
UNCOV
1051
            if not poll_condition(
×
1052
                lambda: get_function_version(
1053
                    function_name, version.id.qualifier, version.id.account, version.id.region
1054
                ).config.state.state
1055
                in [State.Active, State.Failed],
1056
                timeout=10,
1057
            ):
UNCOV
1058
                LOG.warning(
×
1059
                    "LAMBDA_SYNCHRONOUS_CREATE is active, but waiting for %s reached timeout.",
1060
                    function_name,
1061
                )
1062

1063
        return api_utils.map_config_out(
1✔
1064
            version, return_qualified_arn=False, return_update_status=False
1065
        )
1066

1067
    def _validate_runtime(self, package_type, runtime):
1✔
1068
        runtimes = ALL_RUNTIMES
1✔
1069
        if config.LAMBDA_RUNTIME_VALIDATION:
1✔
1070
            runtimes = list(itertools.chain(RUNTIMES_AGGREGATED.values()))
1✔
1071

1072
        if package_type == PackageType.Zip and runtime not in runtimes:
1✔
1073
            # deprecated runtimes have different error
1074
            if runtime in DEPRECATED_RUNTIMES:
1✔
1075
                HINT_LOG.info(
1✔
1076
                    "Set env variable LAMBDA_RUNTIME_VALIDATION to 0"
1077
                    " in order to allow usage of deprecated runtimes"
1078
                )
1079
                self._check_for_recomended_migration_target(runtime)
1✔
1080

1081
            raise InvalidParameterValueException(
1✔
1082
                f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1083
                Type="User",
1084
            )
1085

1086
    def _check_for_recomended_migration_target(self, deprecated_runtime):
1✔
1087
        # AWS offers recommended runtime for migration for "newly" deprecated runtimes
1088
        # in order to preserve parity with error messages we need the code bellow
1089
        latest_runtime = DEPRECATED_RUNTIMES_UPGRADES.get(deprecated_runtime)
1✔
1090

1091
        if latest_runtime is not None:
1✔
1092
            LOG.debug(
1✔
1093
                "The Lambda runtime %s is deprecated. Please upgrade to a supported Lambda runtime such as %s.",
1094
                deprecated_runtime,
1095
                latest_runtime,
1096
            )
1097
            raise InvalidParameterValueException(
1✔
1098
                f"The runtime parameter of {deprecated_runtime} is no longer supported for creating or updating AWS Lambda functions. We recommend you use a supported runtime while creating or updating functions.",
1099
                Type="User",
1100
            )
1101

1102
    @handler(operation="UpdateFunctionConfiguration", expand=False)
1✔
1103
    def update_function_configuration(
1✔
1104
        self, context: RequestContext, request: UpdateFunctionConfigurationRequest
1105
    ) -> FunctionConfiguration:
1106
        """updates the $LATEST version of the function"""
1107
        function_name = request.get("FunctionName")
1✔
1108

1109
        # in case we got ARN or partial ARN
1110
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1111
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1112
        state = lambda_stores[account_id][region]
1✔
1113

1114
        if function_name not in state.functions:
1✔
UNCOV
1115
            raise ResourceNotFoundException(
×
1116
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1117
                Type="User",
1118
            )
1119
        function = state.functions[function_name]
1✔
1120

1121
        # TODO: lock modification of latest version
1122
        # TODO: notify service for changes relevant to re-provisioning of $LATEST
1123
        latest_version = function.latest()
1✔
1124
        latest_version_config = latest_version.config
1✔
1125

1126
        revision_id = request.get("RevisionId")
1✔
1127
        if revision_id and revision_id != latest_version.config.revision_id:
1✔
1128
            raise PreconditionFailedException(
1✔
1129
                "The Revision Id provided does not match the latest Revision Id. "
1130
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1131
                Type="User",
1132
            )
1133

1134
        replace_kwargs = {}
1✔
1135
        if "EphemeralStorage" in request:
1✔
UNCOV
1136
            replace_kwargs["ephemeral_storage"] = LambdaEphemeralStorage(
×
1137
                request.get("EphemeralStorage", {}).get("Size", 512)
1138
            )  # TODO: do defaults here apply as well?
1139

1140
        if "Role" in request:
1✔
1141
            if not api_utils.is_role_arn(request["Role"]):
1✔
1142
                raise ValidationException(
1✔
1143
                    f"1 validation error detected: Value '{request.get('Role')}'"
1144
                    + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
1145
                )
1146
            replace_kwargs["role"] = request["Role"]
1✔
1147

1148
        if "Description" in request:
1✔
1149
            replace_kwargs["description"] = request["Description"]
1✔
1150

1151
        if "Timeout" in request:
1✔
1152
            replace_kwargs["timeout"] = request["Timeout"]
1✔
1153

1154
        if "MemorySize" in request:
1✔
1155
            replace_kwargs["memory_size"] = request["MemorySize"]
1✔
1156

1157
        if "DeadLetterConfig" in request:
1✔
1158
            replace_kwargs["dead_letter_arn"] = request.get("DeadLetterConfig", {}).get("TargetArn")
1✔
1159

1160
        if vpc_config := request.get("VpcConfig"):
1✔
1161
            replace_kwargs["vpc_config"] = self._build_vpc_config(account_id, region, vpc_config)
1✔
1162

1163
        if "Handler" in request:
1✔
1164
            replace_kwargs["handler"] = request["Handler"]
1✔
1165

1166
        if "Runtime" in request:
1✔
1167
            runtime = request["Runtime"]
1✔
1168

1169
            if runtime not in ALL_RUNTIMES:
1✔
1170
                raise InvalidParameterValueException(
1✔
1171
                    f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1172
                    Type="User",
1173
                )
1174
            if runtime in DEPRECATED_RUNTIMES:
1✔
UNCOV
1175
                LOG.warning(
×
1176
                    "The Lambda runtime %s is deprecated. "
1177
                    "Please upgrade the runtime for the function %s: "
1178
                    "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
1179
                    runtime,
1180
                    function_name,
1181
                )
1182
            replace_kwargs["runtime"] = request["Runtime"]
1✔
1183

1184
        if snap_start := request.get("SnapStart"):
1✔
1185
            runtime = replace_kwargs.get("runtime") or latest_version_config.runtime
1✔
1186
            self._validate_snapstart(snap_start, runtime)
1✔
1187
            replace_kwargs["snap_start"] = SnapStartResponse(
1✔
1188
                ApplyOn=snap_start.get("ApplyOn", SnapStartApplyOn.None_),
1189
                OptimizationStatus=SnapStartOptimizationStatus.Off,
1190
            )
1191

1192
        if "Environment" in request:
1✔
1193
            if env_vars := request.get("Environment", {}).get("Variables", {}):
1✔
1194
                self._verify_env_variables(env_vars)
1✔
1195
            replace_kwargs["environment"] = env_vars
1✔
1196

1197
        if "Layers" in request:
1✔
1198
            new_layers = request["Layers"]
1✔
1199
            if new_layers:
1✔
1200
                self._validate_layers(new_layers, region=region, account_id=account_id)
1✔
1201
            replace_kwargs["layers"] = self.map_layers(new_layers)
1✔
1202

1203
        if "ImageConfig" in request:
1✔
1204
            new_image_config = request["ImageConfig"]
1✔
1205
            replace_kwargs["image_config"] = ImageConfig(
1✔
1206
                command=new_image_config.get("Command"),
1207
                entrypoint=new_image_config.get("EntryPoint"),
1208
                working_directory=new_image_config.get("WorkingDirectory"),
1209
            )
1210

1211
        if "LoggingConfig" in request:
1✔
1212
            logging_config = request["LoggingConfig"]
1✔
1213
            LOG.warning(
1✔
1214
                "Advanced Lambda Logging Configuration is currently mocked "
1215
                "and will not impact the logging behavior. "
1216
                "Please create a feature request if needed."
1217
            )
1218

1219
            # when switching to JSON, app and system level log is auto set to INFO
1220
            if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
1221
                logging_config = {
1✔
1222
                    "ApplicationLogLevel": "INFO",
1223
                    "SystemLogLevel": "INFO",
1224
                } | logging_config
1225

1226
            last_config = latest_version_config.logging_config
1✔
1227

1228
            # add partial update
1229
            new_logging_config = last_config | logging_config
1✔
1230

1231
            # in case we switched from JSON to Text we need to remove LogLevel keys
1232
            if (
1✔
1233
                new_logging_config.get("LogFormat") == LogFormat.Text
1234
                and last_config.get("LogFormat") == LogFormat.JSON
1235
            ):
1236
                new_logging_config.pop("ApplicationLogLevel", None)
1✔
1237
                new_logging_config.pop("SystemLogLevel", None)
1✔
1238

1239
            replace_kwargs["logging_config"] = new_logging_config
1✔
1240

1241
        if "TracingConfig" in request:
1✔
1242
            new_mode = request.get("TracingConfig", {}).get("Mode")
×
1243
            if new_mode:
×
UNCOV
1244
                replace_kwargs["tracing_config_mode"] = new_mode
×
1245

1246
        new_latest_version = dataclasses.replace(
1✔
1247
            latest_version,
1248
            config=dataclasses.replace(
1249
                latest_version_config,
1250
                last_modified=api_utils.generate_lambda_date(),
1251
                internal_revision=short_uid(),
1252
                last_update=UpdateStatus(
1253
                    status=LastUpdateStatus.InProgress,
1254
                    code="Creating",
1255
                    reason="The function is being created.",
1256
                ),
1257
                **replace_kwargs,
1258
            ),
1259
        )
1260
        function.versions["$LATEST"] = new_latest_version  # TODO: notify
1✔
1261
        self.lambda_service.update_version(new_version=new_latest_version)
1✔
1262

1263
        return api_utils.map_config_out(new_latest_version)
1✔
1264

1265
    @handler(operation="UpdateFunctionCode", expand=False)
1✔
1266
    def update_function_code(
1✔
1267
        self, context: RequestContext, request: UpdateFunctionCodeRequest
1268
    ) -> FunctionConfiguration:
1269
        """updates the $LATEST version of the function"""
1270
        # only supports normal zip packaging atm
1271
        # if request.get("Publish"):
1272
        #     self.lambda_service.create_function_version()
1273

1274
        function_name = request.get("FunctionName")
1✔
1275
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1276
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1277

1278
        store = lambda_stores[account_id][region]
1✔
1279
        if function_name not in store.functions:
1✔
UNCOV
1280
            raise ResourceNotFoundException(
×
1281
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1282
                Type="User",
1283
            )
1284
        function = store.functions[function_name]
1✔
1285

1286
        revision_id = request.get("RevisionId")
1✔
1287
        if revision_id and revision_id != function.latest().config.revision_id:
1✔
1288
            raise PreconditionFailedException(
1✔
1289
                "The Revision Id provided does not match the latest Revision Id. "
1290
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1291
                Type="User",
1292
            )
1293

1294
        # TODO verify if correct combination of code is set
1295
        image = None
1✔
1296
        if (
1✔
1297
            request.get("ZipFile") or request.get("S3Bucket")
1298
        ) and function.latest().config.package_type == PackageType.Image:
1299
            raise InvalidParameterValueException(
1✔
1300
                "Please provide ImageUri when updating a function with packageType Image.",
1301
                Type="User",
1302
            )
1303
        elif request.get("ImageUri") and function.latest().config.package_type == PackageType.Zip:
1✔
1304
            raise InvalidParameterValueException(
1✔
1305
                "Please don't provide ImageUri when updating a function with packageType Zip.",
1306
                Type="User",
1307
            )
1308

1309
        if zip_file := request.get("ZipFile"):
1✔
1310
            code = store_lambda_archive(
1✔
1311
                archive_file=zip_file,
1312
                function_name=function_name,
1313
                region_name=region,
1314
                account_id=account_id,
1315
            )
1316
        elif s3_bucket := request.get("S3Bucket"):
1✔
1317
            s3_key = request["S3Key"]
1✔
1318
            s3_object_version = request.get("S3ObjectVersion")
1✔
1319
            code = store_s3_bucket_archive(
1✔
1320
                archive_bucket=s3_bucket,
1321
                archive_key=s3_key,
1322
                archive_version=s3_object_version,
1323
                function_name=function_name,
1324
                region_name=region,
1325
                account_id=account_id,
1326
            )
1327
        elif image := request.get("ImageUri"):
1✔
1328
            code = None
1✔
1329
            image = create_image_code(image_uri=image)
1✔
1330
        else:
UNCOV
1331
            raise LambdaServiceException("Gotta have s3 bucket or zip file or image")
×
1332

1333
        old_function_version = function.versions.get("$LATEST")
1✔
1334
        replace_kwargs = {"code": code} if code else {"image": image}
1✔
1335

1336
        if architectures := request.get("Architectures"):
1✔
1337
            if len(architectures) != 1:
×
UNCOV
1338
                raise ValidationException(
×
1339
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1340
                    f"satisfy constraint: Member must have length less than or equal to 1",
1341
                )
1342
            # An empty list of architectures is also forbidden. Further exceptions are tested here for create_function:
1343
            # tests.aws.services.lambda_.test_lambda_api.TestLambdaFunction.test_create_lambda_exceptions
1344
            if architectures[0] not in ARCHITECTURES:
×
UNCOV
1345
                raise ValidationException(
×
1346
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1347
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
1348
                    f"[x86_64, arm64], Member must not be null]",
1349
                )
UNCOV
1350
            replace_kwargs["architectures"] = architectures
×
1351

1352
        config = dataclasses.replace(
1✔
1353
            old_function_version.config,
1354
            internal_revision=short_uid(),
1355
            last_modified=api_utils.generate_lambda_date(),
1356
            last_update=UpdateStatus(
1357
                status=LastUpdateStatus.InProgress,
1358
                code="Creating",
1359
                reason="The function is being created.",
1360
            ),
1361
            **replace_kwargs,
1362
        )
1363
        function_version = dataclasses.replace(old_function_version, config=config)
1✔
1364
        function.versions["$LATEST"] = function_version
1✔
1365

1366
        self.lambda_service.update_version(new_version=function_version)
1✔
1367
        if request.get("Publish"):
1✔
1368
            function_version = self._publish_version_with_changes(
1✔
1369
                function_name=function_name, region=region, account_id=account_id
1370
            )
1371
        return api_utils.map_config_out(
1✔
1372
            function_version, return_qualified_arn=bool(request.get("Publish"))
1373
        )
1374

1375
    # TODO: does deleting the latest published version affect the next versions number?
1376
    # TODO: what happens when we call this with a qualifier and a fully qualified ARN? (+ conflicts?)
1377
    # TODO: test different ARN patterns (shorthand ARN?)
1378
    # TODO: test deleting across regions?
1379
    # TODO: test mismatch between context region and region in ARN
1380
    # TODO: test qualifier $LATEST, alias-name and version
1381
    def delete_function(
1✔
1382
        self,
1383
        context: RequestContext,
1384
        function_name: FunctionName,
1385
        qualifier: Qualifier = None,
1386
        **kwargs,
1387
    ) -> None:
1388
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1389
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1390
            function_name, qualifier, context
1391
        )
1392

1393
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
UNCOV
1394
            raise InvalidParameterValueException(
×
1395
                "Deletion of aliases is not currently supported.",
1396
                Type="User",
1397
            )
1398

1399
        store = lambda_stores[account_id][region]
1✔
1400
        if qualifier == "$LATEST":
1✔
1401
            raise InvalidParameterValueException(
1✔
1402
                "$LATEST version cannot be deleted without deleting the function.", Type="User"
1403
            )
1404

1405
        if function_name not in store.functions:
1✔
1406
            e = ResourceNotFoundException(
1✔
1407
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1408
                Type="User",
1409
            )
1410
            raise e
1✔
1411
        function = store.functions.get(function_name)
1✔
1412

1413
        if qualifier:
1✔
1414
            # delete a version of the function
1415
            version = function.versions.pop(qualifier, None)
1✔
1416
            if version:
1✔
1417
                self.lambda_service.stop_version(version.id.qualified_arn())
1✔
1418
                destroy_code_if_not_used(code=version.config.code, function=function)
1✔
1419
        else:
1420
            # delete the whole function
1421
            # TODO: introduce locking for safe deletion: We could create a new version at the API layer before
1422
            #  the old version gets cleaned up in the internal lambda service.
1423
            function = store.functions.pop(function_name)
1✔
1424
            for version in function.versions.values():
1✔
1425
                self.lambda_service.stop_version(qualified_arn=version.id.qualified_arn())
1✔
1426
                # we can safely destroy the code here
1427
                if version.config.code:
1✔
1428
                    version.config.code.destroy()
1✔
1429

1430
    def list_functions(
1✔
1431
        self,
1432
        context: RequestContext,
1433
        master_region: MasterRegion = None,  # (only relevant for lambda@edge)
1434
        function_version: FunctionVersionApi = None,
1435
        marker: String = None,
1436
        max_items: MaxListItems = None,
1437
        **kwargs,
1438
    ) -> ListFunctionsResponse:
1439
        state = lambda_stores[context.account_id][context.region]
1✔
1440

1441
        if function_version and function_version != FunctionVersionApi.ALL:
1✔
1442
            raise ValidationException(
1✔
1443
                f"1 validation error detected: Value '{function_version}'"
1444
                + " at 'functionVersion' failed to satisfy constraint: Member must satisfy enum value set: [ALL]"
1445
            )
1446

1447
        if function_version == FunctionVersionApi.ALL:
1✔
1448
            # include all versions for all function
1449
            versions = [v for f in state.functions.values() for v in f.versions.values()]
1✔
1450
            return_qualified_arn = True
1✔
1451
        else:
1452
            versions = [f.latest() for f in state.functions.values()]
1✔
1453
            return_qualified_arn = False
1✔
1454

1455
        versions = [
1✔
1456
            api_utils.map_to_list_response(
1457
                api_utils.map_config_out(fc, return_qualified_arn=return_qualified_arn)
1458
            )
1459
            for fc in versions
1460
        ]
1461
        versions = PaginatedList(versions)
1✔
1462
        page, token = versions.get_page(
1✔
1463
            lambda version: version["FunctionArn"],
1464
            marker,
1465
            max_items,
1466
        )
1467
        return ListFunctionsResponse(Functions=page, NextMarker=token)
1✔
1468

1469
    def get_function(
1✔
1470
        self,
1471
        context: RequestContext,
1472
        function_name: NamespacedFunctionName,
1473
        qualifier: Qualifier = None,
1474
        **kwargs,
1475
    ) -> GetFunctionResponse:
1476
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1477
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1478
            function_name, qualifier, context
1479
        )
1480

1481
        fn = lambda_stores[account_id][region].functions.get(function_name)
1✔
1482
        if fn is None:
1✔
1483
            if qualifier is None:
1✔
1484
                raise ResourceNotFoundException(
1✔
1485
                    f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
1486
                    Type="User",
1487
                )
1488
            else:
1489
                raise ResourceNotFoundException(
1✔
1490
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
1491
                    Type="User",
1492
                )
1493
        alias_name = None
1✔
1494
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1495
            if qualifier not in fn.aliases:
1✔
1496
                alias_arn = api_utils.qualified_lambda_arn(
1✔
1497
                    function_name, qualifier, account_id, region
1498
                )
1499
                raise ResourceNotFoundException(f"Function not found: {alias_arn}", Type="User")
1✔
1500
            alias_name = qualifier
1✔
1501
            qualifier = fn.aliases[alias_name].function_version
1✔
1502

1503
        version = get_function_version(
1✔
1504
            function_name=function_name,
1505
            qualifier=qualifier,
1506
            account_id=account_id,
1507
            region=region,
1508
        )
1509
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
1510
        additional_fields = {}
1✔
1511
        if tags:
1✔
1512
            additional_fields["Tags"] = tags
1✔
1513
        code_location = None
1✔
1514
        if code := version.config.code:
1✔
1515
            code_location = FunctionCodeLocation(
1✔
1516
                Location=code.generate_presigned_url(endpoint_url=config.external_service_url()),
1517
                RepositoryType="S3",
1518
            )
1519
        elif image := version.config.image:
1✔
1520
            code_location = FunctionCodeLocation(
1✔
1521
                ImageUri=image.image_uri,
1522
                RepositoryType=image.repository_type,
1523
                ResolvedImageUri=image.resolved_image_uri,
1524
            )
1525
        concurrency = None
1✔
1526
        if fn.reserved_concurrent_executions:
1✔
1527
            concurrency = Concurrency(
1✔
1528
                ReservedConcurrentExecutions=fn.reserved_concurrent_executions
1529
            )
1530

1531
        return GetFunctionResponse(
1✔
1532
            Configuration=api_utils.map_config_out(
1533
                version, return_qualified_arn=bool(qualifier), alias_name=alias_name
1534
            ),
1535
            Code=code_location,  # TODO
1536
            Concurrency=concurrency,
1537
            **additional_fields,
1538
        )
1539

1540
    def get_function_configuration(
1✔
1541
        self,
1542
        context: RequestContext,
1543
        function_name: NamespacedFunctionName,
1544
        qualifier: Qualifier = None,
1545
        **kwargs,
1546
    ) -> FunctionConfiguration:
1547
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1548
        # CAVE: THIS RETURN VALUE IS *NOT* THE SAME AS IN get_function (!) but seems to be only configuration part?
1549
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1550
            function_name, qualifier, context
1551
        )
1552
        version = get_function_version(
1✔
1553
            function_name=function_name,
1554
            qualifier=qualifier,
1555
            account_id=account_id,
1556
            region=region,
1557
        )
1558
        return api_utils.map_config_out(version, return_qualified_arn=bool(qualifier))
1✔
1559

1560
    def invoke(
1✔
1561
        self,
1562
        context: RequestContext,
1563
        function_name: NamespacedFunctionName,
1564
        invocation_type: InvocationType | None = None,
1565
        log_type: LogType | None = None,
1566
        client_context: String | None = None,
1567
        payload: IO[Blob] | None = None,
1568
        qualifier: Qualifier | None = None,
1569
        tenant_id: TenantId | None = None,
1570
        **kwargs,
1571
    ) -> InvocationResponse:
1572
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1573
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1574
            function_name, qualifier, context
1575
        )
1576

1577
        user_agent = context.request.user_agent.string
1✔
1578

1579
        time_before = time.perf_counter()
1✔
1580
        try:
1✔
1581
            invocation_result = self.lambda_service.invoke(
1✔
1582
                function_name=function_name,
1583
                qualifier=qualifier,
1584
                region=region,
1585
                account_id=account_id,
1586
                invocation_type=invocation_type,
1587
                client_context=client_context,
1588
                request_id=context.request_id,
1589
                trace_context=context.trace_context,
1590
                payload=payload.read() if payload else None,
1591
                user_agent=user_agent,
1592
            )
1593
        except ServiceException:
1✔
1594
            raise
1✔
1595
        except EnvironmentStartupTimeoutException as e:
1✔
1596
            raise LambdaServiceException(
1✔
1597
                f"[{context.request_id}] Timeout while starting up lambda environment for function {function_name}:{qualifier}"
1598
            ) from e
1599
        except Exception as e:
1✔
1600
            LOG.error(
1✔
1601
                "[%s] Error while invoking lambda %s",
1602
                context.request_id,
1603
                function_name,
1604
                exc_info=LOG.isEnabledFor(logging.DEBUG),
1605
            )
1606
            raise LambdaServiceException(
1✔
1607
                f"[{context.request_id}] Internal error while executing lambda {function_name}:{qualifier}. Caused by {type(e).__name__}: {e}"
1608
            ) from e
1609

1610
        if invocation_type == InvocationType.Event:
1✔
1611
            # This happens when invocation type is event
1612
            return InvocationResponse(StatusCode=202)
1✔
1613
        if invocation_type == InvocationType.DryRun:
1✔
1614
            # This happens when invocation type is dryrun
1615
            return InvocationResponse(StatusCode=204)
1✔
1616
        LOG.debug("Lambda invocation duration: %0.2fms", (time.perf_counter() - time_before) * 1000)
1✔
1617

1618
        response = InvocationResponse(
1✔
1619
            StatusCode=200,
1620
            Payload=invocation_result.payload,
1621
            ExecutedVersion=invocation_result.executed_version,
1622
        )
1623

1624
        if invocation_result.is_error:
1✔
1625
            response["FunctionError"] = "Unhandled"
1✔
1626

1627
        if log_type == LogType.Tail:
1✔
1628
            response["LogResult"] = to_str(
1✔
1629
                base64.b64encode(to_bytes(invocation_result.logs)[-4096:])
1630
            )
1631

1632
        return response
1✔
1633

1634
    # Version operations
1635
    def publish_version(
1✔
1636
        self,
1637
        context: RequestContext,
1638
        function_name: FunctionName,
1639
        code_sha256: String = None,
1640
        description: Description = None,
1641
        revision_id: String = None,
1642
        **kwargs,
1643
    ) -> FunctionConfiguration:
1644
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1645
        function_name = api_utils.get_function_name(function_name, context)
1✔
1646
        new_version = self._publish_version_from_existing_version(
1✔
1647
            function_name=function_name,
1648
            description=description,
1649
            account_id=account_id,
1650
            region=region,
1651
            revision_id=revision_id,
1652
            code_sha256=code_sha256,
1653
        )
1654
        return api_utils.map_config_out(new_version, return_qualified_arn=True)
1✔
1655

1656
    def list_versions_by_function(
1✔
1657
        self,
1658
        context: RequestContext,
1659
        function_name: NamespacedFunctionName,
1660
        marker: String = None,
1661
        max_items: MaxListItems = None,
1662
        **kwargs,
1663
    ) -> ListVersionsByFunctionResponse:
1664
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1665
        function_name = api_utils.get_function_name(function_name, context)
1✔
1666
        function = self._get_function(
1✔
1667
            function_name=function_name, region=region, account_id=account_id
1668
        )
1669
        versions = [
1✔
1670
            api_utils.map_to_list_response(
1671
                api_utils.map_config_out(version=version, return_qualified_arn=True)
1672
            )
1673
            for version in function.versions.values()
1674
        ]
1675
        items = PaginatedList(versions)
1✔
1676
        page, token = items.get_page(
1✔
1677
            lambda item: item,
1678
            marker,
1679
            max_items,
1680
        )
1681
        return ListVersionsByFunctionResponse(Versions=page, NextMarker=token)
1✔
1682

1683
    # Alias
1684

1685
    def _create_routing_config_model(
1✔
1686
        self, routing_config_dict: dict[str, float], function_version: FunctionVersion
1687
    ):
1688
        if len(routing_config_dict) > 1:
1✔
1689
            raise InvalidParameterValueException(
1✔
1690
                "Number of items in AdditionalVersionWeights cannot be greater than 1",
1691
                Type="User",
1692
            )
1693
        # should be exactly one item here, still iterating, might be supported in the future
1694
        for key, value in routing_config_dict.items():
1✔
1695
            if value < 0.0 or value >= 1.0:
1✔
1696
                raise ValidationException(
1✔
1697
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map value must satisfy constraint: [Member must have value less than or equal to 1.0, Member must have value greater than or equal to 0.0, Member must not be null]"
1698
                )
1699
            if key == function_version.id.qualifier:
1✔
1700
                raise InvalidParameterValueException(
1✔
1701
                    f"Invalid function version {function_version.id.qualifier}. Function version {function_version.id.qualifier} is already included in routing configuration.",
1702
                    Type="User",
1703
                )
1704
            # check if version target is latest, then no routing config is allowed
1705
            if function_version.id.qualifier == "$LATEST":
1✔
1706
                raise InvalidParameterValueException(
1✔
1707
                    "$LATEST is not supported for an alias pointing to more than 1 version"
1708
                )
1709
            if not api_utils.qualifier_is_version(key):
1✔
1710
                raise ValidationException(
1✔
1711
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map keys must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: [0-9]+, Member must not be null]"
1712
                )
1713

1714
            # checking if the version in the config exists
1715
            get_function_version(
1✔
1716
                function_name=function_version.id.function_name,
1717
                qualifier=key,
1718
                region=function_version.id.region,
1719
                account_id=function_version.id.account,
1720
            )
1721
        return AliasRoutingConfig(version_weights=routing_config_dict)
1✔
1722

1723
    def create_alias(
1✔
1724
        self,
1725
        context: RequestContext,
1726
        function_name: FunctionName,
1727
        name: Alias,
1728
        function_version: Version,
1729
        description: Description = None,
1730
        routing_config: AliasRoutingConfiguration = None,
1731
        **kwargs,
1732
    ) -> AliasConfiguration:
1733
        if not api_utils.qualifier_is_alias(name):
1✔
1734
            raise ValidationException(
1✔
1735
                f"1 validation error detected: Value '{name}' at 'name' failed to satisfy constraint: Member must satisfy regular expression pattern: (?!^[0-9]+$)([a-zA-Z0-9-_]+)"
1736
            )
1737

1738
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1739
        function_name = api_utils.get_function_name(function_name, context)
1✔
1740
        target_version = get_function_version(
1✔
1741
            function_name=function_name,
1742
            qualifier=function_version,
1743
            region=region,
1744
            account_id=account_id,
1745
        )
1746
        function = self._get_function(
1✔
1747
            function_name=function_name, region=region, account_id=account_id
1748
        )
1749
        # description is always present, if not specified it's an empty string
1750
        description = description or ""
1✔
1751
        with function.lock:
1✔
1752
            if existing_alias := function.aliases.get(name):
1✔
1753
                raise ResourceConflictException(
1✔
1754
                    f"Alias already exists: {api_utils.map_alias_out(alias=existing_alias, function=function)['AliasArn']}",
1755
                    Type="User",
1756
                )
1757
            # checking if the version exists
1758
            routing_configuration = None
1✔
1759
            if routing_config and (
1✔
1760
                routing_config_dict := routing_config.get("AdditionalVersionWeights")
1761
            ):
1762
                routing_configuration = self._create_routing_config_model(
1✔
1763
                    routing_config_dict, target_version
1764
                )
1765

1766
            alias = VersionAlias(
1✔
1767
                name=name,
1768
                function_version=function_version,
1769
                description=description,
1770
                routing_configuration=routing_configuration,
1771
            )
1772
            function.aliases[name] = alias
1✔
1773
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1774

1775
    def list_aliases(
1✔
1776
        self,
1777
        context: RequestContext,
1778
        function_name: FunctionName,
1779
        function_version: Version = None,
1780
        marker: String = None,
1781
        max_items: MaxListItems = None,
1782
        **kwargs,
1783
    ) -> ListAliasesResponse:
1784
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1785
        function_name = api_utils.get_function_name(function_name, context)
1✔
1786
        function = self._get_function(
1✔
1787
            function_name=function_name, region=region, account_id=account_id
1788
        )
1789
        aliases = [
1✔
1790
            api_utils.map_alias_out(alias, function)
1791
            for alias in function.aliases.values()
1792
            if function_version is None or alias.function_version == function_version
1793
        ]
1794

1795
        aliases = PaginatedList(aliases)
1✔
1796
        page, token = aliases.get_page(
1✔
1797
            lambda alias: alias["AliasArn"],
1798
            marker,
1799
            max_items,
1800
        )
1801

1802
        return ListAliasesResponse(Aliases=page, NextMarker=token)
1✔
1803

1804
    def delete_alias(
1✔
1805
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
1806
    ) -> None:
1807
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1808
        function_name = api_utils.get_function_name(function_name, context)
1✔
1809
        function = self._get_function(
1✔
1810
            function_name=function_name, region=region, account_id=account_id
1811
        )
1812
        version_alias = function.aliases.pop(name, None)
1✔
1813

1814
        # cleanup related resources
1815
        if name in function.provisioned_concurrency_configs:
1✔
1816
            function.provisioned_concurrency_configs.pop(name)
1✔
1817

1818
        # TODO: Allow for deactivating/unregistering specific Lambda URLs
1819
        if version_alias and name in function.function_url_configs:
1✔
1820
            url_config = function.function_url_configs.pop(name)
1✔
1821
            LOG.debug(
1✔
1822
                "Stopping aliased Lambda Function URL %s for %s",
1823
                url_config.url,
1824
                url_config.function_name,
1825
            )
1826

1827
    def get_alias(
1✔
1828
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
1829
    ) -> AliasConfiguration:
1830
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1831
        function_name = api_utils.get_function_name(function_name, context)
1✔
1832
        function = self._get_function(
1✔
1833
            function_name=function_name, region=region, account_id=account_id
1834
        )
1835
        if not (alias := function.aliases.get(name)):
1✔
1836
            raise ResourceNotFoundException(
1✔
1837
                f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name=function_name, qualifier=name, region=region, account=account_id)}",
1838
                Type="User",
1839
            )
1840
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1841

1842
    def update_alias(
1✔
1843
        self,
1844
        context: RequestContext,
1845
        function_name: FunctionName,
1846
        name: Alias,
1847
        function_version: Version = None,
1848
        description: Description = None,
1849
        routing_config: AliasRoutingConfiguration = None,
1850
        revision_id: String = None,
1851
        **kwargs,
1852
    ) -> AliasConfiguration:
1853
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1854
        function_name = api_utils.get_function_name(function_name, context)
1✔
1855
        function = self._get_function(
1✔
1856
            function_name=function_name, region=region, account_id=account_id
1857
        )
1858
        if not (alias := function.aliases.get(name)):
1✔
1859
            fn_arn = api_utils.qualified_lambda_arn(function_name, name, account_id, region)
1✔
1860
            raise ResourceNotFoundException(
1✔
1861
                f"Alias not found: {fn_arn}",
1862
                Type="User",
1863
            )
1864
        if revision_id and alias.revision_id != revision_id:
1✔
1865
            raise PreconditionFailedException(
1✔
1866
                "The Revision Id provided does not match the latest Revision Id. "
1867
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1868
                Type="User",
1869
            )
1870
        changes = {}
1✔
1871
        if function_version is not None:
1✔
1872
            changes |= {"function_version": function_version}
1✔
1873
        if description is not None:
1✔
1874
            changes |= {"description": description}
1✔
1875
        if routing_config is not None:
1✔
1876
            # if it is an empty dict or AdditionalVersionWeights is empty, set routing config to None
1877
            new_routing_config = None
1✔
1878
            if routing_config_dict := routing_config.get("AdditionalVersionWeights"):
1✔
UNCOV
1879
                new_routing_config = self._create_routing_config_model(routing_config_dict)
×
1880
            changes |= {"routing_configuration": new_routing_config}
1✔
1881
        # even if no changes are done, we have to update revision id for some reason
1882
        old_alias = alias
1✔
1883
        alias = dataclasses.replace(alias, **changes)
1✔
1884
        function.aliases[name] = alias
1✔
1885

1886
        # TODO: signal lambda service that pointer potentially changed
1887
        self.lambda_service.update_alias(old_alias=old_alias, new_alias=alias, function=function)
1✔
1888

1889
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1890

1891
    # =======================================
1892
    # ======= EVENT SOURCE MAPPINGS =========
1893
    # =======================================
1894
    def check_service_resource_exists(
1✔
1895
        self, service: str, resource_arn: str, function_arn: str, function_role_arn: str
1896
    ):
1897
        """
1898
        Check if the service resource exists and if the function has access to it.
1899

1900
        Raises:
1901
            InvalidParameterValueException: If the service resource does not exist or the function does not have access to it.
1902
        """
1903
        arn = parse_arn(resource_arn)
1✔
1904
        source_client = get_internal_client(
1✔
1905
            arn=resource_arn,
1906
            role_arn=function_role_arn,
1907
            service_principal=ServicePrincipal.lambda_,
1908
            source_arn=function_arn,
1909
        )
1910
        if service in ["sqs", "sqs-fifo"]:
1✔
1911
            try:
1✔
1912
                # AWS uses `GetQueueAttributes` internally to verify the queue existence, but we need the `QueueUrl`
1913
                # which is not given directly. We build out a dummy `QueueUrl` which can be parsed by SQS to return
1914
                # the right value
1915
                queue_name = arn["resource"].split("/")[-1]
1✔
1916
                queue_url = f"http://sqs.{arn['region']}.domain/{arn['account']}/{queue_name}"
1✔
1917
                source_client.get_queue_attributes(QueueUrl=queue_url)
1✔
1918
            except ClientError as e:
1✔
1919
                error_code = e.response["Error"]["Code"]
1✔
1920
                if error_code == "AWS.SimpleQueueService.NonExistentQueue":
1✔
1921
                    raise InvalidParameterValueException(
1✔
1922
                        f"Error occurred while ReceiveMessage. SQS Error Code: {error_code}. SQS Error Message: {e.response['Error']['Message']}",
1923
                        Type="User",
1924
                    )
UNCOV
1925
                raise e
×
1926
        elif service in ["kinesis"]:
1✔
1927
            try:
1✔
1928
                source_client.describe_stream(StreamARN=resource_arn)
1✔
1929
            except ClientError as e:
1✔
1930
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
1931
                    raise InvalidParameterValueException(
1✔
1932
                        f"Stream not found: {resource_arn}",
1933
                        Type="User",
1934
                    )
UNCOV
1935
                raise e
×
1936
        elif service in ["dynamodb"]:
1✔
1937
            try:
1✔
1938
                source_client.describe_stream(StreamArn=resource_arn)
1✔
1939
            except ClientError as e:
1✔
1940
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
1941
                    raise InvalidParameterValueException(
1✔
1942
                        f"Stream not found: {resource_arn}",
1943
                        Type="User",
1944
                    )
UNCOV
1945
                raise e
×
1946

1947
    @handler("CreateEventSourceMapping", expand=False)
1✔
1948
    def create_event_source_mapping(
1✔
1949
        self,
1950
        context: RequestContext,
1951
        request: CreateEventSourceMappingRequest,
1952
    ) -> EventSourceMappingConfiguration:
1953
        return self.create_event_source_mapping_v2(context, request)
1✔
1954

1955
    def create_event_source_mapping_v2(
1✔
1956
        self,
1957
        context: RequestContext,
1958
        request: CreateEventSourceMappingRequest,
1959
    ) -> EventSourceMappingConfiguration:
1960
        # Validations
1961
        function_arn, function_name, state, function_version, function_role = (
1✔
1962
            self.validate_event_source_mapping(context, request)
1963
        )
1964

1965
        esm_config = EsmConfigFactory(request, context, function_arn).get_esm_config()
1✔
1966

1967
        # Copy esm_config to avoid a race condition with potential async update in the store
1968
        state.event_source_mappings[esm_config["UUID"]] = esm_config.copy()
1✔
1969
        enabled = request.get("Enabled", True)
1✔
1970
        # TODO: check for potential async race condition update -> think about locking
1971
        esm_worker = EsmWorkerFactory(esm_config, function_role, enabled).get_esm_worker()
1✔
1972
        self.esm_workers[esm_worker.uuid] = esm_worker
1✔
1973
        # TODO: check StateTransitionReason, LastModified, LastProcessingResult (concurrent updates requires locking!)
1974
        if tags := request.get("Tags"):
1✔
1975
            self._store_tags(esm_config.get("EventSourceMappingArn"), tags)
1✔
1976
        esm_worker.create()
1✔
1977
        return esm_config
1✔
1978

1979
    def validate_event_source_mapping(self, context, request):
1✔
1980
        # TODO: test whether stream ARNs are valid sources for Pipes or ESM or whether only DynamoDB table ARNs work
1981
        # TODO: Validate MaxRecordAgeInSeconds (i.e cannot subceed 60s but can be -1) and MaxRetryAttempts parameters.
1982
        # See https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumrecordageinseconds
1983
        is_create_esm_request = context.operation.name == self.create_event_source_mapping.operation
1✔
1984

1985
        if destination_config := request.get("DestinationConfig"):
1✔
1986
            if "OnSuccess" in destination_config:
1✔
1987
                raise InvalidParameterValueException(
1✔
1988
                    "Unsupported DestinationConfig parameter for given event source mapping type.",
1989
                    Type="User",
1990
                )
1991

1992
        service = None
1✔
1993
        if "SelfManagedEventSource" in request:
1✔
1994
            service = "kafka"
×
UNCOV
1995
            if "SourceAccessConfigurations" not in request:
×
UNCOV
1996
                raise InvalidParameterValueException(
×
1997
                    "Required 'sourceAccessConfigurations' parameter is missing.", Type="User"
1998
                )
1999
        if service is None and "EventSourceArn" not in request:
1✔
2000
            raise InvalidParameterValueException("Unrecognized event source.", Type="User")
1✔
2001
        if service is None:
1✔
2002
            service = extract_service_from_arn(request["EventSourceArn"])
1✔
2003

2004
        batch_size = api_utils.validate_and_set_batch_size(service, request.get("BatchSize"))
1✔
2005
        if service in ["dynamodb", "kinesis"]:
1✔
2006
            starting_position = request.get("StartingPosition")
1✔
2007
            if not starting_position:
1✔
2008
                raise InvalidParameterValueException(
1✔
2009
                    "1 validation error detected: Value null at 'startingPosition' failed to satisfy constraint: Member must not be null.",
2010
                    Type="User",
2011
                )
2012

2013
            if starting_position not in KinesisStreamStartPosition.__members__:
1✔
2014
                raise ValidationException(
1✔
2015
                    f"1 validation error detected: Value '{starting_position}' at 'startingPosition' failed to satisfy constraint: Member must satisfy enum value set: [LATEST, AT_TIMESTAMP, TRIM_HORIZON]"
2016
                )
2017
            # AT_TIMESTAMP is not allowed for DynamoDB Streams
2018
            elif (
1✔
2019
                service == "dynamodb"
2020
                and starting_position not in DynamoDBStreamStartPosition.__members__
2021
            ):
2022
                raise InvalidParameterValueException(
1✔
2023
                    f"Unsupported starting position for arn type: {request['EventSourceArn']}",
2024
                    Type="User",
2025
                )
2026

2027
        if service in ["sqs", "sqs-fifo"]:
1✔
2028
            if batch_size > 10 and request.get("MaximumBatchingWindowInSeconds", 0) == 0:
1✔
2029
                raise InvalidParameterValueException(
1✔
2030
                    "Maximum batch window in seconds must be greater than 0 if maximum batch size is greater than 10",
2031
                    Type="User",
2032
                )
2033

2034
        if (filter_criteria := request.get("FilterCriteria")) is not None:
1✔
2035
            for filter_ in filter_criteria.get("Filters", []):
1✔
2036
                pattern_str = filter_.get("Pattern")
1✔
2037
                if not pattern_str or not isinstance(pattern_str, str):
1✔
UNCOV
2038
                    raise InvalidParameterValueException(
×
2039
                        "Invalid filter pattern definition.", Type="User"
2040
                    )
2041

2042
                if not validate_event_pattern(pattern_str):
1✔
2043
                    raise InvalidParameterValueException(
1✔
2044
                        "Invalid filter pattern definition.", Type="User"
2045
                    )
2046

2047
        # Can either have a FunctionName (i.e CreateEventSourceMapping request) or
2048
        # an internal EventSourceMappingConfiguration representation
2049
        request_function_name = request.get("FunctionName") or request.get("FunctionArn")
1✔
2050
        # can be either a partial arn or a full arn for the version/alias
2051
        function_name, qualifier, account, region = function_locators_from_arn(
1✔
2052
            request_function_name
2053
        )
2054
        # TODO: validate `context.region` vs. `region(request["FunctionName"])` vs. `region(request["EventSourceArn"])`
2055
        account = account or context.account_id
1✔
2056
        region = region or context.region
1✔
2057
        state = lambda_stores[account][region]
1✔
2058
        fn = state.functions.get(function_name)
1✔
2059
        if not fn:
1✔
2060
            raise InvalidParameterValueException("Function does not exist", Type="User")
1✔
2061

2062
        if qualifier:
1✔
2063
            # make sure the function version/alias exists
2064
            if api_utils.qualifier_is_alias(qualifier):
1✔
2065
                fn_alias = fn.aliases.get(qualifier)
1✔
2066
                if not fn_alias:
1✔
UNCOV
2067
                    raise Exception("unknown alias")  # TODO: cover via test
×
2068
            elif api_utils.qualifier_is_version(qualifier):
1✔
2069
                fn_version = fn.versions.get(qualifier)
1✔
2070
                if not fn_version:
1✔
UNCOV
2071
                    raise Exception("unknown version")  # TODO: cover via test
×
2072
            elif qualifier == "$LATEST":
1✔
2073
                pass
1✔
2074
            else:
UNCOV
2075
                raise Exception("invalid functionname")  # TODO: cover via test
×
2076
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account, region)
1✔
2077

2078
        else:
2079
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account, region)
1✔
2080

2081
        function_version = get_function_version_from_arn(fn_arn)
1✔
2082
        function_role = function_version.config.role
1✔
2083

2084
        if source_arn := request.get("EventSourceArn"):
1✔
2085
            self.check_service_resource_exists(service, source_arn, fn_arn, function_role)
1✔
2086
        # Check we are validating a CreateEventSourceMapping request
2087
        if is_create_esm_request:
1✔
2088

2089
            def _get_mapping_sources(mapping: dict[str, Any]) -> list[str]:
1✔
2090
                if event_source_arn := mapping.get("EventSourceArn"):
1✔
2091
                    return [event_source_arn]
1✔
UNCOV
2092
                return (
×
2093
                    mapping.get("SelfManagedEventSource", {})
2094
                    .get("Endpoints", {})
2095
                    .get("KAFKA_BOOTSTRAP_SERVERS", [])
2096
                )
2097

2098
            # check for event source duplicates
2099
            # TODO: currently validated for sqs, kinesis, and dynamodb
2100
            service_id = load_service(service).service_id
1✔
2101
            for uuid, mapping in state.event_source_mappings.items():
1✔
2102
                mapping_sources = _get_mapping_sources(mapping)
1✔
2103
                request_sources = _get_mapping_sources(request)
1✔
2104
                if mapping["FunctionArn"] == fn_arn and (
1✔
2105
                    set(mapping_sources).intersection(request_sources)
2106
                ):
2107
                    if service == "sqs":
1✔
2108
                        # *shakes fist at SQS*
2109
                        raise ResourceConflictException(
1✔
2110
                            f'An event source mapping with {service_id} arn (" {mapping["EventSourceArn"]} ") '
2111
                            f'and function (" {function_name} ") already exists. Please update or delete the '
2112
                            f"existing mapping with UUID {uuid}",
2113
                            Type="User",
2114
                        )
2115
                    elif service == "kafka":
1✔
UNCOV
2116
                        if set(mapping["Topics"]).intersection(request["Topics"]):
×
UNCOV
2117
                            raise ResourceConflictException(
×
2118
                                f'An event source mapping with event source ("{",".join(request_sources)}"), '
2119
                                f'function ("{fn_arn}"), '
2120
                                f'topics ("{",".join(request["Topics"])}") already exists. Please update or delete the '
2121
                                f"existing mapping with UUID {uuid}",
2122
                                Type="User",
2123
                            )
2124
                    else:
2125
                        raise ResourceConflictException(
1✔
2126
                            f'The event source arn (" {mapping["EventSourceArn"]} ") and function '
2127
                            f'(" {function_name} ") provided mapping already exists. Please update or delete the '
2128
                            f"existing mapping with UUID {uuid}",
2129
                            Type="User",
2130
                        )
2131
        return fn_arn, function_name, state, function_version, function_role
1✔
2132

2133
    @handler("UpdateEventSourceMapping", expand=False)
1✔
2134
    def update_event_source_mapping(
1✔
2135
        self,
2136
        context: RequestContext,
2137
        request: UpdateEventSourceMappingRequest,
2138
    ) -> EventSourceMappingConfiguration:
2139
        return self.update_event_source_mapping_v2(context, request)
1✔
2140

2141
    def update_event_source_mapping_v2(
1✔
2142
        self,
2143
        context: RequestContext,
2144
        request: UpdateEventSourceMappingRequest,
2145
    ) -> EventSourceMappingConfiguration:
2146
        # TODO: test and implement this properly (quite complex with many validations and limitations!)
2147
        LOG.warning(
1✔
2148
            "Updating Lambda Event Source Mapping is in experimental state and not yet fully tested."
2149
        )
2150
        state = lambda_stores[context.account_id][context.region]
1✔
2151
        request_data = {**request}
1✔
2152
        uuid = request_data.pop("UUID", None)
1✔
2153
        if not uuid:
1✔
UNCOV
2154
            raise ResourceNotFoundException(
×
2155
                "The resource you requested does not exist.", Type="User"
2156
            )
2157
        old_event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2158
        esm_worker = self.esm_workers.get(uuid)
1✔
2159
        if old_event_source_mapping is None or esm_worker is None:
1✔
2160
            raise ResourceNotFoundException(
1✔
2161
                "The resource you requested does not exist.", Type="User"
2162
            )  # TODO: test?
2163

2164
        # normalize values to overwrite
2165
        event_source_mapping = old_event_source_mapping | request_data
1✔
2166

2167
        temp_params = {}  # values only set for the returned response, not saved internally (e.g. transient state)
1✔
2168

2169
        # Validate the newly updated ESM object. We ignore the output here since we only care whether an Exception is raised.
2170
        function_arn, _, _, function_version, function_role = self.validate_event_source_mapping(
1✔
2171
            context, event_source_mapping
2172
        )
2173

2174
        # remove the FunctionName field
2175
        event_source_mapping.pop("FunctionName", None)
1✔
2176

2177
        if function_arn:
1✔
2178
            event_source_mapping["FunctionArn"] = function_arn
1✔
2179

2180
        # Only apply update if the desired state differs
2181
        enabled = request.get("Enabled")
1✔
2182
        if enabled is not None:
1✔
2183
            if enabled and old_event_source_mapping["State"] != EsmState.ENABLED:
1✔
2184
                event_source_mapping["State"] = EsmState.ENABLING
1✔
2185
            # TODO: What happens when trying to update during an update or failed state?!
2186
            elif not enabled and old_event_source_mapping["State"] == EsmState.ENABLED:
1✔
2187
                event_source_mapping["State"] = EsmState.DISABLING
1✔
2188
        else:
2189
            event_source_mapping["State"] = EsmState.UPDATING
1✔
2190

2191
        # To ensure parity, certain responses need to be immediately returned
2192
        temp_params["State"] = event_source_mapping["State"]
1✔
2193

2194
        state.event_source_mappings[uuid] = event_source_mapping
1✔
2195

2196
        # TODO: Currently, we re-create the entire ESM worker. Look into approach with better performance.
2197
        worker_factory = EsmWorkerFactory(
1✔
2198
            event_source_mapping, function_role, request.get("Enabled", esm_worker.enabled)
2199
        )
2200

2201
        # Get a new ESM worker object but do not active it, since the factory holds all logic for creating new worker from configuration.
2202
        updated_esm_worker = worker_factory.get_esm_worker()
1✔
2203
        self.esm_workers[uuid] = updated_esm_worker
1✔
2204

2205
        # We should stop() the worker since the delete() will remove the ESM from the state mapping.
2206
        esm_worker.stop()
1✔
2207
        # This will either create an EsmWorker in the CREATING state if enabled. Otherwise, the DISABLING state is set.
2208
        updated_esm_worker.create()
1✔
2209

2210
        return {**event_source_mapping, **temp_params}
1✔
2211

2212
    def delete_event_source_mapping(
1✔
2213
        self, context: RequestContext, uuid: String, **kwargs
2214
    ) -> EventSourceMappingConfiguration:
2215
        state = lambda_stores[context.account_id][context.region]
1✔
2216
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2217
        if not event_source_mapping:
1✔
2218
            raise ResourceNotFoundException(
1✔
2219
                "The resource you requested does not exist.", Type="User"
2220
            )
2221
        esm = state.event_source_mappings[uuid]
1✔
2222
        # TODO: add proper locking
2223
        esm_worker = self.esm_workers.pop(uuid, None)
1✔
2224
        # Asynchronous delete in v2
2225
        if not esm_worker:
1✔
UNCOV
2226
            raise ResourceNotFoundException(
×
2227
                "The resource you requested does not exist.", Type="User"
2228
            )
2229
        esm_worker.delete()
1✔
2230
        return {**esm, "State": EsmState.DELETING}
1✔
2231

2232
    def get_event_source_mapping(
1✔
2233
        self, context: RequestContext, uuid: String, **kwargs
2234
    ) -> EventSourceMappingConfiguration:
2235
        state = lambda_stores[context.account_id][context.region]
1✔
2236
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2237
        if not event_source_mapping:
1✔
2238
            raise ResourceNotFoundException(
1✔
2239
                "The resource you requested does not exist.", Type="User"
2240
            )
2241
        esm_worker = self.esm_workers.get(uuid)
1✔
2242
        if not esm_worker:
1✔
UNCOV
2243
            raise ResourceNotFoundException(
×
2244
                "The resource you requested does not exist.", Type="User"
2245
            )
2246
        event_source_mapping["State"] = esm_worker.current_state
1✔
2247
        event_source_mapping["StateTransitionReason"] = esm_worker.state_transition_reason
1✔
2248
        return event_source_mapping
1✔
2249

2250
    def list_event_source_mappings(
1✔
2251
        self,
2252
        context: RequestContext,
2253
        event_source_arn: Arn = None,
2254
        function_name: FunctionName = None,
2255
        marker: String = None,
2256
        max_items: MaxListItems = None,
2257
        **kwargs,
2258
    ) -> ListEventSourceMappingsResponse:
2259
        state = lambda_stores[context.account_id][context.region]
1✔
2260

2261
        esms = state.event_source_mappings.values()
1✔
2262
        # TODO: update and test State and StateTransitionReason for ESM v2
2263

2264
        if event_source_arn:  # TODO: validate pattern
1✔
2265
            esms = [e for e in esms if e.get("EventSourceArn") == event_source_arn]
1✔
2266

2267
        if function_name:
1✔
2268
            esms = [e for e in esms if function_name in e["FunctionArn"]]
1✔
2269

2270
        esms = PaginatedList(esms)
1✔
2271
        page, token = esms.get_page(
1✔
2272
            lambda x: x["UUID"],
2273
            marker,
2274
            max_items,
2275
        )
2276
        return ListEventSourceMappingsResponse(EventSourceMappings=page, NextMarker=token)
1✔
2277

2278
    def get_source_type_from_request(self, request: dict[str, Any]) -> str:
1✔
2279
        if event_source_arn := request.get("EventSourceArn", ""):
×
2280
            service = extract_service_from_arn(event_source_arn)
×
2281
            if service == "sqs" and "fifo" in event_source_arn:
×
2282
                service = "sqs-fifo"
×
2283
            return service
×
UNCOV
2284
        elif request.get("SelfManagedEventSource"):
×
UNCOV
2285
            return "kafka"
×
2286

2287
    # =======================================
2288
    # ============ FUNCTION URLS ============
2289
    # =======================================
2290

2291
    @staticmethod
1✔
2292
    def _validate_qualifier(qualifier: str) -> None:
1✔
2293
        if qualifier == "$LATEST" or (qualifier and api_utils.qualifier_is_version(qualifier)):
1✔
2294
            raise ValidationException(
1✔
2295
                f"1 validation error detected: Value '{qualifier}' at 'qualifier' failed to satisfy constraint: Member must satisfy regular expression pattern: ((?!^\\d+$)^[0-9a-zA-Z-_]+$)"
2296
            )
2297

2298
    @staticmethod
1✔
2299
    def _validate_invoke_mode(invoke_mode: str) -> None:
1✔
2300
        if invoke_mode and invoke_mode not in [InvokeMode.BUFFERED, InvokeMode.RESPONSE_STREAM]:
1✔
2301
            raise ValidationException(
1✔
2302
                f"1 validation error detected: Value '{invoke_mode}' at 'invokeMode' failed to satisfy constraint: Member must satisfy enum value set: [RESPONSE_STREAM, BUFFERED]"
2303
            )
2304
        if invoke_mode == InvokeMode.RESPONSE_STREAM:
1✔
2305
            # TODO should we actually fail for setting RESPONSE_STREAM?
2306
            #  It should trigger InvokeWithResponseStream which is not implemented
2307
            LOG.warning(
1✔
2308
                "The invokeMode 'RESPONSE_STREAM' is not yet supported on LocalStack. The property is only mocked, the execution will still be 'BUFFERED'"
2309
            )
2310

2311
    # TODO: what happens if function state is not active?
2312
    def create_function_url_config(
1✔
2313
        self,
2314
        context: RequestContext,
2315
        function_name: FunctionName,
2316
        auth_type: FunctionUrlAuthType,
2317
        qualifier: FunctionUrlQualifier = None,
2318
        cors: Cors = None,
2319
        invoke_mode: InvokeMode = None,
2320
        **kwargs,
2321
    ) -> CreateFunctionUrlConfigResponse:
2322
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2323
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2324
            function_name, qualifier, context
2325
        )
2326
        state = lambda_stores[account_id][region]
1✔
2327
        self._validate_qualifier(qualifier)
1✔
2328
        self._validate_invoke_mode(invoke_mode)
1✔
2329

2330
        fn = state.functions.get(function_name)
1✔
2331
        if fn is None:
1✔
2332
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2333

2334
        url_config = fn.function_url_configs.get(qualifier or "$LATEST")
1✔
2335
        if url_config:
1✔
2336
            raise ResourceConflictException(
1✔
2337
                f"Failed to create function url config for [functionArn = {url_config.function_arn}]. Error message:  FunctionUrlConfig exists for this Lambda function",
2338
                Type="User",
2339
            )
2340

2341
        if qualifier and qualifier != "$LATEST" and qualifier not in fn.aliases:
1✔
2342
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2343

2344
        normalized_qualifier = qualifier or "$LATEST"
1✔
2345

2346
        function_arn = (
1✔
2347
            api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
2348
            if qualifier
2349
            else api_utils.unqualified_lambda_arn(function_name, account_id, region)
2350
        )
2351

2352
        custom_id: str | None = None
1✔
2353

2354
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
2355
        if TAG_KEY_CUSTOM_URL in tags:
1✔
2356
            # Note: I really wanted to add verification here that the
2357
            # url_id is unique, so we could surface that to the user ASAP.
2358
            # However, it seems like that information isn't available yet,
2359
            # since (as far as I can tell) we call
2360
            # self.router.register_routes() once, in a single shot, for all
2361
            # of the routes -- and we need to verify that it's unique not
2362
            # just for this particular lambda function, but for the entire
2363
            # lambda provider. Therefore... that idea proved non-trivial!
2364
            custom_id_tag_value = (
1✔
2365
                f"{tags[TAG_KEY_CUSTOM_URL]}-{qualifier}" if qualifier else tags[TAG_KEY_CUSTOM_URL]
2366
            )
2367
            if TAG_KEY_CUSTOM_URL_VALIDATOR.match(custom_id_tag_value):
1✔
2368
                custom_id = custom_id_tag_value
1✔
2369

2370
            else:
2371
                # Note: we're logging here instead of raising to prioritize
2372
                # strict parity with AWS over the localstack-only custom_id
2373
                LOG.warning(
1✔
2374
                    "Invalid custom ID tag value for lambda URL (%s=%s). "
2375
                    "Replaced with default (random id)",
2376
                    TAG_KEY_CUSTOM_URL,
2377
                    custom_id_tag_value,
2378
                )
2379

2380
        # The url_id is the subdomain used for the URL we're creating. This
2381
        # is either created randomly (as in AWS), or can be passed as a tag
2382
        # to the lambda itself (localstack-only).
2383
        url_id: str
2384
        if custom_id is None:
1✔
2385
            url_id = api_utils.generate_random_url_id()
1✔
2386
        else:
2387
            url_id = custom_id
1✔
2388

2389
        host_definition = localstack_host(custom_port=config.GATEWAY_LISTEN[0].port)
1✔
2390
        fn.function_url_configs[normalized_qualifier] = FunctionUrlConfig(
1✔
2391
            function_arn=function_arn,
2392
            function_name=function_name,
2393
            cors=cors,
2394
            url_id=url_id,
2395
            url=f"http://{url_id}.lambda-url.{context.region}.{host_definition.host_and_port()}/",  # TODO: https support
2396
            auth_type=auth_type,
2397
            creation_time=api_utils.generate_lambda_date(),
2398
            last_modified_time=api_utils.generate_lambda_date(),
2399
            invoke_mode=invoke_mode,
2400
        )
2401

2402
        # persist and start URL
2403
        # TODO: implement URL invoke
2404
        api_url_config = api_utils.map_function_url_config(
1✔
2405
            fn.function_url_configs[normalized_qualifier]
2406
        )
2407

2408
        return CreateFunctionUrlConfigResponse(
1✔
2409
            FunctionUrl=api_url_config["FunctionUrl"],
2410
            FunctionArn=api_url_config["FunctionArn"],
2411
            AuthType=api_url_config["AuthType"],
2412
            Cors=api_url_config["Cors"],
2413
            CreationTime=api_url_config["CreationTime"],
2414
            InvokeMode=api_url_config["InvokeMode"],
2415
        )
2416

2417
    def get_function_url_config(
1✔
2418
        self,
2419
        context: RequestContext,
2420
        function_name: FunctionName,
2421
        qualifier: FunctionUrlQualifier = None,
2422
        **kwargs,
2423
    ) -> GetFunctionUrlConfigResponse:
2424
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2425
        state = lambda_stores[account_id][region]
1✔
2426

2427
        fn_name, qualifier = api_utils.get_name_and_qualifier(function_name, qualifier, context)
1✔
2428

2429
        self._validate_qualifier(qualifier)
1✔
2430

2431
        resolved_fn = state.functions.get(fn_name)
1✔
2432
        if not resolved_fn:
1✔
2433
            raise ResourceNotFoundException(
1✔
2434
                "The resource you requested does not exist.", Type="User"
2435
            )
2436

2437
        qualifier = qualifier or "$LATEST"
1✔
2438
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2439
        if not url_config:
1✔
2440
            raise ResourceNotFoundException(
1✔
2441
                "The resource you requested does not exist.", Type="User"
2442
            )
2443

2444
        return api_utils.map_function_url_config(url_config)
1✔
2445

2446
    def update_function_url_config(
1✔
2447
        self,
2448
        context: RequestContext,
2449
        function_name: FunctionName,
2450
        qualifier: FunctionUrlQualifier = None,
2451
        auth_type: FunctionUrlAuthType = None,
2452
        cors: Cors = None,
2453
        invoke_mode: InvokeMode = None,
2454
        **kwargs,
2455
    ) -> UpdateFunctionUrlConfigResponse:
2456
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2457
        state = lambda_stores[account_id][region]
1✔
2458

2459
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2460
            function_name, qualifier, context
2461
        )
2462
        self._validate_qualifier(qualifier)
1✔
2463
        self._validate_invoke_mode(invoke_mode)
1✔
2464

2465
        fn = state.functions.get(function_name)
1✔
2466
        if not fn:
1✔
2467
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2468

2469
        normalized_qualifier = qualifier or "$LATEST"
1✔
2470

2471
        if (
1✔
2472
            api_utils.qualifier_is_alias(normalized_qualifier)
2473
            and normalized_qualifier not in fn.aliases
2474
        ):
2475
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2476

2477
        url_config = fn.function_url_configs.get(normalized_qualifier)
1✔
2478
        if not url_config:
1✔
2479
            raise ResourceNotFoundException(
1✔
2480
                "The resource you requested does not exist.", Type="User"
2481
            )
2482

2483
        changes = {
1✔
2484
            "last_modified_time": api_utils.generate_lambda_date(),
2485
            **({"cors": cors} if cors is not None else {}),
2486
            **({"auth_type": auth_type} if auth_type is not None else {}),
2487
        }
2488

2489
        if invoke_mode:
1✔
2490
            changes["invoke_mode"] = invoke_mode
1✔
2491

2492
        new_url_config = dataclasses.replace(url_config, **changes)
1✔
2493
        fn.function_url_configs[normalized_qualifier] = new_url_config
1✔
2494

2495
        return UpdateFunctionUrlConfigResponse(
1✔
2496
            FunctionUrl=new_url_config.url,
2497
            FunctionArn=new_url_config.function_arn,
2498
            AuthType=new_url_config.auth_type,
2499
            Cors=new_url_config.cors,
2500
            CreationTime=new_url_config.creation_time,
2501
            LastModifiedTime=new_url_config.last_modified_time,
2502
            InvokeMode=new_url_config.invoke_mode,
2503
        )
2504

2505
    def delete_function_url_config(
1✔
2506
        self,
2507
        context: RequestContext,
2508
        function_name: FunctionName,
2509
        qualifier: FunctionUrlQualifier = None,
2510
        **kwargs,
2511
    ) -> None:
2512
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2513
        state = lambda_stores[account_id][region]
1✔
2514

2515
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2516
            function_name, qualifier, context
2517
        )
2518
        self._validate_qualifier(qualifier)
1✔
2519

2520
        resolved_fn = state.functions.get(function_name)
1✔
2521
        if not resolved_fn:
1✔
2522
            raise ResourceNotFoundException(
1✔
2523
                "The resource you requested does not exist.", Type="User"
2524
            )
2525

2526
        qualifier = qualifier or "$LATEST"
1✔
2527
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2528
        if not url_config:
1✔
2529
            raise ResourceNotFoundException(
1✔
2530
                "The resource you requested does not exist.", Type="User"
2531
            )
2532

2533
        del resolved_fn.function_url_configs[qualifier]
1✔
2534

2535
    def list_function_url_configs(
1✔
2536
        self,
2537
        context: RequestContext,
2538
        function_name: FunctionName,
2539
        marker: String = None,
2540
        max_items: MaxItems = None,
2541
        **kwargs,
2542
    ) -> ListFunctionUrlConfigsResponse:
2543
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2544
        state = lambda_stores[account_id][region]
1✔
2545

2546
        fn_name = api_utils.get_function_name(function_name, context)
1✔
2547
        resolved_fn = state.functions.get(fn_name)
1✔
2548
        if not resolved_fn:
1✔
2549
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2550

2551
        url_configs = [
1✔
2552
            api_utils.map_function_url_config(fn_conf)
2553
            for fn_conf in resolved_fn.function_url_configs.values()
2554
        ]
2555
        url_configs = PaginatedList(url_configs)
1✔
2556
        page, token = url_configs.get_page(
1✔
2557
            lambda url_config: url_config["FunctionArn"],
2558
            marker,
2559
            max_items,
2560
        )
2561
        url_configs = page
1✔
2562
        return ListFunctionUrlConfigsResponse(FunctionUrlConfigs=url_configs, NextMarker=token)
1✔
2563

2564
    # =======================================
2565
    # ============  Permissions  ============
2566
    # =======================================
2567

2568
    @handler("AddPermission", expand=False)
1✔
2569
    def add_permission(
1✔
2570
        self,
2571
        context: RequestContext,
2572
        request: AddPermissionRequest,
2573
    ) -> AddPermissionResponse:
2574
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2575
            request.get("FunctionName"), request.get("Qualifier"), context
2576
        )
2577

2578
        # validate qualifier
2579
        if qualifier is not None:
1✔
2580
            self._validate_qualifier_expression(qualifier)
1✔
2581
            if qualifier == "$LATEST":
1✔
2582
                raise InvalidParameterValueException(
1✔
2583
                    "We currently do not support adding policies for $LATEST.", Type="User"
2584
                )
2585
        account_id, region = api_utils.get_account_and_region(request.get("FunctionName"), context)
1✔
2586

2587
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2588
        resolved_qualifier, fn_arn = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2589

2590
        revision_id = request.get("RevisionId")
1✔
2591
        if revision_id:
1✔
2592
            fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2593
            if revision_id != fn_revision_id:
1✔
2594
                raise PreconditionFailedException(
1✔
2595
                    "The Revision Id provided does not match the latest Revision Id. "
2596
                    "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2597
                    Type="User",
2598
                )
2599

2600
        request_sid = request["StatementId"]
1✔
2601
        if not bool(STATEMENT_ID_REGEX.match(request_sid)):
1✔
2602
            raise ValidationException(
1✔
2603
                f"1 validation error detected: Value '{request_sid}' at 'statementId' failed to satisfy constraint: Member must satisfy regular expression pattern: ([a-zA-Z0-9-_]+)"
2604
            )
2605
        # check for an already existing policy and any conflicts in existing statements
2606
        existing_policy = resolved_fn.permissions.get(resolved_qualifier)
1✔
2607
        if existing_policy:
1✔
2608
            if request_sid in [s["Sid"] for s in existing_policy.policy.Statement]:
1✔
2609
                # uniqueness scope: statement id needs to be unique per qualified function ($LATEST, version, or alias)
2610
                # Counterexample: the same sid can exist within $LATEST, version, and alias
2611
                raise ResourceConflictException(
1✔
2612
                    f"The statement id ({request_sid}) provided already exists. Please provide a new statement id, or remove the existing statement.",
2613
                    Type="User",
2614
                )
2615

2616
        permission_statement = api_utils.build_statement(
1✔
2617
            partition=context.partition,
2618
            resource_arn=fn_arn,
2619
            statement_id=request["StatementId"],
2620
            action=request["Action"],
2621
            principal=request["Principal"],
2622
            source_arn=request.get("SourceArn"),
2623
            source_account=request.get("SourceAccount"),
2624
            principal_org_id=request.get("PrincipalOrgID"),
2625
            event_source_token=request.get("EventSourceToken"),
2626
            auth_type=request.get("FunctionUrlAuthType"),
2627
        )
2628
        new_policy = existing_policy
1✔
2629
        if not existing_policy:
1✔
2630
            new_policy = FunctionResourcePolicy(
1✔
2631
                policy=ResourcePolicy(Version="2012-10-17", Id="default", Statement=[])
2632
            )
2633
        new_policy.policy.Statement.append(permission_statement)
1✔
2634
        if not existing_policy:
1✔
2635
            resolved_fn.permissions[resolved_qualifier] = new_policy
1✔
2636

2637
        # Update revision id of alias or version
2638
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2639
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2640
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2641
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2642
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
1✔
2643
        # Assumes that a non-alias is a version
2644
        else:
2645
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2646
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2647
                resolved_version, config=dataclasses.replace(resolved_version.config)
2648
            )
2649
        return AddPermissionResponse(Statement=json.dumps(permission_statement))
1✔
2650

2651
    def remove_permission(
1✔
2652
        self,
2653
        context: RequestContext,
2654
        function_name: FunctionName,
2655
        statement_id: NamespacedStatementId,
2656
        qualifier: Qualifier = None,
2657
        revision_id: String = None,
2658
        **kwargs,
2659
    ) -> None:
2660
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2661
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2662
            function_name, qualifier, context
2663
        )
2664
        if qualifier is not None:
1✔
2665
            self._validate_qualifier_expression(qualifier)
1✔
2666

2667
        state = lambda_stores[account_id][region]
1✔
2668
        resolved_fn = state.functions.get(function_name)
1✔
2669
        if resolved_fn is None:
1✔
2670
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2671
            raise ResourceNotFoundException(f"No policy found for: {fn_arn}", Type="User")
1✔
2672

2673
        resolved_qualifier, _ = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2674
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2675
        if not function_permission:
1✔
2676
            raise ResourceNotFoundException(
1✔
2677
                "No policy is associated with the given resource.", Type="User"
2678
            )
2679

2680
        # try to find statement in policy and delete it
2681
        statement = None
1✔
2682
        for s in function_permission.policy.Statement:
1✔
2683
            if s["Sid"] == statement_id:
1✔
2684
                statement = s
1✔
2685
                break
1✔
2686

2687
        if not statement:
1✔
2688
            raise ResourceNotFoundException(
1✔
2689
                f"Statement {statement_id} is not found in resource policy.", Type="User"
2690
            )
2691
        fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2692
        if revision_id and revision_id != fn_revision_id:
1✔
2693
            raise PreconditionFailedException(
1✔
2694
                "The Revision Id provided does not match the latest Revision Id. "
2695
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2696
                Type="User",
2697
            )
2698
        function_permission.policy.Statement.remove(statement)
1✔
2699

2700
        # Update revision id for alias or version
2701
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2702
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2703
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
UNCOV
2704
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
×
UNCOV
2705
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
×
2706
        # Assumes that a non-alias is a version
2707
        else:
2708
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2709
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2710
                resolved_version, config=dataclasses.replace(resolved_version.config)
2711
            )
2712

2713
        # remove the policy as a whole when there's no statement left in it
2714
        if len(function_permission.policy.Statement) == 0:
1✔
2715
            del resolved_fn.permissions[resolved_qualifier]
1✔
2716

2717
    def get_policy(
1✔
2718
        self,
2719
        context: RequestContext,
2720
        function_name: NamespacedFunctionName,
2721
        qualifier: Qualifier = None,
2722
        **kwargs,
2723
    ) -> GetPolicyResponse:
2724
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2725
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2726
            function_name, qualifier, context
2727
        )
2728

2729
        if qualifier is not None:
1✔
2730
            self._validate_qualifier_expression(qualifier)
1✔
2731

2732
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2733

2734
        resolved_qualifier = qualifier or "$LATEST"
1✔
2735
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2736
        if not function_permission:
1✔
2737
            raise ResourceNotFoundException(
1✔
2738
                "The resource you requested does not exist.", Type="User"
2739
            )
2740

2741
        fn_revision_id = None
1✔
2742
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2743
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2744
            fn_revision_id = resolved_alias.revision_id
1✔
2745
        # Assumes that a non-alias is a version
2746
        else:
2747
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2748
            fn_revision_id = resolved_version.config.revision_id
1✔
2749

2750
        return GetPolicyResponse(
1✔
2751
            Policy=json.dumps(dataclasses.asdict(function_permission.policy)),
2752
            RevisionId=fn_revision_id,
2753
        )
2754

2755
    # =======================================
2756
    # ========  Code signing config  ========
2757
    # =======================================
2758

2759
    def create_code_signing_config(
1✔
2760
        self,
2761
        context: RequestContext,
2762
        allowed_publishers: AllowedPublishers,
2763
        description: Description = None,
2764
        code_signing_policies: CodeSigningPolicies = None,
2765
        tags: Tags = None,
2766
        **kwargs,
2767
    ) -> CreateCodeSigningConfigResponse:
2768
        account = context.account_id
1✔
2769
        region = context.region
1✔
2770

2771
        state = lambda_stores[account][region]
1✔
2772
        # TODO: can there be duplicates?
2773
        csc_id = f"csc-{get_random_hex(17)}"  # e.g. 'csc-077c33b4c19e26036'
1✔
2774
        csc_arn = f"arn:{context.partition}:lambda:{region}:{account}:code-signing-config:{csc_id}"
1✔
2775
        csc = CodeSigningConfig(
1✔
2776
            csc_id=csc_id,
2777
            arn=csc_arn,
2778
            allowed_publishers=allowed_publishers,
2779
            policies=code_signing_policies,
2780
            last_modified=api_utils.generate_lambda_date(),
2781
            description=description,
2782
        )
2783
        state.code_signing_configs[csc_arn] = csc
1✔
2784
        return CreateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
2785

2786
    def put_function_code_signing_config(
1✔
2787
        self,
2788
        context: RequestContext,
2789
        code_signing_config_arn: CodeSigningConfigArn,
2790
        function_name: FunctionName,
2791
        **kwargs,
2792
    ) -> PutFunctionCodeSigningConfigResponse:
2793
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2794
        state = lambda_stores[account_id][region]
1✔
2795
        function_name = api_utils.get_function_name(function_name, context)
1✔
2796

2797
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2798
        if not csc:
1✔
2799
            raise CodeSigningConfigNotFoundException(
1✔
2800
                f"The code signing configuration cannot be found. Check that the provided configuration is not deleted: {code_signing_config_arn}.",
2801
                Type="User",
2802
            )
2803

2804
        fn = state.functions.get(function_name)
1✔
2805
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2806
        if not fn:
1✔
2807
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
2808

2809
        fn.code_signing_config_arn = code_signing_config_arn
1✔
2810
        return PutFunctionCodeSigningConfigResponse(
1✔
2811
            CodeSigningConfigArn=code_signing_config_arn, FunctionName=function_name
2812
        )
2813

2814
    def update_code_signing_config(
1✔
2815
        self,
2816
        context: RequestContext,
2817
        code_signing_config_arn: CodeSigningConfigArn,
2818
        description: Description = None,
2819
        allowed_publishers: AllowedPublishers = None,
2820
        code_signing_policies: CodeSigningPolicies = None,
2821
        **kwargs,
2822
    ) -> UpdateCodeSigningConfigResponse:
2823
        state = lambda_stores[context.account_id][context.region]
1✔
2824
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2825
        if not csc:
1✔
2826
            raise ResourceNotFoundException(
1✔
2827
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2828
            )
2829

2830
        changes = {
1✔
2831
            **(
2832
                {"allowed_publishers": allowed_publishers} if allowed_publishers is not None else {}
2833
            ),
2834
            **({"policies": code_signing_policies} if code_signing_policies is not None else {}),
2835
            **({"description": description} if description is not None else {}),
2836
        }
2837
        new_csc = dataclasses.replace(
1✔
2838
            csc, last_modified=api_utils.generate_lambda_date(), **changes
2839
        )
2840
        state.code_signing_configs[code_signing_config_arn] = new_csc
1✔
2841

2842
        return UpdateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(new_csc))
1✔
2843

2844
    def get_code_signing_config(
1✔
2845
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
2846
    ) -> GetCodeSigningConfigResponse:
2847
        state = lambda_stores[context.account_id][context.region]
1✔
2848
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2849
        if not csc:
1✔
2850
            raise ResourceNotFoundException(
1✔
2851
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2852
            )
2853

2854
        return GetCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
2855

2856
    def get_function_code_signing_config(
1✔
2857
        self, context: RequestContext, function_name: FunctionName, **kwargs
2858
    ) -> GetFunctionCodeSigningConfigResponse:
2859
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2860
        state = lambda_stores[account_id][region]
1✔
2861
        function_name = api_utils.get_function_name(function_name, context)
1✔
2862
        fn = state.functions.get(function_name)
1✔
2863
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2864
        if not fn:
1✔
2865
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
2866

2867
        if fn.code_signing_config_arn:
1✔
2868
            return GetFunctionCodeSigningConfigResponse(
1✔
2869
                CodeSigningConfigArn=fn.code_signing_config_arn, FunctionName=function_name
2870
            )
2871

2872
        return GetFunctionCodeSigningConfigResponse()
1✔
2873

2874
    def delete_function_code_signing_config(
1✔
2875
        self, context: RequestContext, function_name: FunctionName, **kwargs
2876
    ) -> None:
2877
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2878
        state = lambda_stores[account_id][region]
1✔
2879
        function_name = api_utils.get_function_name(function_name, context)
1✔
2880
        fn = state.functions.get(function_name)
1✔
2881
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2882
        if not fn:
1✔
2883
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
2884

2885
        fn.code_signing_config_arn = None
1✔
2886

2887
    def delete_code_signing_config(
1✔
2888
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
2889
    ) -> DeleteCodeSigningConfigResponse:
2890
        state = lambda_stores[context.account_id][context.region]
1✔
2891

2892
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2893
        if not csc:
1✔
2894
            raise ResourceNotFoundException(
1✔
2895
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2896
            )
2897

2898
        del state.code_signing_configs[code_signing_config_arn]
1✔
2899

2900
        return DeleteCodeSigningConfigResponse()
1✔
2901

2902
    def list_code_signing_configs(
1✔
2903
        self,
2904
        context: RequestContext,
2905
        marker: String = None,
2906
        max_items: MaxListItems = None,
2907
        **kwargs,
2908
    ) -> ListCodeSigningConfigsResponse:
2909
        state = lambda_stores[context.account_id][context.region]
1✔
2910

2911
        cscs = [api_utils.map_csc(csc) for csc in state.code_signing_configs.values()]
1✔
2912
        cscs = PaginatedList(cscs)
1✔
2913
        page, token = cscs.get_page(
1✔
2914
            lambda csc: csc["CodeSigningConfigId"],
2915
            marker,
2916
            max_items,
2917
        )
2918
        return ListCodeSigningConfigsResponse(CodeSigningConfigs=page, NextMarker=token)
1✔
2919

2920
    def list_functions_by_code_signing_config(
1✔
2921
        self,
2922
        context: RequestContext,
2923
        code_signing_config_arn: CodeSigningConfigArn,
2924
        marker: String = None,
2925
        max_items: MaxListItems = None,
2926
        **kwargs,
2927
    ) -> ListFunctionsByCodeSigningConfigResponse:
2928
        account = context.account_id
1✔
2929
        region = context.region
1✔
2930

2931
        state = lambda_stores[account][region]
1✔
2932

2933
        if code_signing_config_arn not in state.code_signing_configs:
1✔
2934
            raise ResourceNotFoundException(
1✔
2935
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2936
            )
2937

2938
        fn_arns = [
1✔
2939
            api_utils.unqualified_lambda_arn(fn.function_name, account, region)
2940
            for fn in state.functions.values()
2941
            if fn.code_signing_config_arn == code_signing_config_arn
2942
        ]
2943

2944
        cscs = PaginatedList(fn_arns)
1✔
2945
        page, token = cscs.get_page(
1✔
2946
            lambda x: x,
2947
            marker,
2948
            max_items,
2949
        )
2950
        return ListFunctionsByCodeSigningConfigResponse(FunctionArns=page, NextMarker=token)
1✔
2951

2952
    # =======================================
2953
    # =========  Account Settings   =========
2954
    # =======================================
2955

2956
    # CAVE: these settings & usages are *per* region!
2957
    # Lambda quotas: https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
2958
    def get_account_settings(self, context: RequestContext, **kwargs) -> GetAccountSettingsResponse:
1✔
2959
        state = lambda_stores[context.account_id][context.region]
1✔
2960

2961
        fn_count = 0
1✔
2962
        code_size_sum = 0
1✔
2963
        reserved_concurrency_sum = 0
1✔
2964
        for fn in state.functions.values():
1✔
2965
            fn_count += 1
1✔
2966
            for fn_version in fn.versions.values():
1✔
2967
                # Image-based Lambdas do not have a code attribute and count against the ECR quotas instead
2968
                if fn_version.config.package_type == PackageType.Zip:
1✔
2969
                    code_size_sum += fn_version.config.code.code_size
1✔
2970
            if fn.reserved_concurrent_executions is not None:
1✔
2971
                reserved_concurrency_sum += fn.reserved_concurrent_executions
1✔
2972
            for c in fn.provisioned_concurrency_configs.values():
1✔
2973
                reserved_concurrency_sum += c.provisioned_concurrent_executions
1✔
2974
        for layer in state.layers.values():
1✔
2975
            for layer_version in layer.layer_versions.values():
1✔
2976
                code_size_sum += layer_version.code.code_size
1✔
2977
        return GetAccountSettingsResponse(
1✔
2978
            AccountLimit=AccountLimit(
2979
                TotalCodeSize=config.LAMBDA_LIMITS_TOTAL_CODE_SIZE,
2980
                CodeSizeZipped=config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED,
2981
                CodeSizeUnzipped=config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED,
2982
                ConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS,
2983
                UnreservedConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS
2984
                - reserved_concurrency_sum,
2985
            ),
2986
            AccountUsage=AccountUsage(
2987
                TotalCodeSize=code_size_sum,
2988
                FunctionCount=fn_count,
2989
            ),
2990
        )
2991

2992
    # =======================================
2993
    # ==  Provisioned Concurrency Config   ==
2994
    # =======================================
2995

2996
    def _get_provisioned_config(
1✔
2997
        self, context: RequestContext, function_name: str, qualifier: str
2998
    ) -> ProvisionedConcurrencyConfiguration | None:
2999
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3000
        state = lambda_stores[account_id][region]
1✔
3001
        function_name = api_utils.get_function_name(function_name, context)
1✔
3002
        fn = state.functions.get(function_name)
1✔
3003
        if api_utils.qualifier_is_alias(qualifier):
1✔
3004
            fn_alias = None
1✔
3005
            if fn:
1✔
3006
                fn_alias = fn.aliases.get(qualifier)
1✔
3007
            if fn_alias is None:
1✔
3008
                raise ResourceNotFoundException(
1✔
3009
                    f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3010
                    Type="User",
3011
                )
3012
        elif api_utils.qualifier_is_version(qualifier):
1✔
3013
            fn_version = None
1✔
3014
            if fn:
1✔
3015
                fn_version = fn.versions.get(qualifier)
1✔
3016
            if fn_version is None:
1✔
3017
                raise ResourceNotFoundException(
1✔
3018
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3019
                    Type="User",
3020
                )
3021

3022
        return fn.provisioned_concurrency_configs.get(qualifier)
1✔
3023

3024
    def put_provisioned_concurrency_config(
1✔
3025
        self,
3026
        context: RequestContext,
3027
        function_name: FunctionName,
3028
        qualifier: Qualifier,
3029
        provisioned_concurrent_executions: PositiveInteger,
3030
        **kwargs,
3031
    ) -> PutProvisionedConcurrencyConfigResponse:
3032
        if provisioned_concurrent_executions <= 0:
1✔
3033
            raise ValidationException(
1✔
3034
                f"1 validation error detected: Value '{provisioned_concurrent_executions}' at 'provisionedConcurrentExecutions' failed to satisfy constraint: Member must have value greater than or equal to 1"
3035
            )
3036

3037
        if qualifier == "$LATEST":
1✔
3038
            raise InvalidParameterValueException(
1✔
3039
                "Provisioned Concurrency Configs cannot be applied to unpublished function versions.",
3040
                Type="User",
3041
            )
3042
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3043
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3044
            function_name, qualifier, context
3045
        )
3046
        state = lambda_stores[account_id][region]
1✔
3047
        fn = state.functions.get(function_name)
1✔
3048

3049
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3050

3051
        if provisioned_config:  # TODO: merge?
1✔
3052
            # TODO: add a test for partial updates (if possible)
3053
            LOG.warning(
1✔
3054
                "Partial update of provisioned concurrency config is currently not supported."
3055
            )
3056

3057
        other_provisioned_sum = sum(
1✔
3058
            [
3059
                provisioned_configs.provisioned_concurrent_executions
3060
                for provisioned_qualifier, provisioned_configs in fn.provisioned_concurrency_configs.items()
3061
                if provisioned_qualifier != qualifier
3062
            ]
3063
        )
3064

3065
        if (
1✔
3066
            fn.reserved_concurrent_executions is not None
3067
            and fn.reserved_concurrent_executions
3068
            < other_provisioned_sum + provisioned_concurrent_executions
3069
        ):
3070
            raise InvalidParameterValueException(
1✔
3071
                "Requested Provisioned Concurrency should not be greater than the reservedConcurrentExecution for function",
3072
                Type="User",
3073
            )
3074

3075
        if provisioned_concurrent_executions > config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS:
1✔
3076
            raise InvalidParameterValueException(
1✔
3077
                f"Specified ConcurrentExecutions for function is greater than account's unreserved concurrency"
3078
                f" [{config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS}]."
3079
            )
3080

3081
        settings = self.get_account_settings(context)
1✔
3082
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
3083
            "UnreservedConcurrentExecutions"
3084
        ]
3085
        if (
1✔
3086
            unreserved_concurrent_executions - provisioned_concurrent_executions
3087
            < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY
3088
        ):
3089
            raise InvalidParameterValueException(
1✔
3090
                f"Specified ConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below"
3091
                f" its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
3092
            )
3093

3094
        provisioned_config = ProvisionedConcurrencyConfiguration(
1✔
3095
            provisioned_concurrent_executions, api_utils.generate_lambda_date()
3096
        )
3097
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3098

3099
        if api_utils.qualifier_is_alias(qualifier):
1✔
3100
            alias = fn.aliases.get(qualifier)
1✔
3101
            resolved_version = fn.versions.get(alias.function_version)
1✔
3102

3103
            if (
1✔
3104
                resolved_version
3105
                and fn.provisioned_concurrency_configs.get(alias.function_version) is not None
3106
            ):
3107
                raise ResourceConflictException(
1✔
3108
                    "Alias can't be used for Provisioned Concurrency configuration on an already Provisioned version",
3109
                    Type="User",
3110
                )
3111
            fn_arn = resolved_version.id.qualified_arn()
1✔
3112
        elif api_utils.qualifier_is_version(qualifier):
1✔
3113
            fn_version = fn.versions.get(qualifier)
1✔
3114

3115
            # TODO: might be useful other places, utilize
3116
            pointing_aliases = []
1✔
3117
            for alias in fn.aliases.values():
1✔
3118
                if (
1✔
3119
                    alias.function_version == qualifier
3120
                    and fn.provisioned_concurrency_configs.get(alias.name) is not None
3121
                ):
3122
                    pointing_aliases.append(alias.name)
1✔
3123
            if pointing_aliases:
1✔
3124
                raise ResourceConflictException(
1✔
3125
                    "Version is pointed by a Provisioned Concurrency alias", Type="User"
3126
                )
3127

3128
            fn_arn = fn_version.id.qualified_arn()
1✔
3129

3130
        manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3131

3132
        fn.provisioned_concurrency_configs[qualifier] = provisioned_config
1✔
3133

3134
        manager.update_provisioned_concurrency_config(
1✔
3135
            provisioned_config.provisioned_concurrent_executions
3136
        )
3137

3138
        return PutProvisionedConcurrencyConfigResponse(
1✔
3139
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3140
            AvailableProvisionedConcurrentExecutions=0,
3141
            AllocatedProvisionedConcurrentExecutions=0,
3142
            Status=ProvisionedConcurrencyStatusEnum.IN_PROGRESS,
3143
            # StatusReason=manager.provisioned_state.status_reason,
3144
            LastModified=provisioned_config.last_modified,  # TODO: does change with configuration or also with state changes?
3145
        )
3146

3147
    def get_provisioned_concurrency_config(
1✔
3148
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3149
    ) -> GetProvisionedConcurrencyConfigResponse:
3150
        if qualifier == "$LATEST":
1✔
3151
            raise InvalidParameterValueException(
1✔
3152
                "The function resource provided must be an alias or a published version.",
3153
                Type="User",
3154
            )
3155
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3156
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3157
            function_name, qualifier, context
3158
        )
3159

3160
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3161
        if not provisioned_config:
1✔
3162
            raise ProvisionedConcurrencyConfigNotFoundException(
1✔
3163
                "No Provisioned Concurrency Config found for this function", Type="User"
3164
            )
3165

3166
        # TODO: make this compatible with alias pointer migration on update
3167
        if api_utils.qualifier_is_alias(qualifier):
1✔
3168
            state = lambda_stores[account_id][region]
1✔
3169
            fn = state.functions.get(function_name)
1✔
3170
            alias = fn.aliases.get(qualifier)
1✔
3171
            fn_arn = api_utils.qualified_lambda_arn(
1✔
3172
                function_name, alias.function_version, account_id, region
3173
            )
3174
        else:
3175
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3176

3177
        ver_manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3178

3179
        return GetProvisionedConcurrencyConfigResponse(
1✔
3180
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3181
            LastModified=provisioned_config.last_modified,
3182
            AvailableProvisionedConcurrentExecutions=ver_manager.provisioned_state.available,
3183
            AllocatedProvisionedConcurrentExecutions=ver_manager.provisioned_state.allocated,
3184
            Status=ver_manager.provisioned_state.status,
3185
            StatusReason=ver_manager.provisioned_state.status_reason,
3186
        )
3187

3188
    def list_provisioned_concurrency_configs(
1✔
3189
        self,
3190
        context: RequestContext,
3191
        function_name: FunctionName,
3192
        marker: String = None,
3193
        max_items: MaxProvisionedConcurrencyConfigListItems = None,
3194
        **kwargs,
3195
    ) -> ListProvisionedConcurrencyConfigsResponse:
3196
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3197
        state = lambda_stores[account_id][region]
1✔
3198

3199
        function_name = api_utils.get_function_name(function_name, context)
1✔
3200
        fn = state.functions.get(function_name)
1✔
3201
        if fn is None:
1✔
3202
            raise ResourceNotFoundException(
1✔
3203
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
3204
                Type="User",
3205
            )
3206

3207
        configs = []
1✔
3208
        for qualifier, pc_config in fn.provisioned_concurrency_configs.items():
1✔
3209
            if api_utils.qualifier_is_alias(qualifier):
×
UNCOV
3210
                alias = fn.aliases.get(qualifier)
×
UNCOV
3211
                fn_arn = api_utils.qualified_lambda_arn(
×
3212
                    function_name, alias.function_version, account_id, region
3213
                )
3214
            else:
UNCOV
3215
                fn_arn = api_utils.qualified_lambda_arn(
×
3216
                    function_name, qualifier, account_id, region
3217
                )
3218

3219
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
3220

UNCOV
3221
            configs.append(
×
3222
                ProvisionedConcurrencyConfigListItem(
3223
                    FunctionArn=api_utils.qualified_lambda_arn(
3224
                        function_name, qualifier, account_id, region
3225
                    ),
3226
                    RequestedProvisionedConcurrentExecutions=pc_config.provisioned_concurrent_executions,
3227
                    AvailableProvisionedConcurrentExecutions=manager.provisioned_state.available,
3228
                    AllocatedProvisionedConcurrentExecutions=manager.provisioned_state.allocated,
3229
                    Status=manager.provisioned_state.status,
3230
                    StatusReason=manager.provisioned_state.status_reason,
3231
                    LastModified=pc_config.last_modified,
3232
                )
3233
            )
3234

3235
        provisioned_concurrency_configs = configs
1✔
3236
        provisioned_concurrency_configs = PaginatedList(provisioned_concurrency_configs)
1✔
3237
        page, token = provisioned_concurrency_configs.get_page(
1✔
3238
            lambda x: x,
3239
            marker,
3240
            max_items,
3241
        )
3242
        return ListProvisionedConcurrencyConfigsResponse(
1✔
3243
            ProvisionedConcurrencyConfigs=page, NextMarker=token
3244
        )
3245

3246
    def delete_provisioned_concurrency_config(
1✔
3247
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3248
    ) -> None:
3249
        if qualifier == "$LATEST":
1✔
3250
            raise InvalidParameterValueException(
1✔
3251
                "The function resource provided must be an alias or a published version.",
3252
                Type="User",
3253
            )
3254
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3255
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3256
            function_name, qualifier, context
3257
        )
3258
        state = lambda_stores[account_id][region]
1✔
3259
        fn = state.functions.get(function_name)
1✔
3260

3261
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3262
        # delete is idempotent and doesn't actually care about the provisioned concurrency config not existing
3263
        if provisioned_config:
1✔
3264
            fn.provisioned_concurrency_configs.pop(qualifier)
1✔
3265
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3266
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3267
            manager.update_provisioned_concurrency_config(0)
1✔
3268

3269
    # =======================================
3270
    # =======  Event Invoke Config   ========
3271
    # =======================================
3272

3273
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)"
3274
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]2((-gov)|(-iso(b?)))?-[a-z]+-\\d1)?:(\\d12)?:(.*)" ... (expected → actual)
3275

3276
    def _validate_destination_config(
1✔
3277
        self, store: LambdaStore, function_name: str, destination_config: DestinationConfig
3278
    ):
3279
        def _validate_destination_arn(destination_arn) -> bool:
1✔
3280
            if not api_utils.DESTINATION_ARN_PATTERN.match(destination_arn):
1✔
3281
                # technically we shouldn't handle this in the provider
3282
                raise ValidationException(
1✔
3283
                    "1 validation error detected: Value '"
3284
                    + destination_arn
3285
                    + r"' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\-])+:([a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\d{1})?:(\d{12})?:(.*)"
3286
                )
3287

3288
            match destination_arn.split(":")[2]:
1✔
3289
                case "lambda":
1✔
3290
                    fn_parts = api_utils.FULL_FN_ARN_PATTERN.search(destination_arn).groupdict()
1✔
3291
                    if fn_parts:
1✔
3292
                        # check if it exists
3293
                        fn = store.functions.get(fn_parts["function_name"])
1✔
3294
                        if not fn:
1✔
3295
                            raise InvalidParameterValueException(
1✔
3296
                                f"The destination ARN {destination_arn} is invalid.", Type="User"
3297
                            )
3298
                        if fn_parts["function_name"] == function_name:
1✔
3299
                            raise InvalidParameterValueException(
1✔
3300
                                "You can't specify the function as a destination for itself.",
3301
                                Type="User",
3302
                            )
3303
                case "sns" | "sqs" | "events":
1✔
3304
                    pass
1✔
3305
                case _:
1✔
3306
                    return False
1✔
3307
            return True
1✔
3308

3309
        validation_err = False
1✔
3310

3311
        failure_destination = destination_config.get("OnFailure", {}).get("Destination")
1✔
3312
        if failure_destination:
1✔
3313
            validation_err = validation_err or not _validate_destination_arn(failure_destination)
1✔
3314

3315
        success_destination = destination_config.get("OnSuccess", {}).get("Destination")
1✔
3316
        if success_destination:
1✔
3317
            validation_err = validation_err or not _validate_destination_arn(success_destination)
1✔
3318

3319
        if validation_err:
1✔
3320
            on_success_part = (
1✔
3321
                f"OnSuccess(destination={success_destination})" if success_destination else "null"
3322
            )
3323
            on_failure_part = (
1✔
3324
                f"OnFailure(destination={failure_destination})" if failure_destination else "null"
3325
            )
3326
            raise InvalidParameterValueException(
1✔
3327
                f"The provided destination config DestinationConfig(onSuccess={on_success_part}, onFailure={on_failure_part}) is invalid.",
3328
                Type="User",
3329
            )
3330

3331
    def put_function_event_invoke_config(
1✔
3332
        self,
3333
        context: RequestContext,
3334
        function_name: FunctionName,
3335
        qualifier: Qualifier = None,
3336
        maximum_retry_attempts: MaximumRetryAttempts = None,
3337
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3338
        destination_config: DestinationConfig = None,
3339
        **kwargs,
3340
    ) -> FunctionEventInvokeConfig:
3341
        """
3342
        Destination ARNs can be:
3343
        * SQS arn
3344
        * SNS arn
3345
        * Lambda arn
3346
        * EventBridge arn
3347

3348
        Differences between put_ and update_:
3349
            * put overwrites any existing config
3350
            * update allows changes only single values while keeping the rest of existing ones
3351
            * update fails on non-existing configs
3352

3353
        Differences between destination and DLQ
3354
            * "However, a dead-letter queue is part of a function's version-specific configuration, so it is locked in when you publish a version."
3355
            *  "On-failure destinations also support additional targets and include details about the function's response in the invocation record."
3356

3357
        """
3358
        if (
1✔
3359
            maximum_event_age_in_seconds is None
3360
            and maximum_retry_attempts is None
3361
            and destination_config is None
3362
        ):
3363
            raise InvalidParameterValueException(
1✔
3364
                "You must specify at least one of error handling or destination setting.",
3365
                Type="User",
3366
            )
3367
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3368
        state = lambda_stores[account_id][region]
1✔
3369
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3370
            function_name, qualifier, context
3371
        )
3372
        fn = state.functions.get(function_name)
1✔
3373
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3374
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3375

3376
        qualifier = qualifier or "$LATEST"
1✔
3377

3378
        # validate and normalize destination config
3379
        if destination_config:
1✔
3380
            self._validate_destination_config(state, function_name, destination_config)
1✔
3381

3382
        destination_config = DestinationConfig(
1✔
3383
            OnSuccess=OnSuccess(
3384
                Destination=(destination_config or {}).get("OnSuccess", {}).get("Destination")
3385
            ),
3386
            OnFailure=OnFailure(
3387
                Destination=(destination_config or {}).get("OnFailure", {}).get("Destination")
3388
            ),
3389
        )
3390

3391
        config = EventInvokeConfig(
1✔
3392
            function_name=function_name,
3393
            qualifier=qualifier,
3394
            maximum_event_age_in_seconds=maximum_event_age_in_seconds,
3395
            maximum_retry_attempts=maximum_retry_attempts,
3396
            last_modified=api_utils.generate_lambda_date(),
3397
            destination_config=destination_config,
3398
        )
3399
        fn.event_invoke_configs[qualifier] = config
1✔
3400

3401
        return FunctionEventInvokeConfig(
1✔
3402
            LastModified=datetime.datetime.strptime(
3403
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3404
            ),
3405
            FunctionArn=api_utils.qualified_lambda_arn(
3406
                function_name, qualifier or "$LATEST", account_id, region
3407
            ),
3408
            DestinationConfig=destination_config,
3409
            MaximumEventAgeInSeconds=maximum_event_age_in_seconds,
3410
            MaximumRetryAttempts=maximum_retry_attempts,
3411
        )
3412

3413
    def get_function_event_invoke_config(
1✔
3414
        self,
3415
        context: RequestContext,
3416
        function_name: FunctionName,
3417
        qualifier: Qualifier = None,
3418
        **kwargs,
3419
    ) -> FunctionEventInvokeConfig:
3420
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3421
        state = lambda_stores[account_id][region]
1✔
3422
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3423
            function_name, qualifier, context
3424
        )
3425

3426
        qualifier = qualifier or "$LATEST"
1✔
3427
        fn = state.functions.get(function_name)
1✔
3428
        if not fn:
1✔
3429
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3430
            raise ResourceNotFoundException(
1✔
3431
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3432
            )
3433

3434
        config = fn.event_invoke_configs.get(qualifier)
1✔
3435
        if not config:
1✔
3436
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3437
            raise ResourceNotFoundException(
1✔
3438
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3439
            )
3440

3441
        return FunctionEventInvokeConfig(
1✔
3442
            LastModified=datetime.datetime.strptime(
3443
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3444
            ),
3445
            FunctionArn=api_utils.qualified_lambda_arn(
3446
                function_name, qualifier, account_id, region
3447
            ),
3448
            DestinationConfig=config.destination_config,
3449
            MaximumEventAgeInSeconds=config.maximum_event_age_in_seconds,
3450
            MaximumRetryAttempts=config.maximum_retry_attempts,
3451
        )
3452

3453
    def list_function_event_invoke_configs(
1✔
3454
        self,
3455
        context: RequestContext,
3456
        function_name: FunctionName,
3457
        marker: String = None,
3458
        max_items: MaxFunctionEventInvokeConfigListItems = None,
3459
        **kwargs,
3460
    ) -> ListFunctionEventInvokeConfigsResponse:
3461
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3462
        state = lambda_stores[account_id][region]
1✔
3463
        fn = state.functions.get(function_name)
1✔
3464
        if not fn:
1✔
3465
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3466

3467
        event_invoke_configs = [
1✔
3468
            FunctionEventInvokeConfig(
3469
                LastModified=c.last_modified,
3470
                FunctionArn=api_utils.qualified_lambda_arn(
3471
                    function_name, c.qualifier, account_id, region
3472
                ),
3473
                MaximumEventAgeInSeconds=c.maximum_event_age_in_seconds,
3474
                MaximumRetryAttempts=c.maximum_retry_attempts,
3475
                DestinationConfig=c.destination_config,
3476
            )
3477
            for c in fn.event_invoke_configs.values()
3478
        ]
3479

3480
        event_invoke_configs = PaginatedList(event_invoke_configs)
1✔
3481
        page, token = event_invoke_configs.get_page(
1✔
3482
            lambda x: x["FunctionArn"],
3483
            marker,
3484
            max_items,
3485
        )
3486
        return ListFunctionEventInvokeConfigsResponse(
1✔
3487
            FunctionEventInvokeConfigs=page, NextMarker=token
3488
        )
3489

3490
    def delete_function_event_invoke_config(
1✔
3491
        self,
3492
        context: RequestContext,
3493
        function_name: FunctionName,
3494
        qualifier: Qualifier = None,
3495
        **kwargs,
3496
    ) -> None:
3497
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3498
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3499
            function_name, qualifier, context
3500
        )
3501
        state = lambda_stores[account_id][region]
1✔
3502
        fn = state.functions.get(function_name)
1✔
3503
        resolved_qualifier = qualifier or "$LATEST"
1✔
3504
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3505
        if not fn:
1✔
3506
            raise ResourceNotFoundException(
1✔
3507
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3508
            )
3509

3510
        config = fn.event_invoke_configs.get(resolved_qualifier)
1✔
3511
        if not config:
1✔
3512
            raise ResourceNotFoundException(
1✔
3513
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3514
            )
3515

3516
        del fn.event_invoke_configs[resolved_qualifier]
1✔
3517

3518
    def update_function_event_invoke_config(
1✔
3519
        self,
3520
        context: RequestContext,
3521
        function_name: FunctionName,
3522
        qualifier: Qualifier = None,
3523
        maximum_retry_attempts: MaximumRetryAttempts = None,
3524
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3525
        destination_config: DestinationConfig = None,
3526
        **kwargs,
3527
    ) -> FunctionEventInvokeConfig:
3528
        # like put but only update single fields via replace
3529
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3530
        state = lambda_stores[account_id][region]
1✔
3531
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3532
            function_name, qualifier, context
3533
        )
3534

3535
        if (
1✔
3536
            maximum_event_age_in_seconds is None
3537
            and maximum_retry_attempts is None
3538
            and destination_config is None
3539
        ):
UNCOV
3540
            raise InvalidParameterValueException(
×
3541
                "You must specify at least one of error handling or destination setting.",
3542
                Type="User",
3543
            )
3544

3545
        fn = state.functions.get(function_name)
1✔
3546
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3547
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3548

3549
        qualifier = qualifier or "$LATEST"
1✔
3550

3551
        config = fn.event_invoke_configs.get(qualifier)
1✔
3552
        if not config:
1✔
3553
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3554
            raise ResourceNotFoundException(
1✔
3555
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3556
            )
3557

3558
        if destination_config:
1✔
UNCOV
3559
            self._validate_destination_config(state, function_name, destination_config)
×
3560

3561
        optional_kwargs = {
1✔
3562
            k: v
3563
            for k, v in {
3564
                "destination_config": destination_config,
3565
                "maximum_retry_attempts": maximum_retry_attempts,
3566
                "maximum_event_age_in_seconds": maximum_event_age_in_seconds,
3567
            }.items()
3568
            if v is not None
3569
        }
3570

3571
        new_config = dataclasses.replace(
1✔
3572
            config, last_modified=api_utils.generate_lambda_date(), **optional_kwargs
3573
        )
3574
        fn.event_invoke_configs[qualifier] = new_config
1✔
3575

3576
        return FunctionEventInvokeConfig(
1✔
3577
            LastModified=datetime.datetime.strptime(
3578
                new_config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3579
            ),
3580
            FunctionArn=api_utils.qualified_lambda_arn(
3581
                function_name, qualifier or "$LATEST", account_id, region
3582
            ),
3583
            DestinationConfig=new_config.destination_config,
3584
            MaximumEventAgeInSeconds=new_config.maximum_event_age_in_seconds,
3585
            MaximumRetryAttempts=new_config.maximum_retry_attempts,
3586
        )
3587

3588
    # =======================================
3589
    # ======  Layer & Layer Versions  =======
3590
    # =======================================
3591

3592
    @staticmethod
1✔
3593
    def _resolve_layer(
1✔
3594
        layer_name_or_arn: str, context: RequestContext
3595
    ) -> tuple[str, str, str, str | None]:
3596
        """
3597
        Return locator attributes for a given Lambda layer.
3598

3599
        :param layer_name_or_arn: Layer name or ARN
3600
        :param context: Request context
3601
        :return: Tuple of region, account ID, layer name, layer version
3602
        """
3603
        if api_utils.is_layer_arn(layer_name_or_arn):
1✔
3604
            return api_utils.parse_layer_arn(layer_name_or_arn)
1✔
3605

3606
        return context.region, context.account_id, layer_name_or_arn, None
1✔
3607

3608
    def publish_layer_version(
1✔
3609
        self,
3610
        context: RequestContext,
3611
        layer_name: LayerName,
3612
        content: LayerVersionContentInput,
3613
        description: Description = None,
3614
        compatible_runtimes: CompatibleRuntimes = None,
3615
        license_info: LicenseInfo = None,
3616
        compatible_architectures: CompatibleArchitectures = None,
3617
        **kwargs,
3618
    ) -> PublishLayerVersionResponse:
3619
        """
3620
        On first use of a LayerName a new layer is created and for each subsequent call with the same LayerName a new version is created.
3621
        Note that there are no $LATEST versions with layers!
3622

3623
        """
3624
        account = context.account_id
1✔
3625
        region = context.region
1✔
3626

3627
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
3628
            compatible_runtimes, compatible_architectures
3629
        )
3630
        if validation_errors:
1✔
3631
            raise ValidationException(
1✔
3632
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {'; '.join(validation_errors)}"
3633
            )
3634

3635
        state = lambda_stores[account][region]
1✔
3636
        with self.create_layer_lock:
1✔
3637
            if layer_name not in state.layers:
1✔
3638
                # we don't have a version so create new layer object
3639
                # lock is required to avoid creating two v1 objects for the same name
3640
                layer = Layer(
1✔
3641
                    arn=api_utils.layer_arn(layer_name=layer_name, account=account, region=region)
3642
                )
3643
                state.layers[layer_name] = layer
1✔
3644

3645
        layer = state.layers[layer_name]
1✔
3646
        with layer.next_version_lock:
1✔
3647
            next_version = LambdaLayerVersionIdentifier(
1✔
3648
                account_id=account, region=region, layer_name=layer_name
3649
            ).generate(next_version=layer.next_version)
3650
            # When creating a layer with user defined layer version, it is possible that we
3651
            # create layer versions out of order.
3652
            # ie. a user could replicate layer v2 then layer v1. It is important to always keep the maximum possible
3653
            # value for next layer to avoid overwriting existing versions
3654
            if layer.next_version <= next_version:
1✔
3655
                # We don't need to update layer.next_version if the created version is lower than the "next in line"
3656
                layer.next_version = max(next_version, layer.next_version) + 1
1✔
3657

3658
        # creating a new layer
3659
        if content.get("ZipFile"):
1✔
3660
            code = store_lambda_archive(
1✔
3661
                archive_file=content["ZipFile"],
3662
                function_name=layer_name,
3663
                region_name=region,
3664
                account_id=account,
3665
            )
3666
        else:
3667
            code = store_s3_bucket_archive(
1✔
3668
                archive_bucket=content["S3Bucket"],
3669
                archive_key=content["S3Key"],
3670
                archive_version=content.get("S3ObjectVersion"),
3671
                function_name=layer_name,
3672
                region_name=region,
3673
                account_id=account,
3674
            )
3675

3676
        new_layer_version = LayerVersion(
1✔
3677
            layer_version_arn=api_utils.layer_version_arn(
3678
                layer_name=layer_name,
3679
                account=account,
3680
                region=region,
3681
                version=str(next_version),
3682
            ),
3683
            layer_arn=layer.arn,
3684
            version=next_version,
3685
            description=description or "",
3686
            license_info=license_info,
3687
            compatible_runtimes=compatible_runtimes,
3688
            compatible_architectures=compatible_architectures,
3689
            created=api_utils.generate_lambda_date(),
3690
            code=code,
3691
        )
3692

3693
        layer.layer_versions[str(next_version)] = new_layer_version
1✔
3694

3695
        return api_utils.map_layer_out(new_layer_version)
1✔
3696

3697
    def get_layer_version(
1✔
3698
        self,
3699
        context: RequestContext,
3700
        layer_name: LayerName,
3701
        version_number: LayerVersionNumber,
3702
        **kwargs,
3703
    ) -> GetLayerVersionResponse:
3704
        # TODO: handle layer_name as an ARN
3705

3706
        region_name, account_id, layer_name, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
3707
        state = lambda_stores[account_id][region_name]
1✔
3708

3709
        layer = state.layers.get(layer_name)
1✔
3710
        if version_number < 1:
1✔
3711
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
3712
        if layer is None:
1✔
3713
            raise ResourceNotFoundException(
1✔
3714
                "The resource you requested does not exist.", Type="User"
3715
            )
3716
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3717
        if layer_version is None:
1✔
3718
            raise ResourceNotFoundException(
1✔
3719
                "The resource you requested does not exist.", Type="User"
3720
            )
3721
        return api_utils.map_layer_out(layer_version)
1✔
3722

3723
    def get_layer_version_by_arn(
1✔
3724
        self, context: RequestContext, arn: LayerVersionArn, **kwargs
3725
    ) -> GetLayerVersionResponse:
3726
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3727
            arn, context
3728
        )
3729

3730
        if not layer_version:
1✔
3731
            raise ValidationException(
1✔
3732
                f"1 validation error detected: Value '{arn}' at 'arn' failed to satisfy constraint: Member must satisfy regular expression pattern: "
3733
                + "(arn:(aws[a-zA-Z-]*)?:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+)"
3734
            )
3735

3736
        store = lambda_stores[account_id][region_name]
1✔
3737
        if not (layers := store.layers.get(layer_name)):
1✔
UNCOV
3738
            raise ResourceNotFoundException(
×
3739
                "The resource you requested does not exist.", Type="User"
3740
            )
3741

3742
        layer_version = layers.layer_versions.get(layer_version)
1✔
3743

3744
        if not layer_version:
1✔
3745
            raise ResourceNotFoundException(
1✔
3746
                "The resource you requested does not exist.", Type="User"
3747
            )
3748

3749
        return api_utils.map_layer_out(layer_version)
1✔
3750

3751
    def list_layers(
1✔
3752
        self,
3753
        context: RequestContext,
3754
        compatible_runtime: Runtime = None,
3755
        marker: String = None,
3756
        max_items: MaxLayerListItems = None,
3757
        compatible_architecture: Architecture = None,
3758
        **kwargs,
3759
    ) -> ListLayersResponse:
3760
        validation_errors = []
1✔
3761

3762
        validation_error_arch = api_utils.validate_layer_architecture(compatible_architecture)
1✔
3763
        if validation_error_arch:
1✔
3764
            validation_errors.append(validation_error_arch)
1✔
3765

3766
        validation_error_runtime = api_utils.validate_layer_runtime(compatible_runtime)
1✔
3767
        if validation_error_runtime:
1✔
3768
            validation_errors.append(validation_error_runtime)
1✔
3769

3770
        if validation_errors:
1✔
3771
            raise ValidationException(
1✔
3772
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
3773
            )
3774
        # TODO: handle filter: compatible_runtime
3775
        # TODO: handle filter: compatible_architecture
3776

UNCOV
3777
        state = lambda_stores[context.account_id][context.region]
×
UNCOV
3778
        layers = state.layers
×
3779

3780
        # TODO: test how filters behave together with only returning layers here? Does it return the latest "matching" layer, i.e. does it ignore later layer versions that don't match?
3781

UNCOV
3782
        responses: list[LayersListItem] = []
×
3783
        for layer_name, layer in layers.items():
×
3784
            # fetch latest version
3785
            layer_versions = list(layer.layer_versions.values())
×
3786
            sorted(layer_versions, key=lambda x: x.version)
×
UNCOV
3787
            latest_layer_version = layer_versions[-1]
×
UNCOV
3788
            responses.append(
×
3789
                LayersListItem(
3790
                    LayerName=layer_name,
3791
                    LayerArn=layer.arn,
3792
                    LatestMatchingVersion=api_utils.map_layer_out(latest_layer_version),
3793
                )
3794
            )
3795

UNCOV
3796
        responses = PaginatedList(responses)
×
UNCOV
3797
        page, token = responses.get_page(
×
3798
            lambda version: version,
3799
            marker,
3800
            max_items,
3801
        )
3802

UNCOV
3803
        return ListLayersResponse(NextMarker=token, Layers=page)
×
3804

3805
    def list_layer_versions(
1✔
3806
        self,
3807
        context: RequestContext,
3808
        layer_name: LayerName,
3809
        compatible_runtime: Runtime = None,
3810
        marker: String = None,
3811
        max_items: MaxLayerListItems = None,
3812
        compatible_architecture: Architecture = None,
3813
        **kwargs,
3814
    ) -> ListLayerVersionsResponse:
3815
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
3816
            [compatible_runtime] if compatible_runtime else [],
3817
            [compatible_architecture] if compatible_architecture else [],
3818
        )
3819
        if validation_errors:
1✔
UNCOV
3820
            raise ValidationException(
×
3821
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
3822
            )
3823

3824
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3825
            layer_name, context
3826
        )
3827
        state = lambda_stores[account_id][region_name]
1✔
3828

3829
        # TODO: Test & handle filter: compatible_runtime
3830
        # TODO: Test & handle filter: compatible_architecture
3831
        all_layer_versions = []
1✔
3832
        layer = state.layers.get(layer_name)
1✔
3833
        if layer is not None:
1✔
3834
            for layer_version in layer.layer_versions.values():
1✔
3835
                all_layer_versions.append(api_utils.map_layer_out(layer_version))
1✔
3836

3837
        all_layer_versions.sort(key=lambda x: x["Version"], reverse=True)
1✔
3838
        all_layer_versions = PaginatedList(all_layer_versions)
1✔
3839
        page, token = all_layer_versions.get_page(
1✔
3840
            lambda version: version["LayerVersionArn"],
3841
            marker,
3842
            max_items,
3843
        )
3844
        return ListLayerVersionsResponse(NextMarker=token, LayerVersions=page)
1✔
3845

3846
    def delete_layer_version(
1✔
3847
        self,
3848
        context: RequestContext,
3849
        layer_name: LayerName,
3850
        version_number: LayerVersionNumber,
3851
        **kwargs,
3852
    ) -> None:
3853
        if version_number < 1:
1✔
3854
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
3855

3856
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3857
            layer_name, context
3858
        )
3859

3860
        store = lambda_stores[account_id][region_name]
1✔
3861
        layer = store.layers.get(layer_name, {})
1✔
3862
        if layer:
1✔
3863
            layer.layer_versions.pop(str(version_number), None)
1✔
3864

3865
    # =======================================
3866
    # =====  Layer Version Permissions  =====
3867
    # =======================================
3868
    # TODO: lock updates that change revision IDs
3869

3870
    def add_layer_version_permission(
1✔
3871
        self,
3872
        context: RequestContext,
3873
        layer_name: LayerName,
3874
        version_number: LayerVersionNumber,
3875
        statement_id: StatementId,
3876
        action: LayerPermissionAllowedAction,
3877
        principal: LayerPermissionAllowedPrincipal,
3878
        organization_id: OrganizationId = None,
3879
        revision_id: String = None,
3880
        **kwargs,
3881
    ) -> AddLayerVersionPermissionResponse:
3882
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
3883
        # `layer_n` contains the layer name.
3884
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
3885

3886
        if action != "lambda:GetLayerVersion":
1✔
3887
            raise ValidationException(
1✔
3888
                f"1 validation error detected: Value '{action}' at 'action' failed to satisfy constraint: Member must satisfy regular expression pattern: lambda:GetLayerVersion"
3889
            )
3890

3891
        store = lambda_stores[account_id][region_name]
1✔
3892
        layer = store.layers.get(layer_n)
1✔
3893

3894
        layer_version_arn = api_utils.layer_version_arn(
1✔
3895
            layer_name, account_id, region_name, str(version_number)
3896
        )
3897

3898
        if layer is None:
1✔
3899
            raise ResourceNotFoundException(
1✔
3900
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3901
            )
3902
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3903
        if layer_version is None:
1✔
3904
            raise ResourceNotFoundException(
1✔
3905
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3906
            )
3907
        # do we have a policy? if not set one
3908
        if layer_version.policy is None:
1✔
3909
            layer_version.policy = LayerPolicy()
1✔
3910

3911
        if statement_id in layer_version.policy.statements:
1✔
3912
            raise ResourceConflictException(
1✔
3913
                f"The statement id ({statement_id}) provided already exists. Please provide a new statement id, or remove the existing statement.",
3914
                Type="User",
3915
            )
3916

3917
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
3918
            raise PreconditionFailedException(
1✔
3919
                "The Revision Id provided does not match the latest Revision Id. "
3920
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
3921
                Type="User",
3922
            )
3923

3924
        statement = LayerPolicyStatement(
1✔
3925
            sid=statement_id, action=action, principal=principal, organization_id=organization_id
3926
        )
3927

3928
        old_statements = layer_version.policy.statements
1✔
3929
        layer_version.policy = dataclasses.replace(
1✔
3930
            layer_version.policy, statements={**old_statements, statement_id: statement}
3931
        )
3932

3933
        return AddLayerVersionPermissionResponse(
1✔
3934
            Statement=json.dumps(
3935
                {
3936
                    "Sid": statement.sid,
3937
                    "Effect": "Allow",
3938
                    "Principal": statement.principal,
3939
                    "Action": statement.action,
3940
                    "Resource": layer_version.layer_version_arn,
3941
                }
3942
            ),
3943
            RevisionId=layer_version.policy.revision_id,
3944
        )
3945

3946
    def remove_layer_version_permission(
1✔
3947
        self,
3948
        context: RequestContext,
3949
        layer_name: LayerName,
3950
        version_number: LayerVersionNumber,
3951
        statement_id: StatementId,
3952
        revision_id: String = None,
3953
        **kwargs,
3954
    ) -> None:
3955
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
3956
        # `layer_n` contains the layer name.
3957
        region_name, account_id, layer_n, layer_version = LambdaProvider._resolve_layer(
1✔
3958
            layer_name, context
3959
        )
3960

3961
        layer_version_arn = api_utils.layer_version_arn(
1✔
3962
            layer_name, account_id, region_name, str(version_number)
3963
        )
3964

3965
        state = lambda_stores[account_id][region_name]
1✔
3966
        layer = state.layers.get(layer_n)
1✔
3967
        if layer is None:
1✔
3968
            raise ResourceNotFoundException(
1✔
3969
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3970
            )
3971
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3972
        if layer_version is None:
1✔
3973
            raise ResourceNotFoundException(
1✔
3974
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3975
            )
3976

3977
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
3978
            raise PreconditionFailedException(
1✔
3979
                "The Revision Id provided does not match the latest Revision Id. "
3980
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
3981
                Type="User",
3982
            )
3983

3984
        if statement_id not in layer_version.policy.statements:
1✔
3985
            raise ResourceNotFoundException(
1✔
3986
                f"Statement {statement_id} is not found in resource policy.", Type="User"
3987
            )
3988

3989
        old_statements = layer_version.policy.statements
1✔
3990
        layer_version.policy = dataclasses.replace(
1✔
3991
            layer_version.policy,
3992
            statements={k: v for k, v in old_statements.items() if k != statement_id},
3993
        )
3994

3995
    def get_layer_version_policy(
1✔
3996
        self,
3997
        context: RequestContext,
3998
        layer_name: LayerName,
3999
        version_number: LayerVersionNumber,
4000
        **kwargs,
4001
    ) -> GetLayerVersionPolicyResponse:
4002
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4003
        # `layer_n` contains the layer name.
4004
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
4005

4006
        layer_version_arn = api_utils.layer_version_arn(
1✔
4007
            layer_name, account_id, region_name, str(version_number)
4008
        )
4009

4010
        store = lambda_stores[account_id][region_name]
1✔
4011
        layer = store.layers.get(layer_n)
1✔
4012

4013
        if layer is None:
1✔
4014
            raise ResourceNotFoundException(
1✔
4015
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4016
            )
4017

4018
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4019
        if layer_version is None:
1✔
4020
            raise ResourceNotFoundException(
1✔
4021
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4022
            )
4023

4024
        if layer_version.policy is None:
1✔
4025
            raise ResourceNotFoundException(
1✔
4026
                "No policy is associated with the given resource.", Type="User"
4027
            )
4028

4029
        return GetLayerVersionPolicyResponse(
1✔
4030
            Policy=json.dumps(
4031
                {
4032
                    "Version": layer_version.policy.version,
4033
                    "Id": layer_version.policy.id,
4034
                    "Statement": [
4035
                        {
4036
                            "Sid": ps.sid,
4037
                            "Effect": "Allow",
4038
                            "Principal": ps.principal,
4039
                            "Action": ps.action,
4040
                            "Resource": layer_version.layer_version_arn,
4041
                        }
4042
                        for ps in layer_version.policy.statements.values()
4043
                    ],
4044
                }
4045
            ),
4046
            RevisionId=layer_version.policy.revision_id,
4047
        )
4048

4049
    # =======================================
4050
    # =======  Function Concurrency  ========
4051
    # =======================================
4052
    # (Reserved) function concurrency is scoped to the whole function
4053

4054
    def get_function_concurrency(
1✔
4055
        self, context: RequestContext, function_name: FunctionName, **kwargs
4056
    ) -> GetFunctionConcurrencyResponse:
4057
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4058
        function_name = api_utils.get_function_name(function_name, context)
1✔
4059
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
4060
        return GetFunctionConcurrencyResponse(
1✔
4061
            ReservedConcurrentExecutions=fn.reserved_concurrent_executions
4062
        )
4063

4064
    def put_function_concurrency(
1✔
4065
        self,
4066
        context: RequestContext,
4067
        function_name: FunctionName,
4068
        reserved_concurrent_executions: ReservedConcurrentExecutions,
4069
        **kwargs,
4070
    ) -> Concurrency:
4071
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4072

4073
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4074
        if qualifier:
1✔
4075
            raise InvalidParameterValueException(
1✔
4076
                "This operation is permitted on Lambda functions only. Aliases and versions do not support this operation. Please specify either a function name or an unqualified function ARN.",
4077
                Type="User",
4078
            )
4079

4080
        store = lambda_stores[account_id][region]
1✔
4081
        fn = store.functions.get(function_name)
1✔
4082
        if not fn:
1✔
4083
            fn_arn = api_utils.qualified_lambda_arn(
1✔
4084
                function_name,
4085
                qualifier="$LATEST",
4086
                account=account_id,
4087
                region=region,
4088
            )
4089
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
4090

4091
        settings = self.get_account_settings(context)
1✔
4092
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
4093
            "UnreservedConcurrentExecutions"
4094
        ]
4095

4096
        # The existing reserved concurrent executions for the same function are already deduced in
4097
        # unreserved_concurrent_executions but must not count because the new one will replace the existing one.
4098
        # Joel tested this behavior manually against AWS (2023-11-28).
4099
        existing_reserved_concurrent_executions = (
1✔
4100
            fn.reserved_concurrent_executions if fn.reserved_concurrent_executions else 0
4101
        )
4102
        if (
1✔
4103
            unreserved_concurrent_executions
4104
            - reserved_concurrent_executions
4105
            + existing_reserved_concurrent_executions
4106
        ) < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY:
4107
            raise InvalidParameterValueException(
1✔
4108
                f"Specified ReservedConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
4109
            )
4110

4111
        total_provisioned_concurrency = sum(
1✔
4112
            [
4113
                provisioned_configs.provisioned_concurrent_executions
4114
                for provisioned_configs in fn.provisioned_concurrency_configs.values()
4115
            ]
4116
        )
4117
        if total_provisioned_concurrency > reserved_concurrent_executions:
1✔
4118
            raise InvalidParameterValueException(
1✔
4119
                f" ReservedConcurrentExecutions  {reserved_concurrent_executions} should not be lower than function's total provisioned concurrency [{total_provisioned_concurrency}]."
4120
            )
4121

4122
        fn.reserved_concurrent_executions = reserved_concurrent_executions
1✔
4123

4124
        return Concurrency(ReservedConcurrentExecutions=fn.reserved_concurrent_executions)
1✔
4125

4126
    def delete_function_concurrency(
1✔
4127
        self, context: RequestContext, function_name: FunctionName, **kwargs
4128
    ) -> None:
4129
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4130
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4131
        store = lambda_stores[account_id][region]
1✔
4132
        fn = store.functions.get(function_name)
1✔
4133
        fn.reserved_concurrent_executions = None
1✔
4134

4135
    # =======================================
4136
    # ===============  TAGS   ===============
4137
    # =======================================
4138
    # only Function, Event Source Mapping, and Code Signing Config (not currently supported by LocalStack) ARNs an are available for tagging in AWS
4139

4140
    def _get_tags(self, resource: TaggableResource) -> dict[str, str]:
1✔
4141
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4142
        lambda_adapted_tags = {
1✔
4143
            tag["Key"]: tag["Value"]
4144
            for tag in state.TAGS.list_tags_for_resource(resource).get("Tags")
4145
        }
4146
        return lambda_adapted_tags
1✔
4147

4148
    def _store_tags(self, resource: TaggableResource, tags: dict[str, str]):
1✔
4149
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4150
        if len(state.TAGS.tags.get(resource, {}) | tags) > LAMBDA_TAG_LIMIT_PER_RESOURCE:
1✔
4151
            raise InvalidParameterValueException(
1✔
4152
                "Number of tags exceeds resource tag limit.", Type="User"
4153
            )
4154

4155
        tag_svc_adapted_tags = [{"Key": key, "Value": value} for key, value in tags.items()]
1✔
4156
        state.TAGS.tag_resource(resource, tag_svc_adapted_tags)
1✔
4157

4158
    def fetch_lambda_store_for_tagging(self, resource: TaggableResource) -> LambdaStore:
1✔
4159
        """
4160
        Takes a resource ARN for a TaggableResource (Lambda Function, Event Source Mapping, or Code Signing Config) and returns a corresponding
4161
        LambdaStore for its region and account.
4162

4163
        In addition, this function validates that the ARN is a valid TaggableResource type, and that the TaggableResource exists.
4164

4165
        Raises:
4166
            ValidationException: If the resource ARN is not a full ARN for a TaggableResource.
4167
            ResourceNotFoundException: If the specified resource does not exist.
4168
            InvalidParameterValueException: If the resource ARN is a qualified Lambda Function.
4169
        """
4170

4171
        def _raise_validation_exception():
1✔
4172
            raise ValidationException(
1✔
4173
                f"1 validation error detected: Value '{resource}' at 'resource' failed to satisfy constraint: Member must satisfy regular expression pattern: {api_utils.TAGGABLE_RESOURCE_ARN_PATTERN}"
4174
            )
4175

4176
        # Check whether the ARN we have been passed is correctly formatted
4177
        parsed_resource_arn: ArnData = None
1✔
4178
        try:
1✔
4179
            parsed_resource_arn = parse_arn(resource)
1✔
4180
        except Exception:
1✔
4181
            _raise_validation_exception()
1✔
4182

4183
        # TODO: Should we be checking whether this is a full ARN?
4184
        region, account_id, resource_type = map(
1✔
4185
            parsed_resource_arn.get, ("region", "account", "resource")
4186
        )
4187

4188
        if not all((region, account_id, resource_type)):
1✔
UNCOV
4189
            _raise_validation_exception()
×
4190

4191
        if not (parts := resource_type.split(":")):
1✔
UNCOV
4192
            _raise_validation_exception()
×
4193

4194
        resource_type, resource_identifier, *qualifier = parts
1✔
4195
        if resource_type not in {"event-source-mapping", "code-signing-config", "function"}:
1✔
4196
            _raise_validation_exception()
1✔
4197

4198
        if qualifier:
1✔
4199
            if resource_type == "function":
1✔
4200
                raise InvalidParameterValueException(
1✔
4201
                    "Tags on function aliases and versions are not supported. Please specify a function ARN.",
4202
                    Type="User",
4203
                )
4204
            _raise_validation_exception()
1✔
4205

4206
        match resource_type:
1✔
4207
            case "event-source-mapping":
1✔
4208
                self._get_esm(resource_identifier, account_id, region)
1✔
4209
            case "code-signing-config":
1✔
4210
                raise NotImplementedError("Resource tagging on CSC not yet implemented.")
4211
            case "function":
1✔
4212
                self._get_function(
1✔
4213
                    function_name=resource_identifier, account_id=account_id, region=region
4214
                )
4215

4216
        # If no exceptions are raised, assume ARN and referenced resource is valid for tag operations
4217
        return lambda_stores[account_id][region]
1✔
4218

4219
    def tag_resource(
1✔
4220
        self, context: RequestContext, resource: TaggableResource, tags: Tags, **kwargs
4221
    ) -> None:
4222
        if not tags:
1✔
4223
            raise InvalidParameterValueException(
1✔
4224
                "An error occurred and the request cannot be processed.", Type="User"
4225
            )
4226
        self._store_tags(resource, tags)
1✔
4227

4228
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4229
            "function"
4230
        ):
4231
            name, _, account, region = function_locators_from_arn(resource)
1✔
4232
            function = self._get_function(name, account, region)
1✔
4233
            with function.lock:
1✔
4234
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4235
                latest_version = function.versions["$LATEST"]
1✔
4236
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4237
                    latest_version, config=dataclasses.replace(latest_version.config)
4238
                )
4239

4240
    def list_tags(
1✔
4241
        self, context: RequestContext, resource: TaggableResource, **kwargs
4242
    ) -> ListTagsResponse:
4243
        tags = self._get_tags(resource)
1✔
4244
        return ListTagsResponse(Tags=tags)
1✔
4245

4246
    def untag_resource(
1✔
4247
        self, context: RequestContext, resource: TaggableResource, tag_keys: TagKeyList, **kwargs
4248
    ) -> None:
4249
        if not tag_keys:
1✔
4250
            raise ValidationException(
1✔
4251
                "1 validation error detected: Value null at 'tagKeys' failed to satisfy constraint: Member must not be null"
4252
            )  # should probably be generalized a bit
4253

4254
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4255
        state.TAGS.untag_resource(resource, tag_keys)
1✔
4256

4257
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4258
            "function"
4259
        ):
4260
            name, _, account, region = function_locators_from_arn(resource)
1✔
4261
            function = self._get_function(name, account, region)
1✔
4262
            # TODO: Potential race condition
4263
            with function.lock:
1✔
4264
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4265
                latest_version = function.versions["$LATEST"]
1✔
4266
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4267
                    latest_version, config=dataclasses.replace(latest_version.config)
4268
                )
4269

4270
    # =======================================
4271
    # =======  LEGACY / DEPRECATED   ========
4272
    # =======================================
4273

4274
    def invoke_async(
1✔
4275
        self,
4276
        context: RequestContext,
4277
        function_name: NamespacedFunctionName,
4278
        invoke_args: IO[BlobStream],
4279
        **kwargs,
4280
    ) -> InvokeAsyncResponse:
4281
        """LEGACY API endpoint. Even AWS heavily discourages its usage."""
4282
        raise NotImplementedError
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc