• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / a63921d2-72eb-4883-bbb8-968104512d0e

02 Apr 2025 01:40PM UTC coverage: 86.807% (-0.08%) from 86.888%
a63921d2-72eb-4883-bbb8-968104512d0e

push

circleci

web-flow
CFn: WIP POC v2 executor (#12396)

Co-authored-by: Marco Edoardo Palma <64580864+MEPalma@users.noreply.github.com>

36 of 112 new or added lines in 6 files covered. (32.14%)

72 existing lines in 7 files now uncovered.

63519 of 73173 relevant lines covered (86.81%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.88
/localstack-core/localstack/services/lambda_/provider.py
1
import base64
1✔
2
import dataclasses
1✔
3
import datetime
1✔
4
import itertools
1✔
5
import json
1✔
6
import logging
1✔
7
import re
1✔
8
import threading
1✔
9
import time
1✔
10
from typing import IO, Any, Optional, Tuple
1✔
11

12
from botocore.exceptions import ClientError
1✔
13

14
from localstack import config
1✔
15
from localstack.aws.api import RequestContext, ServiceException, handler
1✔
16
from localstack.aws.api.lambda_ import (
1✔
17
    AccountLimit,
18
    AccountUsage,
19
    AddLayerVersionPermissionResponse,
20
    AddPermissionRequest,
21
    AddPermissionResponse,
22
    Alias,
23
    AliasConfiguration,
24
    AliasRoutingConfiguration,
25
    AllowedPublishers,
26
    Architecture,
27
    Arn,
28
    Blob,
29
    BlobStream,
30
    CodeSigningConfigArn,
31
    CodeSigningConfigNotFoundException,
32
    CodeSigningPolicies,
33
    CompatibleArchitectures,
34
    CompatibleRuntimes,
35
    Concurrency,
36
    Cors,
37
    CreateCodeSigningConfigResponse,
38
    CreateEventSourceMappingRequest,
39
    CreateFunctionRequest,
40
    CreateFunctionUrlConfigResponse,
41
    DeleteCodeSigningConfigResponse,
42
    Description,
43
    DestinationConfig,
44
    EventSourceMappingConfiguration,
45
    FunctionCodeLocation,
46
    FunctionConfiguration,
47
    FunctionEventInvokeConfig,
48
    FunctionName,
49
    FunctionUrlAuthType,
50
    FunctionUrlQualifier,
51
    GetAccountSettingsResponse,
52
    GetCodeSigningConfigResponse,
53
    GetFunctionCodeSigningConfigResponse,
54
    GetFunctionConcurrencyResponse,
55
    GetFunctionRecursionConfigResponse,
56
    GetFunctionResponse,
57
    GetFunctionUrlConfigResponse,
58
    GetLayerVersionPolicyResponse,
59
    GetLayerVersionResponse,
60
    GetPolicyResponse,
61
    GetProvisionedConcurrencyConfigResponse,
62
    InvalidParameterValueException,
63
    InvocationResponse,
64
    InvocationType,
65
    InvokeAsyncResponse,
66
    InvokeMode,
67
    LambdaApi,
68
    LastUpdateStatus,
69
    LayerName,
70
    LayerPermissionAllowedAction,
71
    LayerPermissionAllowedPrincipal,
72
    LayersListItem,
73
    LayerVersionArn,
74
    LayerVersionContentInput,
75
    LayerVersionNumber,
76
    LicenseInfo,
77
    ListAliasesResponse,
78
    ListCodeSigningConfigsResponse,
79
    ListEventSourceMappingsResponse,
80
    ListFunctionEventInvokeConfigsResponse,
81
    ListFunctionsByCodeSigningConfigResponse,
82
    ListFunctionsResponse,
83
    ListFunctionUrlConfigsResponse,
84
    ListLayersResponse,
85
    ListLayerVersionsResponse,
86
    ListProvisionedConcurrencyConfigsResponse,
87
    ListTagsResponse,
88
    ListVersionsByFunctionResponse,
89
    LogFormat,
90
    LoggingConfig,
91
    LogType,
92
    MasterRegion,
93
    MaxFunctionEventInvokeConfigListItems,
94
    MaximumEventAgeInSeconds,
95
    MaximumRetryAttempts,
96
    MaxItems,
97
    MaxLayerListItems,
98
    MaxListItems,
99
    MaxProvisionedConcurrencyConfigListItems,
100
    NamespacedFunctionName,
101
    NamespacedStatementId,
102
    OnFailure,
103
    OnSuccess,
104
    OrganizationId,
105
    PackageType,
106
    PositiveInteger,
107
    PreconditionFailedException,
108
    ProvisionedConcurrencyConfigListItem,
109
    ProvisionedConcurrencyConfigNotFoundException,
110
    ProvisionedConcurrencyStatusEnum,
111
    PublishLayerVersionResponse,
112
    PutFunctionCodeSigningConfigResponse,
113
    PutFunctionRecursionConfigResponse,
114
    PutProvisionedConcurrencyConfigResponse,
115
    Qualifier,
116
    RecursiveLoop,
117
    ReservedConcurrentExecutions,
118
    ResourceConflictException,
119
    ResourceNotFoundException,
120
    Runtime,
121
    RuntimeVersionConfig,
122
    SnapStart,
123
    SnapStartApplyOn,
124
    SnapStartOptimizationStatus,
125
    SnapStartResponse,
126
    State,
127
    StatementId,
128
    StateReasonCode,
129
    String,
130
    TaggableResource,
131
    TagKeyList,
132
    Tags,
133
    TracingMode,
134
    UnqualifiedFunctionName,
135
    UpdateCodeSigningConfigResponse,
136
    UpdateEventSourceMappingRequest,
137
    UpdateFunctionCodeRequest,
138
    UpdateFunctionConfigurationRequest,
139
    UpdateFunctionUrlConfigResponse,
140
    Version,
141
)
142
from localstack.aws.api.lambda_ import FunctionVersion as FunctionVersionApi
1✔
143
from localstack.aws.api.lambda_ import ServiceException as LambdaServiceException
1✔
144
from localstack.aws.api.pipes import (
1✔
145
    DynamoDBStreamStartPosition,
146
    KinesisStreamStartPosition,
147
)
148
from localstack.aws.connect import connect_to
1✔
149
from localstack.aws.spec import load_service
1✔
150
from localstack.services.edge import ROUTER
1✔
151
from localstack.services.lambda_ import api_utils
1✔
152
from localstack.services.lambda_ import hooks as lambda_hooks
1✔
153
from localstack.services.lambda_.analytics import (
1✔
154
    FunctionOperation,
155
    FunctionStatus,
156
    function_counter,
157
)
158
from localstack.services.lambda_.api_utils import (
1✔
159
    ARCHITECTURES,
160
    STATEMENT_ID_REGEX,
161
    SUBNET_ID_REGEX,
162
    function_locators_from_arn,
163
)
164
from localstack.services.lambda_.event_source_mapping.esm_config_factory import (
1✔
165
    EsmConfigFactory,
166
)
167
from localstack.services.lambda_.event_source_mapping.esm_worker import (
1✔
168
    EsmState,
169
    EsmWorker,
170
)
171
from localstack.services.lambda_.event_source_mapping.esm_worker_factory import (
1✔
172
    EsmWorkerFactory,
173
)
174
from localstack.services.lambda_.event_source_mapping.pipe_utils import get_internal_client
1✔
175
from localstack.services.lambda_.invocation import AccessDeniedException
1✔
176
from localstack.services.lambda_.invocation.execution_environment import (
1✔
177
    EnvironmentStartupTimeoutException,
178
)
179
from localstack.services.lambda_.invocation.lambda_models import (
1✔
180
    AliasRoutingConfig,
181
    CodeSigningConfig,
182
    EventInvokeConfig,
183
    Function,
184
    FunctionResourcePolicy,
185
    FunctionUrlConfig,
186
    FunctionVersion,
187
    ImageConfig,
188
    LambdaEphemeralStorage,
189
    Layer,
190
    LayerPolicy,
191
    LayerPolicyStatement,
192
    LayerVersion,
193
    ProvisionedConcurrencyConfiguration,
194
    RequestEntityTooLargeException,
195
    ResourcePolicy,
196
    UpdateStatus,
197
    ValidationException,
198
    VersionAlias,
199
    VersionFunctionConfiguration,
200
    VersionIdentifier,
201
    VersionState,
202
    VpcConfig,
203
)
204
from localstack.services.lambda_.invocation.lambda_service import (
1✔
205
    LambdaService,
206
    create_image_code,
207
    destroy_code_if_not_used,
208
    lambda_stores,
209
    store_lambda_archive,
210
    store_s3_bucket_archive,
211
)
212
from localstack.services.lambda_.invocation.models import LambdaStore
1✔
213
from localstack.services.lambda_.invocation.runtime_executor import get_runtime_executor
1✔
214
from localstack.services.lambda_.lambda_utils import HINT_LOG
1✔
215
from localstack.services.lambda_.layerfetcher.layer_fetcher import LayerFetcher
1✔
216
from localstack.services.lambda_.provider_utils import (
1✔
217
    LambdaLayerVersionIdentifier,
218
    get_function_version,
219
    get_function_version_from_arn,
220
)
221
from localstack.services.lambda_.runtimes import (
1✔
222
    ALL_RUNTIMES,
223
    DEPRECATED_RUNTIMES,
224
    DEPRECATED_RUNTIMES_UPGRADES,
225
    RUNTIMES_AGGREGATED,
226
    SNAP_START_SUPPORTED_RUNTIMES,
227
    VALID_RUNTIMES,
228
)
229
from localstack.services.lambda_.urlrouter import FunctionUrlRouter
1✔
230
from localstack.services.plugins import ServiceLifecycleHook
1✔
231
from localstack.state import StateVisitor
1✔
232
from localstack.utils.aws.arns import (
1✔
233
    ArnData,
234
    extract_resource_from_arn,
235
    extract_service_from_arn,
236
    get_partition,
237
    lambda_event_source_mapping_arn,
238
    parse_arn,
239
)
240
from localstack.utils.aws.client_types import ServicePrincipal
1✔
241
from localstack.utils.bootstrap import is_api_enabled
1✔
242
from localstack.utils.collections import PaginatedList
1✔
243
from localstack.utils.event_matcher import validate_event_pattern
1✔
244
from localstack.utils.lambda_debug_mode.lambda_debug_mode_session import LambdaDebugModeSession
1✔
245
from localstack.utils.strings import get_random_hex, short_uid, to_bytes, to_str
1✔
246
from localstack.utils.sync import poll_condition
1✔
247
from localstack.utils.urls import localstack_host
1✔
248

249
LOG = logging.getLogger(__name__)
1✔
250

251
LAMBDA_DEFAULT_TIMEOUT = 3
1✔
252
LAMBDA_DEFAULT_MEMORY_SIZE = 128
1✔
253

254
LAMBDA_TAG_LIMIT_PER_RESOURCE = 50
1✔
255
LAMBDA_LAYERS_LIMIT_PER_FUNCTION = 5
1✔
256

257
TAG_KEY_CUSTOM_URL = "_custom_id_"
1✔
258
# Requirements (from RFC3986 & co): not longer than 63, first char must be
259
# alpha, then alphanumeric or hyphen, except cannot start or end with hyphen
260
TAG_KEY_CUSTOM_URL_VALIDATOR = re.compile(r"^[A-Za-z]([A-Za-z0-9\-]{0,61}[A-Za-z0-9])?$")
1✔
261

262

263
class LambdaProvider(LambdaApi, ServiceLifecycleHook):
1✔
264
    lambda_service: LambdaService
1✔
265
    create_fn_lock: threading.RLock
1✔
266
    create_layer_lock: threading.RLock
1✔
267
    router: FunctionUrlRouter
1✔
268
    esm_workers: dict[str, EsmWorker]
1✔
269
    layer_fetcher: LayerFetcher | None
1✔
270

271
    def __init__(self) -> None:
1✔
272
        self.lambda_service = LambdaService()
1✔
273
        self.create_fn_lock = threading.RLock()
1✔
274
        self.create_layer_lock = threading.RLock()
1✔
275
        self.router = FunctionUrlRouter(ROUTER, self.lambda_service)
1✔
276
        self.esm_workers = {}
1✔
277
        self.layer_fetcher = None
1✔
278
        lambda_hooks.inject_layer_fetcher.run(self)
1✔
279

280
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
281
        visitor.visit(lambda_stores)
×
282

283
    def on_before_start(self):
1✔
284
        # Attempt to start the Lambda Debug Mode session object.
285
        try:
1✔
286
            lambda_debug_mode_session = LambdaDebugModeSession.get()
1✔
287
            lambda_debug_mode_session.ensure_running()
1✔
288
        except Exception as ex:
×
289
            LOG.error(
×
290
                "Unexpected error encountered when attempting to initialise Lambda Debug Mode '%s'.",
291
                ex,
292
            )
293

294
    def on_before_state_reset(self):
1✔
295
        self.lambda_service.stop()
×
296

297
    def on_after_state_reset(self):
1✔
298
        self.router.lambda_service = self.lambda_service = LambdaService()
×
299

300
    def on_before_state_load(self):
1✔
301
        self.lambda_service.stop()
×
302

303
    def on_after_state_load(self):
1✔
304
        self.lambda_service = LambdaService()
×
305
        self.router.lambda_service = self.lambda_service
×
306

307
        for account_id, account_bundle in lambda_stores.items():
×
308
            for region_name, state in account_bundle.items():
×
309
                for fn in state.functions.values():
×
310
                    for fn_version in fn.versions.values():
×
311
                        # restore the "Pending" state for every function version and start it
312
                        try:
×
313
                            new_state = VersionState(
×
314
                                state=State.Pending,
315
                                code=StateReasonCode.Creating,
316
                                reason="The function is being created.",
317
                            )
318
                            new_config = dataclasses.replace(fn_version.config, state=new_state)
×
319
                            new_version = dataclasses.replace(fn_version, config=new_config)
×
320
                            fn.versions[fn_version.id.qualifier] = new_version
×
321
                            self.lambda_service.create_function_version(fn_version).result(
×
322
                                timeout=5
323
                            )
324
                        except Exception:
×
325
                            LOG.warning(
×
326
                                "Failed to restore function version %s",
327
                                fn_version.id.qualified_arn(),
328
                                exc_info=True,
329
                            )
330
                    # restore provisioned concurrency per function considering both versions and aliases
331
                    for (
×
332
                        provisioned_qualifier,
333
                        provisioned_config,
334
                    ) in fn.provisioned_concurrency_configs.items():
335
                        fn_arn = None
×
336
                        try:
×
337
                            if api_utils.qualifier_is_alias(provisioned_qualifier):
×
338
                                alias = fn.aliases.get(provisioned_qualifier)
×
339
                                resolved_version = fn.versions.get(alias.function_version)
×
340
                                fn_arn = resolved_version.id.qualified_arn()
×
341
                            elif api_utils.qualifier_is_version(provisioned_qualifier):
×
342
                                fn_version = fn.versions.get(provisioned_qualifier)
×
343
                                fn_arn = fn_version.id.qualified_arn()
×
344
                            else:
345
                                raise InvalidParameterValueException(
×
346
                                    "Invalid qualifier type:"
347
                                    " Qualifier can only be an alias or a version for provisioned concurrency."
348
                                )
349

350
                            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
351
                            manager.update_provisioned_concurrency_config(
×
352
                                provisioned_config.provisioned_concurrent_executions
353
                            )
354
                        except Exception:
×
355
                            LOG.warning(
×
356
                                "Failed to restore provisioned concurrency %s for function %s",
357
                                provisioned_config,
358
                                fn_arn,
359
                                exc_info=True,
360
                            )
361

362
                for esm in state.event_source_mappings.values():
×
363
                    # Restores event source workers
364
                    function_arn = esm.get("FunctionArn")
×
365

366
                    # TODO: How do we know the event source is up?
367
                    # A basic poll to see if the mapped Lambda function is active/failed
368
                    if not poll_condition(
×
369
                        lambda: get_function_version_from_arn(function_arn).config.state.state
370
                        in [State.Active, State.Failed],
371
                        timeout=10,
372
                    ):
373
                        LOG.warning(
×
374
                            "Creating ESM for Lambda that is not in running state: %s",
375
                            function_arn,
376
                        )
377

378
                    function_version = get_function_version_from_arn(function_arn)
×
379
                    function_role = function_version.config.role
×
380

381
                    is_esm_enabled = esm.get("State", EsmState.DISABLED) not in (
×
382
                        EsmState.DISABLED,
383
                        EsmState.DISABLING,
384
                    )
385
                    esm_worker = EsmWorkerFactory(
×
386
                        esm, function_role, is_esm_enabled
387
                    ).get_esm_worker()
388

389
                    # Note: a worker is created in the DISABLED state if not enabled
390
                    esm_worker.create()
×
391
                    # TODO: assigning the esm_worker to the dict only works after .create(). Could it cause a race
392
                    #  condition if we get a shutdown here and have a worker thread spawned but not accounted for?
393
                    self.esm_workers[esm_worker.uuid] = esm_worker
×
394

395
    def on_after_init(self):
1✔
396
        self.router.register_routes()
1✔
397
        get_runtime_executor().validate_environment()
1✔
398

399
    def on_before_stop(self) -> None:
1✔
400
        for esm_worker in self.esm_workers.values():
1✔
401
            esm_worker.stop_for_shutdown()
1✔
402

403
        # TODO: should probably unregister routes?
404
        self.lambda_service.stop()
1✔
405
        # Attempt to signal to the Lambda Debug Mode session object to stop.
406
        try:
1✔
407
            lambda_debug_mode_session = LambdaDebugModeSession.get()
1✔
408
            lambda_debug_mode_session.signal_stop()
1✔
409
        except Exception as ex:
×
410
            LOG.error(
×
411
                "Unexpected error encountered when attempting to signal Lambda Debug Mode to stop '%s'.",
412
                ex,
413
            )
414

415
    @staticmethod
1✔
416
    def _get_function(function_name: str, account_id: str, region: str) -> Function:
1✔
417
        state = lambda_stores[account_id][region]
1✔
418
        function = state.functions.get(function_name)
1✔
419
        if not function:
1✔
420
            arn = api_utils.unqualified_lambda_arn(
1✔
421
                function_name=function_name,
422
                account=account_id,
423
                region=region,
424
            )
425
            raise ResourceNotFoundException(
1✔
426
                f"Function not found: {arn}",
427
                Type="User",
428
            )
429
        return function
1✔
430

431
    @staticmethod
1✔
432
    def _get_esm(uuid: str, account_id: str, region: str) -> EventSourceMappingConfiguration:
1✔
433
        state = lambda_stores[account_id][region]
1✔
434
        esm = state.event_source_mappings.get(uuid)
1✔
435
        if not esm:
1✔
436
            arn = lambda_event_source_mapping_arn(uuid, account_id, region)
1✔
437
            raise ResourceNotFoundException(
1✔
438
                f"Event source mapping not found: {arn}",
439
                Type="User",
440
            )
441
        return esm
1✔
442

443
    @staticmethod
1✔
444
    def _validate_qualifier_expression(qualifier: str) -> None:
1✔
445
        if error_messages := api_utils.validate_qualifier(qualifier):
1✔
446
            raise ValidationException(
×
447
                message=api_utils.construct_validation_exception_message(error_messages)
448
            )
449

450
    @staticmethod
1✔
451
    def _resolve_fn_qualifier(resolved_fn: Function, qualifier: str | None) -> tuple[str, str]:
1✔
452
        """Attempts to resolve a given qualifier and returns a qualifier that exists or
453
        raises an appropriate ResourceNotFoundException.
454

455
        :param resolved_fn: The resolved lambda function
456
        :param qualifier: The qualifier to be resolved or None
457
        :return: Tuple of (resolved qualifier, function arn either qualified or unqualified)"""
458
        function_name = resolved_fn.function_name
1✔
459
        # assuming function versions need to live in the same account and region
460
        account_id = resolved_fn.latest().id.account
1✔
461
        region = resolved_fn.latest().id.region
1✔
462
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
463
        if qualifier is not None:
1✔
464
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
465
            if api_utils.qualifier_is_alias(qualifier):
1✔
466
                if qualifier not in resolved_fn.aliases:
1✔
467
                    raise ResourceNotFoundException(f"Cannot find alias arn: {fn_arn}", Type="User")
1✔
468
            elif api_utils.qualifier_is_version(qualifier) or qualifier == "$LATEST":
1✔
469
                if qualifier not in resolved_fn.versions:
1✔
470
                    raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
471
            else:
472
                # matches qualifier pattern but invalid alias or version
473
                raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
474
        resolved_qualifier = qualifier or "$LATEST"
1✔
475
        return resolved_qualifier, fn_arn
1✔
476

477
    @staticmethod
1✔
478
    def _function_revision_id(resolved_fn: Function, resolved_qualifier: str) -> str:
1✔
479
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
480
            return resolved_fn.aliases[resolved_qualifier].revision_id
1✔
481
        # Assumes that a non-alias is a version
482
        else:
483
            return resolved_fn.versions[resolved_qualifier].config.revision_id
1✔
484

485
    def _resolve_vpc_id(self, account_id: str, region_name: str, subnet_id: str) -> str:
1✔
486
        ec2_client = connect_to(aws_access_key_id=account_id, region_name=region_name).ec2
1✔
487
        try:
1✔
488
            return ec2_client.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["VpcId"]
1✔
489
        except ec2_client.exceptions.ClientError as e:
1✔
490
            code = e.response["Error"]["Code"]
1✔
491
            message = e.response["Error"]["Message"]
1✔
492
            raise InvalidParameterValueException(
1✔
493
                f"Error occurred while DescribeSubnets. EC2 Error Code: {code}. EC2 Error Message: {message}",
494
                Type="User",
495
            )
496

497
    def _build_vpc_config(
1✔
498
        self,
499
        account_id: str,
500
        region_name: str,
501
        vpc_config: Optional[dict] = None,
502
    ) -> VpcConfig | None:
503
        if not vpc_config or not is_api_enabled("ec2"):
1✔
504
            return None
1✔
505

506
        subnet_ids = vpc_config.get("SubnetIds", [])
1✔
507
        if subnet_ids is not None and len(subnet_ids) == 0:
1✔
508
            return VpcConfig(vpc_id="", security_group_ids=[], subnet_ids=[])
1✔
509

510
        subnet_id = subnet_ids[0]
1✔
511
        if not bool(SUBNET_ID_REGEX.match(subnet_id)):
1✔
512
            raise ValidationException(
1✔
513
                f"1 validation error detected: Value '[{subnet_id}]' at 'vpcConfig.subnetIds' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 0, Member must satisfy regular expression pattern: ^subnet-[0-9a-z]*$]"
514
            )
515

516
        return VpcConfig(
1✔
517
            vpc_id=self._resolve_vpc_id(account_id, region_name, subnet_id),
518
            security_group_ids=vpc_config.get("SecurityGroupIds", []),
519
            subnet_ids=subnet_ids,
520
        )
521

522
    def _create_version_model(
1✔
523
        self,
524
        function_name: str,
525
        region: str,
526
        account_id: str,
527
        description: str | None = None,
528
        revision_id: str | None = None,
529
        code_sha256: str | None = None,
530
    ) -> tuple[FunctionVersion, bool]:
531
        """
532
        Release a new version to the model if all restrictions are met.
533
        Restrictions:
534
          - CodeSha256, if provided, must equal the current latest version code hash
535
          - RevisionId, if provided, must equal the current latest version revision id
536
          - Some changes have been done to the latest version since last publish
537
        Will return a tuple of the version, and whether the version was published (True) or the latest available version was taken (False).
538
        This can happen if the latest version has not been changed since the last version publish, in this case the last version will be returned.
539

540
        :param function_name: Function name to be published
541
        :param region: Region of the function
542
        :param account_id: Account of the function
543
        :param description: new description of the version (will be the description of the function if missing)
544
        :param revision_id: Revision id, function will raise error if it does not match latest revision id
545
        :param code_sha256: Code sha256, function will raise error if it does not match latest code hash
546
        :return: Tuple of (published version, whether version was released or last released version returned, since nothing changed)
547
        """
548
        current_latest_version = get_function_version(
1✔
549
            function_name=function_name, qualifier="$LATEST", account_id=account_id, region=region
550
        )
551
        if revision_id and current_latest_version.config.revision_id != revision_id:
1✔
552
            raise PreconditionFailedException(
1✔
553
                "The Revision Id provided does not match the latest Revision Id. Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
554
                Type="User",
555
            )
556

557
        # check if code hashes match if they are specified
558
        current_hash = (
1✔
559
            current_latest_version.config.code.code_sha256
560
            if current_latest_version.config.package_type == PackageType.Zip
561
            else current_latest_version.config.image.code_sha256
562
        )
563
        # if the code is a zip package and hot reloaded (hot reloading is currently only supported for zip packagetypes)
564
        # we cannot enforce the codesha256 check
565
        is_hot_reloaded_zip_package = (
1✔
566
            current_latest_version.config.package_type == PackageType.Zip
567
            and current_latest_version.config.code.is_hot_reloading()
568
        )
569
        if code_sha256 and current_hash != code_sha256 and not is_hot_reloaded_zip_package:
1✔
570
            raise InvalidParameterValueException(
1✔
571
                f"CodeSHA256 ({code_sha256}) is different from current CodeSHA256 in $LATEST ({current_hash}). Please try again with the CodeSHA256 in $LATEST.",
572
                Type="User",
573
            )
574

575
        state = lambda_stores[account_id][region]
1✔
576
        function = state.functions.get(function_name)
1✔
577
        changes = {}
1✔
578
        if description is not None:
1✔
579
            changes["description"] = description
1✔
580
        # TODO copy environment instead of restarting one, get rid of all the "Pending"s
581

582
        with function.lock:
1✔
583
            if function.next_version > 1 and (
1✔
584
                prev_version := function.versions.get(str(function.next_version - 1))
585
            ):
586
                if (
1✔
587
                    prev_version.config.internal_revision
588
                    == current_latest_version.config.internal_revision
589
                ):
590
                    return prev_version, False
1✔
591
            # TODO check if there was a change since last version
592
            next_version = str(function.next_version)
1✔
593
            function.next_version += 1
1✔
594
            new_id = VersionIdentifier(
1✔
595
                function_name=function_name,
596
                qualifier=next_version,
597
                region=region,
598
                account=account_id,
599
            )
600
            apply_on = current_latest_version.config.snap_start["ApplyOn"]
1✔
601
            optimization_status = SnapStartOptimizationStatus.Off
1✔
602
            if apply_on == SnapStartApplyOn.PublishedVersions:
1✔
603
                optimization_status = SnapStartOptimizationStatus.On
×
604
            snap_start = SnapStartResponse(
1✔
605
                ApplyOn=apply_on,
606
                OptimizationStatus=optimization_status,
607
            )
608
            new_version = dataclasses.replace(
1✔
609
                current_latest_version,
610
                config=dataclasses.replace(
611
                    current_latest_version.config,
612
                    last_update=None,  # versions never have a last update status
613
                    state=VersionState(
614
                        state=State.Pending,
615
                        code=StateReasonCode.Creating,
616
                        reason="The function is being created.",
617
                    ),
618
                    snap_start=snap_start,
619
                    **changes,
620
                ),
621
                id=new_id,
622
            )
623
            function.versions[next_version] = new_version
1✔
624
        return new_version, True
1✔
625

626
    def _publish_version_from_existing_version(
1✔
627
        self,
628
        function_name: str,
629
        region: str,
630
        account_id: str,
631
        description: str | None = None,
632
        revision_id: str | None = None,
633
        code_sha256: str | None = None,
634
    ) -> FunctionVersion:
635
        """
636
        Publish version from an existing, already initialized LATEST
637

638
        :param function_name: Function name
639
        :param region: region
640
        :param account_id: account id
641
        :param description: description
642
        :param revision_id: revision id (check if current version matches)
643
        :param code_sha256: code sha (check if current code matches)
644
        :return: new version
645
        """
646
        new_version, changed = self._create_version_model(
1✔
647
            function_name=function_name,
648
            region=region,
649
            account_id=account_id,
650
            description=description,
651
            revision_id=revision_id,
652
            code_sha256=code_sha256,
653
        )
654
        if not changed:
1✔
655
            return new_version
1✔
656
        self.lambda_service.publish_version(new_version)
1✔
657
        state = lambda_stores[account_id][region]
1✔
658
        function = state.functions.get(function_name)
1✔
659
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
660
        latest_version = function.versions["$LATEST"]
1✔
661
        function.versions["$LATEST"] = dataclasses.replace(
1✔
662
            latest_version, config=dataclasses.replace(latest_version.config)
663
        )
664
        return function.versions.get(new_version.id.qualifier)
1✔
665

666
    def _publish_version_with_changes(
1✔
667
        self,
668
        function_name: str,
669
        region: str,
670
        account_id: str,
671
        description: str | None = None,
672
        revision_id: str | None = None,
673
        code_sha256: str | None = None,
674
    ) -> FunctionVersion:
675
        """
676
        Publish version together with a new latest version (publish on create / update)
677

678
        :param function_name: Function name
679
        :param region: region
680
        :param account_id: account id
681
        :param description: description
682
        :param revision_id: revision id (check if current version matches)
683
        :param code_sha256: code sha (check if current code matches)
684
        :return: new version
685
        """
686
        new_version, changed = self._create_version_model(
1✔
687
            function_name=function_name,
688
            region=region,
689
            account_id=account_id,
690
            description=description,
691
            revision_id=revision_id,
692
            code_sha256=code_sha256,
693
        )
694
        if not changed:
1✔
695
            return new_version
×
696
        self.lambda_service.create_function_version(new_version)
1✔
697
        return new_version
1✔
698

699
    @staticmethod
1✔
700
    def _verify_env_variables(env_vars: dict[str, str]):
1✔
701
        dumped_env_vars = json.dumps(env_vars, separators=(",", ":"))
1✔
702
        if (
1✔
703
            len(dumped_env_vars.encode("utf-8"))
704
            > config.LAMBDA_LIMITS_MAX_FUNCTION_ENVVAR_SIZE_BYTES
705
        ):
706
            raise InvalidParameterValueException(
1✔
707
                f"Lambda was unable to configure your environment variables because the environment variables you have provided exceeded the 4KB limit. String measured: {dumped_env_vars}",
708
                Type="User",
709
            )
710

711
    @staticmethod
1✔
712
    def _validate_snapstart(snap_start: SnapStart, runtime: Runtime):
1✔
713
        apply_on = snap_start.get("ApplyOn")
1✔
714
        if apply_on not in [
1✔
715
            SnapStartApplyOn.PublishedVersions,
716
            SnapStartApplyOn.None_,
717
        ]:
718
            raise ValidationException(
1✔
719
                f"1 validation error detected: Value '{apply_on}' at 'snapStart.applyOn' failed to satisfy constraint: Member must satisfy enum value set: [PublishedVersions, None]"
720
            )
721

722
        if runtime not in SNAP_START_SUPPORTED_RUNTIMES:
1✔
723
            raise InvalidParameterValueException(
×
724
                f"{runtime} is not supported for SnapStart enabled functions.", Type="User"
725
            )
726

727
    def _validate_layers(self, new_layers: list[str], region: str, account_id: str):
1✔
728
        if len(new_layers) > LAMBDA_LAYERS_LIMIT_PER_FUNCTION:
1✔
729
            raise InvalidParameterValueException(
1✔
730
                "Cannot reference more than 5 layers.", Type="User"
731
            )
732

733
        visited_layers = dict()
1✔
734
        for layer_version_arn in new_layers:
1✔
735
            (
1✔
736
                layer_region,
737
                layer_account_id,
738
                layer_name,
739
                layer_version_str,
740
            ) = api_utils.parse_layer_arn(layer_version_arn)
741
            if layer_version_str is None:
1✔
742
                raise ValidationException(
1✔
743
                    f"1 validation error detected: Value '[{layer_version_arn}]'"
744
                    + r" at 'layers' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 140, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: (arn:[a-zA-Z0-9-]+:lambda:[a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\d{1}:\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+), Member must not be null]",
745
                )
746

747
            state = lambda_stores[layer_account_id][layer_region]
1✔
748
            layer = state.layers.get(layer_name)
1✔
749
            layer_version = None
1✔
750
            if layer is not None:
1✔
751
                layer_version = layer.layer_versions.get(layer_version_str)
1✔
752
            if layer_account_id == account_id:
1✔
753
                if region and layer_region != region:
1✔
754
                    raise InvalidParameterValueException(
1✔
755
                        f"Layers are not in the same region as the function. "
756
                        f"Layers are expected to be in region {region}.",
757
                        Type="User",
758
                    )
759
                if layer is None or layer.layer_versions.get(layer_version_str) is None:
1✔
760
                    raise InvalidParameterValueException(
1✔
761
                        f"Layer version {layer_version_arn} does not exist.", Type="User"
762
                    )
763
            else:  # External layer from other account
764
                # TODO: validate IAM layer policy here, allowing access by default for now and only checking region
765
                if region and layer_region != region:
×
766
                    # TODO: detect user or role from context when IAM users are implemented
767
                    user = "user/localstack-testing"
×
768
                    raise AccessDeniedException(
×
769
                        f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
770
                    )
771
                if layer is None or layer_version is None:
×
772
                    # Limitation: cannot fetch external layers when using the same account id as the target layer
773
                    # because we do not want to trigger the layer fetcher for every non-existing layer.
774
                    if self.layer_fetcher is None:
×
775
                        raise NotImplementedError(
776
                            "Fetching shared layers from AWS is a pro feature."
777
                        )
778

779
                    layer = self.layer_fetcher.fetch_layer(layer_version_arn)
×
780
                    if layer is None:
×
781
                        # TODO: detect user or role from context when IAM users are implemented
782
                        user = "user/localstack-testing"
×
783
                        raise AccessDeniedException(
×
784
                            f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
785
                        )
786

787
                    # Distinguish between new layer and new layer version
788
                    if layer_version is None:
×
789
                        # Create whole layer from scratch
790
                        state.layers[layer_name] = layer
×
791
                    else:
792
                        # Create layer version if another version of the same layer already exists
793
                        state.layers[layer_name].layer_versions[layer_version_str] = (
×
794
                            layer.layer_versions.get(layer_version_str)
795
                        )
796

797
            # only the first two matches in the array are considered for the error message
798
            layer_arn = ":".join(layer_version_arn.split(":")[:-1])
1✔
799
            if layer_arn in visited_layers:
1✔
800
                conflict_layer_version_arn = visited_layers[layer_arn]
1✔
801
                raise InvalidParameterValueException(
1✔
802
                    f"Two different versions of the same layer are not allowed to be referenced in the same function. {conflict_layer_version_arn} and {layer_version_arn} are versions of the same layer.",
803
                    Type="User",
804
                )
805
            visited_layers[layer_arn] = layer_version_arn
1✔
806

807
    @staticmethod
1✔
808
    def map_layers(new_layers: list[str]) -> list[LayerVersion]:
1✔
809
        layers = []
1✔
810
        for layer_version_arn in new_layers:
1✔
811
            region_name, account_id, layer_name, layer_version = api_utils.parse_layer_arn(
1✔
812
                layer_version_arn
813
            )
814
            layer = lambda_stores[account_id][region_name].layers.get(layer_name)
1✔
815
            layer_version = layer.layer_versions.get(layer_version)
1✔
816
            layers.append(layer_version)
1✔
817
        return layers
1✔
818

819
    def get_function_recursion_config(
1✔
820
        self,
821
        context: RequestContext,
822
        function_name: UnqualifiedFunctionName,
823
        **kwargs,
824
    ) -> GetFunctionRecursionConfigResponse:
825
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
826
        function_name = api_utils.get_function_name(function_name, context)
1✔
827
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
828
        return GetFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
829

830
    def put_function_recursion_config(
1✔
831
        self,
832
        context: RequestContext,
833
        function_name: UnqualifiedFunctionName,
834
        recursive_loop: RecursiveLoop,
835
        **kwargs,
836
    ) -> PutFunctionRecursionConfigResponse:
837
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
838
        function_name = api_utils.get_function_name(function_name, context)
1✔
839

840
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
841

842
        allowed_values = list(RecursiveLoop.__members__.values())
1✔
843
        if recursive_loop not in allowed_values:
1✔
844
            raise ValidationException(
1✔
845
                f"1 validation error detected: Value '{recursive_loop}' at 'recursiveLoop' failed to satisfy constraint: "
846
                f"Member must satisfy enum value set: [Terminate, Allow]"
847
            )
848

849
        fn.recursive_loop = recursive_loop
1✔
850
        return PutFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
851

852
    @handler(operation="CreateFunction", expand=False)
1✔
853
    def create_function(
1✔
854
        self,
855
        context: RequestContext,
856
        request: CreateFunctionRequest,
857
    ) -> FunctionConfiguration:
858
        context_region = context.region
1✔
859
        context_account_id = context.account_id
1✔
860

861
        zip_file = request.get("Code", {}).get("ZipFile")
1✔
862
        if zip_file and len(zip_file) > config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED:
1✔
863
            raise RequestEntityTooLargeException(
1✔
864
                f"Zipped size must be smaller than {config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED} bytes"
865
            )
866

867
        if context.request.content_length > config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE:
1✔
868
            raise RequestEntityTooLargeException(
1✔
869
                f"Request must be smaller than {config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE} bytes for the CreateFunction operation"
870
            )
871

872
        if architectures := request.get("Architectures"):
1✔
873
            if len(architectures) != 1:
1✔
874
                raise ValidationException(
1✔
875
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
876
                    f"satisfy constraint: Member must have length less than or equal to 1",
877
                )
878
            if architectures[0] not in ARCHITECTURES:
1✔
879
                raise ValidationException(
1✔
880
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
881
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
882
                    f"[x86_64, arm64], Member must not be null]",
883
                )
884

885
        if env_vars := request.get("Environment", {}).get("Variables"):
1✔
886
            self._verify_env_variables(env_vars)
1✔
887

888
        if layers := request.get("Layers", []):
1✔
889
            self._validate_layers(layers, region=context_region, account_id=context_account_id)
1✔
890

891
        if not api_utils.is_role_arn(request.get("Role")):
1✔
892
            raise ValidationException(
1✔
893
                f"1 validation error detected: Value '{request.get('Role')}'"
894
                + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
895
            )
896
        if not self.lambda_service.can_assume_role(request.get("Role"), context.region):
1✔
897
            raise InvalidParameterValueException(
×
898
                "The role defined for the function cannot be assumed by Lambda.", Type="User"
899
            )
900
        package_type = request.get("PackageType", PackageType.Zip)
1✔
901
        runtime = request.get("Runtime")
1✔
902
        self._validate_runtime(package_type, runtime)
1✔
903

904
        request_function_name = request.get("FunctionName")
1✔
905

906
        function_name, *_ = api_utils.get_name_and_qualifier(
1✔
907
            function_arn_or_name=request_function_name,
908
            qualifier=None,
909
            context=context,
910
        )
911

912
        if runtime in DEPRECATED_RUNTIMES:
1✔
913
            LOG.warning(
1✔
914
                "The Lambda runtime %s} is deprecated. "
915
                "Please upgrade the runtime for the function %s: "
916
                "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
917
                runtime,
918
                function_name,
919
            )
920
        if snap_start := request.get("SnapStart"):
1✔
921
            self._validate_snapstart(snap_start, runtime)
1✔
922
        state = lambda_stores[context_account_id][context_region]
1✔
923

924
        with self.create_fn_lock:
1✔
925
            if function_name in state.functions:
1✔
926
                raise ResourceConflictException(f"Function already exist: {function_name}")
×
927
            fn = Function(function_name=function_name)
1✔
928
            arn = VersionIdentifier(
1✔
929
                function_name=function_name,
930
                qualifier="$LATEST",
931
                region=context_region,
932
                account=context_account_id,
933
            )
934
            # save function code to s3
935
            code = None
1✔
936
            image = None
1✔
937
            image_config = None
1✔
938
            runtime_version_config = RuntimeVersionConfig(
1✔
939
                # Limitation: the runtime id (presumably sha256 of image) is currently hardcoded
940
                # Potential implementation: provide (cached) sha256 hash of used Docker image
941
                RuntimeVersionArn=f"arn:{context.partition}:lambda:{context_region}::runtime:8eeff65f6809a3ce81507fe733fe09b835899b99481ba22fd75b5a7338290ec1"
942
            )
943
            request_code = request.get("Code")
1✔
944
            if package_type == PackageType.Zip:
1✔
945
                # TODO verify if correct combination of code is set
946
                if zip_file := request_code.get("ZipFile"):
1✔
947
                    code = store_lambda_archive(
1✔
948
                        archive_file=zip_file,
949
                        function_name=function_name,
950
                        region_name=context_region,
951
                        account_id=context_account_id,
952
                    )
953
                elif s3_bucket := request_code.get("S3Bucket"):
1✔
954
                    s3_key = request_code["S3Key"]
1✔
955
                    s3_object_version = request_code.get("S3ObjectVersion")
1✔
956
                    code = store_s3_bucket_archive(
1✔
957
                        archive_bucket=s3_bucket,
958
                        archive_key=s3_key,
959
                        archive_version=s3_object_version,
960
                        function_name=function_name,
961
                        region_name=context_region,
962
                        account_id=context_account_id,
963
                    )
964
                else:
965
                    raise LambdaServiceException("Gotta have s3 bucket or zip file")
×
966
            elif package_type == PackageType.Image:
1✔
967
                image = request_code.get("ImageUri")
1✔
968
                if not image:
1✔
969
                    raise LambdaServiceException("Gotta have an image when package type is image")
×
970
                image = create_image_code(image_uri=image)
1✔
971

972
                image_config_req = request.get("ImageConfig", {})
1✔
973
                image_config = ImageConfig(
1✔
974
                    command=image_config_req.get("Command"),
975
                    entrypoint=image_config_req.get("EntryPoint"),
976
                    working_directory=image_config_req.get("WorkingDirectory"),
977
                )
978
                # Runtime management controls are not available when providing a custom image
979
                runtime_version_config = None
1✔
980
            if "LoggingConfig" in request:
1✔
981
                logging_config = request["LoggingConfig"]
1✔
982
                LOG.warning(
1✔
983
                    "Advanced Lambda Logging Configuration is currently mocked "
984
                    "and will not impact the logging behavior. "
985
                    "Please create a feature request if needed."
986
                )
987

988
                # when switching to JSON, app and system level log is auto set to INFO
989
                if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
990
                    logging_config = {
1✔
991
                        "ApplicationLogLevel": "INFO",
992
                        "SystemLogLevel": "INFO",
993
                        "LogGroup": f"/aws/lambda/{function_name}",
994
                    } | logging_config
995
                else:
996
                    logging_config = (
×
997
                        LoggingConfig(
998
                            LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
999
                        )
1000
                        | logging_config
1001
                    )
1002

1003
            else:
1004
                logging_config = LoggingConfig(
1✔
1005
                    LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
1006
                )
1007

1008
            version = FunctionVersion(
1✔
1009
                id=arn,
1010
                config=VersionFunctionConfiguration(
1011
                    last_modified=api_utils.format_lambda_date(datetime.datetime.now()),
1012
                    description=request.get("Description", ""),
1013
                    role=request["Role"],
1014
                    timeout=request.get("Timeout", LAMBDA_DEFAULT_TIMEOUT),
1015
                    runtime=request.get("Runtime"),
1016
                    memory_size=request.get("MemorySize", LAMBDA_DEFAULT_MEMORY_SIZE),
1017
                    handler=request.get("Handler"),
1018
                    package_type=package_type,
1019
                    environment=env_vars,
1020
                    architectures=request.get("Architectures") or [Architecture.x86_64],
1021
                    tracing_config_mode=request.get("TracingConfig", {}).get(
1022
                        "Mode", TracingMode.PassThrough
1023
                    ),
1024
                    image=image,
1025
                    image_config=image_config,
1026
                    code=code,
1027
                    layers=self.map_layers(layers),
1028
                    internal_revision=short_uid(),
1029
                    ephemeral_storage=LambdaEphemeralStorage(
1030
                        size=request.get("EphemeralStorage", {}).get("Size", 512)
1031
                    ),
1032
                    snap_start=SnapStartResponse(
1033
                        ApplyOn=request.get("SnapStart", {}).get("ApplyOn", SnapStartApplyOn.None_),
1034
                        OptimizationStatus=SnapStartOptimizationStatus.Off,
1035
                    ),
1036
                    runtime_version_config=runtime_version_config,
1037
                    dead_letter_arn=request.get("DeadLetterConfig", {}).get("TargetArn"),
1038
                    vpc_config=self._build_vpc_config(
1039
                        context_account_id, context_region, request.get("VpcConfig")
1040
                    ),
1041
                    state=VersionState(
1042
                        state=State.Pending,
1043
                        code=StateReasonCode.Creating,
1044
                        reason="The function is being created.",
1045
                    ),
1046
                    logging_config=logging_config,
1047
                ),
1048
            )
1049
            fn.versions["$LATEST"] = version
1✔
1050
            state.functions[function_name] = fn
1✔
1051
        function_counter.labels(
1✔
1052
            operation=FunctionOperation.create,
1053
            runtime=runtime or "n/a",
1054
            status=FunctionStatus.success,
1055
            invocation_type="n/a",
1056
            package_type=package_type,
1057
        )
1058
        self.lambda_service.create_function_version(version)
1✔
1059

1060
        if tags := request.get("Tags"):
1✔
1061
            # This will check whether the function exists.
1062
            self._store_tags(arn.unqualified_arn(), tags)
1✔
1063

1064
        if request.get("Publish"):
1✔
1065
            version = self._publish_version_with_changes(
1✔
1066
                function_name=function_name, region=context_region, account_id=context_account_id
1067
            )
1068

1069
        if config.LAMBDA_SYNCHRONOUS_CREATE:
1✔
1070
            # block via retrying until "terminal" condition reached before returning
1071
            if not poll_condition(
×
1072
                lambda: get_function_version(
1073
                    function_name, version.id.qualifier, version.id.account, version.id.region
1074
                ).config.state.state
1075
                in [State.Active, State.Failed],
1076
                timeout=10,
1077
            ):
1078
                LOG.warning(
×
1079
                    "LAMBDA_SYNCHRONOUS_CREATE is active, but waiting for %s reached timeout.",
1080
                    function_name,
1081
                )
1082

1083
        return api_utils.map_config_out(
1✔
1084
            version, return_qualified_arn=False, return_update_status=False
1085
        )
1086

1087
    def _validate_runtime(self, package_type, runtime):
1✔
1088
        runtimes = ALL_RUNTIMES
1✔
1089
        if config.LAMBDA_RUNTIME_VALIDATION:
1✔
1090
            runtimes = list(itertools.chain(RUNTIMES_AGGREGATED.values()))
1✔
1091

1092
        if package_type == PackageType.Zip and runtime not in runtimes:
1✔
1093
            # deprecated runtimes have different error
1094
            if runtime in DEPRECATED_RUNTIMES:
1✔
1095
                HINT_LOG.info(
1✔
1096
                    "Set env variable LAMBDA_RUNTIME_VALIDATION to 0"
1097
                    " in order to allow usage of deprecated runtimes"
1098
                )
1099
                self._check_for_recomended_migration_target(runtime)
1✔
1100

1101
            raise InvalidParameterValueException(
1✔
1102
                f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1103
                Type="User",
1104
            )
1105

1106
    def _check_for_recomended_migration_target(self, deprecated_runtime):
1✔
1107
        # AWS offers recommended runtime for migration for "newly" deprecated runtimes
1108
        # in order to preserve parity with error messages we need the code bellow
1109
        latest_runtime = DEPRECATED_RUNTIMES_UPGRADES.get(deprecated_runtime)
1✔
1110

1111
        if latest_runtime is not None:
1✔
1112
            LOG.debug(
1✔
1113
                "The Lambda runtime %s is deprecated. Please upgrade to a supported Lambda runtime such as %s.",
1114
                deprecated_runtime,
1115
                latest_runtime,
1116
            )
1117
            raise InvalidParameterValueException(
1✔
1118
                f"The runtime parameter of {deprecated_runtime} is no longer supported for creating or updating AWS Lambda functions. We recommend you use a supported runtime while creating or updating functions.",
1119
                Type="User",
1120
            )
1121

1122
    @handler(operation="UpdateFunctionConfiguration", expand=False)
1✔
1123
    def update_function_configuration(
1✔
1124
        self, context: RequestContext, request: UpdateFunctionConfigurationRequest
1125
    ) -> FunctionConfiguration:
1126
        """updates the $LATEST version of the function"""
1127
        function_name = request.get("FunctionName")
1✔
1128

1129
        # in case we got ARN or partial ARN
1130
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1131
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1132
        state = lambda_stores[account_id][region]
1✔
1133

1134
        if function_name not in state.functions:
1✔
1135
            raise ResourceNotFoundException(
×
1136
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1137
                Type="User",
1138
            )
1139
        function = state.functions[function_name]
1✔
1140

1141
        # TODO: lock modification of latest version
1142
        # TODO: notify service for changes relevant to re-provisioning of $LATEST
1143
        latest_version = function.latest()
1✔
1144
        latest_version_config = latest_version.config
1✔
1145

1146
        revision_id = request.get("RevisionId")
1✔
1147
        if revision_id and revision_id != latest_version.config.revision_id:
1✔
1148
            raise PreconditionFailedException(
1✔
1149
                "The Revision Id provided does not match the latest Revision Id. "
1150
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1151
                Type="User",
1152
            )
1153

1154
        replace_kwargs = {}
1✔
1155
        if "EphemeralStorage" in request:
1✔
1156
            replace_kwargs["ephemeral_storage"] = LambdaEphemeralStorage(
×
1157
                request.get("EphemeralStorage", {}).get("Size", 512)
1158
            )  # TODO: do defaults here apply as well?
1159

1160
        if "Role" in request:
1✔
1161
            if not api_utils.is_role_arn(request["Role"]):
1✔
1162
                raise ValidationException(
1✔
1163
                    f"1 validation error detected: Value '{request.get('Role')}'"
1164
                    + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
1165
                )
1166
            replace_kwargs["role"] = request["Role"]
1✔
1167

1168
        if "Description" in request:
1✔
1169
            replace_kwargs["description"] = request["Description"]
1✔
1170

1171
        if "Timeout" in request:
1✔
1172
            replace_kwargs["timeout"] = request["Timeout"]
1✔
1173

1174
        if "MemorySize" in request:
1✔
1175
            replace_kwargs["memory_size"] = request["MemorySize"]
1✔
1176

1177
        if "DeadLetterConfig" in request:
1✔
1178
            replace_kwargs["dead_letter_arn"] = request.get("DeadLetterConfig", {}).get("TargetArn")
1✔
1179

1180
        if vpc_config := request.get("VpcConfig"):
1✔
1181
            replace_kwargs["vpc_config"] = self._build_vpc_config(account_id, region, vpc_config)
1✔
1182

1183
        if "Handler" in request:
1✔
1184
            replace_kwargs["handler"] = request["Handler"]
1✔
1185

1186
        if "Runtime" in request:
1✔
1187
            runtime = request["Runtime"]
1✔
1188

1189
            if runtime not in ALL_RUNTIMES:
1✔
1190
                raise InvalidParameterValueException(
1✔
1191
                    f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1192
                    Type="User",
1193
                )
1194
            if runtime in DEPRECATED_RUNTIMES:
1✔
1195
                LOG.warning(
×
1196
                    "The Lambda runtime %s is deprecated. "
1197
                    "Please upgrade the runtime for the function %s: "
1198
                    "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
1199
                    runtime,
1200
                    function_name,
1201
                )
1202
            replace_kwargs["runtime"] = request["Runtime"]
1✔
1203

1204
        if snap_start := request.get("SnapStart"):
1✔
1205
            runtime = replace_kwargs.get("runtime") or latest_version_config.runtime
1✔
1206
            self._validate_snapstart(snap_start, runtime)
1✔
1207
            replace_kwargs["snap_start"] = SnapStartResponse(
1✔
1208
                ApplyOn=snap_start.get("ApplyOn", SnapStartApplyOn.None_),
1209
                OptimizationStatus=SnapStartOptimizationStatus.Off,
1210
            )
1211

1212
        if "Environment" in request:
1✔
1213
            if env_vars := request.get("Environment", {}).get("Variables", {}):
1✔
1214
                self._verify_env_variables(env_vars)
1✔
1215
            replace_kwargs["environment"] = env_vars
1✔
1216

1217
        if "Layers" in request:
1✔
1218
            new_layers = request["Layers"]
1✔
1219
            if new_layers:
1✔
1220
                self._validate_layers(new_layers, region=region, account_id=account_id)
1✔
1221
            replace_kwargs["layers"] = self.map_layers(new_layers)
1✔
1222

1223
        if "ImageConfig" in request:
1✔
1224
            new_image_config = request["ImageConfig"]
1✔
1225
            replace_kwargs["image_config"] = ImageConfig(
1✔
1226
                command=new_image_config.get("Command"),
1227
                entrypoint=new_image_config.get("EntryPoint"),
1228
                working_directory=new_image_config.get("WorkingDirectory"),
1229
            )
1230

1231
        if "LoggingConfig" in request:
1✔
1232
            logging_config = request["LoggingConfig"]
1✔
1233
            LOG.warning(
1✔
1234
                "Advanced Lambda Logging Configuration is currently mocked "
1235
                "and will not impact the logging behavior. "
1236
                "Please create a feature request if needed."
1237
            )
1238

1239
            # when switching to JSON, app and system level log is auto set to INFO
1240
            if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
1241
                logging_config = {
1✔
1242
                    "ApplicationLogLevel": "INFO",
1243
                    "SystemLogLevel": "INFO",
1244
                } | logging_config
1245

1246
            last_config = latest_version_config.logging_config
1✔
1247

1248
            # add partial update
1249
            new_logging_config = last_config | logging_config
1✔
1250

1251
            # in case we switched from JSON to Text we need to remove LogLevel keys
1252
            if (
1✔
1253
                new_logging_config.get("LogFormat") == LogFormat.Text
1254
                and last_config.get("LogFormat") == LogFormat.JSON
1255
            ):
1256
                new_logging_config.pop("ApplicationLogLevel", None)
1✔
1257
                new_logging_config.pop("SystemLogLevel", None)
1✔
1258

1259
            replace_kwargs["logging_config"] = new_logging_config
1✔
1260

1261
        if "TracingConfig" in request:
1✔
1262
            new_mode = request.get("TracingConfig", {}).get("Mode")
×
1263
            if new_mode:
×
1264
                replace_kwargs["tracing_config_mode"] = new_mode
×
1265

1266
        new_latest_version = dataclasses.replace(
1✔
1267
            latest_version,
1268
            config=dataclasses.replace(
1269
                latest_version_config,
1270
                last_modified=api_utils.generate_lambda_date(),
1271
                internal_revision=short_uid(),
1272
                last_update=UpdateStatus(
1273
                    status=LastUpdateStatus.InProgress,
1274
                    code="Creating",
1275
                    reason="The function is being created.",
1276
                ),
1277
                **replace_kwargs,
1278
            ),
1279
        )
1280
        function.versions["$LATEST"] = new_latest_version  # TODO: notify
1✔
1281
        self.lambda_service.update_version(new_version=new_latest_version)
1✔
1282

1283
        return api_utils.map_config_out(new_latest_version)
1✔
1284

1285
    @handler(operation="UpdateFunctionCode", expand=False)
1✔
1286
    def update_function_code(
1✔
1287
        self, context: RequestContext, request: UpdateFunctionCodeRequest
1288
    ) -> FunctionConfiguration:
1289
        """updates the $LATEST version of the function"""
1290
        # only supports normal zip packaging atm
1291
        # if request.get("Publish"):
1292
        #     self.lambda_service.create_function_version()
1293

1294
        function_name = request.get("FunctionName")
1✔
1295
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1296
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1297

1298
        store = lambda_stores[account_id][region]
1✔
1299
        if function_name not in store.functions:
1✔
1300
            raise ResourceNotFoundException(
×
1301
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1302
                Type="User",
1303
            )
1304
        function = store.functions[function_name]
1✔
1305

1306
        revision_id = request.get("RevisionId")
1✔
1307
        if revision_id and revision_id != function.latest().config.revision_id:
1✔
1308
            raise PreconditionFailedException(
1✔
1309
                "The Revision Id provided does not match the latest Revision Id. "
1310
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1311
                Type="User",
1312
            )
1313

1314
        # TODO verify if correct combination of code is set
1315
        image = None
1✔
1316
        if (
1✔
1317
            request.get("ZipFile") or request.get("S3Bucket")
1318
        ) and function.latest().config.package_type == PackageType.Image:
1319
            raise InvalidParameterValueException(
1✔
1320
                "Please provide ImageUri when updating a function with packageType Image.",
1321
                Type="User",
1322
            )
1323
        elif request.get("ImageUri") and function.latest().config.package_type == PackageType.Zip:
1✔
1324
            raise InvalidParameterValueException(
1✔
1325
                "Please don't provide ImageUri when updating a function with packageType Zip.",
1326
                Type="User",
1327
            )
1328

1329
        if zip_file := request.get("ZipFile"):
1✔
1330
            code = store_lambda_archive(
1✔
1331
                archive_file=zip_file,
1332
                function_name=function_name,
1333
                region_name=region,
1334
                account_id=account_id,
1335
            )
1336
        elif s3_bucket := request.get("S3Bucket"):
1✔
1337
            s3_key = request["S3Key"]
1✔
1338
            s3_object_version = request.get("S3ObjectVersion")
1✔
1339
            code = store_s3_bucket_archive(
1✔
1340
                archive_bucket=s3_bucket,
1341
                archive_key=s3_key,
1342
                archive_version=s3_object_version,
1343
                function_name=function_name,
1344
                region_name=region,
1345
                account_id=account_id,
1346
            )
1347
        elif image := request.get("ImageUri"):
1✔
1348
            code = None
1✔
1349
            image = create_image_code(image_uri=image)
1✔
1350
        else:
1351
            raise LambdaServiceException("Gotta have s3 bucket or zip file or image")
×
1352

1353
        old_function_version = function.versions.get("$LATEST")
1✔
1354
        replace_kwargs = {"code": code} if code else {"image": image}
1✔
1355

1356
        if architectures := request.get("Architectures"):
1✔
1357
            if len(architectures) != 1:
×
1358
                raise ValidationException(
×
1359
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1360
                    f"satisfy constraint: Member must have length less than or equal to 1",
1361
                )
1362
            # An empty list of architectures is also forbidden. Further exceptions are tested here for create_function:
1363
            # tests.aws.services.lambda_.test_lambda_api.TestLambdaFunction.test_create_lambda_exceptions
1364
            if architectures[0] not in ARCHITECTURES:
×
1365
                raise ValidationException(
×
1366
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1367
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
1368
                    f"[x86_64, arm64], Member must not be null]",
1369
                )
1370
            replace_kwargs["architectures"] = architectures
×
1371

1372
        config = dataclasses.replace(
1✔
1373
            old_function_version.config,
1374
            internal_revision=short_uid(),
1375
            last_modified=api_utils.generate_lambda_date(),
1376
            last_update=UpdateStatus(
1377
                status=LastUpdateStatus.InProgress,
1378
                code="Creating",
1379
                reason="The function is being created.",
1380
            ),
1381
            **replace_kwargs,
1382
        )
1383
        function_version = dataclasses.replace(old_function_version, config=config)
1✔
1384
        function.versions["$LATEST"] = function_version
1✔
1385

1386
        self.lambda_service.update_version(new_version=function_version)
1✔
1387
        if request.get("Publish"):
1✔
1388
            function_version = self._publish_version_with_changes(
1✔
1389
                function_name=function_name, region=region, account_id=account_id
1390
            )
1391
        return api_utils.map_config_out(
1✔
1392
            function_version, return_qualified_arn=bool(request.get("Publish"))
1393
        )
1394

1395
    # TODO: does deleting the latest published version affect the next versions number?
1396
    # TODO: what happens when we call this with a qualifier and a fully qualified ARN? (+ conflicts?)
1397
    # TODO: test different ARN patterns (shorthand ARN?)
1398
    # TODO: test deleting across regions?
1399
    # TODO: test mismatch between context region and region in ARN
1400
    # TODO: test qualifier $LATEST, alias-name and version
1401
    def delete_function(
1✔
1402
        self,
1403
        context: RequestContext,
1404
        function_name: FunctionName,
1405
        qualifier: Qualifier = None,
1406
        **kwargs,
1407
    ) -> None:
1408
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1409
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1410
            function_name, qualifier, context
1411
        )
1412

1413
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1414
            raise InvalidParameterValueException(
×
1415
                "Deletion of aliases is not currently supported.",
1416
                Type="User",
1417
            )
1418

1419
        store = lambda_stores[account_id][region]
1✔
1420
        if qualifier == "$LATEST":
1✔
1421
            raise InvalidParameterValueException(
1✔
1422
                "$LATEST version cannot be deleted without deleting the function.", Type="User"
1423
            )
1424

1425
        if function_name not in store.functions:
1✔
1426
            e = ResourceNotFoundException(
1✔
1427
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1428
                Type="User",
1429
            )
1430
            raise e
1✔
1431
        function = store.functions.get(function_name)
1✔
1432

1433
        if qualifier:
1✔
1434
            # delete a version of the function
1435
            version = function.versions.pop(qualifier, None)
1✔
1436
            if version:
1✔
1437
                self.lambda_service.stop_version(version.id.qualified_arn())
1✔
1438
                destroy_code_if_not_used(code=version.config.code, function=function)
1✔
1439
        else:
1440
            # delete the whole function
1441
            # TODO: introduce locking for safe deletion: We could create a new version at the API layer before
1442
            #  the old version gets cleaned up in the internal lambda service.
1443
            function = store.functions.pop(function_name)
1✔
1444
            for version in function.versions.values():
1✔
1445
                self.lambda_service.stop_version(qualified_arn=version.id.qualified_arn())
1✔
1446
                # we can safely destroy the code here
1447
                if version.config.code:
1✔
1448
                    version.config.code.destroy()
1✔
1449

1450
    def list_functions(
1✔
1451
        self,
1452
        context: RequestContext,
1453
        master_region: MasterRegion = None,  # (only relevant for lambda@edge)
1454
        function_version: FunctionVersionApi = None,
1455
        marker: String = None,
1456
        max_items: MaxListItems = None,
1457
        **kwargs,
1458
    ) -> ListFunctionsResponse:
1459
        state = lambda_stores[context.account_id][context.region]
1✔
1460

1461
        if function_version and function_version != FunctionVersionApi.ALL:
1✔
1462
            raise ValidationException(
1✔
1463
                f"1 validation error detected: Value '{function_version}'"
1464
                + " at 'functionVersion' failed to satisfy constraint: Member must satisfy enum value set: [ALL]"
1465
            )
1466

1467
        if function_version == FunctionVersionApi.ALL:
1✔
1468
            # include all versions for all function
1469
            versions = [v for f in state.functions.values() for v in f.versions.values()]
1✔
1470
            return_qualified_arn = True
1✔
1471
        else:
1472
            versions = [f.latest() for f in state.functions.values()]
1✔
1473
            return_qualified_arn = False
1✔
1474

1475
        versions = [
1✔
1476
            api_utils.map_to_list_response(
1477
                api_utils.map_config_out(fc, return_qualified_arn=return_qualified_arn)
1478
            )
1479
            for fc in versions
1480
        ]
1481
        versions = PaginatedList(versions)
1✔
1482
        page, token = versions.get_page(
1✔
1483
            lambda version: version["FunctionArn"],
1484
            marker,
1485
            max_items,
1486
        )
1487
        return ListFunctionsResponse(Functions=page, NextMarker=token)
1✔
1488

1489
    def get_function(
1✔
1490
        self,
1491
        context: RequestContext,
1492
        function_name: NamespacedFunctionName,
1493
        qualifier: Qualifier = None,
1494
        **kwargs,
1495
    ) -> GetFunctionResponse:
1496
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1497
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1498
            function_name, qualifier, context
1499
        )
1500

1501
        fn = lambda_stores[account_id][region].functions.get(function_name)
1✔
1502
        if fn is None:
1✔
1503
            if qualifier is None:
1✔
1504
                raise ResourceNotFoundException(
1✔
1505
                    f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
1506
                    Type="User",
1507
                )
1508
            else:
1509
                raise ResourceNotFoundException(
1✔
1510
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
1511
                    Type="User",
1512
                )
1513
        alias_name = None
1✔
1514
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1515
            if qualifier not in fn.aliases:
1✔
1516
                alias_arn = api_utils.qualified_lambda_arn(
1✔
1517
                    function_name, qualifier, account_id, region
1518
                )
1519
                raise ResourceNotFoundException(f"Function not found: {alias_arn}", Type="User")
1✔
1520
            alias_name = qualifier
1✔
1521
            qualifier = fn.aliases[alias_name].function_version
1✔
1522

1523
        version = get_function_version(
1✔
1524
            function_name=function_name,
1525
            qualifier=qualifier,
1526
            account_id=account_id,
1527
            region=region,
1528
        )
1529
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
1530
        additional_fields = {}
1✔
1531
        if tags:
1✔
1532
            additional_fields["Tags"] = tags
1✔
1533
        code_location = None
1✔
1534
        if code := version.config.code:
1✔
1535
            code_location = FunctionCodeLocation(
1✔
1536
                Location=code.generate_presigned_url(), RepositoryType="S3"
1537
            )
1538
        elif image := version.config.image:
1✔
1539
            code_location = FunctionCodeLocation(
1✔
1540
                ImageUri=image.image_uri,
1541
                RepositoryType=image.repository_type,
1542
                ResolvedImageUri=image.resolved_image_uri,
1543
            )
1544

1545
        return GetFunctionResponse(
1✔
1546
            Configuration=api_utils.map_config_out(
1547
                version, return_qualified_arn=bool(qualifier), alias_name=alias_name
1548
            ),
1549
            Code=code_location,  # TODO
1550
            **additional_fields,
1551
            # Concurrency={},  # TODO
1552
        )
1553

1554
    def get_function_configuration(
1✔
1555
        self,
1556
        context: RequestContext,
1557
        function_name: NamespacedFunctionName,
1558
        qualifier: Qualifier = None,
1559
        **kwargs,
1560
    ) -> FunctionConfiguration:
1561
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1562
        # CAVE: THIS RETURN VALUE IS *NOT* THE SAME AS IN get_function (!) but seems to be only configuration part?
1563
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1564
            function_name, qualifier, context
1565
        )
1566
        version = get_function_version(
1✔
1567
            function_name=function_name,
1568
            qualifier=qualifier,
1569
            account_id=account_id,
1570
            region=region,
1571
        )
1572
        return api_utils.map_config_out(version, return_qualified_arn=bool(qualifier))
1✔
1573

1574
    def invoke(
1✔
1575
        self,
1576
        context: RequestContext,
1577
        function_name: NamespacedFunctionName,
1578
        invocation_type: InvocationType = None,
1579
        log_type: LogType = None,
1580
        client_context: String = None,
1581
        payload: IO[Blob] = None,
1582
        qualifier: Qualifier = None,
1583
        **kwargs,
1584
    ) -> InvocationResponse:
1585
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1586
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1587
            function_name, qualifier, context
1588
        )
1589

1590
        time_before = time.perf_counter()
1✔
1591
        try:
1✔
1592
            invocation_result = self.lambda_service.invoke(
1✔
1593
                function_name=function_name,
1594
                qualifier=qualifier,
1595
                region=region,
1596
                account_id=account_id,
1597
                invocation_type=invocation_type,
1598
                client_context=client_context,
1599
                request_id=context.request_id,
1600
                trace_context=context.trace_context,
1601
                payload=payload.read() if payload else None,
1602
            )
1603
        except ServiceException:
1✔
1604
            raise
1✔
1605
        except EnvironmentStartupTimeoutException as e:
1✔
1606
            raise LambdaServiceException(
1✔
1607
                f"[{context.request_id}] Timeout while starting up lambda environment for function {function_name}:{qualifier}"
1608
            ) from e
1609
        except Exception as e:
1✔
1610
            LOG.error(
1✔
1611
                "[%s] Error while invoking lambda %s",
1612
                context.request_id,
1613
                function_name,
1614
                exc_info=LOG.isEnabledFor(logging.DEBUG),
1615
            )
1616
            raise LambdaServiceException(
1✔
1617
                f"[{context.request_id}] Internal error while executing lambda {function_name}:{qualifier}. Caused by {type(e).__name__}: {e}"
1618
            ) from e
1619

1620
        if invocation_type == InvocationType.Event:
1✔
1621
            # This happens when invocation type is event
1622
            return InvocationResponse(StatusCode=202)
1✔
1623
        if invocation_type == InvocationType.DryRun:
1✔
1624
            # This happens when invocation type is dryrun
1625
            return InvocationResponse(StatusCode=204)
1✔
1626
        LOG.debug("Lambda invocation duration: %0.2fms", (time.perf_counter() - time_before) * 1000)
1✔
1627

1628
        response = InvocationResponse(
1✔
1629
            StatusCode=200,
1630
            Payload=invocation_result.payload,
1631
            ExecutedVersion=invocation_result.executed_version,
1632
        )
1633

1634
        if invocation_result.is_error:
1✔
1635
            response["FunctionError"] = "Unhandled"
1✔
1636

1637
        if log_type == LogType.Tail:
1✔
1638
            response["LogResult"] = to_str(
1✔
1639
                base64.b64encode(to_bytes(invocation_result.logs)[-4096:])
1640
            )
1641

1642
        return response
1✔
1643

1644
    # Version operations
1645
    def publish_version(
1✔
1646
        self,
1647
        context: RequestContext,
1648
        function_name: FunctionName,
1649
        code_sha256: String = None,
1650
        description: Description = None,
1651
        revision_id: String = None,
1652
        **kwargs,
1653
    ) -> FunctionConfiguration:
1654
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1655
        function_name = api_utils.get_function_name(function_name, context)
1✔
1656
        new_version = self._publish_version_from_existing_version(
1✔
1657
            function_name=function_name,
1658
            description=description,
1659
            account_id=account_id,
1660
            region=region,
1661
            revision_id=revision_id,
1662
            code_sha256=code_sha256,
1663
        )
1664
        return api_utils.map_config_out(new_version, return_qualified_arn=True)
1✔
1665

1666
    def list_versions_by_function(
1✔
1667
        self,
1668
        context: RequestContext,
1669
        function_name: NamespacedFunctionName,
1670
        marker: String = None,
1671
        max_items: MaxListItems = None,
1672
        **kwargs,
1673
    ) -> ListVersionsByFunctionResponse:
1674
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1675
        function_name = api_utils.get_function_name(function_name, context)
1✔
1676
        function = self._get_function(
1✔
1677
            function_name=function_name, region=region, account_id=account_id
1678
        )
1679
        versions = [
1✔
1680
            api_utils.map_to_list_response(
1681
                api_utils.map_config_out(version=version, return_qualified_arn=True)
1682
            )
1683
            for version in function.versions.values()
1684
        ]
1685
        items = PaginatedList(versions)
1✔
1686
        page, token = items.get_page(
1✔
1687
            lambda item: item,
1688
            marker,
1689
            max_items,
1690
        )
1691
        return ListVersionsByFunctionResponse(Versions=page, NextMarker=token)
1✔
1692

1693
    # Alias
1694

1695
    def _create_routing_config_model(
1✔
1696
        self, routing_config_dict: dict[str, float], function_version: FunctionVersion
1697
    ):
1698
        if len(routing_config_dict) > 1:
1✔
1699
            raise InvalidParameterValueException(
1✔
1700
                "Number of items in AdditionalVersionWeights cannot be greater than 1",
1701
                Type="User",
1702
            )
1703
        # should be exactly one item here, still iterating, might be supported in the future
1704
        for key, value in routing_config_dict.items():
1✔
1705
            if value < 0.0 or value >= 1.0:
1✔
1706
                raise ValidationException(
1✔
1707
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map value must satisfy constraint: [Member must have value less than or equal to 1.0, Member must have value greater than or equal to 0.0, Member must not be null]"
1708
                )
1709
            if key == function_version.id.qualifier:
1✔
1710
                raise InvalidParameterValueException(
1✔
1711
                    f"Invalid function version {function_version.id.qualifier}. Function version {function_version.id.qualifier} is already included in routing configuration.",
1712
                    Type="User",
1713
                )
1714
            # check if version target is latest, then no routing config is allowed
1715
            if function_version.id.qualifier == "$LATEST":
1✔
1716
                raise InvalidParameterValueException(
1✔
1717
                    "$LATEST is not supported for an alias pointing to more than 1 version"
1718
                )
1719
            if not api_utils.qualifier_is_version(key):
1✔
1720
                raise ValidationException(
1✔
1721
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map keys must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: [0-9]+, Member must not be null]"
1722
                )
1723

1724
            # checking if the version in the config exists
1725
            get_function_version(
1✔
1726
                function_name=function_version.id.function_name,
1727
                qualifier=key,
1728
                region=function_version.id.region,
1729
                account_id=function_version.id.account,
1730
            )
1731
        return AliasRoutingConfig(version_weights=routing_config_dict)
1✔
1732

1733
    def create_alias(
1✔
1734
        self,
1735
        context: RequestContext,
1736
        function_name: FunctionName,
1737
        name: Alias,
1738
        function_version: Version,
1739
        description: Description = None,
1740
        routing_config: AliasRoutingConfiguration = None,
1741
        **kwargs,
1742
    ) -> AliasConfiguration:
1743
        if not api_utils.qualifier_is_alias(name):
1✔
1744
            raise ValidationException(
1✔
1745
                f"1 validation error detected: Value '{name}' at 'name' failed to satisfy constraint: Member must satisfy regular expression pattern: (?!^[0-9]+$)([a-zA-Z0-9-_]+)"
1746
            )
1747

1748
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1749
        function_name = api_utils.get_function_name(function_name, context)
1✔
1750
        target_version = get_function_version(
1✔
1751
            function_name=function_name,
1752
            qualifier=function_version,
1753
            region=region,
1754
            account_id=account_id,
1755
        )
1756
        function = self._get_function(
1✔
1757
            function_name=function_name, region=region, account_id=account_id
1758
        )
1759
        # description is always present, if not specified it's an empty string
1760
        description = description or ""
1✔
1761
        with function.lock:
1✔
1762
            if existing_alias := function.aliases.get(name):
1✔
1763
                raise ResourceConflictException(
1✔
1764
                    f"Alias already exists: {api_utils.map_alias_out(alias=existing_alias, function=function)['AliasArn']}",
1765
                    Type="User",
1766
                )
1767
            # checking if the version exists
1768
            routing_configuration = None
1✔
1769
            if routing_config and (
1✔
1770
                routing_config_dict := routing_config.get("AdditionalVersionWeights")
1771
            ):
1772
                routing_configuration = self._create_routing_config_model(
1✔
1773
                    routing_config_dict, target_version
1774
                )
1775

1776
            alias = VersionAlias(
1✔
1777
                name=name,
1778
                function_version=function_version,
1779
                description=description,
1780
                routing_configuration=routing_configuration,
1781
            )
1782
            function.aliases[name] = alias
1✔
1783
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1784

1785
    def list_aliases(
1✔
1786
        self,
1787
        context: RequestContext,
1788
        function_name: FunctionName,
1789
        function_version: Version = None,
1790
        marker: String = None,
1791
        max_items: MaxListItems = None,
1792
        **kwargs,
1793
    ) -> ListAliasesResponse:
1794
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1795
        function_name = api_utils.get_function_name(function_name, context)
1✔
1796
        function = self._get_function(
1✔
1797
            function_name=function_name, region=region, account_id=account_id
1798
        )
1799
        aliases = [
1✔
1800
            api_utils.map_alias_out(alias, function)
1801
            for alias in function.aliases.values()
1802
            if function_version is None or alias.function_version == function_version
1803
        ]
1804

1805
        aliases = PaginatedList(aliases)
1✔
1806
        page, token = aliases.get_page(
1✔
1807
            lambda alias: alias["AliasArn"],
1808
            marker,
1809
            max_items,
1810
        )
1811

1812
        return ListAliasesResponse(Aliases=page, NextMarker=token)
1✔
1813

1814
    def delete_alias(
1✔
1815
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
1816
    ) -> None:
1817
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1818
        function_name = api_utils.get_function_name(function_name, context)
1✔
1819
        function = self._get_function(
1✔
1820
            function_name=function_name, region=region, account_id=account_id
1821
        )
1822
        version_alias = function.aliases.pop(name, None)
1✔
1823

1824
        # cleanup related resources
1825
        if name in function.provisioned_concurrency_configs:
1✔
1826
            function.provisioned_concurrency_configs.pop(name)
1✔
1827

1828
        # TODO: Allow for deactivating/unregistering specific Lambda URLs
1829
        if version_alias and name in function.function_url_configs:
1✔
1830
            url_config = function.function_url_configs.pop(name)
1✔
1831
            LOG.debug(
1✔
1832
                "Stopping aliased Lambda Function URL %s for %s",
1833
                url_config.url,
1834
                url_config.function_name,
1835
            )
1836

1837
    def get_alias(
1✔
1838
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
1839
    ) -> AliasConfiguration:
1840
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1841
        function_name = api_utils.get_function_name(function_name, context)
1✔
1842
        function = self._get_function(
1✔
1843
            function_name=function_name, region=region, account_id=account_id
1844
        )
1845
        if not (alias := function.aliases.get(name)):
1✔
1846
            raise ResourceNotFoundException(
1✔
1847
                f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name=function_name, qualifier=name, region=region, account=account_id)}",
1848
                Type="User",
1849
            )
1850
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1851

1852
    def update_alias(
1✔
1853
        self,
1854
        context: RequestContext,
1855
        function_name: FunctionName,
1856
        name: Alias,
1857
        function_version: Version = None,
1858
        description: Description = None,
1859
        routing_config: AliasRoutingConfiguration = None,
1860
        revision_id: String = None,
1861
        **kwargs,
1862
    ) -> AliasConfiguration:
1863
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1864
        function_name = api_utils.get_function_name(function_name, context)
1✔
1865
        function = self._get_function(
1✔
1866
            function_name=function_name, region=region, account_id=account_id
1867
        )
1868
        if not (alias := function.aliases.get(name)):
1✔
1869
            fn_arn = api_utils.qualified_lambda_arn(function_name, name, account_id, region)
1✔
1870
            raise ResourceNotFoundException(
1✔
1871
                f"Alias not found: {fn_arn}",
1872
                Type="User",
1873
            )
1874
        if revision_id and alias.revision_id != revision_id:
1✔
1875
            raise PreconditionFailedException(
1✔
1876
                "The Revision Id provided does not match the latest Revision Id. "
1877
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1878
                Type="User",
1879
            )
1880
        changes = {}
1✔
1881
        if function_version is not None:
1✔
1882
            changes |= {"function_version": function_version}
1✔
1883
        if description is not None:
1✔
1884
            changes |= {"description": description}
1✔
1885
        if routing_config is not None:
1✔
1886
            # if it is an empty dict or AdditionalVersionWeights is empty, set routing config to None
1887
            new_routing_config = None
1✔
1888
            if routing_config_dict := routing_config.get("AdditionalVersionWeights"):
1✔
UNCOV
1889
                new_routing_config = self._create_routing_config_model(routing_config_dict)
×
1890
            changes |= {"routing_configuration": new_routing_config}
1✔
1891
        # even if no changes are done, we have to update revision id for some reason
1892
        old_alias = alias
1✔
1893
        alias = dataclasses.replace(alias, **changes)
1✔
1894
        function.aliases[name] = alias
1✔
1895

1896
        # TODO: signal lambda service that pointer potentially changed
1897
        self.lambda_service.update_alias(old_alias=old_alias, new_alias=alias, function=function)
1✔
1898

1899
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1900

1901
    # =======================================
1902
    # ======= EVENT SOURCE MAPPINGS =========
1903
    # =======================================
1904
    def check_service_resource_exists(
1✔
1905
        self, service: str, resource_arn: str, function_arn: str, function_role_arn: str
1906
    ):
1907
        """
1908
        Check if the service resource exists and if the function has access to it.
1909

1910
        Raises:
1911
            InvalidParameterValueException: If the service resource does not exist or the function does not have access to it.
1912
        """
1913
        arn = parse_arn(resource_arn)
1✔
1914
        source_client = get_internal_client(
1✔
1915
            arn=resource_arn,
1916
            role_arn=function_role_arn,
1917
            service_principal=ServicePrincipal.lambda_,
1918
            source_arn=function_arn,
1919
        )
1920
        if service in ["sqs", "sqs-fifo"]:
1✔
1921
            try:
1✔
1922
                # AWS uses `GetQueueAttributes` internally to verify the queue existence, but we need the `QueueUrl`
1923
                # which is not given directly. We build out a dummy `QueueUrl` which can be parsed by SQS to return
1924
                # the right value
1925
                queue_name = arn["resource"].split("/")[-1]
1✔
1926
                queue_url = f"http://sqs.{arn['region']}.domain/{arn['account']}/{queue_name}"
1✔
1927
                source_client.get_queue_attributes(QueueUrl=queue_url)
1✔
1928
            except ClientError as e:
1✔
1929
                error_code = e.response["Error"]["Code"]
1✔
1930
                if error_code == "AWS.SimpleQueueService.NonExistentQueue":
1✔
1931
                    raise InvalidParameterValueException(
1✔
1932
                        f"Error occurred while ReceiveMessage. SQS Error Code: {error_code}. SQS Error Message: {e.response['Error']['Message']}",
1933
                        Type="User",
1934
                    )
UNCOV
1935
                raise e
×
1936
        elif service in ["kinesis"]:
1✔
1937
            try:
1✔
1938
                source_client.describe_stream(StreamARN=resource_arn)
1✔
1939
            except ClientError as e:
1✔
1940
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
1941
                    raise InvalidParameterValueException(
1✔
1942
                        f"Stream not found: {resource_arn}",
1943
                        Type="User",
1944
                    )
UNCOV
1945
                raise e
×
1946
        elif service in ["dynamodb"]:
1✔
1947
            try:
1✔
1948
                source_client.describe_stream(StreamArn=resource_arn)
1✔
1949
            except ClientError as e:
1✔
1950
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
1951
                    raise InvalidParameterValueException(
1✔
1952
                        f"Stream not found: {resource_arn}",
1953
                        Type="User",
1954
                    )
UNCOV
1955
                raise e
×
1956

1957
    @handler("CreateEventSourceMapping", expand=False)
1✔
1958
    def create_event_source_mapping(
1✔
1959
        self,
1960
        context: RequestContext,
1961
        request: CreateEventSourceMappingRequest,
1962
    ) -> EventSourceMappingConfiguration:
1963
        return self.create_event_source_mapping_v2(context, request)
1✔
1964

1965
    def create_event_source_mapping_v2(
1✔
1966
        self,
1967
        context: RequestContext,
1968
        request: CreateEventSourceMappingRequest,
1969
    ) -> EventSourceMappingConfiguration:
1970
        # Validations
1971
        function_arn, function_name, state, function_version, function_role = (
1✔
1972
            self.validate_event_source_mapping(context, request)
1973
        )
1974

1975
        esm_config = EsmConfigFactory(request, context, function_arn).get_esm_config()
1✔
1976

1977
        # Copy esm_config to avoid a race condition with potential async update in the store
1978
        state.event_source_mappings[esm_config["UUID"]] = esm_config.copy()
1✔
1979
        enabled = request.get("Enabled", True)
1✔
1980
        # TODO: check for potential async race condition update -> think about locking
1981
        esm_worker = EsmWorkerFactory(esm_config, function_role, enabled).get_esm_worker()
1✔
1982
        self.esm_workers[esm_worker.uuid] = esm_worker
1✔
1983
        # TODO: check StateTransitionReason, LastModified, LastProcessingResult (concurrent updates requires locking!)
1984
        if tags := request.get("Tags"):
1✔
1985
            self._store_tags(esm_config.get("EventSourceMappingArn"), tags)
1✔
1986
        esm_worker.create()
1✔
1987
        return esm_config
1✔
1988

1989
    def validate_event_source_mapping(self, context, request):
1✔
1990
        # TODO: test whether stream ARNs are valid sources for Pipes or ESM or whether only DynamoDB table ARNs work
1991
        is_create_esm_request = context.operation.name == self.create_event_source_mapping.operation
1✔
1992

1993
        if destination_config := request.get("DestinationConfig"):
1✔
1994
            if "OnSuccess" in destination_config:
1✔
1995
                raise InvalidParameterValueException(
1✔
1996
                    "Unsupported DestinationConfig parameter for given event source mapping type.",
1997
                    Type="User",
1998
                )
1999

2000
        service = None
1✔
2001
        if "SelfManagedEventSource" in request:
1✔
UNCOV
2002
            service = "kafka"
×
UNCOV
2003
            if "SourceAccessConfigurations" not in request:
×
UNCOV
2004
                raise InvalidParameterValueException(
×
2005
                    "Required 'sourceAccessConfigurations' parameter is missing.", Type="User"
2006
                )
2007
        if service is None and "EventSourceArn" not in request:
1✔
2008
            raise InvalidParameterValueException("Unrecognized event source.", Type="User")
1✔
2009
        if service is None:
1✔
2010
            service = extract_service_from_arn(request["EventSourceArn"])
1✔
2011

2012
        batch_size = api_utils.validate_and_set_batch_size(service, request.get("BatchSize"))
1✔
2013
        if service in ["dynamodb", "kinesis"]:
1✔
2014
            starting_position = request.get("StartingPosition")
1✔
2015
            if not starting_position:
1✔
2016
                raise InvalidParameterValueException(
1✔
2017
                    "1 validation error detected: Value null at 'startingPosition' failed to satisfy constraint: Member must not be null.",
2018
                    Type="User",
2019
                )
2020

2021
            if starting_position not in KinesisStreamStartPosition.__members__:
1✔
2022
                raise ValidationException(
1✔
2023
                    f"1 validation error detected: Value '{starting_position}' at 'startingPosition' failed to satisfy constraint: Member must satisfy enum value set: [LATEST, AT_TIMESTAMP, TRIM_HORIZON]"
2024
                )
2025
            # AT_TIMESTAMP is not allowed for DynamoDB Streams
2026
            elif (
1✔
2027
                service == "dynamodb"
2028
                and starting_position not in DynamoDBStreamStartPosition.__members__
2029
            ):
2030
                raise InvalidParameterValueException(
1✔
2031
                    f"Unsupported starting position for arn type: {request['EventSourceArn']}",
2032
                    Type="User",
2033
                )
2034

2035
        if service in ["sqs", "sqs-fifo"]:
1✔
2036
            if batch_size > 10 and request.get("MaximumBatchingWindowInSeconds", 0) == 0:
1✔
2037
                raise InvalidParameterValueException(
1✔
2038
                    "Maximum batch window in seconds must be greater than 0 if maximum batch size is greater than 10",
2039
                    Type="User",
2040
                )
2041

2042
        if (filter_criteria := request.get("FilterCriteria")) is not None:
1✔
2043
            for filter_ in filter_criteria.get("Filters", []):
1✔
2044
                pattern_str = filter_.get("Pattern")
1✔
2045
                if not pattern_str or not isinstance(pattern_str, str):
1✔
UNCOV
2046
                    raise InvalidParameterValueException(
×
2047
                        "Invalid filter pattern definition.", Type="User"
2048
                    )
2049

2050
                if not validate_event_pattern(pattern_str):
1✔
2051
                    raise InvalidParameterValueException(
1✔
2052
                        "Invalid filter pattern definition.", Type="User"
2053
                    )
2054

2055
        # Can either have a FunctionName (i.e CreateEventSourceMapping request) or
2056
        # an internal EventSourceMappingConfiguration representation
2057
        request_function_name = request.get("FunctionName") or request.get("FunctionArn")
1✔
2058
        # can be either a partial arn or a full arn for the version/alias
2059
        function_name, qualifier, account, region = function_locators_from_arn(
1✔
2060
            request_function_name
2061
        )
2062
        # TODO: validate `context.region` vs. `region(request["FunctionName"])` vs. `region(request["EventSourceArn"])`
2063
        account = account or context.account_id
1✔
2064
        region = region or context.region
1✔
2065
        state = lambda_stores[account][region]
1✔
2066
        fn = state.functions.get(function_name)
1✔
2067
        if not fn:
1✔
2068
            raise InvalidParameterValueException("Function does not exist", Type="User")
1✔
2069

2070
        if qualifier:
1✔
2071
            # make sure the function version/alias exists
2072
            if api_utils.qualifier_is_alias(qualifier):
1✔
2073
                fn_alias = fn.aliases.get(qualifier)
1✔
2074
                if not fn_alias:
1✔
UNCOV
2075
                    raise Exception("unknown alias")  # TODO: cover via test
×
2076
            elif api_utils.qualifier_is_version(qualifier):
1✔
2077
                fn_version = fn.versions.get(qualifier)
1✔
2078
                if not fn_version:
1✔
UNCOV
2079
                    raise Exception("unknown version")  # TODO: cover via test
×
2080
            elif qualifier == "$LATEST":
1✔
2081
                pass
1✔
2082
            else:
UNCOV
2083
                raise Exception("invalid functionname")  # TODO: cover via test
×
2084
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account, region)
1✔
2085

2086
        else:
2087
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account, region)
1✔
2088

2089
        function_version = get_function_version_from_arn(fn_arn)
1✔
2090
        function_role = function_version.config.role
1✔
2091

2092
        if source_arn := request.get("EventSourceArn"):
1✔
2093
            self.check_service_resource_exists(service, source_arn, fn_arn, function_role)
1✔
2094
        # Check we are validating a CreateEventSourceMapping request
2095
        if is_create_esm_request:
1✔
2096

2097
            def _get_mapping_sources(mapping: dict[str, Any]) -> list[str]:
1✔
2098
                if event_source_arn := mapping.get("EventSourceArn"):
1✔
2099
                    return [event_source_arn]
1✔
UNCOV
2100
                return (
×
2101
                    mapping.get("SelfManagedEventSource", {})
2102
                    .get("Endpoints", {})
2103
                    .get("KAFKA_BOOTSTRAP_SERVERS", [])
2104
                )
2105

2106
            # check for event source duplicates
2107
            # TODO: currently validated for sqs, kinesis, and dynamodb
2108
            service_id = load_service(service).service_id
1✔
2109
            for uuid, mapping in state.event_source_mappings.items():
1✔
2110
                mapping_sources = _get_mapping_sources(mapping)
1✔
2111
                request_sources = _get_mapping_sources(request)
1✔
2112
                if mapping["FunctionArn"] == fn_arn and (
1✔
2113
                    set(mapping_sources).intersection(request_sources)
2114
                ):
2115
                    if service == "sqs":
1✔
2116
                        # *shakes fist at SQS*
2117
                        raise ResourceConflictException(
1✔
2118
                            f'An event source mapping with {service_id} arn (" {mapping["EventSourceArn"]} ") '
2119
                            f'and function (" {function_name} ") already exists. Please update or delete the '
2120
                            f"existing mapping with UUID {uuid}",
2121
                            Type="User",
2122
                        )
2123
                    elif service == "kafka":
1✔
UNCOV
2124
                        if set(mapping["Topics"]).intersection(request["Topics"]):
×
UNCOV
2125
                            raise ResourceConflictException(
×
2126
                                f'An event source mapping with event source ("{",".join(request_sources)}"), '
2127
                                f'function ("{fn_arn}"), '
2128
                                f'topics ("{",".join(request["Topics"])}") already exists. Please update or delete the '
2129
                                f"existing mapping with UUID {uuid}",
2130
                                Type="User",
2131
                            )
2132
                    else:
2133
                        raise ResourceConflictException(
1✔
2134
                            f'The event source arn (" {mapping["EventSourceArn"]} ") and function '
2135
                            f'(" {function_name} ") provided mapping already exists. Please update or delete the '
2136
                            f"existing mapping with UUID {uuid}",
2137
                            Type="User",
2138
                        )
2139
        return fn_arn, function_name, state, function_version, function_role
1✔
2140

2141
    @handler("UpdateEventSourceMapping", expand=False)
1✔
2142
    def update_event_source_mapping(
1✔
2143
        self,
2144
        context: RequestContext,
2145
        request: UpdateEventSourceMappingRequest,
2146
    ) -> EventSourceMappingConfiguration:
2147
        return self.update_event_source_mapping_v2(context, request)
1✔
2148

2149
    def update_event_source_mapping_v2(
1✔
2150
        self,
2151
        context: RequestContext,
2152
        request: UpdateEventSourceMappingRequest,
2153
    ) -> EventSourceMappingConfiguration:
2154
        # TODO: test and implement this properly (quite complex with many validations and limitations!)
2155
        LOG.warning(
1✔
2156
            "Updating Lambda Event Source Mapping is in experimental state and not yet fully tested."
2157
        )
2158
        state = lambda_stores[context.account_id][context.region]
1✔
2159
        request_data = {**request}
1✔
2160
        uuid = request_data.pop("UUID", None)
1✔
2161
        if not uuid:
1✔
UNCOV
2162
            raise ResourceNotFoundException(
×
2163
                "The resource you requested does not exist.", Type="User"
2164
            )
2165
        old_event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2166
        esm_worker = self.esm_workers.get(uuid)
1✔
2167
        if old_event_source_mapping is None or esm_worker is None:
1✔
2168
            raise ResourceNotFoundException(
1✔
2169
                "The resource you requested does not exist.", Type="User"
2170
            )  # TODO: test?
2171

2172
        # normalize values to overwrite
2173
        event_source_mapping = old_event_source_mapping | request_data
1✔
2174

2175
        temp_params = {}  # values only set for the returned response, not saved internally (e.g. transient state)
1✔
2176

2177
        # Validate the newly updated ESM object. We ignore the output here since we only care whether an Exception is raised.
2178
        function_arn, _, _, function_version, function_role = self.validate_event_source_mapping(
1✔
2179
            context, event_source_mapping
2180
        )
2181

2182
        # remove the FunctionName field
2183
        event_source_mapping.pop("FunctionName", None)
1✔
2184

2185
        if function_arn:
1✔
2186
            event_source_mapping["FunctionArn"] = function_arn
1✔
2187

2188
        # Only apply update if the desired state differs
2189
        enabled = request.get("Enabled")
1✔
2190
        if enabled is not None:
1✔
2191
            if enabled and old_event_source_mapping["State"] != EsmState.ENABLED:
1✔
2192
                event_source_mapping["State"] = EsmState.ENABLING
1✔
2193
            # TODO: What happens when trying to update during an update or failed state?!
2194
            elif not enabled and old_event_source_mapping["State"] == EsmState.ENABLED:
1✔
2195
                event_source_mapping["State"] = EsmState.DISABLING
1✔
2196
        else:
2197
            event_source_mapping["State"] = EsmState.UPDATING
1✔
2198

2199
        # To ensure parity, certain responses need to be immediately returned
2200
        temp_params["State"] = event_source_mapping["State"]
1✔
2201

2202
        state.event_source_mappings[uuid] = event_source_mapping
1✔
2203

2204
        # TODO: Currently, we re-create the entire ESM worker. Look into approach with better performance.
2205
        worker_factory = EsmWorkerFactory(
1✔
2206
            event_source_mapping, function_role, request.get("Enabled", esm_worker.enabled)
2207
        )
2208

2209
        # Get a new ESM worker object but do not active it, since the factory holds all logic for creating new worker from configuration.
2210
        updated_esm_worker = worker_factory.get_esm_worker()
1✔
2211
        self.esm_workers[uuid] = updated_esm_worker
1✔
2212

2213
        # We should stop() the worker since the delete() will remove the ESM from the state mapping.
2214
        esm_worker.stop()
1✔
2215
        # This will either create an EsmWorker in the CREATING state if enabled. Otherwise, the DISABLING state is set.
2216
        updated_esm_worker.create()
1✔
2217

2218
        return {**event_source_mapping, **temp_params}
1✔
2219

2220
    def delete_event_source_mapping(
1✔
2221
        self, context: RequestContext, uuid: String, **kwargs
2222
    ) -> EventSourceMappingConfiguration:
2223
        state = lambda_stores[context.account_id][context.region]
1✔
2224
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2225
        if not event_source_mapping:
1✔
2226
            raise ResourceNotFoundException(
1✔
2227
                "The resource you requested does not exist.", Type="User"
2228
            )
2229
        esm = state.event_source_mappings[uuid]
1✔
2230
        # TODO: add proper locking
2231
        esm_worker = self.esm_workers.pop(uuid, None)
1✔
2232
        # Asynchronous delete in v2
2233
        if not esm_worker:
1✔
UNCOV
2234
            raise ResourceNotFoundException(
×
2235
                "The resource you requested does not exist.", Type="User"
2236
            )
2237
        esm_worker.delete()
1✔
2238
        return {**esm, "State": EsmState.DELETING}
1✔
2239

2240
    def get_event_source_mapping(
1✔
2241
        self, context: RequestContext, uuid: String, **kwargs
2242
    ) -> EventSourceMappingConfiguration:
2243
        state = lambda_stores[context.account_id][context.region]
1✔
2244
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2245
        if not event_source_mapping:
1✔
2246
            raise ResourceNotFoundException(
1✔
2247
                "The resource you requested does not exist.", Type="User"
2248
            )
2249
        esm_worker = self.esm_workers.get(uuid)
1✔
2250
        if not esm_worker:
1✔
2251
            raise ResourceNotFoundException(
1✔
2252
                "The resource you requested does not exist.", Type="User"
2253
            )
2254
        event_source_mapping["State"] = esm_worker.current_state
1✔
2255
        event_source_mapping["StateTransitionReason"] = esm_worker.state_transition_reason
1✔
2256
        return event_source_mapping
1✔
2257

2258
    def list_event_source_mappings(
1✔
2259
        self,
2260
        context: RequestContext,
2261
        event_source_arn: Arn = None,
2262
        function_name: FunctionName = None,
2263
        marker: String = None,
2264
        max_items: MaxListItems = None,
2265
        **kwargs,
2266
    ) -> ListEventSourceMappingsResponse:
2267
        state = lambda_stores[context.account_id][context.region]
1✔
2268

2269
        esms = state.event_source_mappings.values()
1✔
2270
        # TODO: update and test State and StateTransitionReason for ESM v2
2271

2272
        if event_source_arn:  # TODO: validate pattern
1✔
2273
            esms = [e for e in esms if e.get("EventSourceArn") == event_source_arn]
1✔
2274

2275
        if function_name:
1✔
2276
            esms = [e for e in esms if function_name in e["FunctionArn"]]
1✔
2277

2278
        esms = PaginatedList(esms)
1✔
2279
        page, token = esms.get_page(
1✔
2280
            lambda x: x["UUID"],
2281
            marker,
2282
            max_items,
2283
        )
2284
        return ListEventSourceMappingsResponse(EventSourceMappings=page, NextMarker=token)
1✔
2285

2286
    def get_source_type_from_request(self, request: dict[str, Any]) -> str:
1✔
UNCOV
2287
        if event_source_arn := request.get("EventSourceArn", ""):
×
UNCOV
2288
            service = extract_service_from_arn(event_source_arn)
×
UNCOV
2289
            if service == "sqs" and "fifo" in event_source_arn:
×
UNCOV
2290
                service = "sqs-fifo"
×
UNCOV
2291
            return service
×
UNCOV
2292
        elif request.get("SelfManagedEventSource"):
×
UNCOV
2293
            return "kafka"
×
2294

2295
    # =======================================
2296
    # ============ FUNCTION URLS ============
2297
    # =======================================
2298

2299
    @staticmethod
1✔
2300
    def _validate_qualifier(qualifier: str) -> None:
1✔
2301
        if qualifier == "$LATEST" or (qualifier and api_utils.qualifier_is_version(qualifier)):
1✔
2302
            raise ValidationException(
1✔
2303
                f"1 validation error detected: Value '{qualifier}' at 'qualifier' failed to satisfy constraint: Member must satisfy regular expression pattern: ((?!^\\d+$)^[0-9a-zA-Z-_]+$)"
2304
            )
2305

2306
    @staticmethod
1✔
2307
    def _validate_invoke_mode(invoke_mode: str) -> None:
1✔
2308
        if invoke_mode and invoke_mode not in [InvokeMode.BUFFERED, InvokeMode.RESPONSE_STREAM]:
1✔
2309
            raise ValidationException(
1✔
2310
                f"1 validation error detected: Value '{invoke_mode}' at 'invokeMode' failed to satisfy constraint: Member must satisfy enum value set: [RESPONSE_STREAM, BUFFERED]"
2311
            )
2312
        if invoke_mode == InvokeMode.RESPONSE_STREAM:
1✔
2313
            # TODO should we actually fail for setting RESPONSE_STREAM?
2314
            #  It should trigger InvokeWithResponseStream which is not implemented
2315
            LOG.warning(
1✔
2316
                "The invokeMode 'RESPONSE_STREAM' is not yet supported on LocalStack. The property is only mocked, the execution will still be 'BUFFERED'"
2317
            )
2318

2319
    # TODO: what happens if function state is not active?
2320
    def create_function_url_config(
1✔
2321
        self,
2322
        context: RequestContext,
2323
        function_name: FunctionName,
2324
        auth_type: FunctionUrlAuthType,
2325
        qualifier: FunctionUrlQualifier = None,
2326
        cors: Cors = None,
2327
        invoke_mode: InvokeMode = None,
2328
        **kwargs,
2329
    ) -> CreateFunctionUrlConfigResponse:
2330
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2331
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2332
            function_name, qualifier, context
2333
        )
2334
        state = lambda_stores[account_id][region]
1✔
2335
        self._validate_qualifier(qualifier)
1✔
2336
        self._validate_invoke_mode(invoke_mode)
1✔
2337

2338
        fn = state.functions.get(function_name)
1✔
2339
        if fn is None:
1✔
2340
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2341

2342
        url_config = fn.function_url_configs.get(qualifier or "$LATEST")
1✔
2343
        if url_config:
1✔
2344
            raise ResourceConflictException(
1✔
2345
                f"Failed to create function url config for [functionArn = {url_config.function_arn}]. Error message:  FunctionUrlConfig exists for this Lambda function",
2346
                Type="User",
2347
            )
2348

2349
        if qualifier and qualifier != "$LATEST" and qualifier not in fn.aliases:
1✔
2350
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2351

2352
        normalized_qualifier = qualifier or "$LATEST"
1✔
2353

2354
        function_arn = (
1✔
2355
            api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
2356
            if qualifier
2357
            else api_utils.unqualified_lambda_arn(function_name, account_id, region)
2358
        )
2359

2360
        custom_id: str | None = None
1✔
2361

2362
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
2363
        if TAG_KEY_CUSTOM_URL in tags:
1✔
2364
            # Note: I really wanted to add verification here that the
2365
            # url_id is unique, so we could surface that to the user ASAP.
2366
            # However, it seems like that information isn't available yet,
2367
            # since (as far as I can tell) we call
2368
            # self.router.register_routes() once, in a single shot, for all
2369
            # of the routes -- and we need to verify that it's unique not
2370
            # just for this particular lambda function, but for the entire
2371
            # lambda provider. Therefore... that idea proved non-trivial!
2372
            custom_id_tag_value = (
1✔
2373
                f"{tags[TAG_KEY_CUSTOM_URL]}-{qualifier}" if qualifier else tags[TAG_KEY_CUSTOM_URL]
2374
            )
2375
            if TAG_KEY_CUSTOM_URL_VALIDATOR.match(custom_id_tag_value):
1✔
2376
                custom_id = custom_id_tag_value
1✔
2377

2378
            else:
2379
                # Note: we're logging here instead of raising to prioritize
2380
                # strict parity with AWS over the localstack-only custom_id
2381
                LOG.warning(
1✔
2382
                    "Invalid custom ID tag value for lambda URL (%s=%s). "
2383
                    "Replaced with default (random id)",
2384
                    TAG_KEY_CUSTOM_URL,
2385
                    custom_id_tag_value,
2386
                )
2387

2388
        # The url_id is the subdomain used for the URL we're creating. This
2389
        # is either created randomly (as in AWS), or can be passed as a tag
2390
        # to the lambda itself (localstack-only).
2391
        url_id: str
2392
        if custom_id is None:
1✔
2393
            url_id = api_utils.generate_random_url_id()
1✔
2394
        else:
2395
            url_id = custom_id
1✔
2396

2397
        host_definition = localstack_host(custom_port=config.GATEWAY_LISTEN[0].port)
1✔
2398
        fn.function_url_configs[normalized_qualifier] = FunctionUrlConfig(
1✔
2399
            function_arn=function_arn,
2400
            function_name=function_name,
2401
            cors=cors,
2402
            url_id=url_id,
2403
            url=f"http://{url_id}.lambda-url.{context.region}.{host_definition.host_and_port()}/",  # TODO: https support
2404
            auth_type=auth_type,
2405
            creation_time=api_utils.generate_lambda_date(),
2406
            last_modified_time=api_utils.generate_lambda_date(),
2407
            invoke_mode=invoke_mode,
2408
        )
2409

2410
        # persist and start URL
2411
        # TODO: implement URL invoke
2412
        api_url_config = api_utils.map_function_url_config(
1✔
2413
            fn.function_url_configs[normalized_qualifier]
2414
        )
2415

2416
        return CreateFunctionUrlConfigResponse(
1✔
2417
            FunctionUrl=api_url_config["FunctionUrl"],
2418
            FunctionArn=api_url_config["FunctionArn"],
2419
            AuthType=api_url_config["AuthType"],
2420
            Cors=api_url_config["Cors"],
2421
            CreationTime=api_url_config["CreationTime"],
2422
            InvokeMode=api_url_config["InvokeMode"],
2423
        )
2424

2425
    def get_function_url_config(
1✔
2426
        self,
2427
        context: RequestContext,
2428
        function_name: FunctionName,
2429
        qualifier: FunctionUrlQualifier = None,
2430
        **kwargs,
2431
    ) -> GetFunctionUrlConfigResponse:
2432
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2433
        state = lambda_stores[account_id][region]
1✔
2434

2435
        fn_name, qualifier = api_utils.get_name_and_qualifier(function_name, qualifier, context)
1✔
2436

2437
        self._validate_qualifier(qualifier)
1✔
2438

2439
        resolved_fn = state.functions.get(fn_name)
1✔
2440
        if not resolved_fn:
1✔
2441
            raise ResourceNotFoundException(
1✔
2442
                "The resource you requested does not exist.", Type="User"
2443
            )
2444

2445
        qualifier = qualifier or "$LATEST"
1✔
2446
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2447
        if not url_config:
1✔
2448
            raise ResourceNotFoundException(
1✔
2449
                "The resource you requested does not exist.", Type="User"
2450
            )
2451

2452
        return api_utils.map_function_url_config(url_config)
1✔
2453

2454
    def update_function_url_config(
1✔
2455
        self,
2456
        context: RequestContext,
2457
        function_name: FunctionName,
2458
        qualifier: FunctionUrlQualifier = None,
2459
        auth_type: FunctionUrlAuthType = None,
2460
        cors: Cors = None,
2461
        invoke_mode: InvokeMode = None,
2462
        **kwargs,
2463
    ) -> UpdateFunctionUrlConfigResponse:
2464
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2465
        state = lambda_stores[account_id][region]
1✔
2466

2467
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2468
            function_name, qualifier, context
2469
        )
2470
        self._validate_qualifier(qualifier)
1✔
2471
        self._validate_invoke_mode(invoke_mode)
1✔
2472

2473
        fn = state.functions.get(function_name)
1✔
2474
        if not fn:
1✔
2475
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2476

2477
        normalized_qualifier = qualifier or "$LATEST"
1✔
2478

2479
        if (
1✔
2480
            api_utils.qualifier_is_alias(normalized_qualifier)
2481
            and normalized_qualifier not in fn.aliases
2482
        ):
2483
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2484

2485
        url_config = fn.function_url_configs.get(normalized_qualifier)
1✔
2486
        if not url_config:
1✔
2487
            raise ResourceNotFoundException(
1✔
2488
                "The resource you requested does not exist.", Type="User"
2489
            )
2490

2491
        changes = {
1✔
2492
            "last_modified_time": api_utils.generate_lambda_date(),
2493
            **({"cors": cors} if cors is not None else {}),
2494
            **({"auth_type": auth_type} if auth_type is not None else {}),
2495
        }
2496

2497
        if invoke_mode:
1✔
2498
            changes["invoke_mode"] = invoke_mode
1✔
2499

2500
        new_url_config = dataclasses.replace(url_config, **changes)
1✔
2501
        fn.function_url_configs[normalized_qualifier] = new_url_config
1✔
2502

2503
        return UpdateFunctionUrlConfigResponse(
1✔
2504
            FunctionUrl=new_url_config.url,
2505
            FunctionArn=new_url_config.function_arn,
2506
            AuthType=new_url_config.auth_type,
2507
            Cors=new_url_config.cors,
2508
            CreationTime=new_url_config.creation_time,
2509
            LastModifiedTime=new_url_config.last_modified_time,
2510
            InvokeMode=new_url_config.invoke_mode,
2511
        )
2512

2513
    def delete_function_url_config(
1✔
2514
        self,
2515
        context: RequestContext,
2516
        function_name: FunctionName,
2517
        qualifier: FunctionUrlQualifier = None,
2518
        **kwargs,
2519
    ) -> None:
2520
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2521
        state = lambda_stores[account_id][region]
1✔
2522

2523
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2524
            function_name, qualifier, context
2525
        )
2526
        self._validate_qualifier(qualifier)
1✔
2527

2528
        resolved_fn = state.functions.get(function_name)
1✔
2529
        if not resolved_fn:
1✔
2530
            raise ResourceNotFoundException(
1✔
2531
                "The resource you requested does not exist.", Type="User"
2532
            )
2533

2534
        qualifier = qualifier or "$LATEST"
1✔
2535
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2536
        if not url_config:
1✔
2537
            raise ResourceNotFoundException(
1✔
2538
                "The resource you requested does not exist.", Type="User"
2539
            )
2540

2541
        del resolved_fn.function_url_configs[qualifier]
1✔
2542

2543
    def list_function_url_configs(
1✔
2544
        self,
2545
        context: RequestContext,
2546
        function_name: FunctionName,
2547
        marker: String = None,
2548
        max_items: MaxItems = None,
2549
        **kwargs,
2550
    ) -> ListFunctionUrlConfigsResponse:
2551
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2552
        state = lambda_stores[account_id][region]
1✔
2553

2554
        fn_name = api_utils.get_function_name(function_name, context)
1✔
2555
        resolved_fn = state.functions.get(fn_name)
1✔
2556
        if not resolved_fn:
1✔
2557
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2558

2559
        url_configs = [
1✔
2560
            api_utils.map_function_url_config(fn_conf)
2561
            for fn_conf in resolved_fn.function_url_configs.values()
2562
        ]
2563
        url_configs = PaginatedList(url_configs)
1✔
2564
        page, token = url_configs.get_page(
1✔
2565
            lambda url_config: url_config["FunctionArn"],
2566
            marker,
2567
            max_items,
2568
        )
2569
        url_configs = page
1✔
2570
        return ListFunctionUrlConfigsResponse(FunctionUrlConfigs=url_configs, NextMarker=token)
1✔
2571

2572
    # =======================================
2573
    # ============  Permissions  ============
2574
    # =======================================
2575

2576
    @handler("AddPermission", expand=False)
1✔
2577
    def add_permission(
1✔
2578
        self,
2579
        context: RequestContext,
2580
        request: AddPermissionRequest,
2581
    ) -> AddPermissionResponse:
2582
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2583
            request.get("FunctionName"), request.get("Qualifier"), context
2584
        )
2585

2586
        # validate qualifier
2587
        if qualifier is not None:
1✔
2588
            self._validate_qualifier_expression(qualifier)
1✔
2589
            if qualifier == "$LATEST":
1✔
2590
                raise InvalidParameterValueException(
1✔
2591
                    "We currently do not support adding policies for $LATEST.", Type="User"
2592
                )
2593
        account_id, region = api_utils.get_account_and_region(request.get("FunctionName"), context)
1✔
2594

2595
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2596
        resolved_qualifier, fn_arn = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2597

2598
        revision_id = request.get("RevisionId")
1✔
2599
        if revision_id:
1✔
2600
            fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2601
            if revision_id != fn_revision_id:
1✔
2602
                raise PreconditionFailedException(
1✔
2603
                    "The Revision Id provided does not match the latest Revision Id. "
2604
                    "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2605
                    Type="User",
2606
                )
2607

2608
        request_sid = request["StatementId"]
1✔
2609
        if not bool(STATEMENT_ID_REGEX.match(request_sid)):
1✔
2610
            raise ValidationException(
1✔
2611
                f"1 validation error detected: Value '{request_sid}' at 'statementId' failed to satisfy constraint: Member must satisfy regular expression pattern: ([a-zA-Z0-9-_]+)"
2612
            )
2613
        # check for an already existing policy and any conflicts in existing statements
2614
        existing_policy = resolved_fn.permissions.get(resolved_qualifier)
1✔
2615
        if existing_policy:
1✔
2616
            if request_sid in [s["Sid"] for s in existing_policy.policy.Statement]:
1✔
2617
                # uniqueness scope: statement id needs to be unique per qualified function ($LATEST, version, or alias)
2618
                # Counterexample: the same sid can exist within $LATEST, version, and alias
2619
                raise ResourceConflictException(
1✔
2620
                    f"The statement id ({request_sid}) provided already exists. Please provide a new statement id, or remove the existing statement.",
2621
                    Type="User",
2622
                )
2623

2624
        permission_statement = api_utils.build_statement(
1✔
2625
            partition=context.partition,
2626
            resource_arn=fn_arn,
2627
            statement_id=request["StatementId"],
2628
            action=request["Action"],
2629
            principal=request["Principal"],
2630
            source_arn=request.get("SourceArn"),
2631
            source_account=request.get("SourceAccount"),
2632
            principal_org_id=request.get("PrincipalOrgID"),
2633
            event_source_token=request.get("EventSourceToken"),
2634
            auth_type=request.get("FunctionUrlAuthType"),
2635
        )
2636
        new_policy = existing_policy
1✔
2637
        if not existing_policy:
1✔
2638
            new_policy = FunctionResourcePolicy(
1✔
2639
                policy=ResourcePolicy(Version="2012-10-17", Id="default", Statement=[])
2640
            )
2641
        new_policy.policy.Statement.append(permission_statement)
1✔
2642
        if not existing_policy:
1✔
2643
            resolved_fn.permissions[resolved_qualifier] = new_policy
1✔
2644

2645
        # Update revision id of alias or version
2646
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2647
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2648
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2649
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2650
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
1✔
2651
        # Assumes that a non-alias is a version
2652
        else:
2653
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2654
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2655
                resolved_version, config=dataclasses.replace(resolved_version.config)
2656
            )
2657
        return AddPermissionResponse(Statement=json.dumps(permission_statement))
1✔
2658

2659
    def remove_permission(
1✔
2660
        self,
2661
        context: RequestContext,
2662
        function_name: FunctionName,
2663
        statement_id: NamespacedStatementId,
2664
        qualifier: Qualifier = None,
2665
        revision_id: String = None,
2666
        **kwargs,
2667
    ) -> None:
2668
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2669
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2670
            function_name, qualifier, context
2671
        )
2672
        if qualifier is not None:
1✔
2673
            self._validate_qualifier_expression(qualifier)
1✔
2674

2675
        state = lambda_stores[account_id][region]
1✔
2676
        resolved_fn = state.functions.get(function_name)
1✔
2677
        if resolved_fn is None:
1✔
2678
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2679
            raise ResourceNotFoundException(f"No policy found for: {fn_arn}", Type="User")
1✔
2680

2681
        resolved_qualifier, _ = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2682
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2683
        if not function_permission:
1✔
2684
            raise ResourceNotFoundException(
1✔
2685
                "No policy is associated with the given resource.", Type="User"
2686
            )
2687

2688
        # try to find statement in policy and delete it
2689
        statement = None
1✔
2690
        for s in function_permission.policy.Statement:
1✔
2691
            if s["Sid"] == statement_id:
1✔
2692
                statement = s
1✔
2693
                break
1✔
2694

2695
        if not statement:
1✔
2696
            raise ResourceNotFoundException(
1✔
2697
                f"Statement {statement_id} is not found in resource policy.", Type="User"
2698
            )
2699
        fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2700
        if revision_id and revision_id != fn_revision_id:
1✔
2701
            raise PreconditionFailedException(
1✔
2702
                "The Revision Id provided does not match the latest Revision Id. "
2703
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2704
                Type="User",
2705
            )
2706
        function_permission.policy.Statement.remove(statement)
1✔
2707

2708
        # Update revision id for alias or version
2709
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2710
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2711
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
UNCOV
2712
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
×
UNCOV
2713
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
×
2714
        # Assumes that a non-alias is a version
2715
        else:
2716
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2717
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2718
                resolved_version, config=dataclasses.replace(resolved_version.config)
2719
            )
2720

2721
        # remove the policy as a whole when there's no statement left in it
2722
        if len(function_permission.policy.Statement) == 0:
1✔
2723
            del resolved_fn.permissions[resolved_qualifier]
1✔
2724

2725
    def get_policy(
1✔
2726
        self,
2727
        context: RequestContext,
2728
        function_name: NamespacedFunctionName,
2729
        qualifier: Qualifier = None,
2730
        **kwargs,
2731
    ) -> GetPolicyResponse:
2732
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2733
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2734
            function_name, qualifier, context
2735
        )
2736

2737
        if qualifier is not None:
1✔
2738
            self._validate_qualifier_expression(qualifier)
1✔
2739

2740
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2741

2742
        resolved_qualifier = qualifier or "$LATEST"
1✔
2743
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2744
        if not function_permission:
1✔
2745
            raise ResourceNotFoundException(
1✔
2746
                "The resource you requested does not exist.", Type="User"
2747
            )
2748

2749
        fn_revision_id = None
1✔
2750
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2751
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2752
            fn_revision_id = resolved_alias.revision_id
1✔
2753
        # Assumes that a non-alias is a version
2754
        else:
2755
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2756
            fn_revision_id = resolved_version.config.revision_id
1✔
2757

2758
        return GetPolicyResponse(
1✔
2759
            Policy=json.dumps(dataclasses.asdict(function_permission.policy)),
2760
            RevisionId=fn_revision_id,
2761
        )
2762

2763
    # =======================================
2764
    # ========  Code signing config  ========
2765
    # =======================================
2766

2767
    def create_code_signing_config(
1✔
2768
        self,
2769
        context: RequestContext,
2770
        allowed_publishers: AllowedPublishers,
2771
        description: Description = None,
2772
        code_signing_policies: CodeSigningPolicies = None,
2773
        tags: Tags = None,
2774
        **kwargs,
2775
    ) -> CreateCodeSigningConfigResponse:
2776
        account = context.account_id
1✔
2777
        region = context.region
1✔
2778

2779
        state = lambda_stores[account][region]
1✔
2780
        # TODO: can there be duplicates?
2781
        csc_id = f"csc-{get_random_hex(17)}"  # e.g. 'csc-077c33b4c19e26036'
1✔
2782
        csc_arn = f"arn:{context.partition}:lambda:{region}:{account}:code-signing-config:{csc_id}"
1✔
2783
        csc = CodeSigningConfig(
1✔
2784
            csc_id=csc_id,
2785
            arn=csc_arn,
2786
            allowed_publishers=allowed_publishers,
2787
            policies=code_signing_policies,
2788
            last_modified=api_utils.generate_lambda_date(),
2789
            description=description,
2790
        )
2791
        state.code_signing_configs[csc_arn] = csc
1✔
2792
        return CreateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
2793

2794
    def put_function_code_signing_config(
1✔
2795
        self,
2796
        context: RequestContext,
2797
        code_signing_config_arn: CodeSigningConfigArn,
2798
        function_name: FunctionName,
2799
        **kwargs,
2800
    ) -> PutFunctionCodeSigningConfigResponse:
2801
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2802
        state = lambda_stores[account_id][region]
1✔
2803
        function_name = api_utils.get_function_name(function_name, context)
1✔
2804

2805
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2806
        if not csc:
1✔
2807
            raise CodeSigningConfigNotFoundException(
1✔
2808
                f"The code signing configuration cannot be found. Check that the provided configuration is not deleted: {code_signing_config_arn}.",
2809
                Type="User",
2810
            )
2811

2812
        fn = state.functions.get(function_name)
1✔
2813
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2814
        if not fn:
1✔
2815
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
2816

2817
        fn.code_signing_config_arn = code_signing_config_arn
1✔
2818
        return PutFunctionCodeSigningConfigResponse(
1✔
2819
            CodeSigningConfigArn=code_signing_config_arn, FunctionName=function_name
2820
        )
2821

2822
    def update_code_signing_config(
1✔
2823
        self,
2824
        context: RequestContext,
2825
        code_signing_config_arn: CodeSigningConfigArn,
2826
        description: Description = None,
2827
        allowed_publishers: AllowedPublishers = None,
2828
        code_signing_policies: CodeSigningPolicies = None,
2829
        **kwargs,
2830
    ) -> UpdateCodeSigningConfigResponse:
2831
        state = lambda_stores[context.account_id][context.region]
1✔
2832
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2833
        if not csc:
1✔
2834
            raise ResourceNotFoundException(
1✔
2835
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2836
            )
2837

2838
        changes = {
1✔
2839
            **(
2840
                {"allowed_publishers": allowed_publishers} if allowed_publishers is not None else {}
2841
            ),
2842
            **({"policies": code_signing_policies} if code_signing_policies is not None else {}),
2843
            **({"description": description} if description is not None else {}),
2844
        }
2845
        new_csc = dataclasses.replace(
1✔
2846
            csc, last_modified=api_utils.generate_lambda_date(), **changes
2847
        )
2848
        state.code_signing_configs[code_signing_config_arn] = new_csc
1✔
2849

2850
        return UpdateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(new_csc))
1✔
2851

2852
    def get_code_signing_config(
1✔
2853
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
2854
    ) -> GetCodeSigningConfigResponse:
2855
        state = lambda_stores[context.account_id][context.region]
1✔
2856
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2857
        if not csc:
1✔
2858
            raise ResourceNotFoundException(
1✔
2859
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2860
            )
2861

2862
        return GetCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
2863

2864
    def get_function_code_signing_config(
1✔
2865
        self, context: RequestContext, function_name: FunctionName, **kwargs
2866
    ) -> GetFunctionCodeSigningConfigResponse:
2867
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2868
        state = lambda_stores[account_id][region]
1✔
2869
        function_name = api_utils.get_function_name(function_name, context)
1✔
2870
        fn = state.functions.get(function_name)
1✔
2871
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2872
        if not fn:
1✔
2873
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
2874

2875
        if fn.code_signing_config_arn:
1✔
2876
            return GetFunctionCodeSigningConfigResponse(
1✔
2877
                CodeSigningConfigArn=fn.code_signing_config_arn, FunctionName=function_name
2878
            )
2879

2880
        return GetFunctionCodeSigningConfigResponse()
1✔
2881

2882
    def delete_function_code_signing_config(
1✔
2883
        self, context: RequestContext, function_name: FunctionName, **kwargs
2884
    ) -> None:
2885
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2886
        state = lambda_stores[account_id][region]
1✔
2887
        function_name = api_utils.get_function_name(function_name, context)
1✔
2888
        fn = state.functions.get(function_name)
1✔
2889
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2890
        if not fn:
1✔
2891
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
2892

2893
        fn.code_signing_config_arn = None
1✔
2894

2895
    def delete_code_signing_config(
1✔
2896
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
2897
    ) -> DeleteCodeSigningConfigResponse:
2898
        state = lambda_stores[context.account_id][context.region]
1✔
2899

2900
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2901
        if not csc:
1✔
2902
            raise ResourceNotFoundException(
1✔
2903
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2904
            )
2905

2906
        del state.code_signing_configs[code_signing_config_arn]
1✔
2907

2908
        return DeleteCodeSigningConfigResponse()
1✔
2909

2910
    def list_code_signing_configs(
1✔
2911
        self,
2912
        context: RequestContext,
2913
        marker: String = None,
2914
        max_items: MaxListItems = None,
2915
        **kwargs,
2916
    ) -> ListCodeSigningConfigsResponse:
2917
        state = lambda_stores[context.account_id][context.region]
1✔
2918

2919
        cscs = [api_utils.map_csc(csc) for csc in state.code_signing_configs.values()]
1✔
2920
        cscs = PaginatedList(cscs)
1✔
2921
        page, token = cscs.get_page(
1✔
2922
            lambda csc: csc["CodeSigningConfigId"],
2923
            marker,
2924
            max_items,
2925
        )
2926
        return ListCodeSigningConfigsResponse(CodeSigningConfigs=page, NextMarker=token)
1✔
2927

2928
    def list_functions_by_code_signing_config(
1✔
2929
        self,
2930
        context: RequestContext,
2931
        code_signing_config_arn: CodeSigningConfigArn,
2932
        marker: String = None,
2933
        max_items: MaxListItems = None,
2934
        **kwargs,
2935
    ) -> ListFunctionsByCodeSigningConfigResponse:
2936
        account = context.account_id
1✔
2937
        region = context.region
1✔
2938

2939
        state = lambda_stores[account][region]
1✔
2940

2941
        if code_signing_config_arn not in state.code_signing_configs:
1✔
2942
            raise ResourceNotFoundException(
1✔
2943
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2944
            )
2945

2946
        fn_arns = [
1✔
2947
            api_utils.unqualified_lambda_arn(fn.function_name, account, region)
2948
            for fn in state.functions.values()
2949
            if fn.code_signing_config_arn == code_signing_config_arn
2950
        ]
2951

2952
        cscs = PaginatedList(fn_arns)
1✔
2953
        page, token = cscs.get_page(
1✔
2954
            lambda x: x,
2955
            marker,
2956
            max_items,
2957
        )
2958
        return ListFunctionsByCodeSigningConfigResponse(FunctionArns=page, NextMarker=token)
1✔
2959

2960
    # =======================================
2961
    # =========  Account Settings   =========
2962
    # =======================================
2963

2964
    # CAVE: these settings & usages are *per* region!
2965
    # Lambda quotas: https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
2966
    def get_account_settings(self, context: RequestContext, **kwargs) -> GetAccountSettingsResponse:
1✔
2967
        state = lambda_stores[context.account_id][context.region]
1✔
2968

2969
        fn_count = 0
1✔
2970
        code_size_sum = 0
1✔
2971
        reserved_concurrency_sum = 0
1✔
2972
        for fn in state.functions.values():
1✔
2973
            fn_count += 1
1✔
2974
            for fn_version in fn.versions.values():
1✔
2975
                # Image-based Lambdas do not have a code attribute and count against the ECR quotas instead
2976
                if fn_version.config.package_type == PackageType.Zip:
1✔
2977
                    code_size_sum += fn_version.config.code.code_size
1✔
2978
            if fn.reserved_concurrent_executions is not None:
1✔
2979
                reserved_concurrency_sum += fn.reserved_concurrent_executions
1✔
2980
            for c in fn.provisioned_concurrency_configs.values():
1✔
2981
                reserved_concurrency_sum += c.provisioned_concurrent_executions
1✔
2982
        for layer in state.layers.values():
1✔
2983
            for layer_version in layer.layer_versions.values():
1✔
2984
                code_size_sum += layer_version.code.code_size
1✔
2985
        return GetAccountSettingsResponse(
1✔
2986
            AccountLimit=AccountLimit(
2987
                TotalCodeSize=config.LAMBDA_LIMITS_TOTAL_CODE_SIZE,
2988
                CodeSizeZipped=config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED,
2989
                CodeSizeUnzipped=config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED,
2990
                ConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS,
2991
                UnreservedConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS
2992
                - reserved_concurrency_sum,
2993
            ),
2994
            AccountUsage=AccountUsage(
2995
                TotalCodeSize=code_size_sum,
2996
                FunctionCount=fn_count,
2997
            ),
2998
        )
2999

3000
    # =======================================
3001
    # ==  Provisioned Concurrency Config   ==
3002
    # =======================================
3003

3004
    def _get_provisioned_config(
1✔
3005
        self, context: RequestContext, function_name: str, qualifier: str
3006
    ) -> ProvisionedConcurrencyConfiguration | None:
3007
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3008
        state = lambda_stores[account_id][region]
1✔
3009
        function_name = api_utils.get_function_name(function_name, context)
1✔
3010
        fn = state.functions.get(function_name)
1✔
3011
        if api_utils.qualifier_is_alias(qualifier):
1✔
3012
            fn_alias = None
1✔
3013
            if fn:
1✔
3014
                fn_alias = fn.aliases.get(qualifier)
1✔
3015
            if fn_alias is None:
1✔
3016
                raise ResourceNotFoundException(
1✔
3017
                    f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3018
                    Type="User",
3019
                )
3020
        elif api_utils.qualifier_is_version(qualifier):
1✔
3021
            fn_version = None
1✔
3022
            if fn:
1✔
3023
                fn_version = fn.versions.get(qualifier)
1✔
3024
            if fn_version is None:
1✔
3025
                raise ResourceNotFoundException(
1✔
3026
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3027
                    Type="User",
3028
                )
3029

3030
        return fn.provisioned_concurrency_configs.get(qualifier)
1✔
3031

3032
    def put_provisioned_concurrency_config(
1✔
3033
        self,
3034
        context: RequestContext,
3035
        function_name: FunctionName,
3036
        qualifier: Qualifier,
3037
        provisioned_concurrent_executions: PositiveInteger,
3038
        **kwargs,
3039
    ) -> PutProvisionedConcurrencyConfigResponse:
3040
        if provisioned_concurrent_executions <= 0:
1✔
3041
            raise ValidationException(
1✔
3042
                f"1 validation error detected: Value '{provisioned_concurrent_executions}' at 'provisionedConcurrentExecutions' failed to satisfy constraint: Member must have value greater than or equal to 1"
3043
            )
3044

3045
        if qualifier == "$LATEST":
1✔
3046
            raise InvalidParameterValueException(
1✔
3047
                "Provisioned Concurrency Configs cannot be applied to unpublished function versions.",
3048
                Type="User",
3049
            )
3050
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3051
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3052
            function_name, qualifier, context
3053
        )
3054
        state = lambda_stores[account_id][region]
1✔
3055
        fn = state.functions.get(function_name)
1✔
3056

3057
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3058

3059
        if provisioned_config:  # TODO: merge?
1✔
3060
            # TODO: add a test for partial updates (if possible)
3061
            LOG.warning(
1✔
3062
                "Partial update of provisioned concurrency config is currently not supported."
3063
            )
3064

3065
        other_provisioned_sum = sum(
1✔
3066
            [
3067
                provisioned_configs.provisioned_concurrent_executions
3068
                for provisioned_qualifier, provisioned_configs in fn.provisioned_concurrency_configs.items()
3069
                if provisioned_qualifier != qualifier
3070
            ]
3071
        )
3072

3073
        if (
1✔
3074
            fn.reserved_concurrent_executions is not None
3075
            and fn.reserved_concurrent_executions
3076
            < other_provisioned_sum + provisioned_concurrent_executions
3077
        ):
3078
            raise InvalidParameterValueException(
1✔
3079
                "Requested Provisioned Concurrency should not be greater than the reservedConcurrentExecution for function",
3080
                Type="User",
3081
            )
3082

3083
        if provisioned_concurrent_executions > config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS:
1✔
3084
            raise InvalidParameterValueException(
1✔
3085
                f"Specified ConcurrentExecutions for function is greater than account's unreserved concurrency"
3086
                f" [{config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS}]."
3087
            )
3088

3089
        settings = self.get_account_settings(context)
1✔
3090
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
3091
            "UnreservedConcurrentExecutions"
3092
        ]
3093
        if (
1✔
3094
            unreserved_concurrent_executions - provisioned_concurrent_executions
3095
            < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY
3096
        ):
3097
            raise InvalidParameterValueException(
1✔
3098
                f"Specified ConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below"
3099
                f" its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
3100
            )
3101

3102
        provisioned_config = ProvisionedConcurrencyConfiguration(
1✔
3103
            provisioned_concurrent_executions, api_utils.generate_lambda_date()
3104
        )
3105
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3106

3107
        if api_utils.qualifier_is_alias(qualifier):
1✔
3108
            alias = fn.aliases.get(qualifier)
1✔
3109
            resolved_version = fn.versions.get(alias.function_version)
1✔
3110

3111
            if (
1✔
3112
                resolved_version
3113
                and fn.provisioned_concurrency_configs.get(alias.function_version) is not None
3114
            ):
3115
                raise ResourceConflictException(
1✔
3116
                    "Alias can't be used for Provisioned Concurrency configuration on an already Provisioned version",
3117
                    Type="User",
3118
                )
3119
            fn_arn = resolved_version.id.qualified_arn()
1✔
3120
        elif api_utils.qualifier_is_version(qualifier):
1✔
3121
            fn_version = fn.versions.get(qualifier)
1✔
3122

3123
            # TODO: might be useful other places, utilize
3124
            pointing_aliases = []
1✔
3125
            for alias in fn.aliases.values():
1✔
3126
                if (
1✔
3127
                    alias.function_version == qualifier
3128
                    and fn.provisioned_concurrency_configs.get(alias.name) is not None
3129
                ):
3130
                    pointing_aliases.append(alias.name)
1✔
3131
            if pointing_aliases:
1✔
3132
                raise ResourceConflictException(
1✔
3133
                    "Version is pointed by a Provisioned Concurrency alias", Type="User"
3134
                )
3135

3136
            fn_arn = fn_version.id.qualified_arn()
1✔
3137

3138
        manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3139

3140
        fn.provisioned_concurrency_configs[qualifier] = provisioned_config
1✔
3141

3142
        manager.update_provisioned_concurrency_config(
1✔
3143
            provisioned_config.provisioned_concurrent_executions
3144
        )
3145

3146
        return PutProvisionedConcurrencyConfigResponse(
1✔
3147
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3148
            AvailableProvisionedConcurrentExecutions=0,
3149
            AllocatedProvisionedConcurrentExecutions=0,
3150
            Status=ProvisionedConcurrencyStatusEnum.IN_PROGRESS,
3151
            # StatusReason=manager.provisioned_state.status_reason,
3152
            LastModified=provisioned_config.last_modified,  # TODO: does change with configuration or also with state changes?
3153
        )
3154

3155
    def get_provisioned_concurrency_config(
1✔
3156
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3157
    ) -> GetProvisionedConcurrencyConfigResponse:
3158
        if qualifier == "$LATEST":
1✔
3159
            raise InvalidParameterValueException(
1✔
3160
                "The function resource provided must be an alias or a published version.",
3161
                Type="User",
3162
            )
3163
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3164
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3165
            function_name, qualifier, context
3166
        )
3167

3168
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3169
        if not provisioned_config:
1✔
3170
            raise ProvisionedConcurrencyConfigNotFoundException(
1✔
3171
                "No Provisioned Concurrency Config found for this function", Type="User"
3172
            )
3173

3174
        # TODO: make this compatible with alias pointer migration on update
3175
        if api_utils.qualifier_is_alias(qualifier):
1✔
3176
            state = lambda_stores[account_id][region]
1✔
3177
            fn = state.functions.get(function_name)
1✔
3178
            alias = fn.aliases.get(qualifier)
1✔
3179
            fn_arn = api_utils.qualified_lambda_arn(
1✔
3180
                function_name, alias.function_version, account_id, region
3181
            )
3182
        else:
3183
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3184

3185
        ver_manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3186

3187
        return GetProvisionedConcurrencyConfigResponse(
1✔
3188
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3189
            LastModified=provisioned_config.last_modified,
3190
            AvailableProvisionedConcurrentExecutions=ver_manager.provisioned_state.available,
3191
            AllocatedProvisionedConcurrentExecutions=ver_manager.provisioned_state.allocated,
3192
            Status=ver_manager.provisioned_state.status,
3193
            StatusReason=ver_manager.provisioned_state.status_reason,
3194
        )
3195

3196
    def list_provisioned_concurrency_configs(
1✔
3197
        self,
3198
        context: RequestContext,
3199
        function_name: FunctionName,
3200
        marker: String = None,
3201
        max_items: MaxProvisionedConcurrencyConfigListItems = None,
3202
        **kwargs,
3203
    ) -> ListProvisionedConcurrencyConfigsResponse:
3204
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3205
        state = lambda_stores[account_id][region]
1✔
3206

3207
        function_name = api_utils.get_function_name(function_name, context)
1✔
3208
        fn = state.functions.get(function_name)
1✔
3209
        if fn is None:
1✔
3210
            raise ResourceNotFoundException(
1✔
3211
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
3212
                Type="User",
3213
            )
3214

3215
        configs = []
1✔
3216
        for qualifier, pc_config in fn.provisioned_concurrency_configs.items():
1✔
UNCOV
3217
            if api_utils.qualifier_is_alias(qualifier):
×
3218
                alias = fn.aliases.get(qualifier)
×
UNCOV
3219
                fn_arn = api_utils.qualified_lambda_arn(
×
3220
                    function_name, alias.function_version, account_id, region
3221
                )
3222
            else:
UNCOV
3223
                fn_arn = api_utils.qualified_lambda_arn(
×
3224
                    function_name, qualifier, account_id, region
3225
                )
3226

UNCOV
3227
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
3228

UNCOV
3229
            configs.append(
×
3230
                ProvisionedConcurrencyConfigListItem(
3231
                    FunctionArn=api_utils.qualified_lambda_arn(
3232
                        function_name, qualifier, account_id, region
3233
                    ),
3234
                    RequestedProvisionedConcurrentExecutions=pc_config.provisioned_concurrent_executions,
3235
                    AvailableProvisionedConcurrentExecutions=manager.provisioned_state.available,
3236
                    AllocatedProvisionedConcurrentExecutions=manager.provisioned_state.allocated,
3237
                    Status=manager.provisioned_state.status,
3238
                    StatusReason=manager.provisioned_state.status_reason,
3239
                    LastModified=pc_config.last_modified,
3240
                )
3241
            )
3242

3243
        provisioned_concurrency_configs = configs
1✔
3244
        provisioned_concurrency_configs = PaginatedList(provisioned_concurrency_configs)
1✔
3245
        page, token = provisioned_concurrency_configs.get_page(
1✔
3246
            lambda x: x,
3247
            marker,
3248
            max_items,
3249
        )
3250
        return ListProvisionedConcurrencyConfigsResponse(
1✔
3251
            ProvisionedConcurrencyConfigs=page, NextMarker=token
3252
        )
3253

3254
    def delete_provisioned_concurrency_config(
1✔
3255
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3256
    ) -> None:
3257
        if qualifier == "$LATEST":
1✔
3258
            raise InvalidParameterValueException(
1✔
3259
                "The function resource provided must be an alias or a published version.",
3260
                Type="User",
3261
            )
3262
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3263
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3264
            function_name, qualifier, context
3265
        )
3266
        state = lambda_stores[account_id][region]
1✔
3267
        fn = state.functions.get(function_name)
1✔
3268

3269
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3270
        # delete is idempotent and doesn't actually care about the provisioned concurrency config not existing
3271
        if provisioned_config:
1✔
3272
            fn.provisioned_concurrency_configs.pop(qualifier)
1✔
3273
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3274
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3275
            manager.update_provisioned_concurrency_config(0)
1✔
3276

3277
    # =======================================
3278
    # =======  Event Invoke Config   ========
3279
    # =======================================
3280

3281
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)"
3282
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]2((-gov)|(-iso(b?)))?-[a-z]+-\\d1)?:(\\d12)?:(.*)" ... (expected → actual)
3283

3284
    def _validate_destination_config(
1✔
3285
        self, store: LambdaStore, function_name: str, destination_config: DestinationConfig
3286
    ):
3287
        def _validate_destination_arn(destination_arn) -> bool:
1✔
3288
            if not api_utils.DESTINATION_ARN_PATTERN.match(destination_arn):
1✔
3289
                # technically we shouldn't handle this in the provider
3290
                raise ValidationException(
1✔
3291
                    "1 validation error detected: Value '"
3292
                    + destination_arn
3293
                    + r"' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\-])+:([a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\d{1})?:(\d{12})?:(.*)"
3294
                )
3295

3296
            match destination_arn.split(":")[2]:
1✔
3297
                case "lambda":
1✔
3298
                    fn_parts = api_utils.FULL_FN_ARN_PATTERN.search(destination_arn).groupdict()
1✔
3299
                    if fn_parts:
1✔
3300
                        # check if it exists
3301
                        fn = store.functions.get(fn_parts["function_name"])
1✔
3302
                        if not fn:
1✔
3303
                            raise InvalidParameterValueException(
1✔
3304
                                f"The destination ARN {destination_arn} is invalid.", Type="User"
3305
                            )
3306
                        if fn_parts["function_name"] == function_name:
1✔
3307
                            raise InvalidParameterValueException(
1✔
3308
                                "You can't specify the function as a destination for itself.",
3309
                                Type="User",
3310
                            )
3311
                case "sns" | "sqs" | "events":
1✔
3312
                    pass
1✔
3313
                case _:
1✔
3314
                    return False
1✔
3315
            return True
1✔
3316

3317
        validation_err = False
1✔
3318

3319
        failure_destination = destination_config.get("OnFailure", {}).get("Destination")
1✔
3320
        if failure_destination:
1✔
3321
            validation_err = validation_err or not _validate_destination_arn(failure_destination)
1✔
3322

3323
        success_destination = destination_config.get("OnSuccess", {}).get("Destination")
1✔
3324
        if success_destination:
1✔
3325
            validation_err = validation_err or not _validate_destination_arn(success_destination)
1✔
3326

3327
        if validation_err:
1✔
3328
            on_success_part = (
1✔
3329
                f"OnSuccess(destination={success_destination})" if success_destination else "null"
3330
            )
3331
            on_failure_part = (
1✔
3332
                f"OnFailure(destination={failure_destination})" if failure_destination else "null"
3333
            )
3334
            raise InvalidParameterValueException(
1✔
3335
                f"The provided destination config DestinationConfig(onSuccess={on_success_part}, onFailure={on_failure_part}) is invalid.",
3336
                Type="User",
3337
            )
3338

3339
    def put_function_event_invoke_config(
1✔
3340
        self,
3341
        context: RequestContext,
3342
        function_name: FunctionName,
3343
        qualifier: Qualifier = None,
3344
        maximum_retry_attempts: MaximumRetryAttempts = None,
3345
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3346
        destination_config: DestinationConfig = None,
3347
        **kwargs,
3348
    ) -> FunctionEventInvokeConfig:
3349
        """
3350
        Destination ARNs can be:
3351
        * SQS arn
3352
        * SNS arn
3353
        * Lambda arn
3354
        * EventBridge arn
3355

3356
        Differences between put_ and update_:
3357
            * put overwrites any existing config
3358
            * update allows changes only single values while keeping the rest of existing ones
3359
            * update fails on non-existing configs
3360

3361
        Differences between destination and DLQ
3362
            * "However, a dead-letter queue is part of a function's version-specific configuration, so it is locked in when you publish a version."
3363
            *  "On-failure destinations also support additional targets and include details about the function's response in the invocation record."
3364

3365
        """
3366
        if (
1✔
3367
            maximum_event_age_in_seconds is None
3368
            and maximum_retry_attempts is None
3369
            and destination_config is None
3370
        ):
3371
            raise InvalidParameterValueException(
1✔
3372
                "You must specify at least one of error handling or destination setting.",
3373
                Type="User",
3374
            )
3375
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3376
        state = lambda_stores[account_id][region]
1✔
3377
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3378
            function_name, qualifier, context
3379
        )
3380
        fn = state.functions.get(function_name)
1✔
3381
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3382
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3383

3384
        qualifier = qualifier or "$LATEST"
1✔
3385

3386
        # validate and normalize destination config
3387
        if destination_config:
1✔
3388
            self._validate_destination_config(state, function_name, destination_config)
1✔
3389

3390
        destination_config = DestinationConfig(
1✔
3391
            OnSuccess=OnSuccess(
3392
                Destination=(destination_config or {}).get("OnSuccess", {}).get("Destination")
3393
            ),
3394
            OnFailure=OnFailure(
3395
                Destination=(destination_config or {}).get("OnFailure", {}).get("Destination")
3396
            ),
3397
        )
3398

3399
        config = EventInvokeConfig(
1✔
3400
            function_name=function_name,
3401
            qualifier=qualifier,
3402
            maximum_event_age_in_seconds=maximum_event_age_in_seconds,
3403
            maximum_retry_attempts=maximum_retry_attempts,
3404
            last_modified=api_utils.generate_lambda_date(),
3405
            destination_config=destination_config,
3406
        )
3407
        fn.event_invoke_configs[qualifier] = config
1✔
3408

3409
        return FunctionEventInvokeConfig(
1✔
3410
            LastModified=datetime.datetime.strptime(
3411
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3412
            ),
3413
            FunctionArn=api_utils.qualified_lambda_arn(
3414
                function_name, qualifier or "$LATEST", account_id, region
3415
            ),
3416
            DestinationConfig=destination_config,
3417
            MaximumEventAgeInSeconds=maximum_event_age_in_seconds,
3418
            MaximumRetryAttempts=maximum_retry_attempts,
3419
        )
3420

3421
    def get_function_event_invoke_config(
1✔
3422
        self,
3423
        context: RequestContext,
3424
        function_name: FunctionName,
3425
        qualifier: Qualifier = None,
3426
        **kwargs,
3427
    ) -> FunctionEventInvokeConfig:
3428
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3429
        state = lambda_stores[account_id][region]
1✔
3430
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3431
            function_name, qualifier, context
3432
        )
3433

3434
        qualifier = qualifier or "$LATEST"
1✔
3435
        fn = state.functions.get(function_name)
1✔
3436
        if not fn:
1✔
3437
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3438
            raise ResourceNotFoundException(
1✔
3439
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3440
            )
3441

3442
        config = fn.event_invoke_configs.get(qualifier)
1✔
3443
        if not config:
1✔
3444
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3445
            raise ResourceNotFoundException(
1✔
3446
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3447
            )
3448

3449
        return FunctionEventInvokeConfig(
1✔
3450
            LastModified=datetime.datetime.strptime(
3451
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3452
            ),
3453
            FunctionArn=api_utils.qualified_lambda_arn(
3454
                function_name, qualifier, account_id, region
3455
            ),
3456
            DestinationConfig=config.destination_config,
3457
            MaximumEventAgeInSeconds=config.maximum_event_age_in_seconds,
3458
            MaximumRetryAttempts=config.maximum_retry_attempts,
3459
        )
3460

3461
    def list_function_event_invoke_configs(
1✔
3462
        self,
3463
        context: RequestContext,
3464
        function_name: FunctionName,
3465
        marker: String = None,
3466
        max_items: MaxFunctionEventInvokeConfigListItems = None,
3467
        **kwargs,
3468
    ) -> ListFunctionEventInvokeConfigsResponse:
3469
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3470
        state = lambda_stores[account_id][region]
1✔
3471
        fn = state.functions.get(function_name)
1✔
3472
        if not fn:
1✔
3473
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3474

3475
        event_invoke_configs = [
1✔
3476
            FunctionEventInvokeConfig(
3477
                LastModified=c.last_modified,
3478
                FunctionArn=api_utils.qualified_lambda_arn(
3479
                    function_name, c.qualifier, account_id, region
3480
                ),
3481
                MaximumEventAgeInSeconds=c.maximum_event_age_in_seconds,
3482
                MaximumRetryAttempts=c.maximum_retry_attempts,
3483
                DestinationConfig=c.destination_config,
3484
            )
3485
            for c in fn.event_invoke_configs.values()
3486
        ]
3487

3488
        event_invoke_configs = PaginatedList(event_invoke_configs)
1✔
3489
        page, token = event_invoke_configs.get_page(
1✔
3490
            lambda x: x["FunctionArn"],
3491
            marker,
3492
            max_items,
3493
        )
3494
        return ListFunctionEventInvokeConfigsResponse(
1✔
3495
            FunctionEventInvokeConfigs=page, NextMarker=token
3496
        )
3497

3498
    def delete_function_event_invoke_config(
1✔
3499
        self,
3500
        context: RequestContext,
3501
        function_name: FunctionName,
3502
        qualifier: Qualifier = None,
3503
        **kwargs,
3504
    ) -> None:
3505
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3506
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3507
            function_name, qualifier, context
3508
        )
3509
        state = lambda_stores[account_id][region]
1✔
3510
        fn = state.functions.get(function_name)
1✔
3511
        resolved_qualifier = qualifier or "$LATEST"
1✔
3512
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3513
        if not fn:
1✔
3514
            raise ResourceNotFoundException(
1✔
3515
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3516
            )
3517

3518
        config = fn.event_invoke_configs.get(resolved_qualifier)
1✔
3519
        if not config:
1✔
3520
            raise ResourceNotFoundException(
1✔
3521
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3522
            )
3523

3524
        del fn.event_invoke_configs[resolved_qualifier]
1✔
3525

3526
    def update_function_event_invoke_config(
1✔
3527
        self,
3528
        context: RequestContext,
3529
        function_name: FunctionName,
3530
        qualifier: Qualifier = None,
3531
        maximum_retry_attempts: MaximumRetryAttempts = None,
3532
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3533
        destination_config: DestinationConfig = None,
3534
        **kwargs,
3535
    ) -> FunctionEventInvokeConfig:
3536
        # like put but only update single fields via replace
3537
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3538
        state = lambda_stores[account_id][region]
1✔
3539
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3540
            function_name, qualifier, context
3541
        )
3542

3543
        if (
1✔
3544
            maximum_event_age_in_seconds is None
3545
            and maximum_retry_attempts is None
3546
            and destination_config is None
3547
        ):
UNCOV
3548
            raise InvalidParameterValueException(
×
3549
                "You must specify at least one of error handling or destination setting.",
3550
                Type="User",
3551
            )
3552

3553
        fn = state.functions.get(function_name)
1✔
3554
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3555
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3556

3557
        qualifier = qualifier or "$LATEST"
1✔
3558

3559
        config = fn.event_invoke_configs.get(qualifier)
1✔
3560
        if not config:
1✔
3561
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3562
            raise ResourceNotFoundException(
1✔
3563
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3564
            )
3565

3566
        if destination_config:
1✔
UNCOV
3567
            self._validate_destination_config(state, function_name, destination_config)
×
3568

3569
        optional_kwargs = {
1✔
3570
            k: v
3571
            for k, v in {
3572
                "destination_config": destination_config,
3573
                "maximum_retry_attempts": maximum_retry_attempts,
3574
                "maximum_event_age_in_seconds": maximum_event_age_in_seconds,
3575
            }.items()
3576
            if v is not None
3577
        }
3578

3579
        new_config = dataclasses.replace(
1✔
3580
            config, last_modified=api_utils.generate_lambda_date(), **optional_kwargs
3581
        )
3582
        fn.event_invoke_configs[qualifier] = new_config
1✔
3583

3584
        return FunctionEventInvokeConfig(
1✔
3585
            LastModified=datetime.datetime.strptime(
3586
                new_config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3587
            ),
3588
            FunctionArn=api_utils.qualified_lambda_arn(
3589
                function_name, qualifier or "$LATEST", account_id, region
3590
            ),
3591
            DestinationConfig=new_config.destination_config,
3592
            MaximumEventAgeInSeconds=new_config.maximum_event_age_in_seconds,
3593
            MaximumRetryAttempts=new_config.maximum_retry_attempts,
3594
        )
3595

3596
    # =======================================
3597
    # ======  Layer & Layer Versions  =======
3598
    # =======================================
3599

3600
    @staticmethod
1✔
3601
    def _resolve_layer(
1✔
3602
        layer_name_or_arn: str, context: RequestContext
3603
    ) -> Tuple[str, str, str, Optional[str]]:
3604
        """
3605
        Return locator attributes for a given Lambda layer.
3606

3607
        :param layer_name_or_arn: Layer name or ARN
3608
        :param context: Request context
3609
        :return: Tuple of region, account ID, layer name, layer version
3610
        """
3611
        if api_utils.is_layer_arn(layer_name_or_arn):
1✔
3612
            return api_utils.parse_layer_arn(layer_name_or_arn)
1✔
3613

3614
        return context.region, context.account_id, layer_name_or_arn, None
1✔
3615

3616
    def publish_layer_version(
1✔
3617
        self,
3618
        context: RequestContext,
3619
        layer_name: LayerName,
3620
        content: LayerVersionContentInput,
3621
        description: Description = None,
3622
        compatible_runtimes: CompatibleRuntimes = None,
3623
        license_info: LicenseInfo = None,
3624
        compatible_architectures: CompatibleArchitectures = None,
3625
        **kwargs,
3626
    ) -> PublishLayerVersionResponse:
3627
        """
3628
        On first use of a LayerName a new layer is created and for each subsequent call with the same LayerName a new version is created.
3629
        Note that there are no $LATEST versions with layers!
3630

3631
        """
3632
        account = context.account_id
1✔
3633
        region = context.region
1✔
3634

3635
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
3636
            compatible_runtimes, compatible_architectures
3637
        )
3638
        if validation_errors:
1✔
3639
            raise ValidationException(
1✔
3640
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {'; '.join(validation_errors)}"
3641
            )
3642

3643
        state = lambda_stores[account][region]
1✔
3644
        with self.create_layer_lock:
1✔
3645
            if layer_name not in state.layers:
1✔
3646
                # we don't have a version so create new layer object
3647
                # lock is required to avoid creating two v1 objects for the same name
3648
                layer = Layer(
1✔
3649
                    arn=api_utils.layer_arn(layer_name=layer_name, account=account, region=region)
3650
                )
3651
                state.layers[layer_name] = layer
1✔
3652

3653
        layer = state.layers[layer_name]
1✔
3654
        with layer.next_version_lock:
1✔
3655
            next_version = LambdaLayerVersionIdentifier(
1✔
3656
                account_id=account, region=region, layer_name=layer_name
3657
            ).generate(next_version=layer.next_version)
3658
            # When creating a layer with user defined layer version, it is possible that we
3659
            # create layer versions out of order.
3660
            # ie. a user could replicate layer v2 then layer v1. It is important to always keep the maximum possible
3661
            # value for next layer to avoid overwriting existing versions
3662
            if layer.next_version <= next_version:
1✔
3663
                # We don't need to update layer.next_version if the created version is lower than the "next in line"
3664
                layer.next_version = max(next_version, layer.next_version) + 1
1✔
3665

3666
        # creating a new layer
3667
        if content.get("ZipFile"):
1✔
3668
            code = store_lambda_archive(
1✔
3669
                archive_file=content["ZipFile"],
3670
                function_name=layer_name,
3671
                region_name=region,
3672
                account_id=account,
3673
            )
3674
        else:
3675
            code = store_s3_bucket_archive(
1✔
3676
                archive_bucket=content["S3Bucket"],
3677
                archive_key=content["S3Key"],
3678
                archive_version=content.get("S3ObjectVersion"),
3679
                function_name=layer_name,
3680
                region_name=region,
3681
                account_id=account,
3682
            )
3683

3684
        new_layer_version = LayerVersion(
1✔
3685
            layer_version_arn=api_utils.layer_version_arn(
3686
                layer_name=layer_name,
3687
                account=account,
3688
                region=region,
3689
                version=str(next_version),
3690
            ),
3691
            layer_arn=layer.arn,
3692
            version=next_version,
3693
            description=description or "",
3694
            license_info=license_info,
3695
            compatible_runtimes=compatible_runtimes,
3696
            compatible_architectures=compatible_architectures,
3697
            created=api_utils.generate_lambda_date(),
3698
            code=code,
3699
        )
3700

3701
        layer.layer_versions[str(next_version)] = new_layer_version
1✔
3702

3703
        return api_utils.map_layer_out(new_layer_version)
1✔
3704

3705
    def get_layer_version(
1✔
3706
        self,
3707
        context: RequestContext,
3708
        layer_name: LayerName,
3709
        version_number: LayerVersionNumber,
3710
        **kwargs,
3711
    ) -> GetLayerVersionResponse:
3712
        # TODO: handle layer_name as an ARN
3713

3714
        region_name, account_id, layer_name, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
3715
        state = lambda_stores[account_id][region_name]
1✔
3716

3717
        layer = state.layers.get(layer_name)
1✔
3718
        if version_number < 1:
1✔
3719
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
3720
        if layer is None:
1✔
3721
            raise ResourceNotFoundException(
1✔
3722
                "The resource you requested does not exist.", Type="User"
3723
            )
3724
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3725
        if layer_version is None:
1✔
3726
            raise ResourceNotFoundException(
1✔
3727
                "The resource you requested does not exist.", Type="User"
3728
            )
3729
        return api_utils.map_layer_out(layer_version)
1✔
3730

3731
    def get_layer_version_by_arn(
1✔
3732
        self, context: RequestContext, arn: LayerVersionArn, **kwargs
3733
    ) -> GetLayerVersionResponse:
3734
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3735
            arn, context
3736
        )
3737

3738
        if not layer_version:
1✔
3739
            raise ValidationException(
1✔
3740
                f"1 validation error detected: Value '{arn}' at 'arn' failed to satisfy constraint: Member must satisfy regular expression pattern: "
3741
                + "(arn:(aws[a-zA-Z-]*)?:lambda:[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+)"
3742
            )
3743

3744
        store = lambda_stores[account_id][region_name]
1✔
3745
        if not (layers := store.layers.get(layer_name)):
1✔
UNCOV
3746
            raise ResourceNotFoundException(
×
3747
                "The resource you requested does not exist.", Type="User"
3748
            )
3749

3750
        layer_version = layers.layer_versions.get(layer_version)
1✔
3751

3752
        if not layer_version:
1✔
3753
            raise ResourceNotFoundException(
1✔
3754
                "The resource you requested does not exist.", Type="User"
3755
            )
3756

3757
        return api_utils.map_layer_out(layer_version)
1✔
3758

3759
    def list_layers(
1✔
3760
        self,
3761
        context: RequestContext,
3762
        compatible_runtime: Runtime = None,
3763
        marker: String = None,
3764
        max_items: MaxLayerListItems = None,
3765
        compatible_architecture: Architecture = None,
3766
        **kwargs,
3767
    ) -> ListLayersResponse:
3768
        validation_errors = []
1✔
3769

3770
        validation_error_arch = api_utils.validate_layer_architecture(compatible_architecture)
1✔
3771
        if validation_error_arch:
1✔
3772
            validation_errors.append(validation_error_arch)
1✔
3773

3774
        validation_error_runtime = api_utils.validate_layer_runtime(compatible_runtime)
1✔
3775
        if validation_error_runtime:
1✔
3776
            validation_errors.append(validation_error_runtime)
1✔
3777

3778
        if validation_errors:
1✔
3779
            raise ValidationException(
1✔
3780
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
3781
            )
3782
        # TODO: handle filter: compatible_runtime
3783
        # TODO: handle filter: compatible_architecture
3784

3785
        state = lambda_stores[context.account_id][context.region]
×
3786
        layers = state.layers
×
3787

3788
        # TODO: test how filters behave together with only returning layers here? Does it return the latest "matching" layer, i.e. does it ignore later layer versions that don't match?
3789

UNCOV
3790
        responses: list[LayersListItem] = []
×
UNCOV
3791
        for layer_name, layer in layers.items():
×
3792
            # fetch latest version
UNCOV
3793
            layer_versions = list(layer.layer_versions.values())
×
UNCOV
3794
            sorted(layer_versions, key=lambda x: x.version)
×
3795
            latest_layer_version = layer_versions[-1]
×
3796
            responses.append(
×
3797
                LayersListItem(
3798
                    LayerName=layer_name,
3799
                    LayerArn=layer.arn,
3800
                    LatestMatchingVersion=api_utils.map_layer_out(latest_layer_version),
3801
                )
3802
            )
3803

UNCOV
3804
        responses = PaginatedList(responses)
×
UNCOV
3805
        page, token = responses.get_page(
×
3806
            lambda version: version,
3807
            marker,
3808
            max_items,
3809
        )
3810

UNCOV
3811
        return ListLayersResponse(NextMarker=token, Layers=page)
×
3812

3813
    def list_layer_versions(
1✔
3814
        self,
3815
        context: RequestContext,
3816
        layer_name: LayerName,
3817
        compatible_runtime: Runtime = None,
3818
        marker: String = None,
3819
        max_items: MaxLayerListItems = None,
3820
        compatible_architecture: Architecture = None,
3821
        **kwargs,
3822
    ) -> ListLayerVersionsResponse:
3823
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
3824
            [compatible_runtime] if compatible_runtime else [],
3825
            [compatible_architecture] if compatible_architecture else [],
3826
        )
3827
        if validation_errors:
1✔
UNCOV
3828
            raise ValidationException(
×
3829
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
3830
            )
3831

3832
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3833
            layer_name, context
3834
        )
3835
        state = lambda_stores[account_id][region_name]
1✔
3836

3837
        # TODO: Test & handle filter: compatible_runtime
3838
        # TODO: Test & handle filter: compatible_architecture
3839
        all_layer_versions = []
1✔
3840
        layer = state.layers.get(layer_name)
1✔
3841
        if layer is not None:
1✔
3842
            for layer_version in layer.layer_versions.values():
1✔
3843
                all_layer_versions.append(api_utils.map_layer_out(layer_version))
1✔
3844

3845
        all_layer_versions.sort(key=lambda x: x["Version"], reverse=True)
1✔
3846
        all_layer_versions = PaginatedList(all_layer_versions)
1✔
3847
        page, token = all_layer_versions.get_page(
1✔
3848
            lambda version: version["LayerVersionArn"],
3849
            marker,
3850
            max_items,
3851
        )
3852
        return ListLayerVersionsResponse(NextMarker=token, LayerVersions=page)
1✔
3853

3854
    def delete_layer_version(
1✔
3855
        self,
3856
        context: RequestContext,
3857
        layer_name: LayerName,
3858
        version_number: LayerVersionNumber,
3859
        **kwargs,
3860
    ) -> None:
3861
        if version_number < 1:
1✔
3862
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
3863

3864
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3865
            layer_name, context
3866
        )
3867

3868
        store = lambda_stores[account_id][region_name]
1✔
3869
        layer = store.layers.get(layer_name, {})
1✔
3870
        if layer:
1✔
3871
            layer.layer_versions.pop(str(version_number), None)
1✔
3872

3873
    # =======================================
3874
    # =====  Layer Version Permissions  =====
3875
    # =======================================
3876
    # TODO: lock updates that change revision IDs
3877

3878
    def add_layer_version_permission(
1✔
3879
        self,
3880
        context: RequestContext,
3881
        layer_name: LayerName,
3882
        version_number: LayerVersionNumber,
3883
        statement_id: StatementId,
3884
        action: LayerPermissionAllowedAction,
3885
        principal: LayerPermissionAllowedPrincipal,
3886
        organization_id: OrganizationId = None,
3887
        revision_id: String = None,
3888
        **kwargs,
3889
    ) -> AddLayerVersionPermissionResponse:
3890
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
3891
        # `layer_n` contains the layer name.
3892
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
3893

3894
        if action != "lambda:GetLayerVersion":
1✔
3895
            raise ValidationException(
1✔
3896
                f"1 validation error detected: Value '{action}' at 'action' failed to satisfy constraint: Member must satisfy regular expression pattern: lambda:GetLayerVersion"
3897
            )
3898

3899
        store = lambda_stores[account_id][region_name]
1✔
3900
        layer = store.layers.get(layer_n)
1✔
3901

3902
        layer_version_arn = api_utils.layer_version_arn(
1✔
3903
            layer_name, account_id, region_name, str(version_number)
3904
        )
3905

3906
        if layer is None:
1✔
3907
            raise ResourceNotFoundException(
1✔
3908
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3909
            )
3910
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3911
        if layer_version is None:
1✔
3912
            raise ResourceNotFoundException(
1✔
3913
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3914
            )
3915
        # do we have a policy? if not set one
3916
        if layer_version.policy is None:
1✔
3917
            layer_version.policy = LayerPolicy()
1✔
3918

3919
        if statement_id in layer_version.policy.statements:
1✔
3920
            raise ResourceConflictException(
1✔
3921
                f"The statement id ({statement_id}) provided already exists. Please provide a new statement id, or remove the existing statement.",
3922
                Type="User",
3923
            )
3924

3925
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
3926
            raise PreconditionFailedException(
1✔
3927
                "The Revision Id provided does not match the latest Revision Id. "
3928
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
3929
                Type="User",
3930
            )
3931

3932
        statement = LayerPolicyStatement(
1✔
3933
            sid=statement_id, action=action, principal=principal, organization_id=organization_id
3934
        )
3935

3936
        old_statements = layer_version.policy.statements
1✔
3937
        layer_version.policy = dataclasses.replace(
1✔
3938
            layer_version.policy, statements={**old_statements, statement_id: statement}
3939
        )
3940

3941
        return AddLayerVersionPermissionResponse(
1✔
3942
            Statement=json.dumps(
3943
                {
3944
                    "Sid": statement.sid,
3945
                    "Effect": "Allow",
3946
                    "Principal": statement.principal,
3947
                    "Action": statement.action,
3948
                    "Resource": layer_version.layer_version_arn,
3949
                }
3950
            ),
3951
            RevisionId=layer_version.policy.revision_id,
3952
        )
3953

3954
    def remove_layer_version_permission(
1✔
3955
        self,
3956
        context: RequestContext,
3957
        layer_name: LayerName,
3958
        version_number: LayerVersionNumber,
3959
        statement_id: StatementId,
3960
        revision_id: String = None,
3961
        **kwargs,
3962
    ) -> None:
3963
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
3964
        # `layer_n` contains the layer name.
3965
        region_name, account_id, layer_n, layer_version = LambdaProvider._resolve_layer(
1✔
3966
            layer_name, context
3967
        )
3968

3969
        layer_version_arn = api_utils.layer_version_arn(
1✔
3970
            layer_name, account_id, region_name, str(version_number)
3971
        )
3972

3973
        state = lambda_stores[account_id][region_name]
1✔
3974
        layer = state.layers.get(layer_n)
1✔
3975
        if layer is None:
1✔
3976
            raise ResourceNotFoundException(
1✔
3977
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3978
            )
3979
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3980
        if layer_version is None:
1✔
3981
            raise ResourceNotFoundException(
1✔
3982
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3983
            )
3984

3985
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
3986
            raise PreconditionFailedException(
1✔
3987
                "The Revision Id provided does not match the latest Revision Id. "
3988
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
3989
                Type="User",
3990
            )
3991

3992
        if statement_id not in layer_version.policy.statements:
1✔
3993
            raise ResourceNotFoundException(
1✔
3994
                f"Statement {statement_id} is not found in resource policy.", Type="User"
3995
            )
3996

3997
        old_statements = layer_version.policy.statements
1✔
3998
        layer_version.policy = dataclasses.replace(
1✔
3999
            layer_version.policy,
4000
            statements={k: v for k, v in old_statements.items() if k != statement_id},
4001
        )
4002

4003
    def get_layer_version_policy(
1✔
4004
        self,
4005
        context: RequestContext,
4006
        layer_name: LayerName,
4007
        version_number: LayerVersionNumber,
4008
        **kwargs,
4009
    ) -> GetLayerVersionPolicyResponse:
4010
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4011
        # `layer_n` contains the layer name.
4012
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
4013

4014
        layer_version_arn = api_utils.layer_version_arn(
1✔
4015
            layer_name, account_id, region_name, str(version_number)
4016
        )
4017

4018
        store = lambda_stores[account_id][region_name]
1✔
4019
        layer = store.layers.get(layer_n)
1✔
4020

4021
        if layer is None:
1✔
4022
            raise ResourceNotFoundException(
1✔
4023
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4024
            )
4025

4026
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4027
        if layer_version is None:
1✔
4028
            raise ResourceNotFoundException(
1✔
4029
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4030
            )
4031

4032
        if layer_version.policy is None:
1✔
4033
            raise ResourceNotFoundException(
1✔
4034
                "No policy is associated with the given resource.", Type="User"
4035
            )
4036

4037
        return GetLayerVersionPolicyResponse(
1✔
4038
            Policy=json.dumps(
4039
                {
4040
                    "Version": layer_version.policy.version,
4041
                    "Id": layer_version.policy.id,
4042
                    "Statement": [
4043
                        {
4044
                            "Sid": ps.sid,
4045
                            "Effect": "Allow",
4046
                            "Principal": ps.principal,
4047
                            "Action": ps.action,
4048
                            "Resource": layer_version.layer_version_arn,
4049
                        }
4050
                        for ps in layer_version.policy.statements.values()
4051
                    ],
4052
                }
4053
            ),
4054
            RevisionId=layer_version.policy.revision_id,
4055
        )
4056

4057
    # =======================================
4058
    # =======  Function Concurrency  ========
4059
    # =======================================
4060
    # (Reserved) function concurrency is scoped to the whole function
4061

4062
    def get_function_concurrency(
1✔
4063
        self, context: RequestContext, function_name: FunctionName, **kwargs
4064
    ) -> GetFunctionConcurrencyResponse:
4065
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4066
        function_name = api_utils.get_function_name(function_name, context)
1✔
4067
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
4068
        return GetFunctionConcurrencyResponse(
1✔
4069
            ReservedConcurrentExecutions=fn.reserved_concurrent_executions
4070
        )
4071

4072
    def put_function_concurrency(
1✔
4073
        self,
4074
        context: RequestContext,
4075
        function_name: FunctionName,
4076
        reserved_concurrent_executions: ReservedConcurrentExecutions,
4077
        **kwargs,
4078
    ) -> Concurrency:
4079
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4080

4081
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4082
        if qualifier:
1✔
4083
            raise InvalidParameterValueException(
1✔
4084
                "This operation is permitted on Lambda functions only. Aliases and versions do not support this operation. Please specify either a function name or an unqualified function ARN.",
4085
                Type="User",
4086
            )
4087

4088
        store = lambda_stores[account_id][region]
1✔
4089
        fn = store.functions.get(function_name)
1✔
4090
        if not fn:
1✔
4091
            fn_arn = api_utils.qualified_lambda_arn(
1✔
4092
                function_name,
4093
                qualifier="$LATEST",
4094
                account=account_id,
4095
                region=region,
4096
            )
4097
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
4098

4099
        settings = self.get_account_settings(context)
1✔
4100
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
4101
            "UnreservedConcurrentExecutions"
4102
        ]
4103

4104
        # The existing reserved concurrent executions for the same function are already deduced in
4105
        # unreserved_concurrent_executions but must not count because the new one will replace the existing one.
4106
        # Joel tested this behavior manually against AWS (2023-11-28).
4107
        existing_reserved_concurrent_executions = (
1✔
4108
            fn.reserved_concurrent_executions if fn.reserved_concurrent_executions else 0
4109
        )
4110
        if (
1✔
4111
            unreserved_concurrent_executions
4112
            - reserved_concurrent_executions
4113
            + existing_reserved_concurrent_executions
4114
        ) < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY:
4115
            raise InvalidParameterValueException(
1✔
4116
                f"Specified ReservedConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
4117
            )
4118

4119
        total_provisioned_concurrency = sum(
1✔
4120
            [
4121
                provisioned_configs.provisioned_concurrent_executions
4122
                for provisioned_configs in fn.provisioned_concurrency_configs.values()
4123
            ]
4124
        )
4125
        if total_provisioned_concurrency > reserved_concurrent_executions:
1✔
4126
            raise InvalidParameterValueException(
1✔
4127
                f" ReservedConcurrentExecutions  {reserved_concurrent_executions} should not be lower than function's total provisioned concurrency [{total_provisioned_concurrency}]."
4128
            )
4129

4130
        fn.reserved_concurrent_executions = reserved_concurrent_executions
1✔
4131

4132
        return Concurrency(ReservedConcurrentExecutions=fn.reserved_concurrent_executions)
1✔
4133

4134
    def delete_function_concurrency(
1✔
4135
        self, context: RequestContext, function_name: FunctionName, **kwargs
4136
    ) -> None:
4137
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4138
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4139
        store = lambda_stores[account_id][region]
1✔
4140
        fn = store.functions.get(function_name)
1✔
4141
        fn.reserved_concurrent_executions = None
1✔
4142

4143
    # =======================================
4144
    # ===============  TAGS   ===============
4145
    # =======================================
4146
    # only Function, Event Source Mapping, and Code Signing Config (not currently supported by LocalStack) ARNs an are available for tagging in AWS
4147

4148
    def _get_tags(self, resource: TaggableResource) -> dict[str, str]:
1✔
4149
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4150
        lambda_adapted_tags = {
1✔
4151
            tag["Key"]: tag["Value"]
4152
            for tag in state.TAGS.list_tags_for_resource(resource).get("Tags")
4153
        }
4154
        return lambda_adapted_tags
1✔
4155

4156
    def _store_tags(self, resource: TaggableResource, tags: dict[str, str]):
1✔
4157
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4158
        if len(state.TAGS.tags.get(resource, {}) | tags) > LAMBDA_TAG_LIMIT_PER_RESOURCE:
1✔
4159
            raise InvalidParameterValueException(
1✔
4160
                "Number of tags exceeds resource tag limit.", Type="User"
4161
            )
4162

4163
        tag_svc_adapted_tags = [{"Key": key, "Value": value} for key, value in tags.items()]
1✔
4164
        state.TAGS.tag_resource(resource, tag_svc_adapted_tags)
1✔
4165

4166
    def fetch_lambda_store_for_tagging(self, resource: TaggableResource) -> LambdaStore:
1✔
4167
        """
4168
        Takes a resource ARN for a TaggableResource (Lambda Function, Event Source Mapping, or Code Signing Config) and returns a corresponding
4169
        LambdaStore for its region and account.
4170

4171
        In addition, this function validates that the ARN is a valid TaggableResource type, and that the TaggableResource exists.
4172

4173
        Raises:
4174
            ValidationException: If the resource ARN is not a full ARN for a TaggableResource.
4175
            ResourceNotFoundException: If the specified resource does not exist.
4176
            InvalidParameterValueException: If the resource ARN is a qualified Lambda Function.
4177
        """
4178

4179
        def _raise_validation_exception():
1✔
4180
            raise ValidationException(
1✔
4181
                f"1 validation error detected: Value '{resource}' at 'resource' failed to satisfy constraint: Member must satisfy regular expression pattern: {api_utils.TAGGABLE_RESOURCE_ARN_PATTERN}"
4182
            )
4183

4184
        # Check whether the ARN we have been passed is correctly formatted
4185
        parsed_resource_arn: ArnData = None
1✔
4186
        try:
1✔
4187
            parsed_resource_arn = parse_arn(resource)
1✔
4188
        except Exception:
1✔
4189
            _raise_validation_exception()
1✔
4190

4191
        # TODO: Should we be checking whether this is a full ARN?
4192
        region, account_id, resource_type = map(
1✔
4193
            parsed_resource_arn.get, ("region", "account", "resource")
4194
        )
4195

4196
        if not all((region, account_id, resource_type)):
1✔
UNCOV
4197
            _raise_validation_exception()
×
4198

4199
        if not (parts := resource_type.split(":")):
1✔
UNCOV
4200
            _raise_validation_exception()
×
4201

4202
        resource_type, resource_identifier, *qualifier = parts
1✔
4203
        if resource_type not in {"event-source-mapping", "code-signing-config", "function"}:
1✔
4204
            _raise_validation_exception()
1✔
4205

4206
        if qualifier:
1✔
4207
            if resource_type == "function":
1✔
4208
                raise InvalidParameterValueException(
1✔
4209
                    "Tags on function aliases and versions are not supported. Please specify a function ARN.",
4210
                    Type="User",
4211
                )
4212
            _raise_validation_exception()
1✔
4213

4214
        match resource_type:
1✔
4215
            case "event-source-mapping":
1✔
4216
                self._get_esm(resource_identifier, account_id, region)
1✔
4217
            case "code-signing-config":
1✔
4218
                raise NotImplementedError("Resource tagging on CSC not yet implemented.")
4219
            case "function":
1✔
4220
                self._get_function(
1✔
4221
                    function_name=resource_identifier, account_id=account_id, region=region
4222
                )
4223

4224
        # If no exceptions are raised, assume ARN and referenced resource is valid for tag operations
4225
        return lambda_stores[account_id][region]
1✔
4226

4227
    def tag_resource(
1✔
4228
        self, context: RequestContext, resource: TaggableResource, tags: Tags, **kwargs
4229
    ) -> None:
4230
        if not tags:
1✔
4231
            raise InvalidParameterValueException(
1✔
4232
                "An error occurred and the request cannot be processed.", Type="User"
4233
            )
4234
        self._store_tags(resource, tags)
1✔
4235

4236
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4237
            "function"
4238
        ):
4239
            name, _, account, region = function_locators_from_arn(resource)
1✔
4240
            function = self._get_function(name, account, region)
1✔
4241
            with function.lock:
1✔
4242
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4243
                latest_version = function.versions["$LATEST"]
1✔
4244
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4245
                    latest_version, config=dataclasses.replace(latest_version.config)
4246
                )
4247

4248
    def list_tags(
1✔
4249
        self, context: RequestContext, resource: TaggableResource, **kwargs
4250
    ) -> ListTagsResponse:
4251
        tags = self._get_tags(resource)
1✔
4252
        return ListTagsResponse(Tags=tags)
1✔
4253

4254
    def untag_resource(
1✔
4255
        self, context: RequestContext, resource: TaggableResource, tag_keys: TagKeyList, **kwargs
4256
    ) -> None:
4257
        if not tag_keys:
1✔
4258
            raise ValidationException(
1✔
4259
                "1 validation error detected: Value null at 'tagKeys' failed to satisfy constraint: Member must not be null"
4260
            )  # should probably be generalized a bit
4261

4262
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4263
        state.TAGS.untag_resource(resource, tag_keys)
1✔
4264

4265
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4266
            "function"
4267
        ):
4268
            name, _, account, region = function_locators_from_arn(resource)
1✔
4269
            function = self._get_function(name, account, region)
1✔
4270
            # TODO: Potential race condition
4271
            with function.lock:
1✔
4272
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4273
                latest_version = function.versions["$LATEST"]
1✔
4274
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4275
                    latest_version, config=dataclasses.replace(latest_version.config)
4276
                )
4277

4278
    # =======================================
4279
    # =======  LEGACY / DEPRECATED   ========
4280
    # =======================================
4281

4282
    def invoke_async(
1✔
4283
        self,
4284
        context: RequestContext,
4285
        function_name: NamespacedFunctionName,
4286
        invoke_args: IO[BlobStream],
4287
        **kwargs,
4288
    ) -> InvokeAsyncResponse:
4289
        """LEGACY API endpoint. Even AWS heavily discourages its usage."""
4290
        raise NotImplementedError
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc