• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 858585c6-6e27-4bca-a3b8-78d6d978f737

25 Mar 2025 06:43PM UTC coverage: 86.88% (+0.02%) from 86.865%
858585c6-6e27-4bca-a3b8-78d6d978f737

push

circleci

web-flow
[Utils] Add a batch policy utility (#12430)

53 of 56 new or added lines in 1 file covered. (94.64%)

227 existing lines in 12 files now uncovered.

63251 of 72803 relevant lines covered (86.88%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.93
/localstack-core/localstack/services/lambda_/provider.py
1
import base64
1✔
2
import dataclasses
1✔
3
import datetime
1✔
4
import itertools
1✔
5
import json
1✔
6
import logging
1✔
7
import re
1✔
8
import threading
1✔
9
import time
1✔
10
from typing import IO, Any, Optional, Tuple
1✔
11

12
from botocore.exceptions import ClientError
1✔
13

14
from localstack import config
1✔
15
from localstack.aws.api import RequestContext, ServiceException, handler
1✔
16
from localstack.aws.api.lambda_ import (
1✔
17
    AccountLimit,
18
    AccountUsage,
19
    AddLayerVersionPermissionResponse,
20
    AddPermissionRequest,
21
    AddPermissionResponse,
22
    Alias,
23
    AliasConfiguration,
24
    AliasRoutingConfiguration,
25
    AllowedPublishers,
26
    Architecture,
27
    Arn,
28
    Blob,
29
    BlobStream,
30
    CodeSigningConfigArn,
31
    CodeSigningConfigNotFoundException,
32
    CodeSigningPolicies,
33
    CompatibleArchitectures,
34
    CompatibleRuntimes,
35
    Concurrency,
36
    Cors,
37
    CreateCodeSigningConfigResponse,
38
    CreateEventSourceMappingRequest,
39
    CreateFunctionRequest,
40
    CreateFunctionUrlConfigResponse,
41
    DeleteCodeSigningConfigResponse,
42
    Description,
43
    DestinationConfig,
44
    EventSourceMappingConfiguration,
45
    FunctionCodeLocation,
46
    FunctionConfiguration,
47
    FunctionEventInvokeConfig,
48
    FunctionName,
49
    FunctionUrlAuthType,
50
    FunctionUrlQualifier,
51
    GetAccountSettingsResponse,
52
    GetCodeSigningConfigResponse,
53
    GetFunctionCodeSigningConfigResponse,
54
    GetFunctionConcurrencyResponse,
55
    GetFunctionRecursionConfigResponse,
56
    GetFunctionResponse,
57
    GetFunctionUrlConfigResponse,
58
    GetLayerVersionPolicyResponse,
59
    GetLayerVersionResponse,
60
    GetPolicyResponse,
61
    GetProvisionedConcurrencyConfigResponse,
62
    InvalidParameterValueException,
63
    InvocationResponse,
64
    InvocationType,
65
    InvokeAsyncResponse,
66
    InvokeMode,
67
    LambdaApi,
68
    LastUpdateStatus,
69
    LayerName,
70
    LayerPermissionAllowedAction,
71
    LayerPermissionAllowedPrincipal,
72
    LayersListItem,
73
    LayerVersionArn,
74
    LayerVersionContentInput,
75
    LayerVersionNumber,
76
    LicenseInfo,
77
    ListAliasesResponse,
78
    ListCodeSigningConfigsResponse,
79
    ListEventSourceMappingsResponse,
80
    ListFunctionEventInvokeConfigsResponse,
81
    ListFunctionsByCodeSigningConfigResponse,
82
    ListFunctionsResponse,
83
    ListFunctionUrlConfigsResponse,
84
    ListLayersResponse,
85
    ListLayerVersionsResponse,
86
    ListProvisionedConcurrencyConfigsResponse,
87
    ListTagsResponse,
88
    ListVersionsByFunctionResponse,
89
    LogFormat,
90
    LoggingConfig,
91
    LogType,
92
    MasterRegion,
93
    MaxFunctionEventInvokeConfigListItems,
94
    MaximumEventAgeInSeconds,
95
    MaximumRetryAttempts,
96
    MaxItems,
97
    MaxLayerListItems,
98
    MaxListItems,
99
    MaxProvisionedConcurrencyConfigListItems,
100
    NamespacedFunctionName,
101
    NamespacedStatementId,
102
    OnFailure,
103
    OnSuccess,
104
    OrganizationId,
105
    PackageType,
106
    PositiveInteger,
107
    PreconditionFailedException,
108
    ProvisionedConcurrencyConfigListItem,
109
    ProvisionedConcurrencyConfigNotFoundException,
110
    ProvisionedConcurrencyStatusEnum,
111
    PublishLayerVersionResponse,
112
    PutFunctionCodeSigningConfigResponse,
113
    PutFunctionRecursionConfigResponse,
114
    PutProvisionedConcurrencyConfigResponse,
115
    Qualifier,
116
    RecursiveLoop,
117
    ReservedConcurrentExecutions,
118
    ResourceConflictException,
119
    ResourceNotFoundException,
120
    Runtime,
121
    RuntimeVersionConfig,
122
    SnapStart,
123
    SnapStartApplyOn,
124
    SnapStartOptimizationStatus,
125
    SnapStartResponse,
126
    State,
127
    StatementId,
128
    StateReasonCode,
129
    String,
130
    TaggableResource,
131
    TagKeyList,
132
    Tags,
133
    TracingMode,
134
    UnqualifiedFunctionName,
135
    UpdateCodeSigningConfigResponse,
136
    UpdateEventSourceMappingRequest,
137
    UpdateFunctionCodeRequest,
138
    UpdateFunctionConfigurationRequest,
139
    UpdateFunctionUrlConfigResponse,
140
    Version,
141
)
142
from localstack.aws.api.lambda_ import FunctionVersion as FunctionVersionApi
1✔
143
from localstack.aws.api.lambda_ import ServiceException as LambdaServiceException
1✔
144
from localstack.aws.api.pipes import (
1✔
145
    DynamoDBStreamStartPosition,
146
    KinesisStreamStartPosition,
147
)
148
from localstack.aws.connect import connect_to
1✔
149
from localstack.aws.spec import load_service
1✔
150
from localstack.services.edge import ROUTER
1✔
151
from localstack.services.lambda_ import api_utils
1✔
152
from localstack.services.lambda_ import hooks as lambda_hooks
1✔
153
from localstack.services.lambda_.analytics import (
1✔
154
    FunctionOperation,
155
    FunctionStatus,
156
    function_counter,
157
)
158
from localstack.services.lambda_.api_utils import (
1✔
159
    ARCHITECTURES,
160
    STATEMENT_ID_REGEX,
161
    SUBNET_ID_REGEX,
162
    function_locators_from_arn,
163
)
164
from localstack.services.lambda_.event_source_mapping.esm_config_factory import (
1✔
165
    EsmConfigFactory,
166
)
167
from localstack.services.lambda_.event_source_mapping.esm_worker import (
1✔
168
    EsmState,
169
    EsmWorker,
170
)
171
from localstack.services.lambda_.event_source_mapping.esm_worker_factory import (
1✔
172
    EsmWorkerFactory,
173
)
174
from localstack.services.lambda_.event_source_mapping.pipe_utils import get_internal_client
1✔
175
from localstack.services.lambda_.invocation import AccessDeniedException
1✔
176
from localstack.services.lambda_.invocation.execution_environment import (
1✔
177
    EnvironmentStartupTimeoutException,
178
)
179
from localstack.services.lambda_.invocation.lambda_models import (
1✔
180
    AliasRoutingConfig,
181
    CodeSigningConfig,
182
    EventInvokeConfig,
183
    Function,
184
    FunctionResourcePolicy,
185
    FunctionUrlConfig,
186
    FunctionVersion,
187
    ImageConfig,
188
    LambdaEphemeralStorage,
189
    Layer,
190
    LayerPolicy,
191
    LayerPolicyStatement,
192
    LayerVersion,
193
    ProvisionedConcurrencyConfiguration,
194
    RequestEntityTooLargeException,
195
    ResourcePolicy,
196
    UpdateStatus,
197
    ValidationException,
198
    VersionAlias,
199
    VersionFunctionConfiguration,
200
    VersionIdentifier,
201
    VersionState,
202
    VpcConfig,
203
)
204
from localstack.services.lambda_.invocation.lambda_service import (
1✔
205
    LambdaService,
206
    create_image_code,
207
    destroy_code_if_not_used,
208
    lambda_stores,
209
    store_lambda_archive,
210
    store_s3_bucket_archive,
211
)
212
from localstack.services.lambda_.invocation.models import LambdaStore
1✔
213
from localstack.services.lambda_.invocation.runtime_executor import get_runtime_executor
1✔
214
from localstack.services.lambda_.lambda_utils import HINT_LOG
1✔
215
from localstack.services.lambda_.layerfetcher.layer_fetcher import LayerFetcher
1✔
216
from localstack.services.lambda_.provider_utils import (
1✔
217
    LambdaLayerVersionIdentifier,
218
    get_function_version,
219
    get_function_version_from_arn,
220
)
221
from localstack.services.lambda_.runtimes import (
1✔
222
    ALL_RUNTIMES,
223
    DEPRECATED_RUNTIMES,
224
    DEPRECATED_RUNTIMES_UPGRADES,
225
    RUNTIMES_AGGREGATED,
226
    VALID_RUNTIMES,
227
)
228
from localstack.services.lambda_.urlrouter import FunctionUrlRouter
1✔
229
from localstack.services.plugins import ServiceLifecycleHook
1✔
230
from localstack.state import StateVisitor
1✔
231
from localstack.utils.aws.arns import (
1✔
232
    ArnData,
233
    extract_resource_from_arn,
234
    extract_service_from_arn,
235
    get_partition,
236
    lambda_event_source_mapping_arn,
237
    parse_arn,
238
)
239
from localstack.utils.aws.client_types import ServicePrincipal
1✔
240
from localstack.utils.bootstrap import is_api_enabled
1✔
241
from localstack.utils.collections import PaginatedList
1✔
242
from localstack.utils.event_matcher import validate_event_pattern
1✔
243
from localstack.utils.lambda_debug_mode.lambda_debug_mode_session import LambdaDebugModeSession
1✔
244
from localstack.utils.strings import get_random_hex, short_uid, to_bytes, to_str
1✔
245
from localstack.utils.sync import poll_condition
1✔
246
from localstack.utils.urls import localstack_host
1✔
247

248
LOG = logging.getLogger(__name__)
1✔
249

250
LAMBDA_DEFAULT_TIMEOUT = 3
1✔
251
LAMBDA_DEFAULT_MEMORY_SIZE = 128
1✔
252

253
LAMBDA_TAG_LIMIT_PER_RESOURCE = 50
1✔
254
LAMBDA_LAYERS_LIMIT_PER_FUNCTION = 5
1✔
255

256
TAG_KEY_CUSTOM_URL = "_custom_id_"
1✔
257
# Requirements (from RFC3986 & co): not longer than 63, first char must be
258
# alpha, then alphanumeric or hyphen, except cannot start or end with hyphen
259
TAG_KEY_CUSTOM_URL_VALIDATOR = re.compile(r"^[A-Za-z]([A-Za-z0-9\-]{0,61}[A-Za-z0-9])?$")
1✔
260

261

262
class LambdaProvider(LambdaApi, ServiceLifecycleHook):
1✔
263
    lambda_service: LambdaService
1✔
264
    create_fn_lock: threading.RLock
1✔
265
    create_layer_lock: threading.RLock
1✔
266
    router: FunctionUrlRouter
1✔
267
    esm_workers: dict[str, EsmWorker]
1✔
268
    layer_fetcher: LayerFetcher | None
1✔
269

270
    def __init__(self) -> None:
1✔
271
        self.lambda_service = LambdaService()
1✔
272
        self.create_fn_lock = threading.RLock()
1✔
273
        self.create_layer_lock = threading.RLock()
1✔
274
        self.router = FunctionUrlRouter(ROUTER, self.lambda_service)
1✔
275
        self.esm_workers = {}
1✔
276
        self.layer_fetcher = None
1✔
277
        lambda_hooks.inject_layer_fetcher.run(self)
1✔
278

279
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
UNCOV
280
        visitor.visit(lambda_stores)
×
281

282
    def on_before_start(self):
1✔
283
        # Attempt to start the Lambda Debug Mode session object.
284
        try:
1✔
285
            lambda_debug_mode_session = LambdaDebugModeSession.get()
1✔
286
            lambda_debug_mode_session.ensure_running()
1✔
UNCOV
287
        except Exception as ex:
×
UNCOV
288
            LOG.error(
×
289
                "Unexpected error encountered when attempting to initialise Lambda Debug Mode '%s'.",
290
                ex,
291
            )
292

293
    def on_before_state_reset(self):
1✔
UNCOV
294
        self.lambda_service.stop()
×
295

296
    def on_after_state_reset(self):
1✔
UNCOV
297
        self.router.lambda_service = self.lambda_service = LambdaService()
×
298

299
    def on_before_state_load(self):
1✔
UNCOV
300
        self.lambda_service.stop()
×
301

302
    def on_after_state_load(self):
1✔
303
        self.lambda_service = LambdaService()
×
304
        self.router.lambda_service = self.lambda_service
×
305

306
        for account_id, account_bundle in lambda_stores.items():
×
307
            for region_name, state in account_bundle.items():
×
UNCOV
308
                for fn in state.functions.values():
×
UNCOV
309
                    for fn_version in fn.versions.values():
×
310
                        # restore the "Pending" state for every function version and start it
UNCOV
311
                        try:
×
312
                            new_state = VersionState(
×
313
                                state=State.Pending,
314
                                code=StateReasonCode.Creating,
315
                                reason="The function is being created.",
316
                            )
UNCOV
317
                            new_config = dataclasses.replace(fn_version.config, state=new_state)
×
318
                            new_version = dataclasses.replace(fn_version, config=new_config)
×
319
                            fn.versions[fn_version.id.qualifier] = new_version
×
UNCOV
320
                            self.lambda_service.create_function_version(fn_version).result(
×
321
                                timeout=5
322
                            )
UNCOV
323
                        except Exception:
×
UNCOV
324
                            LOG.warning(
×
325
                                "Failed to restore function version %s",
326
                                fn_version.id.qualified_arn(),
327
                                exc_info=True,
328
                            )
329
                    # restore provisioned concurrency per function considering both versions and aliases
330
                    for (
×
331
                        provisioned_qualifier,
332
                        provisioned_config,
333
                    ) in fn.provisioned_concurrency_configs.items():
334
                        fn_arn = None
×
335
                        try:
×
336
                            if api_utils.qualifier_is_alias(provisioned_qualifier):
×
337
                                alias = fn.aliases.get(provisioned_qualifier)
×
UNCOV
338
                                resolved_version = fn.versions.get(alias.function_version)
×
339
                                fn_arn = resolved_version.id.qualified_arn()
×
UNCOV
340
                            elif api_utils.qualifier_is_version(provisioned_qualifier):
×
UNCOV
341
                                fn_version = fn.versions.get(provisioned_qualifier)
×
UNCOV
342
                                fn_arn = fn_version.id.qualified_arn()
×
343
                            else:
344
                                raise InvalidParameterValueException(
×
345
                                    "Invalid qualifier type:"
346
                                    " Qualifier can only be an alias or a version for provisioned concurrency."
347
                                )
348

349
                            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
UNCOV
350
                            manager.update_provisioned_concurrency_config(
×
351
                                provisioned_config.provisioned_concurrent_executions
352
                            )
UNCOV
353
                        except Exception:
×
UNCOV
354
                            LOG.warning(
×
355
                                "Failed to restore provisioned concurrency %s for function %s",
356
                                provisioned_config,
357
                                fn_arn,
358
                                exc_info=True,
359
                            )
360

UNCOV
361
                for esm in state.event_source_mappings.values():
×
362
                    # Restores event source workers
UNCOV
363
                    function_arn = esm.get("FunctionArn")
×
364

365
                    # TODO: How do we know the event source is up?
366
                    # A basic poll to see if the mapped Lambda function is active/failed
367
                    if not poll_condition(
×
368
                        lambda: get_function_version_from_arn(function_arn).config.state.state
369
                        in [State.Active, State.Failed],
370
                        timeout=10,
371
                    ):
372
                        LOG.warning(
×
373
                            "Creating ESM for Lambda that is not in running state: %s",
374
                            function_arn,
375
                        )
376

UNCOV
377
                    function_version = get_function_version_from_arn(function_arn)
×
UNCOV
378
                    function_role = function_version.config.role
×
379

UNCOV
380
                    is_esm_enabled = esm.get("State", EsmState.DISABLED) not in (
×
381
                        EsmState.DISABLED,
382
                        EsmState.DISABLING,
383
                    )
384
                    esm_worker = EsmWorkerFactory(
×
385
                        esm, function_role, is_esm_enabled
386
                    ).get_esm_worker()
387

388
                    # Note: a worker is created in the DISABLED state if not enabled
UNCOV
389
                    esm_worker.create()
×
390
                    # TODO: assigning the esm_worker to the dict only works after .create(). Could it cause a race
391
                    #  condition if we get a shutdown here and have a worker thread spawned but not accounted for?
UNCOV
392
                    self.esm_workers[esm_worker.uuid] = esm_worker
×
393

394
    def on_after_init(self):
1✔
395
        self.router.register_routes()
1✔
396
        get_runtime_executor().validate_environment()
1✔
397

398
    def on_before_stop(self) -> None:
1✔
399
        for esm_worker in self.esm_workers.values():
1✔
400
            esm_worker.stop_for_shutdown()
1✔
401

402
        # TODO: should probably unregister routes?
403
        self.lambda_service.stop()
1✔
404
        # Attempt to signal to the Lambda Debug Mode session object to stop.
405
        try:
1✔
406
            lambda_debug_mode_session = LambdaDebugModeSession.get()
1✔
407
            lambda_debug_mode_session.signal_stop()
1✔
UNCOV
408
        except Exception as ex:
×
UNCOV
409
            LOG.error(
×
410
                "Unexpected error encountered when attempting to signal Lambda Debug Mode to stop '%s'.",
411
                ex,
412
            )
413

414
    @staticmethod
1✔
415
    def _get_function(function_name: str, account_id: str, region: str) -> Function:
1✔
416
        state = lambda_stores[account_id][region]
1✔
417
        function = state.functions.get(function_name)
1✔
418
        if not function:
1✔
419
            arn = api_utils.unqualified_lambda_arn(
1✔
420
                function_name=function_name,
421
                account=account_id,
422
                region=region,
423
            )
424
            raise ResourceNotFoundException(
1✔
425
                f"Function not found: {arn}",
426
                Type="User",
427
            )
428
        return function
1✔
429

430
    @staticmethod
1✔
431
    def _get_esm(uuid: str, account_id: str, region: str) -> EventSourceMappingConfiguration:
1✔
432
        state = lambda_stores[account_id][region]
1✔
433
        esm = state.event_source_mappings.get(uuid)
1✔
434
        if not esm:
1✔
435
            arn = lambda_event_source_mapping_arn(uuid, account_id, region)
1✔
436
            raise ResourceNotFoundException(
1✔
437
                f"Event source mapping not found: {arn}",
438
                Type="User",
439
            )
440
        return esm
1✔
441

442
    @staticmethod
1✔
443
    def _validate_qualifier_expression(qualifier: str) -> None:
1✔
444
        if error_messages := api_utils.validate_qualifier(qualifier):
1✔
UNCOV
445
            raise ValidationException(
×
446
                message=api_utils.construct_validation_exception_message(error_messages)
447
            )
448

449
    @staticmethod
1✔
450
    def _resolve_fn_qualifier(resolved_fn: Function, qualifier: str | None) -> tuple[str, str]:
1✔
451
        """Attempts to resolve a given qualifier and returns a qualifier that exists or
452
        raises an appropriate ResourceNotFoundException.
453

454
        :param resolved_fn: The resolved lambda function
455
        :param qualifier: The qualifier to be resolved or None
456
        :return: Tuple of (resolved qualifier, function arn either qualified or unqualified)"""
457
        function_name = resolved_fn.function_name
1✔
458
        # assuming function versions need to live in the same account and region
459
        account_id = resolved_fn.latest().id.account
1✔
460
        region = resolved_fn.latest().id.region
1✔
461
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
462
        if qualifier is not None:
1✔
463
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
464
            if api_utils.qualifier_is_alias(qualifier):
1✔
465
                if qualifier not in resolved_fn.aliases:
1✔
466
                    raise ResourceNotFoundException(f"Cannot find alias arn: {fn_arn}", Type="User")
1✔
467
            elif api_utils.qualifier_is_version(qualifier) or qualifier == "$LATEST":
1✔
468
                if qualifier not in resolved_fn.versions:
1✔
469
                    raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
470
            else:
471
                # matches qualifier pattern but invalid alias or version
472
                raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
473
        resolved_qualifier = qualifier or "$LATEST"
1✔
474
        return resolved_qualifier, fn_arn
1✔
475

476
    @staticmethod
1✔
477
    def _function_revision_id(resolved_fn: Function, resolved_qualifier: str) -> str:
1✔
478
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
479
            return resolved_fn.aliases[resolved_qualifier].revision_id
1✔
480
        # Assumes that a non-alias is a version
481
        else:
482
            return resolved_fn.versions[resolved_qualifier].config.revision_id
1✔
483

484
    def _resolve_vpc_id(self, account_id: str, region_name: str, subnet_id: str) -> str:
1✔
485
        ec2_client = connect_to(aws_access_key_id=account_id, region_name=region_name).ec2
1✔
486
        try:
1✔
487
            return ec2_client.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["VpcId"]
1✔
488
        except ec2_client.exceptions.ClientError as e:
1✔
489
            code = e.response["Error"]["Code"]
1✔
490
            message = e.response["Error"]["Message"]
1✔
491
            raise InvalidParameterValueException(
1✔
492
                f"Error occurred while DescribeSubnets. EC2 Error Code: {code}. EC2 Error Message: {message}",
493
                Type="User",
494
            )
495

496
    def _build_vpc_config(
1✔
497
        self,
498
        account_id: str,
499
        region_name: str,
500
        vpc_config: Optional[dict] = None,
501
    ) -> VpcConfig | None:
502
        if not vpc_config or not is_api_enabled("ec2"):
1✔
503
            return None
1✔
504

505
        subnet_ids = vpc_config.get("SubnetIds", [])
1✔
506
        if subnet_ids is not None and len(subnet_ids) == 0:
1✔
507
            return VpcConfig(vpc_id="", security_group_ids=[], subnet_ids=[])
1✔
508

509
        subnet_id = subnet_ids[0]
1✔
510
        if not bool(SUBNET_ID_REGEX.match(subnet_id)):
1✔
511
            raise ValidationException(
1✔
512
                f"1 validation error detected: Value '[{subnet_id}]' at 'vpcConfig.subnetIds' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 0, Member must satisfy regular expression pattern: ^subnet-[0-9a-z]*$]"
513
            )
514

515
        return VpcConfig(
1✔
516
            vpc_id=self._resolve_vpc_id(account_id, region_name, subnet_id),
517
            security_group_ids=vpc_config.get("SecurityGroupIds", []),
518
            subnet_ids=subnet_ids,
519
        )
520

521
    def _create_version_model(
1✔
522
        self,
523
        function_name: str,
524
        region: str,
525
        account_id: str,
526
        description: str | None = None,
527
        revision_id: str | None = None,
528
        code_sha256: str | None = None,
529
    ) -> tuple[FunctionVersion, bool]:
530
        """
531
        Release a new version to the model if all restrictions are met.
532
        Restrictions:
533
          - CodeSha256, if provided, must equal the current latest version code hash
534
          - RevisionId, if provided, must equal the current latest version revision id
535
          - Some changes have been done to the latest version since last publish
536
        Will return a tuple of the version, and whether the version was published (True) or the latest available version was taken (False).
537
        This can happen if the latest version has not been changed since the last version publish, in this case the last version will be returned.
538

539
        :param function_name: Function name to be published
540
        :param region: Region of the function
541
        :param account_id: Account of the function
542
        :param description: new description of the version (will be the description of the function if missing)
543
        :param revision_id: Revision id, function will raise error if it does not match latest revision id
544
        :param code_sha256: Code sha256, function will raise error if it does not match latest code hash
545
        :return: Tuple of (published version, whether version was released or last released version returned, since nothing changed)
546
        """
547
        current_latest_version = get_function_version(
1✔
548
            function_name=function_name, qualifier="$LATEST", account_id=account_id, region=region
549
        )
550
        if revision_id and current_latest_version.config.revision_id != revision_id:
1✔
551
            raise PreconditionFailedException(
1✔
552
                "The Revision Id provided does not match the latest Revision Id. Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
553
                Type="User",
554
            )
555

556
        # check if code hashes match if they are specified
557
        current_hash = (
1✔
558
            current_latest_version.config.code.code_sha256
559
            if current_latest_version.config.package_type == PackageType.Zip
560
            else current_latest_version.config.image.code_sha256
561
        )
562
        # if the code is a zip package and hot reloaded (hot reloading is currently only supported for zip packagetypes)
563
        # we cannot enforce the codesha256 check
564
        is_hot_reloaded_zip_package = (
1✔
565
            current_latest_version.config.package_type == PackageType.Zip
566
            and current_latest_version.config.code.is_hot_reloading()
567
        )
568
        if code_sha256 and current_hash != code_sha256 and not is_hot_reloaded_zip_package:
1✔
569
            raise InvalidParameterValueException(
1✔
570
                f"CodeSHA256 ({code_sha256}) is different from current CodeSHA256 in $LATEST ({current_hash}). Please try again with the CodeSHA256 in $LATEST.",
571
                Type="User",
572
            )
573

574
        state = lambda_stores[account_id][region]
1✔
575
        function = state.functions.get(function_name)
1✔
576
        changes = {}
1✔
577
        if description is not None:
1✔
578
            changes["description"] = description
1✔
579
        # TODO copy environment instead of restarting one, get rid of all the "Pending"s
580

581
        with function.lock:
1✔
582
            if function.next_version > 1 and (
1✔
583
                prev_version := function.versions.get(str(function.next_version - 1))
584
            ):
585
                if (
1✔
586
                    prev_version.config.internal_revision
587
                    == current_latest_version.config.internal_revision
588
                ):
589
                    return prev_version, False
1✔
590
            # TODO check if there was a change since last version
591
            next_version = str(function.next_version)
1✔
592
            function.next_version += 1
1✔
593
            new_id = VersionIdentifier(
1✔
594
                function_name=function_name,
595
                qualifier=next_version,
596
                region=region,
597
                account=account_id,
598
            )
599
            apply_on = current_latest_version.config.snap_start["ApplyOn"]
1✔
600
            optimization_status = SnapStartOptimizationStatus.Off
1✔
601
            if apply_on == SnapStartApplyOn.PublishedVersions:
1✔
602
                optimization_status = SnapStartOptimizationStatus.On
1✔
603
            snap_start = SnapStartResponse(
1✔
604
                ApplyOn=apply_on,
605
                OptimizationStatus=optimization_status,
606
            )
607
            new_version = dataclasses.replace(
1✔
608
                current_latest_version,
609
                config=dataclasses.replace(
610
                    current_latest_version.config,
611
                    last_update=None,  # versions never have a last update status
612
                    state=VersionState(
613
                        state=State.Pending,
614
                        code=StateReasonCode.Creating,
615
                        reason="The function is being created.",
616
                    ),
617
                    snap_start=snap_start,
618
                    **changes,
619
                ),
620
                id=new_id,
621
            )
622
            function.versions[next_version] = new_version
1✔
623
        return new_version, True
1✔
624

625
    def _publish_version_from_existing_version(
1✔
626
        self,
627
        function_name: str,
628
        region: str,
629
        account_id: str,
630
        description: str | None = None,
631
        revision_id: str | None = None,
632
        code_sha256: str | None = None,
633
    ) -> FunctionVersion:
634
        """
635
        Publish version from an existing, already initialized LATEST
636

637
        :param function_name: Function name
638
        :param region: region
639
        :param account_id: account id
640
        :param description: description
641
        :param revision_id: revision id (check if current version matches)
642
        :param code_sha256: code sha (check if current code matches)
643
        :return: new version
644
        """
645
        new_version, changed = self._create_version_model(
1✔
646
            function_name=function_name,
647
            region=region,
648
            account_id=account_id,
649
            description=description,
650
            revision_id=revision_id,
651
            code_sha256=code_sha256,
652
        )
653
        if not changed:
1✔
654
            return new_version
1✔
655
        self.lambda_service.publish_version(new_version)
1✔
656
        state = lambda_stores[account_id][region]
1✔
657
        function = state.functions.get(function_name)
1✔
658
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
659
        latest_version = function.versions["$LATEST"]
1✔
660
        function.versions["$LATEST"] = dataclasses.replace(
1✔
661
            latest_version, config=dataclasses.replace(latest_version.config)
662
        )
663
        return function.versions.get(new_version.id.qualifier)
1✔
664

665
    def _publish_version_with_changes(
1✔
666
        self,
667
        function_name: str,
668
        region: str,
669
        account_id: str,
670
        description: str | None = None,
671
        revision_id: str | None = None,
672
        code_sha256: str | None = None,
673
    ) -> FunctionVersion:
674
        """
675
        Publish version together with a new latest version (publish on create / update)
676

677
        :param function_name: Function name
678
        :param region: region
679
        :param account_id: account id
680
        :param description: description
681
        :param revision_id: revision id (check if current version matches)
682
        :param code_sha256: code sha (check if current code matches)
683
        :return: new version
684
        """
685
        new_version, changed = self._create_version_model(
1✔
686
            function_name=function_name,
687
            region=region,
688
            account_id=account_id,
689
            description=description,
690
            revision_id=revision_id,
691
            code_sha256=code_sha256,
692
        )
693
        if not changed:
1✔
UNCOV
694
            return new_version
×
695
        self.lambda_service.create_function_version(new_version)
1✔
696
        return new_version
1✔
697

698
    @staticmethod
1✔
699
    def _verify_env_variables(env_vars: dict[str, str]):
1✔
700
        dumped_env_vars = json.dumps(env_vars, separators=(",", ":"))
1✔
701
        if (
1✔
702
            len(dumped_env_vars.encode("utf-8"))
703
            > config.LAMBDA_LIMITS_MAX_FUNCTION_ENVVAR_SIZE_BYTES
704
        ):
705
            raise InvalidParameterValueException(
1✔
706
                f"Lambda was unable to configure your environment variables because the environment variables you have provided exceeded the 4KB limit. String measured: {dumped_env_vars}",
707
                Type="User",
708
            )
709

710
    @staticmethod
1✔
711
    def _validate_snapstart(snap_start: SnapStart, runtime: Runtime):
1✔
712
        apply_on = snap_start.get("ApplyOn")
1✔
713
        if apply_on not in [
1✔
714
            SnapStartApplyOn.PublishedVersions,
715
            SnapStartApplyOn.None_,
716
        ]:
717
            raise ValidationException(
1✔
718
                f"1 validation error detected: Value '{apply_on}' at 'snapStart.applyOn' failed to satisfy constraint: Member must satisfy enum value set: [PublishedVersions, None]"
719
            )
720

721
    def _validate_layers(self, new_layers: list[str], region: str, account_id: str):
1✔
722
        if len(new_layers) > LAMBDA_LAYERS_LIMIT_PER_FUNCTION:
1✔
723
            raise InvalidParameterValueException(
1✔
724
                "Cannot reference more than 5 layers.", Type="User"
725
            )
726

727
        visited_layers = dict()
1✔
728
        for layer_version_arn in new_layers:
1✔
729
            (
1✔
730
                layer_region,
731
                layer_account_id,
732
                layer_name,
733
                layer_version_str,
734
            ) = api_utils.parse_layer_arn(layer_version_arn)
735
            if layer_version_str is None:
1✔
736
                raise ValidationException(
1✔
737
                    f"1 validation error detected: Value '[{layer_version_arn}]'"
738
                    + r" at 'layers' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 140, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: (arn:[a-zA-Z0-9-]+:lambda:[a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\d{1}:\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+), Member must not be null]",
739
                )
740

741
            state = lambda_stores[layer_account_id][layer_region]
1✔
742
            layer = state.layers.get(layer_name)
1✔
743
            layer_version = None
1✔
744
            if layer is not None:
1✔
745
                layer_version = layer.layer_versions.get(layer_version_str)
1✔
746
            if layer_account_id == account_id:
1✔
747
                if region and layer_region != region:
1✔
748
                    raise InvalidParameterValueException(
1✔
749
                        f"Layers are not in the same region as the function. "
750
                        f"Layers are expected to be in region {region}.",
751
                        Type="User",
752
                    )
753
                if layer is None or layer.layer_versions.get(layer_version_str) is None:
1✔
754
                    raise InvalidParameterValueException(
1✔
755
                        f"Layer version {layer_version_arn} does not exist.", Type="User"
756
                    )
757
            else:  # External layer from other account
758
                # TODO: validate IAM layer policy here, allowing access by default for now and only checking region
UNCOV
759
                if region and layer_region != region:
×
760
                    # TODO: detect user or role from context when IAM users are implemented
UNCOV
761
                    user = "user/localstack-testing"
×
UNCOV
762
                    raise AccessDeniedException(
×
763
                        f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
764
                    )
UNCOV
765
                if layer is None or layer_version is None:
×
766
                    # Limitation: cannot fetch external layers when using the same account id as the target layer
767
                    # because we do not want to trigger the layer fetcher for every non-existing layer.
768
                    if self.layer_fetcher is None:
×
769
                        raise NotImplementedError(
770
                            "Fetching shared layers from AWS is a pro feature."
771
                        )
772

UNCOV
773
                    layer = self.layer_fetcher.fetch_layer(layer_version_arn)
×
UNCOV
774
                    if layer is None:
×
775
                        # TODO: detect user or role from context when IAM users are implemented
UNCOV
776
                        user = "user/localstack-testing"
×
777
                        raise AccessDeniedException(
×
778
                            f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
779
                        )
780

781
                    # Distinguish between new layer and new layer version
782
                    if layer_version is None:
×
783
                        # Create whole layer from scratch
UNCOV
784
                        state.layers[layer_name] = layer
×
785
                    else:
786
                        # Create layer version if another version of the same layer already exists
UNCOV
787
                        state.layers[layer_name].layer_versions[layer_version_str] = (
×
788
                            layer.layer_versions.get(layer_version_str)
789
                        )
790

791
            # only the first two matches in the array are considered for the error message
792
            layer_arn = ":".join(layer_version_arn.split(":")[:-1])
1✔
793
            if layer_arn in visited_layers:
1✔
794
                conflict_layer_version_arn = visited_layers[layer_arn]
1✔
795
                raise InvalidParameterValueException(
1✔
796
                    f"Two different versions of the same layer are not allowed to be referenced in the same function. {conflict_layer_version_arn} and {layer_version_arn} are versions of the same layer.",
797
                    Type="User",
798
                )
799
            visited_layers[layer_arn] = layer_version_arn
1✔
800

801
    @staticmethod
1✔
802
    def map_layers(new_layers: list[str]) -> list[LayerVersion]:
1✔
803
        layers = []
1✔
804
        for layer_version_arn in new_layers:
1✔
805
            region_name, account_id, layer_name, layer_version = api_utils.parse_layer_arn(
1✔
806
                layer_version_arn
807
            )
808
            layer = lambda_stores[account_id][region_name].layers.get(layer_name)
1✔
809
            layer_version = layer.layer_versions.get(layer_version)
1✔
810
            layers.append(layer_version)
1✔
811
        return layers
1✔
812

813
    def get_function_recursion_config(
1✔
814
        self,
815
        context: RequestContext,
816
        function_name: UnqualifiedFunctionName,
817
        **kwargs,
818
    ) -> GetFunctionRecursionConfigResponse:
819
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
820
        function_name = api_utils.get_function_name(function_name, context)
1✔
821
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
822
        return GetFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
823

824
    def put_function_recursion_config(
1✔
825
        self,
826
        context: RequestContext,
827
        function_name: UnqualifiedFunctionName,
828
        recursive_loop: RecursiveLoop,
829
        **kwargs,
830
    ) -> PutFunctionRecursionConfigResponse:
831
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
832
        function_name = api_utils.get_function_name(function_name, context)
1✔
833

834
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
835

836
        allowed_values = list(RecursiveLoop.__members__.values())
1✔
837
        if recursive_loop not in allowed_values:
1✔
838
            raise ValidationException(
1✔
839
                f"1 validation error detected: Value '{recursive_loop}' at 'recursiveLoop' failed to satisfy constraint: "
840
                f"Member must satisfy enum value set: [Terminate, Allow]"
841
            )
842

843
        fn.recursive_loop = recursive_loop
1✔
844
        return PutFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
845

846
    @handler(operation="CreateFunction", expand=False)
1✔
847
    def create_function(
1✔
848
        self,
849
        context: RequestContext,
850
        request: CreateFunctionRequest,
851
    ) -> FunctionConfiguration:
852
        context_region = context.region
1✔
853
        context_account_id = context.account_id
1✔
854

855
        zip_file = request.get("Code", {}).get("ZipFile")
1✔
856
        if zip_file and len(zip_file) > config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED:
1✔
857
            raise RequestEntityTooLargeException(
1✔
858
                f"Zipped size must be smaller than {config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED} bytes"
859
            )
860

861
        if context.request.content_length > config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE:
1✔
862
            raise RequestEntityTooLargeException(
1✔
863
                f"Request must be smaller than {config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE} bytes for the CreateFunction operation"
864
            )
865

866
        if architectures := request.get("Architectures"):
1✔
867
            if len(architectures) != 1:
1✔
868
                raise ValidationException(
1✔
869
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
870
                    f"satisfy constraint: Member must have length less than or equal to 1",
871
                )
872
            if architectures[0] not in ARCHITECTURES:
1✔
873
                raise ValidationException(
1✔
874
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
875
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
876
                    f"[x86_64, arm64], Member must not be null]",
877
                )
878

879
        if env_vars := request.get("Environment", {}).get("Variables"):
1✔
880
            self._verify_env_variables(env_vars)
1✔
881

882
        if layers := request.get("Layers", []):
1✔
883
            self._validate_layers(layers, region=context_region, account_id=context_account_id)
1✔
884

885
        if not api_utils.is_role_arn(request.get("Role")):
1✔
886
            raise ValidationException(
1✔
887
                f"1 validation error detected: Value '{request.get('Role')}'"
888
                + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
889
            )
890
        if not self.lambda_service.can_assume_role(request.get("Role"), context.region):
1✔
UNCOV
891
            raise InvalidParameterValueException(
×
892
                "The role defined for the function cannot be assumed by Lambda.", Type="User"
893
            )
894
        package_type = request.get("PackageType", PackageType.Zip)
1✔
895
        runtime = request.get("Runtime")
1✔
896
        self._validate_runtime(package_type, runtime)
1✔
897

898
        request_function_name = request.get("FunctionName")
1✔
899

900
        function_name, *_ = api_utils.get_name_and_qualifier(
1✔
901
            function_arn_or_name=request_function_name,
902
            qualifier=None,
903
            context=context,
904
        )
905

906
        if runtime in DEPRECATED_RUNTIMES:
1✔
907
            LOG.warning(
1✔
908
                "The Lambda runtime %s} is deprecated. "
909
                "Please upgrade the runtime for the function %s: "
910
                "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
911
                runtime,
912
                function_name,
913
            )
914
        if snap_start := request.get("SnapStart"):
1✔
915
            self._validate_snapstart(snap_start, runtime)
1✔
916
        state = lambda_stores[context_account_id][context_region]
1✔
917

918
        with self.create_fn_lock:
1✔
919
            if function_name in state.functions:
1✔
UNCOV
920
                raise ResourceConflictException(f"Function already exist: {function_name}")
×
921
            fn = Function(function_name=function_name)
1✔
922
            arn = VersionIdentifier(
1✔
923
                function_name=function_name,
924
                qualifier="$LATEST",
925
                region=context_region,
926
                account=context_account_id,
927
            )
928
            # save function code to s3
929
            code = None
1✔
930
            image = None
1✔
931
            image_config = None
1✔
932
            runtime_version_config = RuntimeVersionConfig(
1✔
933
                # Limitation: the runtime id (presumably sha256 of image) is currently hardcoded
934
                # Potential implementation: provide (cached) sha256 hash of used Docker image
935
                RuntimeVersionArn=f"arn:{context.partition}:lambda:{context_region}::runtime:8eeff65f6809a3ce81507fe733fe09b835899b99481ba22fd75b5a7338290ec1"
936
            )
937
            request_code = request.get("Code")
1✔
938
            if package_type == PackageType.Zip:
1✔
939
                # TODO verify if correct combination of code is set
940
                if zip_file := request_code.get("ZipFile"):
1✔
941
                    code = store_lambda_archive(
1✔
942
                        archive_file=zip_file,
943
                        function_name=function_name,
944
                        region_name=context_region,
945
                        account_id=context_account_id,
946
                    )
947
                elif s3_bucket := request_code.get("S3Bucket"):
1✔
948
                    s3_key = request_code["S3Key"]
1✔
949
                    s3_object_version = request_code.get("S3ObjectVersion")
1✔
950
                    code = store_s3_bucket_archive(
1✔
951
                        archive_bucket=s3_bucket,
952
                        archive_key=s3_key,
953
                        archive_version=s3_object_version,
954
                        function_name=function_name,
955
                        region_name=context_region,
956
                        account_id=context_account_id,
957
                    )
958
                else:
UNCOV
959
                    raise LambdaServiceException("Gotta have s3 bucket or zip file")
×
960
            elif package_type == PackageType.Image:
1✔
961
                image = request_code.get("ImageUri")
1✔
962
                if not image:
1✔
UNCOV
963
                    raise LambdaServiceException("Gotta have an image when package type is image")
×
964
                image = create_image_code(image_uri=image)
1✔
965

966
                image_config_req = request.get("ImageConfig", {})
1✔
967
                image_config = ImageConfig(
1✔
968
                    command=image_config_req.get("Command"),
969
                    entrypoint=image_config_req.get("EntryPoint"),
970
                    working_directory=image_config_req.get("WorkingDirectory"),
971
                )
972
                # Runtime management controls are not available when providing a custom image
973
                runtime_version_config = None
1✔
974
            if "LoggingConfig" in request:
1✔
975
                logging_config = request["LoggingConfig"]
1✔
976
                LOG.warning(
1✔
977
                    "Advanced Lambda Logging Configuration is currently mocked "
978
                    "and will not impact the logging behavior. "
979
                    "Please create a feature request if needed."
980
                )
981

982
                # when switching to JSON, app and system level log is auto set to INFO
983
                if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
984
                    logging_config = {
1✔
985
                        "ApplicationLogLevel": "INFO",
986
                        "SystemLogLevel": "INFO",
987
                        "LogGroup": f"/aws/lambda/{function_name}",
988
                    } | logging_config
989
                else:
UNCOV
990
                    logging_config = (
×
991
                        LoggingConfig(
992
                            LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
993
                        )
994
                        | logging_config
995
                    )
996

997
            else:
998
                logging_config = LoggingConfig(
1✔
999
                    LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
1000
                )
1001

1002
            version = FunctionVersion(
1✔
1003
                id=arn,
1004
                config=VersionFunctionConfiguration(
1005
                    last_modified=api_utils.format_lambda_date(datetime.datetime.now()),
1006
                    description=request.get("Description", ""),
1007
                    role=request["Role"],
1008
                    timeout=request.get("Timeout", LAMBDA_DEFAULT_TIMEOUT),
1009
                    runtime=request.get("Runtime"),
1010
                    memory_size=request.get("MemorySize", LAMBDA_DEFAULT_MEMORY_SIZE),
1011
                    handler=request.get("Handler"),
1012
                    package_type=package_type,
1013
                    environment=env_vars,
1014
                    architectures=request.get("Architectures") or [Architecture.x86_64],
1015
                    tracing_config_mode=request.get("TracingConfig", {}).get(
1016
                        "Mode", TracingMode.PassThrough
1017
                    ),
1018
                    image=image,
1019
                    image_config=image_config,
1020
                    code=code,
1021
                    layers=self.map_layers(layers),
1022
                    internal_revision=short_uid(),
1023
                    ephemeral_storage=LambdaEphemeralStorage(
1024
                        size=request.get("EphemeralStorage", {}).get("Size", 512)
1025
                    ),
1026
                    snap_start=SnapStartResponse(
1027
                        ApplyOn=request.get("SnapStart", {}).get("ApplyOn", SnapStartApplyOn.None_),
1028
                        OptimizationStatus=SnapStartOptimizationStatus.Off,
1029
                    ),
1030
                    runtime_version_config=runtime_version_config,
1031
                    dead_letter_arn=request.get("DeadLetterConfig", {}).get("TargetArn"),
1032
                    vpc_config=self._build_vpc_config(
1033
                        context_account_id, context_region, request.get("VpcConfig")
1034
                    ),
1035
                    state=VersionState(
1036
                        state=State.Pending,
1037
                        code=StateReasonCode.Creating,
1038
                        reason="The function is being created.",
1039
                    ),
1040
                    logging_config=logging_config,
1041
                ),
1042
            )
1043
            fn.versions["$LATEST"] = version
1✔
1044
            state.functions[function_name] = fn
1✔
1045
        function_counter.labels(
1✔
1046
            operation=FunctionOperation.create,
1047
            runtime=runtime or "n/a",
1048
            status=FunctionStatus.success,
1049
            invocation_type="n/a",
1050
            package_type=package_type,
1051
        )
1052
        self.lambda_service.create_function_version(version)
1✔
1053

1054
        if tags := request.get("Tags"):
1✔
1055
            # This will check whether the function exists.
1056
            self._store_tags(arn.unqualified_arn(), tags)
1✔
1057

1058
        if request.get("Publish"):
1✔
1059
            version = self._publish_version_with_changes(
1✔
1060
                function_name=function_name, region=context_region, account_id=context_account_id
1061
            )
1062

1063
        if config.LAMBDA_SYNCHRONOUS_CREATE:
1✔
1064
            # block via retrying until "terminal" condition reached before returning
UNCOV
1065
            if not poll_condition(
×
1066
                lambda: get_function_version(
1067
                    function_name, version.id.qualifier, version.id.account, version.id.region
1068
                ).config.state.state
1069
                in [State.Active, State.Failed],
1070
                timeout=10,
1071
            ):
UNCOV
1072
                LOG.warning(
×
1073
                    "LAMBDA_SYNCHRONOUS_CREATE is active, but waiting for %s reached timeout.",
1074
                    function_name,
1075
                )
1076

1077
        return api_utils.map_config_out(
1✔
1078
            version, return_qualified_arn=False, return_update_status=False
1079
        )
1080

1081
    def _validate_runtime(self, package_type, runtime):
1✔
1082
        runtimes = ALL_RUNTIMES
1✔
1083
        if config.LAMBDA_RUNTIME_VALIDATION:
1✔
1084
            runtimes = list(itertools.chain(RUNTIMES_AGGREGATED.values()))
1✔
1085

1086
        if package_type == PackageType.Zip and runtime not in runtimes:
1✔
1087
            # deprecated runtimes have different error
1088
            if runtime in DEPRECATED_RUNTIMES:
1✔
1089
                HINT_LOG.info(
1✔
1090
                    "Set env variable LAMBDA_RUNTIME_VALIDATION to 0"
1091
                    " in order to allow usage of deprecated runtimes"
1092
                )
1093
                self._check_for_recomended_migration_target(runtime)
1✔
1094

1095
            raise InvalidParameterValueException(
1✔
1096
                f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1097
                Type="User",
1098
            )
1099

1100
    def _check_for_recomended_migration_target(self, deprecated_runtime):
1✔
1101
        # AWS offers recommended runtime for migration for "newly" deprecated runtimes
1102
        # in order to preserve parity with error messages we need the code bellow
1103
        latest_runtime = DEPRECATED_RUNTIMES_UPGRADES.get(deprecated_runtime)
1✔
1104

1105
        if latest_runtime is not None:
1✔
1106
            LOG.debug(
1✔
1107
                "The Lambda runtime %s is deprecated. Please upgrade to a supported Lambda runtime such as %s.",
1108
                deprecated_runtime,
1109
                latest_runtime,
1110
            )
1111
            raise InvalidParameterValueException(
1✔
1112
                f"The runtime parameter of {deprecated_runtime} is no longer supported for creating or updating AWS Lambda functions. We recommend you use a supported runtime while creating or updating functions.",
1113
                Type="User",
1114
            )
1115

1116
    @handler(operation="UpdateFunctionConfiguration", expand=False)
1✔
1117
    def update_function_configuration(
1✔
1118
        self, context: RequestContext, request: UpdateFunctionConfigurationRequest
1119
    ) -> FunctionConfiguration:
1120
        """updates the $LATEST version of the function"""
1121
        function_name = request.get("FunctionName")
1✔
1122

1123
        # in case we got ARN or partial ARN
1124
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1125
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1126
        state = lambda_stores[account_id][region]
1✔
1127

1128
        if function_name not in state.functions:
1✔
UNCOV
1129
            raise ResourceNotFoundException(
×
1130
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1131
                Type="User",
1132
            )
1133
        function = state.functions[function_name]
1✔
1134

1135
        # TODO: lock modification of latest version
1136
        # TODO: notify service for changes relevant to re-provisioning of $LATEST
1137
        latest_version = function.latest()
1✔
1138
        latest_version_config = latest_version.config
1✔
1139

1140
        revision_id = request.get("RevisionId")
1✔
1141
        if revision_id and revision_id != latest_version.config.revision_id:
1✔
1142
            raise PreconditionFailedException(
1✔
1143
                "The Revision Id provided does not match the latest Revision Id. "
1144
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1145
                Type="User",
1146
            )
1147

1148
        replace_kwargs = {}
1✔
1149
        if "EphemeralStorage" in request:
1✔
UNCOV
1150
            replace_kwargs["ephemeral_storage"] = LambdaEphemeralStorage(
×
1151
                request.get("EphemeralStorage", {}).get("Size", 512)
1152
            )  # TODO: do defaults here apply as well?
1153

1154
        if "Role" in request:
1✔
1155
            if not api_utils.is_role_arn(request["Role"]):
1✔
1156
                raise ValidationException(
1✔
1157
                    f"1 validation error detected: Value '{request.get('Role')}'"
1158
                    + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
1159
                )
1160
            replace_kwargs["role"] = request["Role"]
1✔
1161

1162
        if "Description" in request:
1✔
1163
            replace_kwargs["description"] = request["Description"]
1✔
1164

1165
        if "Timeout" in request:
1✔
1166
            replace_kwargs["timeout"] = request["Timeout"]
1✔
1167

1168
        if "MemorySize" in request:
1✔
1169
            replace_kwargs["memory_size"] = request["MemorySize"]
1✔
1170

1171
        if "DeadLetterConfig" in request:
1✔
1172
            replace_kwargs["dead_letter_arn"] = request.get("DeadLetterConfig", {}).get("TargetArn")
1✔
1173

1174
        if vpc_config := request.get("VpcConfig"):
1✔
1175
            replace_kwargs["vpc_config"] = self._build_vpc_config(account_id, region, vpc_config)
1✔
1176

1177
        if "Handler" in request:
1✔
1178
            replace_kwargs["handler"] = request["Handler"]
1✔
1179

1180
        if "Runtime" in request:
1✔
1181
            runtime = request["Runtime"]
1✔
1182

1183
            if runtime not in ALL_RUNTIMES:
1✔
1184
                raise InvalidParameterValueException(
1✔
1185
                    f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1186
                    Type="User",
1187
                )
1188
            if runtime in DEPRECATED_RUNTIMES:
1✔
UNCOV
1189
                LOG.warning(
×
1190
                    "The Lambda runtime %s is deprecated. "
1191
                    "Please upgrade the runtime for the function %s: "
1192
                    "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
1193
                    runtime,
1194
                    function_name,
1195
                )
1196
            replace_kwargs["runtime"] = request["Runtime"]
1✔
1197

1198
        if snap_start := request.get("SnapStart"):
1✔
1199
            runtime = replace_kwargs.get("runtime") or latest_version_config.runtime
1✔
1200
            self._validate_snapstart(snap_start, runtime)
1✔
1201
            replace_kwargs["snap_start"] = SnapStartResponse(
1✔
1202
                ApplyOn=snap_start.get("ApplyOn", SnapStartApplyOn.None_),
1203
                OptimizationStatus=SnapStartOptimizationStatus.Off,
1204
            )
1205

1206
        if "Environment" in request:
1✔
1207
            if env_vars := request.get("Environment", {}).get("Variables", {}):
1✔
1208
                self._verify_env_variables(env_vars)
1✔
1209
            replace_kwargs["environment"] = env_vars
1✔
1210

1211
        if "Layers" in request:
1✔
1212
            new_layers = request["Layers"]
1✔
1213
            if new_layers:
1✔
1214
                self._validate_layers(new_layers, region=region, account_id=account_id)
1✔
1215
            replace_kwargs["layers"] = self.map_layers(new_layers)
1✔
1216

1217
        if "ImageConfig" in request:
1✔
1218
            new_image_config = request["ImageConfig"]
1✔
1219
            replace_kwargs["image_config"] = ImageConfig(
1✔
1220
                command=new_image_config.get("Command"),
1221
                entrypoint=new_image_config.get("EntryPoint"),
1222
                working_directory=new_image_config.get("WorkingDirectory"),
1223
            )
1224

1225
        if "LoggingConfig" in request:
1✔
1226
            logging_config = request["LoggingConfig"]
1✔
1227
            LOG.warning(
1✔
1228
                "Advanced Lambda Logging Configuration is currently mocked "
1229
                "and will not impact the logging behavior. "
1230
                "Please create a feature request if needed."
1231
            )
1232

1233
            # when switching to JSON, app and system level log is auto set to INFO
1234
            if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
1235
                logging_config = {
1✔
1236
                    "ApplicationLogLevel": "INFO",
1237
                    "SystemLogLevel": "INFO",
1238
                } | logging_config
1239

1240
            last_config = latest_version_config.logging_config
1✔
1241

1242
            # add partial update
1243
            new_logging_config = last_config | logging_config
1✔
1244

1245
            # in case we switched from JSON to Text we need to remove LogLevel keys
1246
            if (
1✔
1247
                new_logging_config.get("LogFormat") == LogFormat.Text
1248
                and last_config.get("LogFormat") == LogFormat.JSON
1249
            ):
1250
                new_logging_config.pop("ApplicationLogLevel", None)
1✔
1251
                new_logging_config.pop("SystemLogLevel", None)
1✔
1252

1253
            replace_kwargs["logging_config"] = new_logging_config
1✔
1254

1255
        if "TracingConfig" in request:
1✔
UNCOV
1256
            new_mode = request.get("TracingConfig", {}).get("Mode")
×
UNCOV
1257
            if new_mode:
×
UNCOV
1258
                replace_kwargs["tracing_config_mode"] = new_mode
×
1259

1260
        new_latest_version = dataclasses.replace(
1✔
1261
            latest_version,
1262
            config=dataclasses.replace(
1263
                latest_version_config,
1264
                last_modified=api_utils.generate_lambda_date(),
1265
                internal_revision=short_uid(),
1266
                last_update=UpdateStatus(
1267
                    status=LastUpdateStatus.InProgress,
1268
                    code="Creating",
1269
                    reason="The function is being created.",
1270
                ),
1271
                **replace_kwargs,
1272
            ),
1273
        )
1274
        function.versions["$LATEST"] = new_latest_version  # TODO: notify
1✔
1275
        self.lambda_service.update_version(new_version=new_latest_version)
1✔
1276

1277
        return api_utils.map_config_out(new_latest_version)
1✔
1278

1279
    @handler(operation="UpdateFunctionCode", expand=False)
1✔
1280
    def update_function_code(
1✔
1281
        self, context: RequestContext, request: UpdateFunctionCodeRequest
1282
    ) -> FunctionConfiguration:
1283
        """updates the $LATEST version of the function"""
1284
        # only supports normal zip packaging atm
1285
        # if request.get("Publish"):
1286
        #     self.lambda_service.create_function_version()
1287

1288
        function_name = request.get("FunctionName")
1✔
1289
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1290
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1291

1292
        store = lambda_stores[account_id][region]
1✔
1293
        if function_name not in store.functions:
1✔
UNCOV
1294
            raise ResourceNotFoundException(
×
1295
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1296
                Type="User",
1297
            )
1298
        function = store.functions[function_name]
1✔
1299

1300
        revision_id = request.get("RevisionId")
1✔
1301
        if revision_id and revision_id != function.latest().config.revision_id:
1✔
1302
            raise PreconditionFailedException(
1✔
1303
                "The Revision Id provided does not match the latest Revision Id. "
1304
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1305
                Type="User",
1306
            )
1307

1308
        # TODO verify if correct combination of code is set
1309
        image = None
1✔
1310
        if (
1✔
1311
            request.get("ZipFile") or request.get("S3Bucket")
1312
        ) and function.latest().config.package_type == PackageType.Image:
1313
            raise InvalidParameterValueException(
1✔
1314
                "Please provide ImageUri when updating a function with packageType Image.",
1315
                Type="User",
1316
            )
1317
        elif request.get("ImageUri") and function.latest().config.package_type == PackageType.Zip:
1✔
1318
            raise InvalidParameterValueException(
1✔
1319
                "Please don't provide ImageUri when updating a function with packageType Zip.",
1320
                Type="User",
1321
            )
1322

1323
        if zip_file := request.get("ZipFile"):
1✔
1324
            code = store_lambda_archive(
1✔
1325
                archive_file=zip_file,
1326
                function_name=function_name,
1327
                region_name=region,
1328
                account_id=account_id,
1329
            )
1330
        elif s3_bucket := request.get("S3Bucket"):
1✔
1331
            s3_key = request["S3Key"]
1✔
1332
            s3_object_version = request.get("S3ObjectVersion")
1✔
1333
            code = store_s3_bucket_archive(
1✔
1334
                archive_bucket=s3_bucket,
1335
                archive_key=s3_key,
1336
                archive_version=s3_object_version,
1337
                function_name=function_name,
1338
                region_name=region,
1339
                account_id=account_id,
1340
            )
1341
        elif image := request.get("ImageUri"):
1✔
1342
            code = None
1✔
1343
            image = create_image_code(image_uri=image)
1✔
1344
        else:
UNCOV
1345
            raise LambdaServiceException("Gotta have s3 bucket or zip file or image")
×
1346

1347
        old_function_version = function.versions.get("$LATEST")
1✔
1348
        replace_kwargs = {"code": code} if code else {"image": image}
1✔
1349

1350
        if architectures := request.get("Architectures"):
1✔
UNCOV
1351
            if len(architectures) != 1:
×
1352
                raise ValidationException(
×
1353
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1354
                    f"satisfy constraint: Member must have length less than or equal to 1",
1355
                )
1356
            # An empty list of architectures is also forbidden. Further exceptions are tested here for create_function:
1357
            # tests.aws.services.lambda_.test_lambda_api.TestLambdaFunction.test_create_lambda_exceptions
UNCOV
1358
            if architectures[0] not in ARCHITECTURES:
×
UNCOV
1359
                raise ValidationException(
×
1360
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1361
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
1362
                    f"[x86_64, arm64], Member must not be null]",
1363
                )
UNCOV
1364
            replace_kwargs["architectures"] = architectures
×
1365

1366
        config = dataclasses.replace(
1✔
1367
            old_function_version.config,
1368
            internal_revision=short_uid(),
1369
            last_modified=api_utils.generate_lambda_date(),
1370
            last_update=UpdateStatus(
1371
                status=LastUpdateStatus.InProgress,
1372
                code="Creating",
1373
                reason="The function is being created.",
1374
            ),
1375
            **replace_kwargs,
1376
        )
1377
        function_version = dataclasses.replace(old_function_version, config=config)
1✔
1378
        function.versions["$LATEST"] = function_version
1✔
1379

1380
        self.lambda_service.update_version(new_version=function_version)
1✔
1381
        if request.get("Publish"):
1✔
1382
            function_version = self._publish_version_with_changes(
1✔
1383
                function_name=function_name, region=region, account_id=account_id
1384
            )
1385
        return api_utils.map_config_out(
1✔
1386
            function_version, return_qualified_arn=bool(request.get("Publish"))
1387
        )
1388

1389
    # TODO: does deleting the latest published version affect the next versions number?
1390
    # TODO: what happens when we call this with a qualifier and a fully qualified ARN? (+ conflicts?)
1391
    # TODO: test different ARN patterns (shorthand ARN?)
1392
    # TODO: test deleting across regions?
1393
    # TODO: test mismatch between context region and region in ARN
1394
    # TODO: test qualifier $LATEST, alias-name and version
1395
    def delete_function(
1✔
1396
        self,
1397
        context: RequestContext,
1398
        function_name: FunctionName,
1399
        qualifier: Qualifier = None,
1400
        **kwargs,
1401
    ) -> None:
1402
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1403
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1404
            function_name, qualifier, context
1405
        )
1406

1407
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
UNCOV
1408
            raise InvalidParameterValueException(
×
1409
                "Deletion of aliases is not currently supported.",
1410
                Type="User",
1411
            )
1412

1413
        store = lambda_stores[account_id][region]
1✔
1414
        if qualifier == "$LATEST":
1✔
1415
            raise InvalidParameterValueException(
1✔
1416
                "$LATEST version cannot be deleted without deleting the function.", Type="User"
1417
            )
1418

1419
        if function_name not in store.functions:
1✔
1420
            e = ResourceNotFoundException(
1✔
1421
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1422
                Type="User",
1423
            )
1424
            raise e
1✔
1425
        function = store.functions.get(function_name)
1✔
1426

1427
        if qualifier:
1✔
1428
            # delete a version of the function
1429
            version = function.versions.pop(qualifier, None)
1✔
1430
            if version:
1✔
1431
                self.lambda_service.stop_version(version.id.qualified_arn())
1✔
1432
                destroy_code_if_not_used(code=version.config.code, function=function)
1✔
1433
        else:
1434
            # delete the whole function
1435
            # TODO: introduce locking for safe deletion: We could create a new version at the API layer before
1436
            #  the old version gets cleaned up in the internal lambda service.
1437
            function = store.functions.pop(function_name)
1✔
1438
            for version in function.versions.values():
1✔
1439
                self.lambda_service.stop_version(qualified_arn=version.id.qualified_arn())
1✔
1440
                # we can safely destroy the code here
1441
                if version.config.code:
1✔
1442
                    version.config.code.destroy()
1✔
1443

1444
    def list_functions(
1✔
1445
        self,
1446
        context: RequestContext,
1447
        master_region: MasterRegion = None,  # (only relevant for lambda@edge)
1448
        function_version: FunctionVersionApi = None,
1449
        marker: String = None,
1450
        max_items: MaxListItems = None,
1451
        **kwargs,
1452
    ) -> ListFunctionsResponse:
1453
        state = lambda_stores[context.account_id][context.region]
1✔
1454

1455
        if function_version and function_version != FunctionVersionApi.ALL:
1✔
1456
            raise ValidationException(
1✔
1457
                f"1 validation error detected: Value '{function_version}'"
1458
                + " at 'functionVersion' failed to satisfy constraint: Member must satisfy enum value set: [ALL]"
1459
            )
1460

1461
        if function_version == FunctionVersionApi.ALL:
1✔
1462
            # include all versions for all function
1463
            versions = [v for f in state.functions.values() for v in f.versions.values()]
1✔
1464
            return_qualified_arn = True
1✔
1465
        else:
1466
            versions = [f.latest() for f in state.functions.values()]
1✔
1467
            return_qualified_arn = False
1✔
1468

1469
        versions = [
1✔
1470
            api_utils.map_to_list_response(
1471
                api_utils.map_config_out(fc, return_qualified_arn=return_qualified_arn)
1472
            )
1473
            for fc in versions
1474
        ]
1475
        versions = PaginatedList(versions)
1✔
1476
        page, token = versions.get_page(
1✔
1477
            lambda version: version["FunctionArn"],
1478
            marker,
1479
            max_items,
1480
        )
1481
        return ListFunctionsResponse(Functions=page, NextMarker=token)
1✔
1482

1483
    def get_function(
1✔
1484
        self,
1485
        context: RequestContext,
1486
        function_name: NamespacedFunctionName,
1487
        qualifier: Qualifier = None,
1488
        **kwargs,
1489
    ) -> GetFunctionResponse:
1490
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1491
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1492
            function_name, qualifier, context
1493
        )
1494

1495
        fn = lambda_stores[account_id][region].functions.get(function_name)
1✔
1496
        if fn is None:
1✔
1497
            if qualifier is None:
1✔
1498
                raise ResourceNotFoundException(
1✔
1499
                    f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
1500
                    Type="User",
1501
                )
1502
            else:
1503
                raise ResourceNotFoundException(
1✔
1504
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
1505
                    Type="User",
1506
                )
1507
        alias_name = None
1✔
1508
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1509
            if qualifier not in fn.aliases:
1✔
1510
                alias_arn = api_utils.qualified_lambda_arn(
1✔
1511
                    function_name, qualifier, account_id, region
1512
                )
1513
                raise ResourceNotFoundException(f"Function not found: {alias_arn}", Type="User")
1✔
1514
            alias_name = qualifier
1✔
1515
            qualifier = fn.aliases[alias_name].function_version
1✔
1516

1517
        version = get_function_version(
1✔
1518
            function_name=function_name,
1519
            qualifier=qualifier,
1520
            account_id=account_id,
1521
            region=region,
1522
        )
1523
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
1524
        additional_fields = {}
1✔
1525
        if tags:
1✔
1526
            additional_fields["Tags"] = tags
1✔
1527
        code_location = None
1✔
1528
        if code := version.config.code:
1✔
1529
            code_location = FunctionCodeLocation(
1✔
1530
                Location=code.generate_presigned_url(), RepositoryType="S3"
1531
            )
1532
        elif image := version.config.image:
1✔
1533
            code_location = FunctionCodeLocation(
1✔
1534
                ImageUri=image.image_uri,
1535
                RepositoryType=image.repository_type,
1536
                ResolvedImageUri=image.resolved_image_uri,
1537
            )
1538

1539
        return GetFunctionResponse(
1✔
1540
            Configuration=api_utils.map_config_out(
1541
                version, return_qualified_arn=bool(qualifier), alias_name=alias_name
1542
            ),
1543
            Code=code_location,  # TODO
1544
            **additional_fields,
1545
            # Concurrency={},  # TODO
1546
        )
1547

1548
    def get_function_configuration(
1✔
1549
        self,
1550
        context: RequestContext,
1551
        function_name: NamespacedFunctionName,
1552
        qualifier: Qualifier = None,
1553
        **kwargs,
1554
    ) -> FunctionConfiguration:
1555
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1556
        # CAVE: THIS RETURN VALUE IS *NOT* THE SAME AS IN get_function (!) but seems to be only configuration part?
1557
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1558
            function_name, qualifier, context
1559
        )
1560
        version = get_function_version(
1✔
1561
            function_name=function_name,
1562
            qualifier=qualifier,
1563
            account_id=account_id,
1564
            region=region,
1565
        )
1566
        return api_utils.map_config_out(version, return_qualified_arn=bool(qualifier))
1✔
1567

1568
    def invoke(
1✔
1569
        self,
1570
        context: RequestContext,
1571
        function_name: NamespacedFunctionName,
1572
        invocation_type: InvocationType = None,
1573
        log_type: LogType = None,
1574
        client_context: String = None,
1575
        payload: IO[Blob] = None,
1576
        qualifier: Qualifier = None,
1577
        **kwargs,
1578
    ) -> InvocationResponse:
1579
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1580
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1581
            function_name, qualifier, context
1582
        )
1583

1584
        time_before = time.perf_counter()
1✔
1585
        try:
1✔
1586
            invocation_result = self.lambda_service.invoke(
1✔
1587
                function_name=function_name,
1588
                qualifier=qualifier,
1589
                region=region,
1590
                account_id=account_id,
1591
                invocation_type=invocation_type,
1592
                client_context=client_context,
1593
                request_id=context.request_id,
1594
                trace_context=context.trace_context,
1595
                payload=payload.read() if payload else None,
1596
            )
1597
        except ServiceException:
1✔
1598
            raise
1✔
1599
        except EnvironmentStartupTimeoutException as e:
1✔
1600
            raise LambdaServiceException("Internal error while executing lambda") from e
1✔
1601
        except Exception as e:
1✔
1602
            LOG.error("Error while invoking lambda", exc_info=e)
1✔
1603
            raise LambdaServiceException("Internal error while executing lambda") from e
1✔
1604

1605
        if invocation_type == InvocationType.Event:
1✔
1606
            # This happens when invocation type is event
1607
            return InvocationResponse(StatusCode=202)
1✔
1608
        if invocation_type == InvocationType.DryRun:
1✔
1609
            # This happens when invocation type is dryrun
1610
            return InvocationResponse(StatusCode=204)
1✔
1611
        LOG.debug("Lambda invocation duration: %0.2fms", (time.perf_counter() - time_before) * 1000)
1✔
1612

1613
        response = InvocationResponse(
1✔
1614
            StatusCode=200,
1615
            Payload=invocation_result.payload,
1616
            ExecutedVersion=invocation_result.executed_version,
1617
        )
1618

1619
        if invocation_result.is_error:
1✔
1620
            response["FunctionError"] = "Unhandled"
1✔
1621

1622
        if log_type == LogType.Tail:
1✔
1623
            response["LogResult"] = to_str(
1✔
1624
                base64.b64encode(to_bytes(invocation_result.logs)[-4096:])
1625
            )
1626

1627
        return response
1✔
1628

1629
    # Version operations
1630
    def publish_version(
1✔
1631
        self,
1632
        context: RequestContext,
1633
        function_name: FunctionName,
1634
        code_sha256: String = None,
1635
        description: Description = None,
1636
        revision_id: String = None,
1637
        **kwargs,
1638
    ) -> FunctionConfiguration:
1639
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1640
        function_name = api_utils.get_function_name(function_name, context)
1✔
1641
        new_version = self._publish_version_from_existing_version(
1✔
1642
            function_name=function_name,
1643
            description=description,
1644
            account_id=account_id,
1645
            region=region,
1646
            revision_id=revision_id,
1647
            code_sha256=code_sha256,
1648
        )
1649
        return api_utils.map_config_out(new_version, return_qualified_arn=True)
1✔
1650

1651
    def list_versions_by_function(
1✔
1652
        self,
1653
        context: RequestContext,
1654
        function_name: NamespacedFunctionName,
1655
        marker: String = None,
1656
        max_items: MaxListItems = None,
1657
        **kwargs,
1658
    ) -> ListVersionsByFunctionResponse:
1659
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1660
        function_name = api_utils.get_function_name(function_name, context)
1✔
1661
        function = self._get_function(
1✔
1662
            function_name=function_name, region=region, account_id=account_id
1663
        )
1664
        versions = [
1✔
1665
            api_utils.map_to_list_response(
1666
                api_utils.map_config_out(version=version, return_qualified_arn=True)
1667
            )
1668
            for version in function.versions.values()
1669
        ]
1670
        items = PaginatedList(versions)
1✔
1671
        page, token = items.get_page(
1✔
1672
            lambda item: item,
1673
            marker,
1674
            max_items,
1675
        )
1676
        return ListVersionsByFunctionResponse(Versions=page, NextMarker=token)
1✔
1677

1678
    # Alias
1679

1680
    def _create_routing_config_model(
1✔
1681
        self, routing_config_dict: dict[str, float], function_version: FunctionVersion
1682
    ):
1683
        if len(routing_config_dict) > 1:
1✔
1684
            raise InvalidParameterValueException(
1✔
1685
                "Number of items in AdditionalVersionWeights cannot be greater than 1",
1686
                Type="User",
1687
            )
1688
        # should be exactly one item here, still iterating, might be supported in the future
1689
        for key, value in routing_config_dict.items():
1✔
1690
            if value < 0.0 or value >= 1.0:
1✔
1691
                raise ValidationException(
1✔
1692
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map value must satisfy constraint: [Member must have value less than or equal to 1.0, Member must have value greater than or equal to 0.0, Member must not be null]"
1693
                )
1694
            if key == function_version.id.qualifier:
1✔
1695
                raise InvalidParameterValueException(
1✔
1696
                    f"Invalid function version {function_version.id.qualifier}. Function version {function_version.id.qualifier} is already included in routing configuration.",
1697
                    Type="User",
1698
                )
1699
            # check if version target is latest, then no routing config is allowed
1700
            if function_version.id.qualifier == "$LATEST":
1✔
1701
                raise InvalidParameterValueException(
1✔
1702
                    "$LATEST is not supported for an alias pointing to more than 1 version"
1703
                )
1704
            if not api_utils.qualifier_is_version(key):
1✔
1705
                raise ValidationException(
1✔
1706
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map keys must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: [0-9]+, Member must not be null]"
1707
                )
1708

1709
            # checking if the version in the config exists
1710
            get_function_version(
1✔
1711
                function_name=function_version.id.function_name,
1712
                qualifier=key,
1713
                region=function_version.id.region,
1714
                account_id=function_version.id.account,
1715
            )
1716
        return AliasRoutingConfig(version_weights=routing_config_dict)
1✔
1717

1718
    def create_alias(
1✔
1719
        self,
1720
        context: RequestContext,
1721
        function_name: FunctionName,
1722
        name: Alias,
1723
        function_version: Version,
1724
        description: Description = None,
1725
        routing_config: AliasRoutingConfiguration = None,
1726
        **kwargs,
1727
    ) -> AliasConfiguration:
1728
        if not api_utils.qualifier_is_alias(name):
1✔
1729
            raise ValidationException(
1✔
1730
                f"1 validation error detected: Value '{name}' at 'name' failed to satisfy constraint: Member must satisfy regular expression pattern: (?!^[0-9]+$)([a-zA-Z0-9-_]+)"
1731
            )
1732

1733
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1734
        function_name = api_utils.get_function_name(function_name, context)
1✔
1735
        target_version = get_function_version(
1✔
1736
            function_name=function_name,
1737
            qualifier=function_version,
1738
            region=region,
1739
            account_id=account_id,
1740
        )
1741
        function = self._get_function(
1✔
1742
            function_name=function_name, region=region, account_id=account_id
1743
        )
1744
        # description is always present, if not specified it's an empty string
1745
        description = description or ""
1✔
1746
        with function.lock:
1✔
1747
            if existing_alias := function.aliases.get(name):
1✔
1748
                raise ResourceConflictException(
1✔
1749
                    f"Alias already exists: {api_utils.map_alias_out(alias=existing_alias, function=function)['AliasArn']}",
1750
                    Type="User",
1751
                )
1752
            # checking if the version exists
1753
            routing_configuration = None
1✔
1754
            if routing_config and (
1✔
1755
                routing_config_dict := routing_config.get("AdditionalVersionWeights")
1756
            ):
1757
                routing_configuration = self._create_routing_config_model(
1✔
1758
                    routing_config_dict, target_version
1759
                )
1760

1761
            alias = VersionAlias(
1✔
1762
                name=name,
1763
                function_version=function_version,
1764
                description=description,
1765
                routing_configuration=routing_configuration,
1766
            )
1767
            function.aliases[name] = alias
1✔
1768
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1769

1770
    def list_aliases(
1✔
1771
        self,
1772
        context: RequestContext,
1773
        function_name: FunctionName,
1774
        function_version: Version = None,
1775
        marker: String = None,
1776
        max_items: MaxListItems = None,
1777
        **kwargs,
1778
    ) -> ListAliasesResponse:
1779
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1780
        function_name = api_utils.get_function_name(function_name, context)
1✔
1781
        function = self._get_function(
1✔
1782
            function_name=function_name, region=region, account_id=account_id
1783
        )
1784
        aliases = [
1✔
1785
            api_utils.map_alias_out(alias, function)
1786
            for alias in function.aliases.values()
1787
            if function_version is None or alias.function_version == function_version
1788
        ]
1789

1790
        aliases = PaginatedList(aliases)
1✔
1791
        page, token = aliases.get_page(
1✔
1792
            lambda alias: alias["AliasArn"],
1793
            marker,
1794
            max_items,
1795
        )
1796

1797
        return ListAliasesResponse(Aliases=page, NextMarker=token)
1✔
1798

1799
    def delete_alias(
1✔
1800
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
1801
    ) -> None:
1802
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1803
        function_name = api_utils.get_function_name(function_name, context)
1✔
1804
        function = self._get_function(
1✔
1805
            function_name=function_name, region=region, account_id=account_id
1806
        )
1807
        version_alias = function.aliases.pop(name, None)
1✔
1808

1809
        # cleanup related resources
1810
        if name in function.provisioned_concurrency_configs:
1✔
1811
            function.provisioned_concurrency_configs.pop(name)
1✔
1812

1813
        # TODO: Allow for deactivating/unregistering specific Lambda URLs
1814
        if version_alias and name in function.function_url_configs:
1✔
1815
            url_config = function.function_url_configs.pop(name)
1✔
1816
            LOG.debug(
1✔
1817
                "Stopping aliased Lambda Function URL %s for %s",
1818
                url_config.url,
1819
                url_config.function_name,
1820
            )
1821

1822
    def get_alias(
1✔
1823
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
1824
    ) -> AliasConfiguration:
1825
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1826
        function_name = api_utils.get_function_name(function_name, context)
1✔
1827
        function = self._get_function(
1✔
1828
            function_name=function_name, region=region, account_id=account_id
1829
        )
1830
        if not (alias := function.aliases.get(name)):
1✔
1831
            raise ResourceNotFoundException(
1✔
1832
                f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name=function_name, qualifier=name, region=region, account=account_id)}",
1833
                Type="User",
1834
            )
1835
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1836

1837
    def update_alias(
1✔
1838
        self,
1839
        context: RequestContext,
1840
        function_name: FunctionName,
1841
        name: Alias,
1842
        function_version: Version = None,
1843
        description: Description = None,
1844
        routing_config: AliasRoutingConfiguration = None,
1845
        revision_id: String = None,
1846
        **kwargs,
1847
    ) -> AliasConfiguration:
1848
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1849
        function_name = api_utils.get_function_name(function_name, context)
1✔
1850
        function = self._get_function(
1✔
1851
            function_name=function_name, region=region, account_id=account_id
1852
        )
1853
        if not (alias := function.aliases.get(name)):
1✔
1854
            fn_arn = api_utils.qualified_lambda_arn(function_name, name, account_id, region)
1✔
1855
            raise ResourceNotFoundException(
1✔
1856
                f"Alias not found: {fn_arn}",
1857
                Type="User",
1858
            )
1859
        if revision_id and alias.revision_id != revision_id:
1✔
1860
            raise PreconditionFailedException(
1✔
1861
                "The Revision Id provided does not match the latest Revision Id. "
1862
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1863
                Type="User",
1864
            )
1865
        changes = {}
1✔
1866
        if function_version is not None:
1✔
1867
            changes |= {"function_version": function_version}
1✔
1868
        if description is not None:
1✔
1869
            changes |= {"description": description}
1✔
1870
        if routing_config is not None:
1✔
1871
            # if it is an empty dict or AdditionalVersionWeights is empty, set routing config to None
1872
            new_routing_config = None
1✔
1873
            if routing_config_dict := routing_config.get("AdditionalVersionWeights"):
1✔
UNCOV
1874
                new_routing_config = self._create_routing_config_model(routing_config_dict)
×
1875
            changes |= {"routing_configuration": new_routing_config}
1✔
1876
        # even if no changes are done, we have to update revision id for some reason
1877
        old_alias = alias
1✔
1878
        alias = dataclasses.replace(alias, **changes)
1✔
1879
        function.aliases[name] = alias
1✔
1880

1881
        # TODO: signal lambda service that pointer potentially changed
1882
        self.lambda_service.update_alias(old_alias=old_alias, new_alias=alias, function=function)
1✔
1883

1884
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1885

1886
    # =======================================
1887
    # ======= EVENT SOURCE MAPPINGS =========
1888
    # =======================================
1889
    def check_service_resource_exists(
1✔
1890
        self, service: str, resource_arn: str, function_arn: str, function_role_arn: str
1891
    ):
1892
        """
1893
        Check if the service resource exists and if the function has access to it.
1894

1895
        Raises:
1896
            InvalidParameterValueException: If the service resource does not exist or the function does not have access to it.
1897
        """
1898
        arn = parse_arn(resource_arn)
1✔
1899
        source_client = get_internal_client(
1✔
1900
            arn=resource_arn,
1901
            role_arn=function_role_arn,
1902
            service_principal=ServicePrincipal.lambda_,
1903
            source_arn=function_arn,
1904
        )
1905
        if service in ["sqs", "sqs-fifo"]:
1✔
1906
            try:
1✔
1907
                # AWS uses `GetQueueAttributes` internally to verify the queue existence, but we need the `QueueUrl`
1908
                # which is not given directly. We build out a dummy `QueueUrl` which can be parsed by SQS to return
1909
                # the right value
1910
                queue_name = arn["resource"].split("/")[-1]
1✔
1911
                queue_url = f"http://sqs.{arn['region']}.domain/{arn['account']}/{queue_name}"
1✔
1912
                source_client.get_queue_attributes(QueueUrl=queue_url)
1✔
1913
            except ClientError as e:
1✔
1914
                error_code = e.response["Error"]["Code"]
1✔
1915
                if error_code == "AWS.SimpleQueueService.NonExistentQueue":
1✔
1916
                    raise InvalidParameterValueException(
1✔
1917
                        f"Error occurred while ReceiveMessage. SQS Error Code: {error_code}. SQS Error Message: {e.response['Error']['Message']}",
1918
                        Type="User",
1919
                    )
UNCOV
1920
                raise e
×
1921
        elif service in ["kinesis"]:
1✔
1922
            try:
1✔
1923
                source_client.describe_stream(StreamARN=resource_arn)
1✔
1924
            except ClientError as e:
1✔
1925
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
1926
                    raise InvalidParameterValueException(
1✔
1927
                        f"Stream not found: {resource_arn}",
1928
                        Type="User",
1929
                    )
UNCOV
1930
                raise e
×
1931
        elif service in ["dynamodb"]:
1✔
1932
            try:
1✔
1933
                source_client.describe_stream(StreamArn=resource_arn)
1✔
1934
            except ClientError as e:
1✔
1935
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
1936
                    raise InvalidParameterValueException(
1✔
1937
                        f"Stream not found: {resource_arn}",
1938
                        Type="User",
1939
                    )
UNCOV
1940
                raise e
×
1941

1942
    @handler("CreateEventSourceMapping", expand=False)
1✔
1943
    def create_event_source_mapping(
1✔
1944
        self,
1945
        context: RequestContext,
1946
        request: CreateEventSourceMappingRequest,
1947
    ) -> EventSourceMappingConfiguration:
1948
        return self.create_event_source_mapping_v2(context, request)
1✔
1949

1950
    def create_event_source_mapping_v2(
1✔
1951
        self,
1952
        context: RequestContext,
1953
        request: CreateEventSourceMappingRequest,
1954
    ) -> EventSourceMappingConfiguration:
1955
        # Validations
1956
        function_arn, function_name, state, function_version, function_role = (
1✔
1957
            self.validate_event_source_mapping(context, request)
1958
        )
1959

1960
        esm_config = EsmConfigFactory(request, context, function_arn).get_esm_config()
1✔
1961

1962
        # Copy esm_config to avoid a race condition with potential async update in the store
1963
        state.event_source_mappings[esm_config["UUID"]] = esm_config.copy()
1✔
1964
        enabled = request.get("Enabled", True)
1✔
1965
        # TODO: check for potential async race condition update -> think about locking
1966
        esm_worker = EsmWorkerFactory(esm_config, function_role, enabled).get_esm_worker()
1✔
1967
        self.esm_workers[esm_worker.uuid] = esm_worker
1✔
1968
        # TODO: check StateTransitionReason, LastModified, LastProcessingResult (concurrent updates requires locking!)
1969
        if tags := request.get("Tags"):
1✔
1970
            self._store_tags(esm_config.get("EventSourceMappingArn"), tags)
1✔
1971
        esm_worker.create()
1✔
1972
        return esm_config
1✔
1973

1974
    def validate_event_source_mapping(self, context, request):
1✔
1975
        # TODO: test whether stream ARNs are valid sources for Pipes or ESM or whether only DynamoDB table ARNs work
1976
        is_create_esm_request = context.operation.name == self.create_event_source_mapping.operation
1✔
1977

1978
        if destination_config := request.get("DestinationConfig"):
1✔
1979
            if "OnSuccess" in destination_config:
1✔
1980
                raise InvalidParameterValueException(
1✔
1981
                    "Unsupported DestinationConfig parameter for given event source mapping type.",
1982
                    Type="User",
1983
                )
1984

1985
        service = None
1✔
1986
        if "SelfManagedEventSource" in request:
1✔
UNCOV
1987
            service = "kafka"
×
UNCOV
1988
            if "SourceAccessConfigurations" not in request:
×
UNCOV
1989
                raise InvalidParameterValueException(
×
1990
                    "Required 'sourceAccessConfigurations' parameter is missing.", Type="User"
1991
                )
1992
        if service is None and "EventSourceArn" not in request:
1✔
1993
            raise InvalidParameterValueException("Unrecognized event source.", Type="User")
1✔
1994
        if service is None:
1✔
1995
            service = extract_service_from_arn(request["EventSourceArn"])
1✔
1996

1997
        batch_size = api_utils.validate_and_set_batch_size(service, request.get("BatchSize"))
1✔
1998
        if service in ["dynamodb", "kinesis"]:
1✔
1999
            starting_position = request.get("StartingPosition")
1✔
2000
            if not starting_position:
1✔
2001
                raise InvalidParameterValueException(
1✔
2002
                    "1 validation error detected: Value null at 'startingPosition' failed to satisfy constraint: Member must not be null.",
2003
                    Type="User",
2004
                )
2005

2006
            if starting_position not in KinesisStreamStartPosition.__members__:
1✔
2007
                raise ValidationException(
1✔
2008
                    f"1 validation error detected: Value '{starting_position}' at 'startingPosition' failed to satisfy constraint: Member must satisfy enum value set: [LATEST, AT_TIMESTAMP, TRIM_HORIZON]"
2009
                )
2010
            # AT_TIMESTAMP is not allowed for DynamoDB Streams
2011
            elif (
1✔
2012
                service == "dynamodb"
2013
                and starting_position not in DynamoDBStreamStartPosition.__members__
2014
            ):
2015
                raise InvalidParameterValueException(
1✔
2016
                    f"Unsupported starting position for arn type: {request['EventSourceArn']}",
2017
                    Type="User",
2018
                )
2019

2020
        if service in ["sqs", "sqs-fifo"]:
1✔
2021
            if batch_size > 10 and request.get("MaximumBatchingWindowInSeconds", 0) == 0:
1✔
2022
                raise InvalidParameterValueException(
1✔
2023
                    "Maximum batch window in seconds must be greater than 0 if maximum batch size is greater than 10",
2024
                    Type="User",
2025
                )
2026

2027
        if (filter_criteria := request.get("FilterCriteria")) is not None:
1✔
2028
            for filter_ in filter_criteria.get("Filters", []):
1✔
2029
                pattern_str = filter_.get("Pattern")
1✔
2030
                if not pattern_str or not isinstance(pattern_str, str):
1✔
UNCOV
2031
                    raise InvalidParameterValueException(
×
2032
                        "Invalid filter pattern definition.", Type="User"
2033
                    )
2034

2035
                if not validate_event_pattern(pattern_str):
1✔
2036
                    raise InvalidParameterValueException(
1✔
2037
                        "Invalid filter pattern definition.", Type="User"
2038
                    )
2039

2040
        # Can either have a FunctionName (i.e CreateEventSourceMapping request) or
2041
        # an internal EventSourceMappingConfiguration representation
2042
        request_function_name = request.get("FunctionName") or request.get("FunctionArn")
1✔
2043
        # can be either a partial arn or a full arn for the version/alias
2044
        function_name, qualifier, account, region = function_locators_from_arn(
1✔
2045
            request_function_name
2046
        )
2047
        # TODO: validate `context.region` vs. `region(request["FunctionName"])` vs. `region(request["EventSourceArn"])`
2048
        account = account or context.account_id
1✔
2049
        region = region or context.region
1✔
2050
        state = lambda_stores[account][region]
1✔
2051
        fn = state.functions.get(function_name)
1✔
2052
        if not fn:
1✔
2053
            raise InvalidParameterValueException("Function does not exist", Type="User")
1✔
2054

2055
        if qualifier:
1✔
2056
            # make sure the function version/alias exists
2057
            if api_utils.qualifier_is_alias(qualifier):
1✔
2058
                fn_alias = fn.aliases.get(qualifier)
1✔
2059
                if not fn_alias:
1✔
UNCOV
2060
                    raise Exception("unknown alias")  # TODO: cover via test
×
2061
            elif api_utils.qualifier_is_version(qualifier):
1✔
2062
                fn_version = fn.versions.get(qualifier)
1✔
2063
                if not fn_version:
1✔
UNCOV
2064
                    raise Exception("unknown version")  # TODO: cover via test
×
2065
            elif qualifier == "$LATEST":
1✔
2066
                pass
1✔
2067
            else:
UNCOV
2068
                raise Exception("invalid functionname")  # TODO: cover via test
×
2069
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account, region)
1✔
2070

2071
        else:
2072
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account, region)
1✔
2073

2074
        function_version = get_function_version_from_arn(fn_arn)
1✔
2075
        function_role = function_version.config.role
1✔
2076

2077
        if source_arn := request.get("EventSourceArn"):
1✔
2078
            self.check_service_resource_exists(service, source_arn, fn_arn, function_role)
1✔
2079
        # Check we are validating a CreateEventSourceMapping request
2080
        if is_create_esm_request:
1✔
2081

2082
            def _get_mapping_sources(mapping: dict[str, Any]) -> list[str]:
1✔
2083
                if event_source_arn := mapping.get("EventSourceArn"):
1✔
2084
                    return [event_source_arn]
1✔
UNCOV
2085
                return (
×
2086
                    mapping.get("SelfManagedEventSource", {})
2087
                    .get("Endpoints", {})
2088
                    .get("KAFKA_BOOTSTRAP_SERVERS", [])
2089
                )
2090

2091
            # check for event source duplicates
2092
            # TODO: currently validated for sqs, kinesis, and dynamodb
2093
            service_id = load_service(service).service_id
1✔
2094
            for uuid, mapping in state.event_source_mappings.items():
1✔
2095
                mapping_sources = _get_mapping_sources(mapping)
1✔
2096
                request_sources = _get_mapping_sources(request)
1✔
2097
                if mapping["FunctionArn"] == fn_arn and (
1✔
2098
                    set(mapping_sources).intersection(request_sources)
2099
                ):
2100
                    if service == "sqs":
1✔
2101
                        # *shakes fist at SQS*
2102
                        raise ResourceConflictException(
1✔
2103
                            f'An event source mapping with {service_id} arn (" {mapping["EventSourceArn"]} ") '
2104
                            f'and function (" {function_name} ") already exists. Please update or delete the '
2105
                            f"existing mapping with UUID {uuid}",
2106
                            Type="User",
2107
                        )
2108
                    elif service == "kafka":
1✔
UNCOV
2109
                        if set(mapping["Topics"]).intersection(request["Topics"]):
×
UNCOV
2110
                            raise ResourceConflictException(
×
2111
                                f'An event source mapping with event source ("{",".join(request_sources)}"), '
2112
                                f'function ("{fn_arn}"), '
2113
                                f'topics ("{",".join(request["Topics"])}") already exists. Please update or delete the '
2114
                                f"existing mapping with UUID {uuid}",
2115
                                Type="User",
2116
                            )
2117
                    else:
2118
                        raise ResourceConflictException(
1✔
2119
                            f'The event source arn (" {mapping["EventSourceArn"]} ") and function '
2120
                            f'(" {function_name} ") provided mapping already exists. Please update or delete the '
2121
                            f"existing mapping with UUID {uuid}",
2122
                            Type="User",
2123
                        )
2124
        return fn_arn, function_name, state, function_version, function_role
1✔
2125

2126
    @handler("UpdateEventSourceMapping", expand=False)
1✔
2127
    def update_event_source_mapping(
1✔
2128
        self,
2129
        context: RequestContext,
2130
        request: UpdateEventSourceMappingRequest,
2131
    ) -> EventSourceMappingConfiguration:
2132
        return self.update_event_source_mapping_v2(context, request)
1✔
2133

2134
    def update_event_source_mapping_v2(
1✔
2135
        self,
2136
        context: RequestContext,
2137
        request: UpdateEventSourceMappingRequest,
2138
    ) -> EventSourceMappingConfiguration:
2139
        # TODO: test and implement this properly (quite complex with many validations and limitations!)
2140
        LOG.warning(
1✔
2141
            "Updating Lambda Event Source Mapping is in experimental state and not yet fully tested."
2142
        )
2143
        state = lambda_stores[context.account_id][context.region]
1✔
2144
        request_data = {**request}
1✔
2145
        uuid = request_data.pop("UUID", None)
1✔
2146
        if not uuid:
1✔
UNCOV
2147
            raise ResourceNotFoundException(
×
2148
                "The resource you requested does not exist.", Type="User"
2149
            )
2150
        old_event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2151
        esm_worker = self.esm_workers.get(uuid)
1✔
2152
        if old_event_source_mapping is None or esm_worker is None:
1✔
2153
            raise ResourceNotFoundException(
1✔
2154
                "The resource you requested does not exist.", Type="User"
2155
            )  # TODO: test?
2156

2157
        # normalize values to overwrite
2158
        event_source_mapping = old_event_source_mapping | request_data
1✔
2159

2160
        temp_params = {}  # values only set for the returned response, not saved internally (e.g. transient state)
1✔
2161

2162
        # Validate the newly updated ESM object. We ignore the output here since we only care whether an Exception is raised.
2163
        function_arn, _, _, function_version, function_role = self.validate_event_source_mapping(
1✔
2164
            context, event_source_mapping
2165
        )
2166

2167
        # remove the FunctionName field
2168
        event_source_mapping.pop("FunctionName", None)
1✔
2169

2170
        if function_arn:
1✔
2171
            event_source_mapping["FunctionArn"] = function_arn
1✔
2172

2173
        # Only apply update if the desired state differs
2174
        enabled = request.get("Enabled")
1✔
2175
        if enabled is not None:
1✔
2176
            if enabled and old_event_source_mapping["State"] != EsmState.ENABLED:
1✔
2177
                event_source_mapping["State"] = EsmState.ENABLING
1✔
2178
            # TODO: What happens when trying to update during an update or failed state?!
2179
            elif not enabled and old_event_source_mapping["State"] == EsmState.ENABLED:
1✔
2180
                event_source_mapping["State"] = EsmState.DISABLING
1✔
2181
        else:
2182
            event_source_mapping["State"] = EsmState.UPDATING
1✔
2183

2184
        # To ensure parity, certain responses need to be immediately returned
2185
        temp_params["State"] = event_source_mapping["State"]
1✔
2186

2187
        state.event_source_mappings[uuid] = event_source_mapping
1✔
2188

2189
        # TODO: Currently, we re-create the entire ESM worker. Look into approach with better performance.
2190
        worker_factory = EsmWorkerFactory(
1✔
2191
            event_source_mapping, function_role, request.get("Enabled", esm_worker.enabled)
2192
        )
2193

2194
        # Get a new ESM worker object but do not active it, since the factory holds all logic for creating new worker from configuration.
2195
        updated_esm_worker = worker_factory.get_esm_worker()
1✔
2196
        self.esm_workers[uuid] = updated_esm_worker
1✔
2197

2198
        # We should stop() the worker since the delete() will remove the ESM from the state mapping.
2199
        esm_worker.stop()
1✔
2200
        # This will either create an EsmWorker in the CREATING state if enabled. Otherwise, the DISABLING state is set.
2201
        updated_esm_worker.create()
1✔
2202

2203
        return {**event_source_mapping, **temp_params}
1✔
2204

2205
    def delete_event_source_mapping(
1✔
2206
        self, context: RequestContext, uuid: String, **kwargs
2207
    ) -> EventSourceMappingConfiguration:
2208
        state = lambda_stores[context.account_id][context.region]
1✔
2209
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2210
        if not event_source_mapping:
1✔
2211
            raise ResourceNotFoundException(
1✔
2212
                "The resource you requested does not exist.", Type="User"
2213
            )
2214
        esm = state.event_source_mappings[uuid]
1✔
2215
        # TODO: add proper locking
2216
        esm_worker = self.esm_workers.pop(uuid, None)
1✔
2217
        # Asynchronous delete in v2
2218
        if not esm_worker:
1✔
UNCOV
2219
            raise ResourceNotFoundException(
×
2220
                "The resource you requested does not exist.", Type="User"
2221
            )
2222
        esm_worker.delete()
1✔
2223
        return {**esm, "State": EsmState.DELETING}
1✔
2224

2225
    def get_event_source_mapping(
1✔
2226
        self, context: RequestContext, uuid: String, **kwargs
2227
    ) -> EventSourceMappingConfiguration:
2228
        state = lambda_stores[context.account_id][context.region]
1✔
2229
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2230
        if not event_source_mapping:
1✔
2231
            raise ResourceNotFoundException(
1✔
2232
                "The resource you requested does not exist.", Type="User"
2233
            )
2234
        esm_worker = self.esm_workers.get(uuid)
1✔
2235
        if not esm_worker:
1✔
UNCOV
2236
            raise ResourceNotFoundException(
×
2237
                "The resource you requested does not exist.", Type="User"
2238
            )
2239
        event_source_mapping["State"] = esm_worker.current_state
1✔
2240
        event_source_mapping["StateTransitionReason"] = esm_worker.state_transition_reason
1✔
2241
        return event_source_mapping
1✔
2242

2243
    def list_event_source_mappings(
1✔
2244
        self,
2245
        context: RequestContext,
2246
        event_source_arn: Arn = None,
2247
        function_name: FunctionName = None,
2248
        marker: String = None,
2249
        max_items: MaxListItems = None,
2250
        **kwargs,
2251
    ) -> ListEventSourceMappingsResponse:
2252
        state = lambda_stores[context.account_id][context.region]
1✔
2253

2254
        esms = state.event_source_mappings.values()
1✔
2255
        # TODO: update and test State and StateTransitionReason for ESM v2
2256

2257
        if event_source_arn:  # TODO: validate pattern
1✔
2258
            esms = [e for e in esms if e.get("EventSourceArn") == event_source_arn]
1✔
2259

2260
        if function_name:
1✔
2261
            esms = [e for e in esms if function_name in e["FunctionArn"]]
1✔
2262

2263
        esms = PaginatedList(esms)
1✔
2264
        page, token = esms.get_page(
1✔
2265
            lambda x: x["UUID"],
2266
            marker,
2267
            max_items,
2268
        )
2269
        return ListEventSourceMappingsResponse(EventSourceMappings=page, NextMarker=token)
1✔
2270

2271
    def get_source_type_from_request(self, request: dict[str, Any]) -> str:
1✔
UNCOV
2272
        if event_source_arn := request.get("EventSourceArn", ""):
×
UNCOV
2273
            service = extract_service_from_arn(event_source_arn)
×
UNCOV
2274
            if service == "sqs" and "fifo" in event_source_arn:
×
UNCOV
2275
                service = "sqs-fifo"
×
UNCOV
2276
            return service
×
UNCOV
2277
        elif request.get("SelfManagedEventSource"):
×
UNCOV
2278
            return "kafka"
×
2279

2280
    # =======================================
2281
    # ============ FUNCTION URLS ============
2282
    # =======================================
2283

2284
    @staticmethod
1✔
2285
    def _validate_qualifier(qualifier: str) -> None:
1✔
2286
        if qualifier == "$LATEST" or (qualifier and api_utils.qualifier_is_version(qualifier)):
1✔
2287
            raise ValidationException(
1✔
2288
                f"1 validation error detected: Value '{qualifier}' at 'qualifier' failed to satisfy constraint: Member must satisfy regular expression pattern: ((?!^\\d+$)^[0-9a-zA-Z-_]+$)"
2289
            )
2290

2291
    @staticmethod
1✔
2292
    def _validate_invoke_mode(invoke_mode: str) -> None:
1✔
2293
        if invoke_mode and invoke_mode not in [InvokeMode.BUFFERED, InvokeMode.RESPONSE_STREAM]:
1✔
2294
            raise ValidationException(
1✔
2295
                f"1 validation error detected: Value '{invoke_mode}' at 'invokeMode' failed to satisfy constraint: Member must satisfy enum value set: [RESPONSE_STREAM, BUFFERED]"
2296
            )
2297
        if invoke_mode == InvokeMode.RESPONSE_STREAM:
1✔
2298
            # TODO should we actually fail for setting RESPONSE_STREAM?
2299
            #  It should trigger InvokeWithResponseStream which is not implemented
2300
            LOG.warning(
1✔
2301
                "The invokeMode 'RESPONSE_STREAM' is not yet supported on LocalStack. The property is only mocked, the execution will still be 'BUFFERED'"
2302
            )
2303

2304
    # TODO: what happens if function state is not active?
2305
    def create_function_url_config(
1✔
2306
        self,
2307
        context: RequestContext,
2308
        function_name: FunctionName,
2309
        auth_type: FunctionUrlAuthType,
2310
        qualifier: FunctionUrlQualifier = None,
2311
        cors: Cors = None,
2312
        invoke_mode: InvokeMode = None,
2313
        **kwargs,
2314
    ) -> CreateFunctionUrlConfigResponse:
2315
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2316
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2317
            function_name, qualifier, context
2318
        )
2319
        state = lambda_stores[account_id][region]
1✔
2320
        self._validate_qualifier(qualifier)
1✔
2321
        self._validate_invoke_mode(invoke_mode)
1✔
2322

2323
        fn = state.functions.get(function_name)
1✔
2324
        if fn is None:
1✔
2325
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2326

2327
        url_config = fn.function_url_configs.get(qualifier or "$LATEST")
1✔
2328
        if url_config:
1✔
2329
            raise ResourceConflictException(
1✔
2330
                f"Failed to create function url config for [functionArn = {url_config.function_arn}]. Error message:  FunctionUrlConfig exists for this Lambda function",
2331
                Type="User",
2332
            )
2333

2334
        if qualifier and qualifier != "$LATEST" and qualifier not in fn.aliases:
1✔
2335
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2336

2337
        normalized_qualifier = qualifier or "$LATEST"
1✔
2338

2339
        function_arn = (
1✔
2340
            api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
2341
            if qualifier
2342
            else api_utils.unqualified_lambda_arn(function_name, account_id, region)
2343
        )
2344

2345
        custom_id: str | None = None
1✔
2346

2347
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
2348
        if TAG_KEY_CUSTOM_URL in tags:
1✔
2349
            # Note: I really wanted to add verification here that the
2350
            # url_id is unique, so we could surface that to the user ASAP.
2351
            # However, it seems like that information isn't available yet,
2352
            # since (as far as I can tell) we call
2353
            # self.router.register_routes() once, in a single shot, for all
2354
            # of the routes -- and we need to verify that it's unique not
2355
            # just for this particular lambda function, but for the entire
2356
            # lambda provider. Therefore... that idea proved non-trivial!
2357
            custom_id_tag_value = (
1✔
2358
                f"{tags[TAG_KEY_CUSTOM_URL]}-{qualifier}" if qualifier else tags[TAG_KEY_CUSTOM_URL]
2359
            )
2360
            if TAG_KEY_CUSTOM_URL_VALIDATOR.match(custom_id_tag_value):
1✔
2361
                custom_id = custom_id_tag_value
1✔
2362

2363
            else:
2364
                # Note: we're logging here instead of raising to prioritize
2365
                # strict parity with AWS over the localstack-only custom_id
2366
                LOG.warning(
1✔
2367
                    "Invalid custom ID tag value for lambda URL (%s=%s). "
2368
                    "Replaced with default (random id)",
2369
                    TAG_KEY_CUSTOM_URL,
2370
                    custom_id_tag_value,
2371
                )
2372

2373
        # The url_id is the subdomain used for the URL we're creating. This
2374
        # is either created randomly (as in AWS), or can be passed as a tag
2375
        # to the lambda itself (localstack-only).
2376
        url_id: str
2377
        if custom_id is None:
1✔
2378
            url_id = api_utils.generate_random_url_id()
1✔
2379
        else:
2380
            url_id = custom_id
1✔
2381

2382
        host_definition = localstack_host(custom_port=config.GATEWAY_LISTEN[0].port)
1✔
2383
        fn.function_url_configs[normalized_qualifier] = FunctionUrlConfig(
1✔
2384
            function_arn=function_arn,
2385
            function_name=function_name,
2386
            cors=cors,
2387
            url_id=url_id,
2388
            url=f"http://{url_id}.lambda-url.{context.region}.{host_definition.host_and_port()}/",  # TODO: https support
2389
            auth_type=auth_type,
2390
            creation_time=api_utils.generate_lambda_date(),
2391
            last_modified_time=api_utils.generate_lambda_date(),
2392
            invoke_mode=invoke_mode,
2393
        )
2394

2395
        # persist and start URL
2396
        # TODO: implement URL invoke
2397
        api_url_config = api_utils.map_function_url_config(
1✔
2398
            fn.function_url_configs[normalized_qualifier]
2399
        )
2400

2401
        return CreateFunctionUrlConfigResponse(
1✔
2402
            FunctionUrl=api_url_config["FunctionUrl"],
2403
            FunctionArn=api_url_config["FunctionArn"],
2404
            AuthType=api_url_config["AuthType"],
2405
            Cors=api_url_config["Cors"],
2406
            CreationTime=api_url_config["CreationTime"],
2407
            InvokeMode=api_url_config["InvokeMode"],
2408
        )
2409

2410
    def get_function_url_config(
1✔
2411
        self,
2412
        context: RequestContext,
2413
        function_name: FunctionName,
2414
        qualifier: FunctionUrlQualifier = None,
2415
        **kwargs,
2416
    ) -> GetFunctionUrlConfigResponse:
2417
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2418
        state = lambda_stores[account_id][region]
1✔
2419

2420
        fn_name, qualifier = api_utils.get_name_and_qualifier(function_name, qualifier, context)
1✔
2421

2422
        self._validate_qualifier(qualifier)
1✔
2423

2424
        resolved_fn = state.functions.get(fn_name)
1✔
2425
        if not resolved_fn:
1✔
2426
            raise ResourceNotFoundException(
1✔
2427
                "The resource you requested does not exist.", Type="User"
2428
            )
2429

2430
        qualifier = qualifier or "$LATEST"
1✔
2431
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2432
        if not url_config:
1✔
2433
            raise ResourceNotFoundException(
1✔
2434
                "The resource you requested does not exist.", Type="User"
2435
            )
2436

2437
        return api_utils.map_function_url_config(url_config)
1✔
2438

2439
    def update_function_url_config(
1✔
2440
        self,
2441
        context: RequestContext,
2442
        function_name: FunctionName,
2443
        qualifier: FunctionUrlQualifier = None,
2444
        auth_type: FunctionUrlAuthType = None,
2445
        cors: Cors = None,
2446
        invoke_mode: InvokeMode = None,
2447
        **kwargs,
2448
    ) -> UpdateFunctionUrlConfigResponse:
2449
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2450
        state = lambda_stores[account_id][region]
1✔
2451

2452
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2453
            function_name, qualifier, context
2454
        )
2455
        self._validate_qualifier(qualifier)
1✔
2456
        self._validate_invoke_mode(invoke_mode)
1✔
2457

2458
        fn = state.functions.get(function_name)
1✔
2459
        if not fn:
1✔
2460
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2461

2462
        normalized_qualifier = qualifier or "$LATEST"
1✔
2463

2464
        if (
1✔
2465
            api_utils.qualifier_is_alias(normalized_qualifier)
2466
            and normalized_qualifier not in fn.aliases
2467
        ):
2468
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2469

2470
        url_config = fn.function_url_configs.get(normalized_qualifier)
1✔
2471
        if not url_config:
1✔
2472
            raise ResourceNotFoundException(
1✔
2473
                "The resource you requested does not exist.", Type="User"
2474
            )
2475

2476
        changes = {
1✔
2477
            "last_modified_time": api_utils.generate_lambda_date(),
2478
            **({"cors": cors} if cors is not None else {}),
2479
            **({"auth_type": auth_type} if auth_type is not None else {}),
2480
        }
2481

2482
        if invoke_mode:
1✔
2483
            changes["invoke_mode"] = invoke_mode
1✔
2484

2485
        new_url_config = dataclasses.replace(url_config, **changes)
1✔
2486
        fn.function_url_configs[normalized_qualifier] = new_url_config
1✔
2487

2488
        return UpdateFunctionUrlConfigResponse(
1✔
2489
            FunctionUrl=new_url_config.url,
2490
            FunctionArn=new_url_config.function_arn,
2491
            AuthType=new_url_config.auth_type,
2492
            Cors=new_url_config.cors,
2493
            CreationTime=new_url_config.creation_time,
2494
            LastModifiedTime=new_url_config.last_modified_time,
2495
            InvokeMode=new_url_config.invoke_mode,
2496
        )
2497

2498
    def delete_function_url_config(
1✔
2499
        self,
2500
        context: RequestContext,
2501
        function_name: FunctionName,
2502
        qualifier: FunctionUrlQualifier = None,
2503
        **kwargs,
2504
    ) -> None:
2505
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2506
        state = lambda_stores[account_id][region]
1✔
2507

2508
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2509
            function_name, qualifier, context
2510
        )
2511
        self._validate_qualifier(qualifier)
1✔
2512

2513
        resolved_fn = state.functions.get(function_name)
1✔
2514
        if not resolved_fn:
1✔
2515
            raise ResourceNotFoundException(
1✔
2516
                "The resource you requested does not exist.", Type="User"
2517
            )
2518

2519
        qualifier = qualifier or "$LATEST"
1✔
2520
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2521
        if not url_config:
1✔
2522
            raise ResourceNotFoundException(
1✔
2523
                "The resource you requested does not exist.", Type="User"
2524
            )
2525

2526
        del resolved_fn.function_url_configs[qualifier]
1✔
2527

2528
    def list_function_url_configs(
1✔
2529
        self,
2530
        context: RequestContext,
2531
        function_name: FunctionName,
2532
        marker: String = None,
2533
        max_items: MaxItems = None,
2534
        **kwargs,
2535
    ) -> ListFunctionUrlConfigsResponse:
2536
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2537
        state = lambda_stores[account_id][region]
1✔
2538

2539
        fn_name = api_utils.get_function_name(function_name, context)
1✔
2540
        resolved_fn = state.functions.get(fn_name)
1✔
2541
        if not resolved_fn:
1✔
2542
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2543

2544
        url_configs = [
1✔
2545
            api_utils.map_function_url_config(fn_conf)
2546
            for fn_conf in resolved_fn.function_url_configs.values()
2547
        ]
2548
        url_configs = PaginatedList(url_configs)
1✔
2549
        page, token = url_configs.get_page(
1✔
2550
            lambda url_config: url_config["FunctionArn"],
2551
            marker,
2552
            max_items,
2553
        )
2554
        url_configs = page
1✔
2555
        return ListFunctionUrlConfigsResponse(FunctionUrlConfigs=url_configs, NextMarker=token)
1✔
2556

2557
    # =======================================
2558
    # ============  Permissions  ============
2559
    # =======================================
2560

2561
    @handler("AddPermission", expand=False)
1✔
2562
    def add_permission(
1✔
2563
        self,
2564
        context: RequestContext,
2565
        request: AddPermissionRequest,
2566
    ) -> AddPermissionResponse:
2567
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2568
            request.get("FunctionName"), request.get("Qualifier"), context
2569
        )
2570

2571
        # validate qualifier
2572
        if qualifier is not None:
1✔
2573
            self._validate_qualifier_expression(qualifier)
1✔
2574
            if qualifier == "$LATEST":
1✔
2575
                raise InvalidParameterValueException(
1✔
2576
                    "We currently do not support adding policies for $LATEST.", Type="User"
2577
                )
2578
        account_id, region = api_utils.get_account_and_region(request.get("FunctionName"), context)
1✔
2579

2580
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2581
        resolved_qualifier, fn_arn = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2582

2583
        revision_id = request.get("RevisionId")
1✔
2584
        if revision_id:
1✔
2585
            fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2586
            if revision_id != fn_revision_id:
1✔
2587
                raise PreconditionFailedException(
1✔
2588
                    "The Revision Id provided does not match the latest Revision Id. "
2589
                    "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2590
                    Type="User",
2591
                )
2592

2593
        request_sid = request["StatementId"]
1✔
2594
        if not bool(STATEMENT_ID_REGEX.match(request_sid)):
1✔
2595
            raise ValidationException(
1✔
2596
                f"1 validation error detected: Value '{request_sid}' at 'statementId' failed to satisfy constraint: Member must satisfy regular expression pattern: ([a-zA-Z0-9-_]+)"
2597
            )
2598
        # check for an already existing policy and any conflicts in existing statements
2599
        existing_policy = resolved_fn.permissions.get(resolved_qualifier)
1✔
2600
        if existing_policy:
1✔
2601
            if request_sid in [s["Sid"] for s in existing_policy.policy.Statement]:
1✔
2602
                # uniqueness scope: statement id needs to be unique per qualified function ($LATEST, version, or alias)
2603
                # Counterexample: the same sid can exist within $LATEST, version, and alias
2604
                raise ResourceConflictException(
1✔
2605
                    f"The statement id ({request_sid}) provided already exists. Please provide a new statement id, or remove the existing statement.",
2606
                    Type="User",
2607
                )
2608

2609
        permission_statement = api_utils.build_statement(
1✔
2610
            partition=context.partition,
2611
            resource_arn=fn_arn,
2612
            statement_id=request["StatementId"],
2613
            action=request["Action"],
2614
            principal=request["Principal"],
2615
            source_arn=request.get("SourceArn"),
2616
            source_account=request.get("SourceAccount"),
2617
            principal_org_id=request.get("PrincipalOrgID"),
2618
            event_source_token=request.get("EventSourceToken"),
2619
            auth_type=request.get("FunctionUrlAuthType"),
2620
        )
2621
        new_policy = existing_policy
1✔
2622
        if not existing_policy:
1✔
2623
            new_policy = FunctionResourcePolicy(
1✔
2624
                policy=ResourcePolicy(Version="2012-10-17", Id="default", Statement=[])
2625
            )
2626
        new_policy.policy.Statement.append(permission_statement)
1✔
2627
        if not existing_policy:
1✔
2628
            resolved_fn.permissions[resolved_qualifier] = new_policy
1✔
2629

2630
        # Update revision id of alias or version
2631
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2632
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2633
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2634
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2635
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
1✔
2636
        # Assumes that a non-alias is a version
2637
        else:
2638
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2639
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2640
                resolved_version, config=dataclasses.replace(resolved_version.config)
2641
            )
2642
        return AddPermissionResponse(Statement=json.dumps(permission_statement))
1✔
2643

2644
    def remove_permission(
1✔
2645
        self,
2646
        context: RequestContext,
2647
        function_name: FunctionName,
2648
        statement_id: NamespacedStatementId,
2649
        qualifier: Qualifier = None,
2650
        revision_id: String = None,
2651
        **kwargs,
2652
    ) -> None:
2653
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2654
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2655
            function_name, qualifier, context
2656
        )
2657
        if qualifier is not None:
1✔
2658
            self._validate_qualifier_expression(qualifier)
1✔
2659

2660
        state = lambda_stores[account_id][region]
1✔
2661
        resolved_fn = state.functions.get(function_name)
1✔
2662
        if resolved_fn is None:
1✔
2663
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2664
            raise ResourceNotFoundException(f"No policy found for: {fn_arn}", Type="User")
1✔
2665

2666
        resolved_qualifier, _ = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2667
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2668
        if not function_permission:
1✔
2669
            raise ResourceNotFoundException(
1✔
2670
                "No policy is associated with the given resource.", Type="User"
2671
            )
2672

2673
        # try to find statement in policy and delete it
2674
        statement = None
1✔
2675
        for s in function_permission.policy.Statement:
1✔
2676
            if s["Sid"] == statement_id:
1✔
2677
                statement = s
1✔
2678
                break
1✔
2679

2680
        if not statement:
1✔
2681
            raise ResourceNotFoundException(
1✔
2682
                f"Statement {statement_id} is not found in resource policy.", Type="User"
2683
            )
2684
        fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2685
        if revision_id and revision_id != fn_revision_id:
1✔
2686
            raise PreconditionFailedException(
1✔
2687
                "The Revision Id provided does not match the latest Revision Id. "
2688
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2689
                Type="User",
2690
            )
2691
        function_permission.policy.Statement.remove(statement)
1✔
2692

2693
        # Update revision id for alias or version
2694
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2695
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2696
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
UNCOV
2697
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
×
UNCOV
2698
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
×
2699
        # Assumes that a non-alias is a version
2700
        else:
2701
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2702
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2703
                resolved_version, config=dataclasses.replace(resolved_version.config)
2704
            )
2705

2706
        # remove the policy as a whole when there's no statement left in it
2707
        if len(function_permission.policy.Statement) == 0:
1✔
2708
            del resolved_fn.permissions[resolved_qualifier]
1✔
2709

2710
    def get_policy(
1✔
2711
        self,
2712
        context: RequestContext,
2713
        function_name: NamespacedFunctionName,
2714
        qualifier: Qualifier = None,
2715
        **kwargs,
2716
    ) -> GetPolicyResponse:
2717
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2718
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2719
            function_name, qualifier, context
2720
        )
2721

2722
        if qualifier is not None:
1✔
2723
            self._validate_qualifier_expression(qualifier)
1✔
2724

2725
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2726

2727
        resolved_qualifier = qualifier or "$LATEST"
1✔
2728
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2729
        if not function_permission:
1✔
2730
            raise ResourceNotFoundException(
1✔
2731
                "The resource you requested does not exist.", Type="User"
2732
            )
2733

2734
        fn_revision_id = None
1✔
2735
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2736
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2737
            fn_revision_id = resolved_alias.revision_id
1✔
2738
        # Assumes that a non-alias is a version
2739
        else:
2740
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2741
            fn_revision_id = resolved_version.config.revision_id
1✔
2742

2743
        return GetPolicyResponse(
1✔
2744
            Policy=json.dumps(dataclasses.asdict(function_permission.policy)),
2745
            RevisionId=fn_revision_id,
2746
        )
2747

2748
    # =======================================
2749
    # ========  Code signing config  ========
2750
    # =======================================
2751

2752
    def create_code_signing_config(
1✔
2753
        self,
2754
        context: RequestContext,
2755
        allowed_publishers: AllowedPublishers,
2756
        description: Description = None,
2757
        code_signing_policies: CodeSigningPolicies = None,
2758
        tags: Tags = None,
2759
        **kwargs,
2760
    ) -> CreateCodeSigningConfigResponse:
2761
        account = context.account_id
1✔
2762
        region = context.region
1✔
2763

2764
        state = lambda_stores[account][region]
1✔
2765
        # TODO: can there be duplicates?
2766
        csc_id = f"csc-{get_random_hex(17)}"  # e.g. 'csc-077c33b4c19e26036'
1✔
2767
        csc_arn = f"arn:{context.partition}:lambda:{region}:{account}:code-signing-config:{csc_id}"
1✔
2768
        csc = CodeSigningConfig(
1✔
2769
            csc_id=csc_id,
2770
            arn=csc_arn,
2771
            allowed_publishers=allowed_publishers,
2772
            policies=code_signing_policies,
2773
            last_modified=api_utils.generate_lambda_date(),
2774
            description=description,
2775
        )
2776
        state.code_signing_configs[csc_arn] = csc
1✔
2777
        return CreateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
2778

2779
    def put_function_code_signing_config(
1✔
2780
        self,
2781
        context: RequestContext,
2782
        code_signing_config_arn: CodeSigningConfigArn,
2783
        function_name: FunctionName,
2784
        **kwargs,
2785
    ) -> PutFunctionCodeSigningConfigResponse:
2786
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2787
        state = lambda_stores[account_id][region]
1✔
2788
        function_name = api_utils.get_function_name(function_name, context)
1✔
2789

2790
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2791
        if not csc:
1✔
2792
            raise CodeSigningConfigNotFoundException(
1✔
2793
                f"The code signing configuration cannot be found. Check that the provided configuration is not deleted: {code_signing_config_arn}.",
2794
                Type="User",
2795
            )
2796

2797
        fn = state.functions.get(function_name)
1✔
2798
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2799
        if not fn:
1✔
2800
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
2801

2802
        fn.code_signing_config_arn = code_signing_config_arn
1✔
2803
        return PutFunctionCodeSigningConfigResponse(
1✔
2804
            CodeSigningConfigArn=code_signing_config_arn, FunctionName=function_name
2805
        )
2806

2807
    def update_code_signing_config(
1✔
2808
        self,
2809
        context: RequestContext,
2810
        code_signing_config_arn: CodeSigningConfigArn,
2811
        description: Description = None,
2812
        allowed_publishers: AllowedPublishers = None,
2813
        code_signing_policies: CodeSigningPolicies = None,
2814
        **kwargs,
2815
    ) -> UpdateCodeSigningConfigResponse:
2816
        state = lambda_stores[context.account_id][context.region]
1✔
2817
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2818
        if not csc:
1✔
2819
            raise ResourceNotFoundException(
1✔
2820
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2821
            )
2822

2823
        changes = {
1✔
2824
            **(
2825
                {"allowed_publishers": allowed_publishers} if allowed_publishers is not None else {}
2826
            ),
2827
            **({"policies": code_signing_policies} if code_signing_policies is not None else {}),
2828
            **({"description": description} if description is not None else {}),
2829
        }
2830
        new_csc = dataclasses.replace(
1✔
2831
            csc, last_modified=api_utils.generate_lambda_date(), **changes
2832
        )
2833
        state.code_signing_configs[code_signing_config_arn] = new_csc
1✔
2834

2835
        return UpdateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(new_csc))
1✔
2836

2837
    def get_code_signing_config(
1✔
2838
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
2839
    ) -> GetCodeSigningConfigResponse:
2840
        state = lambda_stores[context.account_id][context.region]
1✔
2841
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2842
        if not csc:
1✔
2843
            raise ResourceNotFoundException(
1✔
2844
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2845
            )
2846

2847
        return GetCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
2848

2849
    def get_function_code_signing_config(
1✔
2850
        self, context: RequestContext, function_name: FunctionName, **kwargs
2851
    ) -> GetFunctionCodeSigningConfigResponse:
2852
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2853
        state = lambda_stores[account_id][region]
1✔
2854
        function_name = api_utils.get_function_name(function_name, context)
1✔
2855
        fn = state.functions.get(function_name)
1✔
2856
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2857
        if not fn:
1✔
2858
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
2859

2860
        if fn.code_signing_config_arn:
1✔
2861
            return GetFunctionCodeSigningConfigResponse(
1✔
2862
                CodeSigningConfigArn=fn.code_signing_config_arn, FunctionName=function_name
2863
            )
2864

2865
        return GetFunctionCodeSigningConfigResponse()
1✔
2866

2867
    def delete_function_code_signing_config(
1✔
2868
        self, context: RequestContext, function_name: FunctionName, **kwargs
2869
    ) -> None:
2870
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2871
        state = lambda_stores[account_id][region]
1✔
2872
        function_name = api_utils.get_function_name(function_name, context)
1✔
2873
        fn = state.functions.get(function_name)
1✔
2874
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2875
        if not fn:
1✔
2876
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
2877

2878
        fn.code_signing_config_arn = None
1✔
2879

2880
    def delete_code_signing_config(
1✔
2881
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
2882
    ) -> DeleteCodeSigningConfigResponse:
2883
        state = lambda_stores[context.account_id][context.region]
1✔
2884

2885
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2886
        if not csc:
1✔
2887
            raise ResourceNotFoundException(
1✔
2888
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2889
            )
2890

2891
        del state.code_signing_configs[code_signing_config_arn]
1✔
2892

2893
        return DeleteCodeSigningConfigResponse()
1✔
2894

2895
    def list_code_signing_configs(
1✔
2896
        self,
2897
        context: RequestContext,
2898
        marker: String = None,
2899
        max_items: MaxListItems = None,
2900
        **kwargs,
2901
    ) -> ListCodeSigningConfigsResponse:
2902
        state = lambda_stores[context.account_id][context.region]
1✔
2903

2904
        cscs = [api_utils.map_csc(csc) for csc in state.code_signing_configs.values()]
1✔
2905
        cscs = PaginatedList(cscs)
1✔
2906
        page, token = cscs.get_page(
1✔
2907
            lambda csc: csc["CodeSigningConfigId"],
2908
            marker,
2909
            max_items,
2910
        )
2911
        return ListCodeSigningConfigsResponse(CodeSigningConfigs=page, NextMarker=token)
1✔
2912

2913
    def list_functions_by_code_signing_config(
1✔
2914
        self,
2915
        context: RequestContext,
2916
        code_signing_config_arn: CodeSigningConfigArn,
2917
        marker: String = None,
2918
        max_items: MaxListItems = None,
2919
        **kwargs,
2920
    ) -> ListFunctionsByCodeSigningConfigResponse:
2921
        account = context.account_id
1✔
2922
        region = context.region
1✔
2923

2924
        state = lambda_stores[account][region]
1✔
2925

2926
        if code_signing_config_arn not in state.code_signing_configs:
1✔
2927
            raise ResourceNotFoundException(
1✔
2928
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2929
            )
2930

2931
        fn_arns = [
1✔
2932
            api_utils.unqualified_lambda_arn(fn.function_name, account, region)
2933
            for fn in state.functions.values()
2934
            if fn.code_signing_config_arn == code_signing_config_arn
2935
        ]
2936

2937
        cscs = PaginatedList(fn_arns)
1✔
2938
        page, token = cscs.get_page(
1✔
2939
            lambda x: x,
2940
            marker,
2941
            max_items,
2942
        )
2943
        return ListFunctionsByCodeSigningConfigResponse(FunctionArns=page, NextMarker=token)
1✔
2944

2945
    # =======================================
2946
    # =========  Account Settings   =========
2947
    # =======================================
2948

2949
    # CAVE: these settings & usages are *per* region!
2950
    # Lambda quotas: https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
2951
    def get_account_settings(self, context: RequestContext, **kwargs) -> GetAccountSettingsResponse:
1✔
2952
        state = lambda_stores[context.account_id][context.region]
1✔
2953

2954
        fn_count = 0
1✔
2955
        code_size_sum = 0
1✔
2956
        reserved_concurrency_sum = 0
1✔
2957
        for fn in state.functions.values():
1✔
2958
            fn_count += 1
1✔
2959
            for fn_version in fn.versions.values():
1✔
2960
                # Image-based Lambdas do not have a code attribute and count against the ECR quotas instead
2961
                if fn_version.config.package_type == PackageType.Zip:
1✔
2962
                    code_size_sum += fn_version.config.code.code_size
1✔
2963
            if fn.reserved_concurrent_executions is not None:
1✔
2964
                reserved_concurrency_sum += fn.reserved_concurrent_executions
1✔
2965
            for c in fn.provisioned_concurrency_configs.values():
1✔
2966
                reserved_concurrency_sum += c.provisioned_concurrent_executions
1✔
2967
        for layer in state.layers.values():
1✔
2968
            for layer_version in layer.layer_versions.values():
1✔
2969
                code_size_sum += layer_version.code.code_size
1✔
2970
        return GetAccountSettingsResponse(
1✔
2971
            AccountLimit=AccountLimit(
2972
                TotalCodeSize=config.LAMBDA_LIMITS_TOTAL_CODE_SIZE,
2973
                CodeSizeZipped=config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED,
2974
                CodeSizeUnzipped=config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED,
2975
                ConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS,
2976
                UnreservedConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS
2977
                - reserved_concurrency_sum,
2978
            ),
2979
            AccountUsage=AccountUsage(
2980
                TotalCodeSize=code_size_sum,
2981
                FunctionCount=fn_count,
2982
            ),
2983
        )
2984

2985
    # =======================================
2986
    # ==  Provisioned Concurrency Config   ==
2987
    # =======================================
2988

2989
    def _get_provisioned_config(
1✔
2990
        self, context: RequestContext, function_name: str, qualifier: str
2991
    ) -> ProvisionedConcurrencyConfiguration | None:
2992
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2993
        state = lambda_stores[account_id][region]
1✔
2994
        function_name = api_utils.get_function_name(function_name, context)
1✔
2995
        fn = state.functions.get(function_name)
1✔
2996
        if api_utils.qualifier_is_alias(qualifier):
1✔
2997
            fn_alias = None
1✔
2998
            if fn:
1✔
2999
                fn_alias = fn.aliases.get(qualifier)
1✔
3000
            if fn_alias is None:
1✔
3001
                raise ResourceNotFoundException(
1✔
3002
                    f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3003
                    Type="User",
3004
                )
3005
        elif api_utils.qualifier_is_version(qualifier):
1✔
3006
            fn_version = None
1✔
3007
            if fn:
1✔
3008
                fn_version = fn.versions.get(qualifier)
1✔
3009
            if fn_version is None:
1✔
3010
                raise ResourceNotFoundException(
1✔
3011
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3012
                    Type="User",
3013
                )
3014

3015
        return fn.provisioned_concurrency_configs.get(qualifier)
1✔
3016

3017
    def put_provisioned_concurrency_config(
1✔
3018
        self,
3019
        context: RequestContext,
3020
        function_name: FunctionName,
3021
        qualifier: Qualifier,
3022
        provisioned_concurrent_executions: PositiveInteger,
3023
        **kwargs,
3024
    ) -> PutProvisionedConcurrencyConfigResponse:
3025
        if provisioned_concurrent_executions <= 0:
1✔
3026
            raise ValidationException(
1✔
3027
                f"1 validation error detected: Value '{provisioned_concurrent_executions}' at 'provisionedConcurrentExecutions' failed to satisfy constraint: Member must have value greater than or equal to 1"
3028
            )
3029

3030
        if qualifier == "$LATEST":
1✔
3031
            raise InvalidParameterValueException(
1✔
3032
                "Provisioned Concurrency Configs cannot be applied to unpublished function versions.",
3033
                Type="User",
3034
            )
3035
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3036
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3037
            function_name, qualifier, context
3038
        )
3039
        state = lambda_stores[account_id][region]
1✔
3040
        fn = state.functions.get(function_name)
1✔
3041

3042
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3043

3044
        if provisioned_config:  # TODO: merge?
1✔
3045
            # TODO: add a test for partial updates (if possible)
3046
            LOG.warning(
1✔
3047
                "Partial update of provisioned concurrency config is currently not supported."
3048
            )
3049

3050
        other_provisioned_sum = sum(
1✔
3051
            [
3052
                provisioned_configs.provisioned_concurrent_executions
3053
                for provisioned_qualifier, provisioned_configs in fn.provisioned_concurrency_configs.items()
3054
                if provisioned_qualifier != qualifier
3055
            ]
3056
        )
3057

3058
        if (
1✔
3059
            fn.reserved_concurrent_executions is not None
3060
            and fn.reserved_concurrent_executions
3061
            < other_provisioned_sum + provisioned_concurrent_executions
3062
        ):
3063
            raise InvalidParameterValueException(
1✔
3064
                "Requested Provisioned Concurrency should not be greater than the reservedConcurrentExecution for function",
3065
                Type="User",
3066
            )
3067

3068
        if provisioned_concurrent_executions > config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS:
1✔
3069
            raise InvalidParameterValueException(
1✔
3070
                f"Specified ConcurrentExecutions for function is greater than account's unreserved concurrency"
3071
                f" [{config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS}]."
3072
            )
3073

3074
        settings = self.get_account_settings(context)
1✔
3075
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
3076
            "UnreservedConcurrentExecutions"
3077
        ]
3078
        if (
1✔
3079
            unreserved_concurrent_executions - provisioned_concurrent_executions
3080
            < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY
3081
        ):
3082
            raise InvalidParameterValueException(
1✔
3083
                f"Specified ConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below"
3084
                f" its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
3085
            )
3086

3087
        provisioned_config = ProvisionedConcurrencyConfiguration(
1✔
3088
            provisioned_concurrent_executions, api_utils.generate_lambda_date()
3089
        )
3090
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3091

3092
        if api_utils.qualifier_is_alias(qualifier):
1✔
3093
            alias = fn.aliases.get(qualifier)
1✔
3094
            resolved_version = fn.versions.get(alias.function_version)
1✔
3095

3096
            if (
1✔
3097
                resolved_version
3098
                and fn.provisioned_concurrency_configs.get(alias.function_version) is not None
3099
            ):
3100
                raise ResourceConflictException(
1✔
3101
                    "Alias can't be used for Provisioned Concurrency configuration on an already Provisioned version",
3102
                    Type="User",
3103
                )
3104
            fn_arn = resolved_version.id.qualified_arn()
1✔
3105
        elif api_utils.qualifier_is_version(qualifier):
1✔
3106
            fn_version = fn.versions.get(qualifier)
1✔
3107

3108
            # TODO: might be useful other places, utilize
3109
            pointing_aliases = []
1✔
3110
            for alias in fn.aliases.values():
1✔
3111
                if (
1✔
3112
                    alias.function_version == qualifier
3113
                    and fn.provisioned_concurrency_configs.get(alias.name) is not None
3114
                ):
3115
                    pointing_aliases.append(alias.name)
1✔
3116
            if pointing_aliases:
1✔
3117
                raise ResourceConflictException(
1✔
3118
                    "Version is pointed by a Provisioned Concurrency alias", Type="User"
3119
                )
3120

3121
            fn_arn = fn_version.id.qualified_arn()
1✔
3122

3123
        manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3124

3125
        fn.provisioned_concurrency_configs[qualifier] = provisioned_config
1✔
3126

3127
        manager.update_provisioned_concurrency_config(
1✔
3128
            provisioned_config.provisioned_concurrent_executions
3129
        )
3130

3131
        return PutProvisionedConcurrencyConfigResponse(
1✔
3132
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3133
            AvailableProvisionedConcurrentExecutions=0,
3134
            AllocatedProvisionedConcurrentExecutions=0,
3135
            Status=ProvisionedConcurrencyStatusEnum.IN_PROGRESS,
3136
            # StatusReason=manager.provisioned_state.status_reason,
3137
            LastModified=provisioned_config.last_modified,  # TODO: does change with configuration or also with state changes?
3138
        )
3139

3140
    def get_provisioned_concurrency_config(
1✔
3141
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3142
    ) -> GetProvisionedConcurrencyConfigResponse:
3143
        if qualifier == "$LATEST":
1✔
3144
            raise InvalidParameterValueException(
1✔
3145
                "The function resource provided must be an alias or a published version.",
3146
                Type="User",
3147
            )
3148
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3149
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3150
            function_name, qualifier, context
3151
        )
3152

3153
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3154
        if not provisioned_config:
1✔
3155
            raise ProvisionedConcurrencyConfigNotFoundException(
1✔
3156
                "No Provisioned Concurrency Config found for this function", Type="User"
3157
            )
3158

3159
        # TODO: make this compatible with alias pointer migration on update
3160
        if api_utils.qualifier_is_alias(qualifier):
1✔
3161
            state = lambda_stores[account_id][region]
1✔
3162
            fn = state.functions.get(function_name)
1✔
3163
            alias = fn.aliases.get(qualifier)
1✔
3164
            fn_arn = api_utils.qualified_lambda_arn(
1✔
3165
                function_name, alias.function_version, account_id, region
3166
            )
3167
        else:
3168
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3169

3170
        ver_manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3171

3172
        return GetProvisionedConcurrencyConfigResponse(
1✔
3173
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3174
            LastModified=provisioned_config.last_modified,
3175
            AvailableProvisionedConcurrentExecutions=ver_manager.provisioned_state.available,
3176
            AllocatedProvisionedConcurrentExecutions=ver_manager.provisioned_state.allocated,
3177
            Status=ver_manager.provisioned_state.status,
3178
            StatusReason=ver_manager.provisioned_state.status_reason,
3179
        )
3180

3181
    def list_provisioned_concurrency_configs(
1✔
3182
        self,
3183
        context: RequestContext,
3184
        function_name: FunctionName,
3185
        marker: String = None,
3186
        max_items: MaxProvisionedConcurrencyConfigListItems = None,
3187
        **kwargs,
3188
    ) -> ListProvisionedConcurrencyConfigsResponse:
3189
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3190
        state = lambda_stores[account_id][region]
1✔
3191

3192
        function_name = api_utils.get_function_name(function_name, context)
1✔
3193
        fn = state.functions.get(function_name)
1✔
3194
        if fn is None:
1✔
3195
            raise ResourceNotFoundException(
1✔
3196
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
3197
                Type="User",
3198
            )
3199

3200
        configs = []
1✔
3201
        for qualifier, pc_config in fn.provisioned_concurrency_configs.items():
1✔
3202
            if api_utils.qualifier_is_alias(qualifier):
×
UNCOV
3203
                alias = fn.aliases.get(qualifier)
×
UNCOV
3204
                fn_arn = api_utils.qualified_lambda_arn(
×
3205
                    function_name, alias.function_version, account_id, region
3206
                )
3207
            else:
UNCOV
3208
                fn_arn = api_utils.qualified_lambda_arn(
×
3209
                    function_name, qualifier, account_id, region
3210
                )
3211

UNCOV
3212
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
3213

UNCOV
3214
            configs.append(
×
3215
                ProvisionedConcurrencyConfigListItem(
3216
                    FunctionArn=api_utils.qualified_lambda_arn(
3217
                        function_name, qualifier, account_id, region
3218
                    ),
3219
                    RequestedProvisionedConcurrentExecutions=pc_config.provisioned_concurrent_executions,
3220
                    AvailableProvisionedConcurrentExecutions=manager.provisioned_state.available,
3221
                    AllocatedProvisionedConcurrentExecutions=manager.provisioned_state.allocated,
3222
                    Status=manager.provisioned_state.status,
3223
                    StatusReason=manager.provisioned_state.status_reason,
3224
                    LastModified=pc_config.last_modified,
3225
                )
3226
            )
3227

3228
        provisioned_concurrency_configs = configs
1✔
3229
        provisioned_concurrency_configs = PaginatedList(provisioned_concurrency_configs)
1✔
3230
        page, token = provisioned_concurrency_configs.get_page(
1✔
3231
            lambda x: x,
3232
            marker,
3233
            max_items,
3234
        )
3235
        return ListProvisionedConcurrencyConfigsResponse(
1✔
3236
            ProvisionedConcurrencyConfigs=page, NextMarker=token
3237
        )
3238

3239
    def delete_provisioned_concurrency_config(
1✔
3240
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3241
    ) -> None:
3242
        if qualifier == "$LATEST":
1✔
3243
            raise InvalidParameterValueException(
1✔
3244
                "The function resource provided must be an alias or a published version.",
3245
                Type="User",
3246
            )
3247
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3248
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3249
            function_name, qualifier, context
3250
        )
3251
        state = lambda_stores[account_id][region]
1✔
3252
        fn = state.functions.get(function_name)
1✔
3253

3254
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3255
        # delete is idempotent and doesn't actually care about the provisioned concurrency config not existing
3256
        if provisioned_config:
1✔
3257
            fn.provisioned_concurrency_configs.pop(qualifier)
1✔
3258
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3259
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3260
            manager.update_provisioned_concurrency_config(0)
1✔
3261

3262
    # =======================================
3263
    # =======  Event Invoke Config   ========
3264
    # =======================================
3265

3266
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)"
3267
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]2((-gov)|(-iso(b?)))?-[a-z]+-\\d1)?:(\\d12)?:(.*)" ... (expected → actual)
3268

3269
    def _validate_destination_config(
1✔
3270
        self, store: LambdaStore, function_name: str, destination_config: DestinationConfig
3271
    ):
3272
        def _validate_destination_arn(destination_arn) -> bool:
1✔
3273
            if not api_utils.DESTINATION_ARN_PATTERN.match(destination_arn):
1✔
3274
                # technically we shouldn't handle this in the provider
3275
                raise ValidationException(
1✔
3276
                    "1 validation error detected: Value '"
3277
                    + destination_arn
3278
                    + r"' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\-])+:([a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\d{1})?:(\d{12})?:(.*)"
3279
                )
3280

3281
            match destination_arn.split(":")[2]:
1✔
3282
                case "lambda":
1✔
3283
                    fn_parts = api_utils.FULL_FN_ARN_PATTERN.search(destination_arn).groupdict()
1✔
3284
                    if fn_parts:
1✔
3285
                        # check if it exists
3286
                        fn = store.functions.get(fn_parts["function_name"])
1✔
3287
                        if not fn:
1✔
3288
                            raise InvalidParameterValueException(
1✔
3289
                                f"The destination ARN {destination_arn} is invalid.", Type="User"
3290
                            )
3291
                        if fn_parts["function_name"] == function_name:
1✔
3292
                            raise InvalidParameterValueException(
1✔
3293
                                "You can't specify the function as a destination for itself.",
3294
                                Type="User",
3295
                            )
3296
                case "sns" | "sqs" | "events":
1✔
3297
                    pass
1✔
3298
                case _:
1✔
3299
                    return False
1✔
3300
            return True
1✔
3301

3302
        validation_err = False
1✔
3303

3304
        failure_destination = destination_config.get("OnFailure", {}).get("Destination")
1✔
3305
        if failure_destination:
1✔
3306
            validation_err = validation_err or not _validate_destination_arn(failure_destination)
1✔
3307

3308
        success_destination = destination_config.get("OnSuccess", {}).get("Destination")
1✔
3309
        if success_destination:
1✔
3310
            validation_err = validation_err or not _validate_destination_arn(success_destination)
1✔
3311

3312
        if validation_err:
1✔
3313
            on_success_part = (
1✔
3314
                f"OnSuccess(destination={success_destination})" if success_destination else "null"
3315
            )
3316
            on_failure_part = (
1✔
3317
                f"OnFailure(destination={failure_destination})" if failure_destination else "null"
3318
            )
3319
            raise InvalidParameterValueException(
1✔
3320
                f"The provided destination config DestinationConfig(onSuccess={on_success_part}, onFailure={on_failure_part}) is invalid.",
3321
                Type="User",
3322
            )
3323

3324
    def put_function_event_invoke_config(
1✔
3325
        self,
3326
        context: RequestContext,
3327
        function_name: FunctionName,
3328
        qualifier: Qualifier = None,
3329
        maximum_retry_attempts: MaximumRetryAttempts = None,
3330
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3331
        destination_config: DestinationConfig = None,
3332
        **kwargs,
3333
    ) -> FunctionEventInvokeConfig:
3334
        """
3335
        Destination ARNs can be:
3336
        * SQS arn
3337
        * SNS arn
3338
        * Lambda arn
3339
        * EventBridge arn
3340

3341
        Differences between put_ and update_:
3342
            * put overwrites any existing config
3343
            * update allows changes only single values while keeping the rest of existing ones
3344
            * update fails on non-existing configs
3345

3346
        Differences between destination and DLQ
3347
            * "However, a dead-letter queue is part of a function's version-specific configuration, so it is locked in when you publish a version."
3348
            *  "On-failure destinations also support additional targets and include details about the function's response in the invocation record."
3349

3350
        """
3351
        if (
1✔
3352
            maximum_event_age_in_seconds is None
3353
            and maximum_retry_attempts is None
3354
            and destination_config is None
3355
        ):
3356
            raise InvalidParameterValueException(
1✔
3357
                "You must specify at least one of error handling or destination setting.",
3358
                Type="User",
3359
            )
3360
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3361
        state = lambda_stores[account_id][region]
1✔
3362
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3363
            function_name, qualifier, context
3364
        )
3365
        fn = state.functions.get(function_name)
1✔
3366
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3367
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3368

3369
        qualifier = qualifier or "$LATEST"
1✔
3370

3371
        # validate and normalize destination config
3372
        if destination_config:
1✔
3373
            self._validate_destination_config(state, function_name, destination_config)
1✔
3374

3375
        destination_config = DestinationConfig(
1✔
3376
            OnSuccess=OnSuccess(
3377
                Destination=(destination_config or {}).get("OnSuccess", {}).get("Destination")
3378
            ),
3379
            OnFailure=OnFailure(
3380
                Destination=(destination_config or {}).get("OnFailure", {}).get("Destination")
3381
            ),
3382
        )
3383

3384
        config = EventInvokeConfig(
1✔
3385
            function_name=function_name,
3386
            qualifier=qualifier,
3387
            maximum_event_age_in_seconds=maximum_event_age_in_seconds,
3388
            maximum_retry_attempts=maximum_retry_attempts,
3389
            last_modified=api_utils.generate_lambda_date(),
3390
            destination_config=destination_config,
3391
        )
3392
        fn.event_invoke_configs[qualifier] = config
1✔
3393

3394
        return FunctionEventInvokeConfig(
1✔
3395
            LastModified=datetime.datetime.strptime(
3396
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3397
            ),
3398
            FunctionArn=api_utils.qualified_lambda_arn(
3399
                function_name, qualifier or "$LATEST", account_id, region
3400
            ),
3401
            DestinationConfig=destination_config,
3402
            MaximumEventAgeInSeconds=maximum_event_age_in_seconds,
3403
            MaximumRetryAttempts=maximum_retry_attempts,
3404
        )
3405

3406
    def get_function_event_invoke_config(
1✔
3407
        self,
3408
        context: RequestContext,
3409
        function_name: FunctionName,
3410
        qualifier: Qualifier = None,
3411
        **kwargs,
3412
    ) -> FunctionEventInvokeConfig:
3413
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3414
        state = lambda_stores[account_id][region]
1✔
3415
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3416
            function_name, qualifier, context
3417
        )
3418

3419
        qualifier = qualifier or "$LATEST"
1✔
3420
        fn = state.functions.get(function_name)
1✔
3421
        if not fn:
1✔
3422
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3423
            raise ResourceNotFoundException(
1✔
3424
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3425
            )
3426

3427
        config = fn.event_invoke_configs.get(qualifier)
1✔
3428
        if not config:
1✔
3429
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3430
            raise ResourceNotFoundException(
1✔
3431
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3432
            )
3433

3434
        return FunctionEventInvokeConfig(
1✔
3435
            LastModified=datetime.datetime.strptime(
3436
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3437
            ),
3438
            FunctionArn=api_utils.qualified_lambda_arn(
3439
                function_name, qualifier, account_id, region
3440
            ),
3441
            DestinationConfig=config.destination_config,
3442
            MaximumEventAgeInSeconds=config.maximum_event_age_in_seconds,
3443
            MaximumRetryAttempts=config.maximum_retry_attempts,
3444
        )
3445

3446
    def list_function_event_invoke_configs(
1✔
3447
        self,
3448
        context: RequestContext,
3449
        function_name: FunctionName,
3450
        marker: String = None,
3451
        max_items: MaxFunctionEventInvokeConfigListItems = None,
3452
        **kwargs,
3453
    ) -> ListFunctionEventInvokeConfigsResponse:
3454
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3455
        state = lambda_stores[account_id][region]
1✔
3456
        fn = state.functions.get(function_name)
1✔
3457
        if not fn:
1✔
3458
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3459

3460
        event_invoke_configs = [
1✔
3461
            FunctionEventInvokeConfig(
3462
                LastModified=c.last_modified,
3463
                FunctionArn=api_utils.qualified_lambda_arn(
3464
                    function_name, c.qualifier, account_id, region
3465
                ),
3466
                MaximumEventAgeInSeconds=c.maximum_event_age_in_seconds,
3467
                MaximumRetryAttempts=c.maximum_retry_attempts,
3468
                DestinationConfig=c.destination_config,
3469
            )
3470
            for c in fn.event_invoke_configs.values()
3471
        ]
3472

3473
        event_invoke_configs = PaginatedList(event_invoke_configs)
1✔
3474
        page, token = event_invoke_configs.get_page(
1✔
3475
            lambda x: x["FunctionArn"],
3476
            marker,
3477
            max_items,
3478
        )
3479
        return ListFunctionEventInvokeConfigsResponse(
1✔
3480
            FunctionEventInvokeConfigs=page, NextMarker=token
3481
        )
3482

3483
    def delete_function_event_invoke_config(
1✔
3484
        self,
3485
        context: RequestContext,
3486
        function_name: FunctionName,
3487
        qualifier: Qualifier = None,
3488
        **kwargs,
3489
    ) -> None:
3490
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3491
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3492
            function_name, qualifier, context
3493
        )
3494
        state = lambda_stores[account_id][region]
1✔
3495
        fn = state.functions.get(function_name)
1✔
3496
        resolved_qualifier = qualifier or "$LATEST"
1✔
3497
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3498
        if not fn:
1✔
3499
            raise ResourceNotFoundException(
1✔
3500
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3501
            )
3502

3503
        config = fn.event_invoke_configs.get(resolved_qualifier)
1✔
3504
        if not config:
1✔
3505
            raise ResourceNotFoundException(
1✔
3506
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3507
            )
3508

3509
        del fn.event_invoke_configs[resolved_qualifier]
1✔
3510

3511
    def update_function_event_invoke_config(
1✔
3512
        self,
3513
        context: RequestContext,
3514
        function_name: FunctionName,
3515
        qualifier: Qualifier = None,
3516
        maximum_retry_attempts: MaximumRetryAttempts = None,
3517
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3518
        destination_config: DestinationConfig = None,
3519
        **kwargs,
3520
    ) -> FunctionEventInvokeConfig:
3521
        # like put but only update single fields via replace
3522
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3523
        state = lambda_stores[account_id][region]
1✔
3524
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3525
            function_name, qualifier, context
3526
        )
3527

3528
        if (
1✔
3529
            maximum_event_age_in_seconds is None
3530
            and maximum_retry_attempts is None
3531
            and destination_config is None
3532
        ):
UNCOV
3533
            raise InvalidParameterValueException(
×
3534
                "You must specify at least one of error handling or destination setting.",
3535
                Type="User",
3536
            )
3537

3538
        fn = state.functions.get(function_name)
1✔
3539
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3540
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3541

3542
        qualifier = qualifier or "$LATEST"
1✔
3543

3544
        config = fn.event_invoke_configs.get(qualifier)
1✔
3545
        if not config:
1✔
3546
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3547
            raise ResourceNotFoundException(
1✔
3548
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3549
            )
3550

3551
        if destination_config:
1✔
UNCOV
3552
            self._validate_destination_config(state, function_name, destination_config)
×
3553

3554
        optional_kwargs = {
1✔
3555
            k: v
3556
            for k, v in {
3557
                "destination_config": destination_config,
3558
                "maximum_retry_attempts": maximum_retry_attempts,
3559
                "maximum_event_age_in_seconds": maximum_event_age_in_seconds,
3560
            }.items()
3561
            if v is not None
3562
        }
3563

3564
        new_config = dataclasses.replace(
1✔
3565
            config, last_modified=api_utils.generate_lambda_date(), **optional_kwargs
3566
        )
3567
        fn.event_invoke_configs[qualifier] = new_config
1✔
3568

3569
        return FunctionEventInvokeConfig(
1✔
3570
            LastModified=datetime.datetime.strptime(
3571
                new_config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3572
            ),
3573
            FunctionArn=api_utils.qualified_lambda_arn(
3574
                function_name, qualifier or "$LATEST", account_id, region
3575
            ),
3576
            DestinationConfig=new_config.destination_config,
3577
            MaximumEventAgeInSeconds=new_config.maximum_event_age_in_seconds,
3578
            MaximumRetryAttempts=new_config.maximum_retry_attempts,
3579
        )
3580

3581
    # =======================================
3582
    # ======  Layer & Layer Versions  =======
3583
    # =======================================
3584

3585
    @staticmethod
1✔
3586
    def _resolve_layer(
1✔
3587
        layer_name_or_arn: str, context: RequestContext
3588
    ) -> Tuple[str, str, str, Optional[str]]:
3589
        """
3590
        Return locator attributes for a given Lambda layer.
3591

3592
        :param layer_name_or_arn: Layer name or ARN
3593
        :param context: Request context
3594
        :return: Tuple of region, account ID, layer name, layer version
3595
        """
3596
        if api_utils.is_layer_arn(layer_name_or_arn):
1✔
3597
            return api_utils.parse_layer_arn(layer_name_or_arn)
1✔
3598

3599
        return context.region, context.account_id, layer_name_or_arn, None
1✔
3600

3601
    def publish_layer_version(
1✔
3602
        self,
3603
        context: RequestContext,
3604
        layer_name: LayerName,
3605
        content: LayerVersionContentInput,
3606
        description: Description = None,
3607
        compatible_runtimes: CompatibleRuntimes = None,
3608
        license_info: LicenseInfo = None,
3609
        compatible_architectures: CompatibleArchitectures = None,
3610
        **kwargs,
3611
    ) -> PublishLayerVersionResponse:
3612
        """
3613
        On first use of a LayerName a new layer is created and for each subsequent call with the same LayerName a new version is created.
3614
        Note that there are no $LATEST versions with layers!
3615

3616
        """
3617
        account = context.account_id
1✔
3618
        region = context.region
1✔
3619

3620
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
3621
            compatible_runtimes, compatible_architectures
3622
        )
3623
        if validation_errors:
1✔
3624
            raise ValidationException(
1✔
3625
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {'; '.join(validation_errors)}"
3626
            )
3627

3628
        state = lambda_stores[account][region]
1✔
3629
        with self.create_layer_lock:
1✔
3630
            if layer_name not in state.layers:
1✔
3631
                # we don't have a version so create new layer object
3632
                # lock is required to avoid creating two v1 objects for the same name
3633
                layer = Layer(
1✔
3634
                    arn=api_utils.layer_arn(layer_name=layer_name, account=account, region=region)
3635
                )
3636
                state.layers[layer_name] = layer
1✔
3637

3638
        layer = state.layers[layer_name]
1✔
3639
        with layer.next_version_lock:
1✔
3640
            next_version = LambdaLayerVersionIdentifier(
1✔
3641
                account_id=account, region=region, layer_name=layer_name
3642
            ).generate(next_version=layer.next_version)
3643
            # When creating a layer with user defined layer version, it is possible that we
3644
            # create layer versions out of order.
3645
            # ie. a user could replicate layer v2 then layer v1. It is important to always keep the maximum possible
3646
            # value for next layer to avoid overwriting existing versions
3647
            if layer.next_version <= next_version:
1✔
3648
                # We don't need to update layer.next_version if the created version is lower than the "next in line"
3649
                layer.next_version = max(next_version, layer.next_version) + 1
1✔
3650

3651
        # creating a new layer
3652
        if content.get("ZipFile"):
1✔
3653
            code = store_lambda_archive(
1✔
3654
                archive_file=content["ZipFile"],
3655
                function_name=layer_name,
3656
                region_name=region,
3657
                account_id=account,
3658
            )
3659
        else:
3660
            code = store_s3_bucket_archive(
1✔
3661
                archive_bucket=content["S3Bucket"],
3662
                archive_key=content["S3Key"],
3663
                archive_version=content.get("S3ObjectVersion"),
3664
                function_name=layer_name,
3665
                region_name=region,
3666
                account_id=account,
3667
            )
3668

3669
        new_layer_version = LayerVersion(
1✔
3670
            layer_version_arn=api_utils.layer_version_arn(
3671
                layer_name=layer_name,
3672
                account=account,
3673
                region=region,
3674
                version=str(next_version),
3675
            ),
3676
            layer_arn=layer.arn,
3677
            version=next_version,
3678
            description=description or "",
3679
            license_info=license_info,
3680
            compatible_runtimes=compatible_runtimes,
3681
            compatible_architectures=compatible_architectures,
3682
            created=api_utils.generate_lambda_date(),
3683
            code=code,
3684
        )
3685

3686
        layer.layer_versions[str(next_version)] = new_layer_version
1✔
3687

3688
        return api_utils.map_layer_out(new_layer_version)
1✔
3689

3690
    def get_layer_version(
1✔
3691
        self,
3692
        context: RequestContext,
3693
        layer_name: LayerName,
3694
        version_number: LayerVersionNumber,
3695
        **kwargs,
3696
    ) -> GetLayerVersionResponse:
3697
        # TODO: handle layer_name as an ARN
3698

3699
        region_name, account_id, layer_name, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
3700
        state = lambda_stores[account_id][region_name]
1✔
3701

3702
        layer = state.layers.get(layer_name)
1✔
3703
        if version_number < 1:
1✔
3704
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
3705
        if layer is None:
1✔
3706
            raise ResourceNotFoundException(
1✔
3707
                "The resource you requested does not exist.", Type="User"
3708
            )
3709
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3710
        if layer_version is None:
1✔
3711
            raise ResourceNotFoundException(
1✔
3712
                "The resource you requested does not exist.", Type="User"
3713
            )
3714
        return api_utils.map_layer_out(layer_version)
1✔
3715

3716
    def get_layer_version_by_arn(
1✔
3717
        self, context: RequestContext, arn: LayerVersionArn, **kwargs
3718
    ) -> GetLayerVersionResponse:
3719
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3720
            arn, context
3721
        )
3722

3723
        if not layer_version:
1✔
3724
            raise ValidationException(
1✔
3725
                f"1 validation error detected: Value '{arn}' at 'arn' failed to satisfy constraint: Member must satisfy regular expression pattern: "
3726
                + "(arn:(aws[a-zA-Z-]*)?:lambda:[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+)"
3727
            )
3728

3729
        store = lambda_stores[account_id][region_name]
1✔
3730
        if not (layers := store.layers.get(layer_name)):
1✔
UNCOV
3731
            raise ResourceNotFoundException(
×
3732
                "The resource you requested does not exist.", Type="User"
3733
            )
3734

3735
        layer_version = layers.layer_versions.get(layer_version)
1✔
3736

3737
        if not layer_version:
1✔
3738
            raise ResourceNotFoundException(
1✔
3739
                "The resource you requested does not exist.", Type="User"
3740
            )
3741

3742
        return api_utils.map_layer_out(layer_version)
1✔
3743

3744
    def list_layers(
1✔
3745
        self,
3746
        context: RequestContext,
3747
        compatible_runtime: Runtime = None,
3748
        marker: String = None,
3749
        max_items: MaxLayerListItems = None,
3750
        compatible_architecture: Architecture = None,
3751
        **kwargs,
3752
    ) -> ListLayersResponse:
3753
        validation_errors = []
1✔
3754

3755
        validation_error_arch = api_utils.validate_layer_architecture(compatible_architecture)
1✔
3756
        if validation_error_arch:
1✔
3757
            validation_errors.append(validation_error_arch)
1✔
3758

3759
        validation_error_runtime = api_utils.validate_layer_runtime(compatible_runtime)
1✔
3760
        if validation_error_runtime:
1✔
3761
            validation_errors.append(validation_error_runtime)
1✔
3762

3763
        if validation_errors:
1✔
3764
            raise ValidationException(
1✔
3765
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
3766
            )
3767
        # TODO: handle filter: compatible_runtime
3768
        # TODO: handle filter: compatible_architecture
3769

UNCOV
3770
        state = lambda_stores[context.account_id][context.region]
×
UNCOV
3771
        layers = state.layers
×
3772

3773
        # TODO: test how filters behave together with only returning layers here? Does it return the latest "matching" layer, i.e. does it ignore later layer versions that don't match?
3774

UNCOV
3775
        responses: list[LayersListItem] = []
×
UNCOV
3776
        for layer_name, layer in layers.items():
×
3777
            # fetch latest version
3778
            layer_versions = list(layer.layer_versions.values())
×
UNCOV
3779
            sorted(layer_versions, key=lambda x: x.version)
×
UNCOV
3780
            latest_layer_version = layer_versions[-1]
×
UNCOV
3781
            responses.append(
×
3782
                LayersListItem(
3783
                    LayerName=layer_name,
3784
                    LayerArn=layer.arn,
3785
                    LatestMatchingVersion=api_utils.map_layer_out(latest_layer_version),
3786
                )
3787
            )
3788

UNCOV
3789
        responses = PaginatedList(responses)
×
UNCOV
3790
        page, token = responses.get_page(
×
3791
            lambda version: version,
3792
            marker,
3793
            max_items,
3794
        )
3795

UNCOV
3796
        return ListLayersResponse(NextMarker=token, Layers=page)
×
3797

3798
    def list_layer_versions(
1✔
3799
        self,
3800
        context: RequestContext,
3801
        layer_name: LayerName,
3802
        compatible_runtime: Runtime = None,
3803
        marker: String = None,
3804
        max_items: MaxLayerListItems = None,
3805
        compatible_architecture: Architecture = None,
3806
        **kwargs,
3807
    ) -> ListLayerVersionsResponse:
3808
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
3809
            [compatible_runtime] if compatible_runtime else [],
3810
            [compatible_architecture] if compatible_architecture else [],
3811
        )
3812
        if validation_errors:
1✔
UNCOV
3813
            raise ValidationException(
×
3814
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
3815
            )
3816

3817
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3818
            layer_name, context
3819
        )
3820
        state = lambda_stores[account_id][region_name]
1✔
3821

3822
        # TODO: Test & handle filter: compatible_runtime
3823
        # TODO: Test & handle filter: compatible_architecture
3824
        all_layer_versions = []
1✔
3825
        layer = state.layers.get(layer_name)
1✔
3826
        if layer is not None:
1✔
3827
            for layer_version in layer.layer_versions.values():
1✔
3828
                all_layer_versions.append(api_utils.map_layer_out(layer_version))
1✔
3829

3830
        all_layer_versions.sort(key=lambda x: x["Version"], reverse=True)
1✔
3831
        all_layer_versions = PaginatedList(all_layer_versions)
1✔
3832
        page, token = all_layer_versions.get_page(
1✔
3833
            lambda version: version["LayerVersionArn"],
3834
            marker,
3835
            max_items,
3836
        )
3837
        return ListLayerVersionsResponse(NextMarker=token, LayerVersions=page)
1✔
3838

3839
    def delete_layer_version(
1✔
3840
        self,
3841
        context: RequestContext,
3842
        layer_name: LayerName,
3843
        version_number: LayerVersionNumber,
3844
        **kwargs,
3845
    ) -> None:
3846
        if version_number < 1:
1✔
3847
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
3848

3849
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3850
            layer_name, context
3851
        )
3852

3853
        store = lambda_stores[account_id][region_name]
1✔
3854
        layer = store.layers.get(layer_name, {})
1✔
3855
        if layer:
1✔
3856
            layer.layer_versions.pop(str(version_number), None)
1✔
3857

3858
    # =======================================
3859
    # =====  Layer Version Permissions  =====
3860
    # =======================================
3861
    # TODO: lock updates that change revision IDs
3862

3863
    def add_layer_version_permission(
1✔
3864
        self,
3865
        context: RequestContext,
3866
        layer_name: LayerName,
3867
        version_number: LayerVersionNumber,
3868
        statement_id: StatementId,
3869
        action: LayerPermissionAllowedAction,
3870
        principal: LayerPermissionAllowedPrincipal,
3871
        organization_id: OrganizationId = None,
3872
        revision_id: String = None,
3873
        **kwargs,
3874
    ) -> AddLayerVersionPermissionResponse:
3875
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
3876
        # `layer_n` contains the layer name.
3877
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
3878

3879
        if action != "lambda:GetLayerVersion":
1✔
3880
            raise ValidationException(
1✔
3881
                f"1 validation error detected: Value '{action}' at 'action' failed to satisfy constraint: Member must satisfy regular expression pattern: lambda:GetLayerVersion"
3882
            )
3883

3884
        store = lambda_stores[account_id][region_name]
1✔
3885
        layer = store.layers.get(layer_n)
1✔
3886

3887
        layer_version_arn = api_utils.layer_version_arn(
1✔
3888
            layer_name, account_id, region_name, str(version_number)
3889
        )
3890

3891
        if layer is None:
1✔
3892
            raise ResourceNotFoundException(
1✔
3893
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3894
            )
3895
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3896
        if layer_version is None:
1✔
3897
            raise ResourceNotFoundException(
1✔
3898
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3899
            )
3900
        # do we have a policy? if not set one
3901
        if layer_version.policy is None:
1✔
3902
            layer_version.policy = LayerPolicy()
1✔
3903

3904
        if statement_id in layer_version.policy.statements:
1✔
3905
            raise ResourceConflictException(
1✔
3906
                f"The statement id ({statement_id}) provided already exists. Please provide a new statement id, or remove the existing statement.",
3907
                Type="User",
3908
            )
3909

3910
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
3911
            raise PreconditionFailedException(
1✔
3912
                "The Revision Id provided does not match the latest Revision Id. "
3913
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
3914
                Type="User",
3915
            )
3916

3917
        statement = LayerPolicyStatement(
1✔
3918
            sid=statement_id, action=action, principal=principal, organization_id=organization_id
3919
        )
3920

3921
        old_statements = layer_version.policy.statements
1✔
3922
        layer_version.policy = dataclasses.replace(
1✔
3923
            layer_version.policy, statements={**old_statements, statement_id: statement}
3924
        )
3925

3926
        return AddLayerVersionPermissionResponse(
1✔
3927
            Statement=json.dumps(
3928
                {
3929
                    "Sid": statement.sid,
3930
                    "Effect": "Allow",
3931
                    "Principal": statement.principal,
3932
                    "Action": statement.action,
3933
                    "Resource": layer_version.layer_version_arn,
3934
                }
3935
            ),
3936
            RevisionId=layer_version.policy.revision_id,
3937
        )
3938

3939
    def remove_layer_version_permission(
1✔
3940
        self,
3941
        context: RequestContext,
3942
        layer_name: LayerName,
3943
        version_number: LayerVersionNumber,
3944
        statement_id: StatementId,
3945
        revision_id: String = None,
3946
        **kwargs,
3947
    ) -> None:
3948
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
3949
        # `layer_n` contains the layer name.
3950
        region_name, account_id, layer_n, layer_version = LambdaProvider._resolve_layer(
1✔
3951
            layer_name, context
3952
        )
3953

3954
        layer_version_arn = api_utils.layer_version_arn(
1✔
3955
            layer_name, account_id, region_name, str(version_number)
3956
        )
3957

3958
        state = lambda_stores[account_id][region_name]
1✔
3959
        layer = state.layers.get(layer_n)
1✔
3960
        if layer is None:
1✔
3961
            raise ResourceNotFoundException(
1✔
3962
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3963
            )
3964
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3965
        if layer_version is None:
1✔
3966
            raise ResourceNotFoundException(
1✔
3967
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3968
            )
3969

3970
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
3971
            raise PreconditionFailedException(
1✔
3972
                "The Revision Id provided does not match the latest Revision Id. "
3973
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
3974
                Type="User",
3975
            )
3976

3977
        if statement_id not in layer_version.policy.statements:
1✔
3978
            raise ResourceNotFoundException(
1✔
3979
                f"Statement {statement_id} is not found in resource policy.", Type="User"
3980
            )
3981

3982
        old_statements = layer_version.policy.statements
1✔
3983
        layer_version.policy = dataclasses.replace(
1✔
3984
            layer_version.policy,
3985
            statements={k: v for k, v in old_statements.items() if k != statement_id},
3986
        )
3987

3988
    def get_layer_version_policy(
1✔
3989
        self,
3990
        context: RequestContext,
3991
        layer_name: LayerName,
3992
        version_number: LayerVersionNumber,
3993
        **kwargs,
3994
    ) -> GetLayerVersionPolicyResponse:
3995
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
3996
        # `layer_n` contains the layer name.
3997
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
3998

3999
        layer_version_arn = api_utils.layer_version_arn(
1✔
4000
            layer_name, account_id, region_name, str(version_number)
4001
        )
4002

4003
        store = lambda_stores[account_id][region_name]
1✔
4004
        layer = store.layers.get(layer_n)
1✔
4005

4006
        if layer is None:
1✔
4007
            raise ResourceNotFoundException(
1✔
4008
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4009
            )
4010

4011
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4012
        if layer_version is None:
1✔
4013
            raise ResourceNotFoundException(
1✔
4014
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4015
            )
4016

4017
        if layer_version.policy is None:
1✔
4018
            raise ResourceNotFoundException(
1✔
4019
                "No policy is associated with the given resource.", Type="User"
4020
            )
4021

4022
        return GetLayerVersionPolicyResponse(
1✔
4023
            Policy=json.dumps(
4024
                {
4025
                    "Version": layer_version.policy.version,
4026
                    "Id": layer_version.policy.id,
4027
                    "Statement": [
4028
                        {
4029
                            "Sid": ps.sid,
4030
                            "Effect": "Allow",
4031
                            "Principal": ps.principal,
4032
                            "Action": ps.action,
4033
                            "Resource": layer_version.layer_version_arn,
4034
                        }
4035
                        for ps in layer_version.policy.statements.values()
4036
                    ],
4037
                }
4038
            ),
4039
            RevisionId=layer_version.policy.revision_id,
4040
        )
4041

4042
    # =======================================
4043
    # =======  Function Concurrency  ========
4044
    # =======================================
4045
    # (Reserved) function concurrency is scoped to the whole function
4046

4047
    def get_function_concurrency(
1✔
4048
        self, context: RequestContext, function_name: FunctionName, **kwargs
4049
    ) -> GetFunctionConcurrencyResponse:
4050
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4051
        function_name = api_utils.get_function_name(function_name, context)
1✔
4052
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
4053
        return GetFunctionConcurrencyResponse(
1✔
4054
            ReservedConcurrentExecutions=fn.reserved_concurrent_executions
4055
        )
4056

4057
    def put_function_concurrency(
1✔
4058
        self,
4059
        context: RequestContext,
4060
        function_name: FunctionName,
4061
        reserved_concurrent_executions: ReservedConcurrentExecutions,
4062
        **kwargs,
4063
    ) -> Concurrency:
4064
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4065

4066
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4067
        if qualifier:
1✔
4068
            raise InvalidParameterValueException(
1✔
4069
                "This operation is permitted on Lambda functions only. Aliases and versions do not support this operation. Please specify either a function name or an unqualified function ARN.",
4070
                Type="User",
4071
            )
4072

4073
        store = lambda_stores[account_id][region]
1✔
4074
        fn = store.functions.get(function_name)
1✔
4075
        if not fn:
1✔
4076
            fn_arn = api_utils.qualified_lambda_arn(
1✔
4077
                function_name,
4078
                qualifier="$LATEST",
4079
                account=account_id,
4080
                region=region,
4081
            )
4082
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
4083

4084
        settings = self.get_account_settings(context)
1✔
4085
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
4086
            "UnreservedConcurrentExecutions"
4087
        ]
4088

4089
        # The existing reserved concurrent executions for the same function are already deduced in
4090
        # unreserved_concurrent_executions but must not count because the new one will replace the existing one.
4091
        # Joel tested this behavior manually against AWS (2023-11-28).
4092
        existing_reserved_concurrent_executions = (
1✔
4093
            fn.reserved_concurrent_executions if fn.reserved_concurrent_executions else 0
4094
        )
4095
        if (
1✔
4096
            unreserved_concurrent_executions
4097
            - reserved_concurrent_executions
4098
            + existing_reserved_concurrent_executions
4099
        ) < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY:
4100
            raise InvalidParameterValueException(
1✔
4101
                f"Specified ReservedConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
4102
            )
4103

4104
        total_provisioned_concurrency = sum(
1✔
4105
            [
4106
                provisioned_configs.provisioned_concurrent_executions
4107
                for provisioned_configs in fn.provisioned_concurrency_configs.values()
4108
            ]
4109
        )
4110
        if total_provisioned_concurrency > reserved_concurrent_executions:
1✔
4111
            raise InvalidParameterValueException(
1✔
4112
                f" ReservedConcurrentExecutions  {reserved_concurrent_executions} should not be lower than function's total provisioned concurrency [{total_provisioned_concurrency}]."
4113
            )
4114

4115
        fn.reserved_concurrent_executions = reserved_concurrent_executions
1✔
4116

4117
        return Concurrency(ReservedConcurrentExecutions=fn.reserved_concurrent_executions)
1✔
4118

4119
    def delete_function_concurrency(
1✔
4120
        self, context: RequestContext, function_name: FunctionName, **kwargs
4121
    ) -> None:
4122
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4123
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4124
        store = lambda_stores[account_id][region]
1✔
4125
        fn = store.functions.get(function_name)
1✔
4126
        fn.reserved_concurrent_executions = None
1✔
4127

4128
    # =======================================
4129
    # ===============  TAGS   ===============
4130
    # =======================================
4131
    # only Function, Event Source Mapping, and Code Signing Config (not currently supported by LocalStack) ARNs an are available for tagging in AWS
4132

4133
    def _get_tags(self, resource: TaggableResource) -> dict[str, str]:
1✔
4134
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4135
        lambda_adapted_tags = {
1✔
4136
            tag["Key"]: tag["Value"]
4137
            for tag in state.TAGS.list_tags_for_resource(resource).get("Tags")
4138
        }
4139
        return lambda_adapted_tags
1✔
4140

4141
    def _store_tags(self, resource: TaggableResource, tags: dict[str, str]):
1✔
4142
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4143
        if len(state.TAGS.tags.get(resource, {}) | tags) > LAMBDA_TAG_LIMIT_PER_RESOURCE:
1✔
4144
            raise InvalidParameterValueException(
1✔
4145
                "Number of tags exceeds resource tag limit.", Type="User"
4146
            )
4147

4148
        tag_svc_adapted_tags = [{"Key": key, "Value": value} for key, value in tags.items()]
1✔
4149
        state.TAGS.tag_resource(resource, tag_svc_adapted_tags)
1✔
4150

4151
    def fetch_lambda_store_for_tagging(self, resource: TaggableResource) -> LambdaStore:
1✔
4152
        """
4153
        Takes a resource ARN for a TaggableResource (Lambda Function, Event Source Mapping, or Code Signing Config) and returns a corresponding
4154
        LambdaStore for its region and account.
4155

4156
        In addition, this function validates that the ARN is a valid TaggableResource type, and that the TaggableResource exists.
4157

4158
        Raises:
4159
            ValidationException: If the resource ARN is not a full ARN for a TaggableResource.
4160
            ResourceNotFoundException: If the specified resource does not exist.
4161
            InvalidParameterValueException: If the resource ARN is a qualified Lambda Function.
4162
        """
4163

4164
        def _raise_validation_exception():
1✔
4165
            raise ValidationException(
1✔
4166
                f"1 validation error detected: Value '{resource}' at 'resource' failed to satisfy constraint: Member must satisfy regular expression pattern: {api_utils.TAGGABLE_RESOURCE_ARN_PATTERN}"
4167
            )
4168

4169
        # Check whether the ARN we have been passed is correctly formatted
4170
        parsed_resource_arn: ArnData = None
1✔
4171
        try:
1✔
4172
            parsed_resource_arn = parse_arn(resource)
1✔
4173
        except Exception:
1✔
4174
            _raise_validation_exception()
1✔
4175

4176
        # TODO: Should we be checking whether this is a full ARN?
4177
        region, account_id, resource_type = map(
1✔
4178
            parsed_resource_arn.get, ("region", "account", "resource")
4179
        )
4180

4181
        if not all((region, account_id, resource_type)):
1✔
UNCOV
4182
            _raise_validation_exception()
×
4183

4184
        if not (parts := resource_type.split(":")):
1✔
UNCOV
4185
            _raise_validation_exception()
×
4186

4187
        resource_type, resource_identifier, *qualifier = parts
1✔
4188
        if resource_type not in {"event-source-mapping", "code-signing-config", "function"}:
1✔
4189
            _raise_validation_exception()
1✔
4190

4191
        if qualifier:
1✔
4192
            if resource_type == "function":
1✔
4193
                raise InvalidParameterValueException(
1✔
4194
                    "Tags on function aliases and versions are not supported. Please specify a function ARN.",
4195
                    Type="User",
4196
                )
4197
            _raise_validation_exception()
1✔
4198

4199
        match resource_type:
1✔
4200
            case "event-source-mapping":
1✔
4201
                self._get_esm(resource_identifier, account_id, region)
1✔
4202
            case "code-signing-config":
1✔
4203
                raise NotImplementedError("Resource tagging on CSC not yet implemented.")
4204
            case "function":
1✔
4205
                self._get_function(
1✔
4206
                    function_name=resource_identifier, account_id=account_id, region=region
4207
                )
4208

4209
        # If no exceptions are raised, assume ARN and referenced resource is valid for tag operations
4210
        return lambda_stores[account_id][region]
1✔
4211

4212
    def tag_resource(
1✔
4213
        self, context: RequestContext, resource: TaggableResource, tags: Tags, **kwargs
4214
    ) -> None:
4215
        if not tags:
1✔
4216
            raise InvalidParameterValueException(
1✔
4217
                "An error occurred and the request cannot be processed.", Type="User"
4218
            )
4219
        self._store_tags(resource, tags)
1✔
4220

4221
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4222
            "function"
4223
        ):
4224
            name, _, account, region = function_locators_from_arn(resource)
1✔
4225
            function = self._get_function(name, account, region)
1✔
4226
            with function.lock:
1✔
4227
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4228
                latest_version = function.versions["$LATEST"]
1✔
4229
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4230
                    latest_version, config=dataclasses.replace(latest_version.config)
4231
                )
4232

4233
    def list_tags(
1✔
4234
        self, context: RequestContext, resource: TaggableResource, **kwargs
4235
    ) -> ListTagsResponse:
4236
        tags = self._get_tags(resource)
1✔
4237
        return ListTagsResponse(Tags=tags)
1✔
4238

4239
    def untag_resource(
1✔
4240
        self, context: RequestContext, resource: TaggableResource, tag_keys: TagKeyList, **kwargs
4241
    ) -> None:
4242
        if not tag_keys:
1✔
4243
            raise ValidationException(
1✔
4244
                "1 validation error detected: Value null at 'tagKeys' failed to satisfy constraint: Member must not be null"
4245
            )  # should probably be generalized a bit
4246

4247
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4248
        state.TAGS.untag_resource(resource, tag_keys)
1✔
4249

4250
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4251
            "function"
4252
        ):
4253
            name, _, account, region = function_locators_from_arn(resource)
1✔
4254
            function = self._get_function(name, account, region)
1✔
4255
            # TODO: Potential race condition
4256
            with function.lock:
1✔
4257
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4258
                latest_version = function.versions["$LATEST"]
1✔
4259
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4260
                    latest_version, config=dataclasses.replace(latest_version.config)
4261
                )
4262

4263
    # =======================================
4264
    # =======  LEGACY / DEPRECATED   ========
4265
    # =======================================
4266

4267
    def invoke_async(
1✔
4268
        self,
4269
        context: RequestContext,
4270
        function_name: NamespacedFunctionName,
4271
        invoke_args: IO[BlobStream],
4272
        **kwargs,
4273
    ) -> InvokeAsyncResponse:
4274
        """LEGACY API endpoint. Even AWS heavily discourages its usage."""
4275
        raise NotImplementedError
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc