• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19821277742

01 Dec 2025 08:16AM UTC coverage: 86.821% (-0.04%) from 86.863%
19821277742

push

github

web-flow
Add Lambda Managed Instances (#13440)

Co-authored-by: Joel Scheuner <joel.scheuner.dev@gmail.com>
Co-authored-by: Anisa Oshafi <anisaoshafi@gmail.com>
Co-authored-by: Cristopher Pinzón <cristopher.pinzon@gmail.com>
Co-authored-by: Alexander Rashed <alexander.rashed@localstack.cloud>
Co-authored-by: Dominik Schubert <dominik.schubert91@gmail.com>
Co-authored-by: Mathieu Cloutier <79954947+cloutierMat@users.noreply.github.com>
Co-authored-by: Simon Walker <simon.walker@localstack.cloud>

127 of 181 new or added lines in 11 files covered. (70.17%)

17 existing lines in 5 files now uncovered.

69556 of 80114 relevant lines covered (86.82%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.0
/localstack-core/localstack/services/lambda_/provider.py
1
import base64
1✔
2
import dataclasses
1✔
3
import datetime
1✔
4
import itertools
1✔
5
import json
1✔
6
import logging
1✔
7
import re
1✔
8
import threading
1✔
9
import time
1✔
10
from typing import IO, Any
1✔
11

12
from botocore.exceptions import ClientError
1✔
13

14
from localstack import config
1✔
15
from localstack.aws.api import RequestContext, ServiceException, handler
1✔
16
from localstack.aws.api.lambda_ import (
1✔
17
    AccountLimit,
18
    AccountUsage,
19
    AddLayerVersionPermissionResponse,
20
    AddPermissionRequest,
21
    AddPermissionResponse,
22
    Alias,
23
    AliasConfiguration,
24
    AliasRoutingConfiguration,
25
    AllowedPublishers,
26
    Architecture,
27
    Arn,
28
    Blob,
29
    BlobStream,
30
    CapacityProviderConfig,
31
    CodeSigningConfigArn,
32
    CodeSigningConfigNotFoundException,
33
    CodeSigningPolicies,
34
    CompatibleArchitectures,
35
    CompatibleRuntimes,
36
    Concurrency,
37
    Cors,
38
    CreateCodeSigningConfigResponse,
39
    CreateEventSourceMappingRequest,
40
    CreateFunctionRequest,
41
    CreateFunctionUrlConfigResponse,
42
    DeleteCodeSigningConfigResponse,
43
    DeleteFunctionResponse,
44
    Description,
45
    DestinationConfig,
46
    EventSourceMappingConfiguration,
47
    FunctionCodeLocation,
48
    FunctionConfiguration,
49
    FunctionEventInvokeConfig,
50
    FunctionName,
51
    FunctionUrlAuthType,
52
    FunctionUrlQualifier,
53
    FunctionVersionLatestPublished,
54
    GetAccountSettingsResponse,
55
    GetCodeSigningConfigResponse,
56
    GetFunctionCodeSigningConfigResponse,
57
    GetFunctionConcurrencyResponse,
58
    GetFunctionRecursionConfigResponse,
59
    GetFunctionResponse,
60
    GetFunctionUrlConfigResponse,
61
    GetLayerVersionPolicyResponse,
62
    GetLayerVersionResponse,
63
    GetPolicyResponse,
64
    GetProvisionedConcurrencyConfigResponse,
65
    InvalidParameterValueException,
66
    InvocationResponse,
67
    InvocationType,
68
    InvokeAsyncResponse,
69
    InvokeMode,
70
    LambdaApi,
71
    LambdaManagedInstancesCapacityProviderConfig,
72
    LastUpdateStatus,
73
    LayerName,
74
    LayerPermissionAllowedAction,
75
    LayerPermissionAllowedPrincipal,
76
    LayersListItem,
77
    LayerVersionArn,
78
    LayerVersionContentInput,
79
    LayerVersionNumber,
80
    LicenseInfo,
81
    ListAliasesResponse,
82
    ListCodeSigningConfigsResponse,
83
    ListEventSourceMappingsResponse,
84
    ListFunctionEventInvokeConfigsResponse,
85
    ListFunctionsByCodeSigningConfigResponse,
86
    ListFunctionsResponse,
87
    ListFunctionUrlConfigsResponse,
88
    ListLayersResponse,
89
    ListLayerVersionsResponse,
90
    ListProvisionedConcurrencyConfigsResponse,
91
    ListTagsResponse,
92
    ListVersionsByFunctionResponse,
93
    LogFormat,
94
    LoggingConfig,
95
    LogType,
96
    MasterRegion,
97
    MaxFunctionEventInvokeConfigListItems,
98
    MaximumEventAgeInSeconds,
99
    MaximumRetryAttempts,
100
    MaxItems,
101
    MaxLayerListItems,
102
    MaxListItems,
103
    MaxProvisionedConcurrencyConfigListItems,
104
    NamespacedFunctionName,
105
    NamespacedStatementId,
106
    NumericLatestPublishedOrAliasQualifier,
107
    OnFailure,
108
    OnSuccess,
109
    OrganizationId,
110
    PackageType,
111
    PositiveInteger,
112
    PreconditionFailedException,
113
    ProvisionedConcurrencyConfigListItem,
114
    ProvisionedConcurrencyConfigNotFoundException,
115
    ProvisionedConcurrencyStatusEnum,
116
    PublishLayerVersionResponse,
117
    PutFunctionCodeSigningConfigResponse,
118
    PutFunctionRecursionConfigResponse,
119
    PutProvisionedConcurrencyConfigResponse,
120
    Qualifier,
121
    RecursiveLoop,
122
    ReservedConcurrentExecutions,
123
    ResourceConflictException,
124
    ResourceNotFoundException,
125
    Runtime,
126
    RuntimeVersionConfig,
127
    SnapStart,
128
    SnapStartApplyOn,
129
    SnapStartOptimizationStatus,
130
    SnapStartResponse,
131
    State,
132
    StatementId,
133
    StateReasonCode,
134
    String,
135
    TaggableResource,
136
    TagKeyList,
137
    Tags,
138
    TracingMode,
139
    UnqualifiedFunctionName,
140
    UpdateCodeSigningConfigResponse,
141
    UpdateEventSourceMappingRequest,
142
    UpdateFunctionCodeRequest,
143
    UpdateFunctionConfigurationRequest,
144
    UpdateFunctionUrlConfigResponse,
145
    VersionWithLatestPublished,
146
)
147
from localstack.aws.api.lambda_ import FunctionVersion as FunctionVersionApi
1✔
148
from localstack.aws.api.lambda_ import ServiceException as LambdaServiceException
1✔
149
from localstack.aws.api.pipes import (
1✔
150
    DynamoDBStreamStartPosition,
151
    KinesisStreamStartPosition,
152
)
153
from localstack.aws.connect import connect_to
1✔
154
from localstack.aws.spec import load_service
1✔
155
from localstack.services.edge import ROUTER
1✔
156
from localstack.services.lambda_ import api_utils
1✔
157
from localstack.services.lambda_ import hooks as lambda_hooks
1✔
158
from localstack.services.lambda_.analytics import (
1✔
159
    FunctionOperation,
160
    FunctionStatus,
161
    function_counter,
162
)
163
from localstack.services.lambda_.api_utils import (
1✔
164
    ARCHITECTURES,
165
    STATEMENT_ID_REGEX,
166
    SUBNET_ID_REGEX,
167
    function_locators_from_arn,
168
)
169
from localstack.services.lambda_.event_source_mapping.esm_config_factory import (
1✔
170
    EsmConfigFactory,
171
)
172
from localstack.services.lambda_.event_source_mapping.esm_worker import (
1✔
173
    EsmState,
174
    EsmWorker,
175
)
176
from localstack.services.lambda_.event_source_mapping.esm_worker_factory import (
1✔
177
    EsmWorkerFactory,
178
)
179
from localstack.services.lambda_.event_source_mapping.pipe_utils import get_internal_client
1✔
180
from localstack.services.lambda_.invocation import AccessDeniedException
1✔
181
from localstack.services.lambda_.invocation.execution_environment import (
1✔
182
    EnvironmentStartupTimeoutException,
183
)
184
from localstack.services.lambda_.invocation.lambda_models import (
1✔
185
    AliasRoutingConfig,
186
    CodeSigningConfig,
187
    EventInvokeConfig,
188
    Function,
189
    FunctionResourcePolicy,
190
    FunctionUrlConfig,
191
    FunctionVersion,
192
    ImageConfig,
193
    LambdaEphemeralStorage,
194
    Layer,
195
    LayerPolicy,
196
    LayerPolicyStatement,
197
    LayerVersion,
198
    ProvisionedConcurrencyConfiguration,
199
    RequestEntityTooLargeException,
200
    ResourcePolicy,
201
    UpdateStatus,
202
    ValidationException,
203
    VersionAlias,
204
    VersionFunctionConfiguration,
205
    VersionIdentifier,
206
    VersionState,
207
    VpcConfig,
208
)
209
from localstack.services.lambda_.invocation.lambda_service import (
1✔
210
    LambdaService,
211
    create_image_code,
212
    destroy_code_if_not_used,
213
    lambda_stores,
214
    store_lambda_archive,
215
    store_s3_bucket_archive,
216
)
217
from localstack.services.lambda_.invocation.models import CapacityProvider as CapacityProviderModel
1✔
218
from localstack.services.lambda_.invocation.models import LambdaStore
1✔
219
from localstack.services.lambda_.invocation.runtime_executor import get_runtime_executor
1✔
220
from localstack.services.lambda_.lambda_utils import HINT_LOG
1✔
221
from localstack.services.lambda_.layerfetcher.layer_fetcher import LayerFetcher
1✔
222
from localstack.services.lambda_.provider_utils import (
1✔
223
    LambdaLayerVersionIdentifier,
224
    get_function_version,
225
    get_function_version_from_arn,
226
)
227
from localstack.services.lambda_.runtimes import (
1✔
228
    ALL_RUNTIMES,
229
    DEPRECATED_RUNTIMES,
230
    DEPRECATED_RUNTIMES_UPGRADES,
231
    RUNTIMES_AGGREGATED,
232
    SNAP_START_SUPPORTED_RUNTIMES,
233
    VALID_RUNTIMES,
234
)
235
from localstack.services.lambda_.urlrouter import FunctionUrlRouter
1✔
236
from localstack.services.plugins import ServiceLifecycleHook
1✔
237
from localstack.state import StateVisitor
1✔
238
from localstack.utils.aws.arns import (
1✔
239
    ArnData,
240
    capacity_provider_arn,
241
    extract_resource_from_arn,
242
    extract_service_from_arn,
243
    get_partition,
244
    lambda_event_source_mapping_arn,
245
    parse_arn,
246
)
247
from localstack.utils.aws.client_types import ServicePrincipal
1✔
248
from localstack.utils.bootstrap import is_api_enabled
1✔
249
from localstack.utils.collections import PaginatedList, merge_recursive
1✔
250
from localstack.utils.event_matcher import validate_event_pattern
1✔
251
from localstack.utils.strings import get_random_hex, short_uid, to_bytes, to_str
1✔
252
from localstack.utils.sync import poll_condition
1✔
253
from localstack.utils.urls import localstack_host
1✔
254

255
LOG = logging.getLogger(__name__)
1✔
256

257
LAMBDA_DEFAULT_TIMEOUT = 3
1✔
258
LAMBDA_DEFAULT_MEMORY_SIZE = 128
1✔
259

260
LAMBDA_TAG_LIMIT_PER_RESOURCE = 50
1✔
261
LAMBDA_LAYERS_LIMIT_PER_FUNCTION = 5
1✔
262

263
TAG_KEY_CUSTOM_URL = "_custom_id_"
1✔
264
# Requirements (from RFC3986 & co): not longer than 63, first char must be
265
# alpha, then alphanumeric or hyphen, except cannot start or end with hyphen
266
TAG_KEY_CUSTOM_URL_VALIDATOR = re.compile(r"^[A-Za-z]([A-Za-z0-9\-]{0,61}[A-Za-z0-9])?$")
1✔
267

268

269
class LambdaProvider(LambdaApi, ServiceLifecycleHook):
1✔
270
    lambda_service: LambdaService
1✔
271
    create_fn_lock: threading.RLock
1✔
272
    create_layer_lock: threading.RLock
1✔
273
    router: FunctionUrlRouter
1✔
274
    esm_workers: dict[str, EsmWorker]
1✔
275
    layer_fetcher: LayerFetcher | None
1✔
276

277
    def __init__(self) -> None:
1✔
278
        self.lambda_service = LambdaService()
1✔
279
        self.create_fn_lock = threading.RLock()
1✔
280
        self.create_layer_lock = threading.RLock()
1✔
281
        self.router = FunctionUrlRouter(ROUTER, self.lambda_service)
1✔
282
        self.esm_workers = {}
1✔
283
        self.layer_fetcher = None
1✔
284
        lambda_hooks.inject_layer_fetcher.run(self)
1✔
285

286
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
287
        visitor.visit(lambda_stores)
×
288

289
    def on_before_state_reset(self):
1✔
290
        self.lambda_service.stop()
×
291

292
    def on_after_state_reset(self):
1✔
293
        self.router.lambda_service = self.lambda_service = LambdaService()
×
294

295
    def on_before_state_load(self):
1✔
296
        self.lambda_service.stop()
×
297

298
    def on_after_state_load(self):
1✔
299
        self.lambda_service = LambdaService()
×
300
        self.router.lambda_service = self.lambda_service
×
301

302
        for account_id, account_bundle in lambda_stores.items():
×
303
            for region_name, state in account_bundle.items():
×
304
                for fn in state.functions.values():
×
305
                    for fn_version in fn.versions.values():
×
306
                        # restore the "Pending" state for every function version and start it
307
                        try:
×
308
                            new_state = VersionState(
×
309
                                state=State.Pending,
310
                                code=StateReasonCode.Creating,
311
                                reason="The function is being created.",
312
                            )
313
                            new_config = dataclasses.replace(fn_version.config, state=new_state)
×
314
                            new_version = dataclasses.replace(fn_version, config=new_config)
×
315
                            fn.versions[fn_version.id.qualifier] = new_version
×
316
                            # TODO: consider skipping this for $LATEST versions of functions with a capacity provider
UNCOV
317
                            self.lambda_service.create_function_version(fn_version).result(
×
318
                                timeout=5
319
                            )
320
                        except Exception:
×
321
                            LOG.warning(
×
322
                                "Failed to restore function version %s",
323
                                fn_version.id.qualified_arn(),
324
                                exc_info=LOG.isEnabledFor(logging.DEBUG),
325
                            )
326
                    # restore provisioned concurrency per function considering both versions and aliases
327
                    for (
×
328
                        provisioned_qualifier,
329
                        provisioned_config,
330
                    ) in fn.provisioned_concurrency_configs.items():
331
                        fn_arn = None
×
332
                        try:
×
333
                            if api_utils.qualifier_is_alias(provisioned_qualifier):
×
334
                                alias = fn.aliases.get(provisioned_qualifier)
×
335
                                resolved_version = fn.versions.get(alias.function_version)
×
336
                                fn_arn = resolved_version.id.qualified_arn()
×
337
                            elif api_utils.qualifier_is_version(provisioned_qualifier):
×
338
                                fn_version = fn.versions.get(provisioned_qualifier)
×
339
                                fn_arn = fn_version.id.qualified_arn()
×
340
                            else:
341
                                raise InvalidParameterValueException(
×
342
                                    "Invalid qualifier type:"
343
                                    " Qualifier can only be an alias or a version for provisioned concurrency."
344
                                )
345

346
                            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
347
                            manager.update_provisioned_concurrency_config(
×
348
                                provisioned_config.provisioned_concurrent_executions
349
                            )
350
                        except Exception:
×
351
                            LOG.warning(
×
352
                                "Failed to restore provisioned concurrency %s for function %s",
353
                                provisioned_config,
354
                                fn_arn,
355
                                exc_info=LOG.isEnabledFor(logging.DEBUG),
356
                            )
357

358
                for esm in state.event_source_mappings.values():
×
359
                    # Restores event source workers
360
                    function_arn = esm.get("FunctionArn")
×
361

362
                    # TODO: How do we know the event source is up?
363
                    # A basic poll to see if the mapped Lambda function is active/failed
364
                    if not poll_condition(
×
365
                        lambda: get_function_version_from_arn(function_arn).config.state.state
366
                        in [State.Active, State.Failed],
367
                        timeout=10,
368
                    ):
369
                        LOG.warning(
×
370
                            "Creating ESM for Lambda that is not in running state: %s",
371
                            function_arn,
372
                        )
373

374
                    function_version = get_function_version_from_arn(function_arn)
×
375
                    function_role = function_version.config.role
×
376

377
                    is_esm_enabled = esm.get("State", EsmState.DISABLED) not in (
×
378
                        EsmState.DISABLED,
379
                        EsmState.DISABLING,
380
                    )
381
                    esm_worker = EsmWorkerFactory(
×
382
                        esm, function_role, is_esm_enabled
383
                    ).get_esm_worker()
384

385
                    # Note: a worker is created in the DISABLED state if not enabled
386
                    esm_worker.create()
×
387
                    # TODO: assigning the esm_worker to the dict only works after .create(). Could it cause a race
388
                    #  condition if we get a shutdown here and have a worker thread spawned but not accounted for?
389
                    self.esm_workers[esm_worker.uuid] = esm_worker
×
390

391
    def on_after_init(self):
1✔
392
        self.router.register_routes()
1✔
393
        get_runtime_executor().validate_environment()
1✔
394

395
    def on_before_stop(self) -> None:
1✔
396
        for esm_worker in self.esm_workers.values():
1✔
397
            esm_worker.stop_for_shutdown()
1✔
398

399
        # TODO: should probably unregister routes?
400
        self.lambda_service.stop()
1✔
401

402
    @staticmethod
1✔
403
    def _get_function(function_name: str, account_id: str, region: str) -> Function:
1✔
404
        state = lambda_stores[account_id][region]
1✔
405
        function = state.functions.get(function_name)
1✔
406
        if not function:
1✔
407
            arn = api_utils.unqualified_lambda_arn(
1✔
408
                function_name=function_name,
409
                account=account_id,
410
                region=region,
411
            )
412
            raise ResourceNotFoundException(
1✔
413
                f"Function not found: {arn}",
414
                Type="User",
415
            )
416
        return function
1✔
417

418
    @staticmethod
1✔
419
    def _get_esm(uuid: str, account_id: str, region: str) -> EventSourceMappingConfiguration:
1✔
420
        state = lambda_stores[account_id][region]
1✔
421
        esm = state.event_source_mappings.get(uuid)
1✔
422
        if not esm:
1✔
423
            arn = lambda_event_source_mapping_arn(uuid, account_id, region)
1✔
424
            raise ResourceNotFoundException(
1✔
425
                f"Event source mapping not found: {arn}",
426
                Type="User",
427
            )
428
        return esm
1✔
429

430
    @staticmethod
1✔
431
    def _get_capacity_provider(
1✔
432
        capacity_provider_name: str,
433
        account_id: str,
434
        region: str,
435
        error_msg_template: str = "Capacity provider not found: {}",
436
    ) -> CapacityProviderModel:
437
        state = lambda_stores[account_id][region]
1✔
438
        cp = state.capacity_providers.get(capacity_provider_name)
1✔
439
        if not cp:
1✔
440
            arn = capacity_provider_arn(capacity_provider_name, account_id, region)
1✔
441
            raise ResourceNotFoundException(
1✔
442
                error_msg_template.format(arn),
443
                Type="User",
444
            )
NEW
445
        return cp
×
446

447
    @staticmethod
1✔
448
    def _validate_qualifier_expression(qualifier: str) -> None:
1✔
449
        if error_messages := api_utils.validate_qualifier(qualifier):
1✔
450
            raise ValidationException(
×
451
                message=api_utils.construct_validation_exception_message(error_messages)
452
            )
453

454
    @staticmethod
1✔
455
    def _resolve_fn_qualifier(resolved_fn: Function, qualifier: str | None) -> tuple[str, str]:
1✔
456
        """Attempts to resolve a given qualifier and returns a qualifier that exists or
457
        raises an appropriate ResourceNotFoundException.
458

459
        :param resolved_fn: The resolved lambda function
460
        :param qualifier: The qualifier to be resolved or None
461
        :return: Tuple of (resolved qualifier, function arn either qualified or unqualified)"""
462
        function_name = resolved_fn.function_name
1✔
463
        # assuming function versions need to live in the same account and region
464
        account_id = resolved_fn.latest().id.account
1✔
465
        region = resolved_fn.latest().id.region
1✔
466
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
467
        if qualifier is not None:
1✔
468
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
469
            if api_utils.qualifier_is_alias(qualifier):
1✔
470
                if qualifier not in resolved_fn.aliases:
1✔
471
                    raise ResourceNotFoundException(f"Cannot find alias arn: {fn_arn}", Type="User")
1✔
472
            elif api_utils.qualifier_is_version(qualifier) or qualifier == "$LATEST":
1✔
473
                if qualifier not in resolved_fn.versions:
1✔
474
                    raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
475
            else:
476
                # matches qualifier pattern but invalid alias or version
477
                raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
478
        resolved_qualifier = qualifier or "$LATEST"
1✔
479
        return resolved_qualifier, fn_arn
1✔
480

481
    @staticmethod
1✔
482
    def _function_revision_id(resolved_fn: Function, resolved_qualifier: str) -> str:
1✔
483
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
484
            return resolved_fn.aliases[resolved_qualifier].revision_id
1✔
485
        # Assumes that a non-alias is a version
486
        else:
487
            return resolved_fn.versions[resolved_qualifier].config.revision_id
1✔
488

489
    def _resolve_vpc_id(self, account_id: str, region_name: str, subnet_id: str) -> str:
1✔
490
        ec2_client = connect_to(aws_access_key_id=account_id, region_name=region_name).ec2
1✔
491
        try:
1✔
492
            return ec2_client.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["VpcId"]
1✔
493
        except ec2_client.exceptions.ClientError as e:
1✔
494
            code = e.response["Error"]["Code"]
1✔
495
            message = e.response["Error"]["Message"]
1✔
496
            raise InvalidParameterValueException(
1✔
497
                f"Error occurred while DescribeSubnets. EC2 Error Code: {code}. EC2 Error Message: {message}",
498
                Type="User",
499
            )
500

501
    def _build_vpc_config(
1✔
502
        self,
503
        account_id: str,
504
        region_name: str,
505
        vpc_config: dict | None = None,
506
    ) -> VpcConfig | None:
507
        if not vpc_config or not is_api_enabled("ec2"):
1✔
508
            return None
1✔
509

510
        subnet_ids = vpc_config.get("SubnetIds", [])
1✔
511
        if subnet_ids is not None and len(subnet_ids) == 0:
1✔
512
            return VpcConfig(vpc_id="", security_group_ids=[], subnet_ids=[])
1✔
513

514
        subnet_id = subnet_ids[0]
1✔
515
        if not bool(SUBNET_ID_REGEX.match(subnet_id)):
1✔
516
            raise ValidationException(
1✔
517
                f"1 validation error detected: Value '[{subnet_id}]' at 'vpcConfig.subnetIds' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 0, Member must satisfy regular expression pattern: subnet-[0-9a-z]*]"
518
            )
519

520
        return VpcConfig(
1✔
521
            vpc_id=self._resolve_vpc_id(account_id, region_name, subnet_id),
522
            security_group_ids=vpc_config.get("SecurityGroupIds", []),
523
            subnet_ids=subnet_ids,
524
        )
525

526
    def _create_version_model(
1✔
527
        self,
528
        function_name: str,
529
        region: str,
530
        account_id: str,
531
        description: str | None = None,
532
        revision_id: str | None = None,
533
        code_sha256: str | None = None,
534
        publish_to: FunctionVersionLatestPublished | None = None,
535
        is_active: bool = False,
536
    ) -> tuple[FunctionVersion, bool]:
537
        """
538
        Release a new version to the model if all restrictions are met.
539
        Restrictions:
540
          - CodeSha256, if provided, must equal the current latest version code hash
541
          - RevisionId, if provided, must equal the current latest version revision id
542
          - Some changes have been done to the latest version since last publish
543
        Will return a tuple of the version, and whether the version was published (True) or the latest available version was taken (False).
544
        This can happen if the latest version has not been changed since the last version publish, in this case the last version will be returned.
545

546
        :param function_name: Function name to be published
547
        :param region: Region of the function
548
        :param account_id: Account of the function
549
        :param description: new description of the version (will be the description of the function if missing)
550
        :param revision_id: Revision id, function will raise error if it does not match latest revision id
551
        :param code_sha256: Code sha256, function will raise error if it does not match latest code hash
552
        :return: Tuple of (published version, whether version was released or last released version returned, since nothing changed)
553
        """
554
        current_latest_version = get_function_version(
1✔
555
            function_name=function_name, qualifier="$LATEST", account_id=account_id, region=region
556
        )
557
        if revision_id and current_latest_version.config.revision_id != revision_id:
1✔
558
            raise PreconditionFailedException(
1✔
559
                "The Revision Id provided does not match the latest Revision Id. Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
560
                Type="User",
561
            )
562

563
        # check if code hashes match if they are specified
564
        current_hash = (
1✔
565
            current_latest_version.config.code.code_sha256
566
            if current_latest_version.config.package_type == PackageType.Zip
567
            else current_latest_version.config.image.code_sha256
568
        )
569
        # if the code is a zip package and hot reloaded (hot reloading is currently only supported for zip packagetypes)
570
        # we cannot enforce the codesha256 check
571
        is_hot_reloaded_zip_package = (
1✔
572
            current_latest_version.config.package_type == PackageType.Zip
573
            and current_latest_version.config.code.is_hot_reloading()
574
        )
575
        if code_sha256 and current_hash != code_sha256 and not is_hot_reloaded_zip_package:
1✔
576
            raise InvalidParameterValueException(
1✔
577
                f"CodeSHA256 ({code_sha256}) is different from current CodeSHA256 in $LATEST ({current_hash}). Please try again with the CodeSHA256 in $LATEST.",
578
                Type="User",
579
            )
580

581
        state = lambda_stores[account_id][region]
1✔
582
        function = state.functions.get(function_name)
1✔
583
        changes = {}
1✔
584
        if description is not None:
1✔
585
            changes["description"] = description
1✔
586
        # TODO copy environment instead of restarting one, get rid of all the "Pending"s
587

588
        with function.lock:
1✔
589
            if function.next_version > 1 and (
1✔
590
                prev_version := function.versions.get(str(function.next_version - 1))
591
            ):
592
                if (
1✔
593
                    prev_version.config.internal_revision
594
                    == current_latest_version.config.internal_revision
595
                ):
596
                    return prev_version, False
1✔
597
            # TODO check if there was a change since last version
598
            if publish_to == FunctionVersionLatestPublished.LATEST_PUBLISHED:
1✔
NEW
599
                qualifier = "$LATEST.PUBLISHED"
×
600
            else:
601
                qualifier = str(function.next_version)
1✔
602
                function.next_version += 1
1✔
603
            new_id = VersionIdentifier(
1✔
604
                function_name=function_name,
605
                qualifier=qualifier,
606
                region=region,
607
                account=account_id,
608
            )
609

610
            if current_latest_version.config.CapacityProviderConfig:
1✔
611
                # for lambda managed functions, snap start is not supported
NEW
612
                snap_start = None
×
613
            else:
614
                apply_on = current_latest_version.config.snap_start["ApplyOn"]
1✔
615
                optimization_status = SnapStartOptimizationStatus.Off
1✔
616
                if apply_on == SnapStartApplyOn.PublishedVersions:
1✔
NEW
617
                    optimization_status = SnapStartOptimizationStatus.On
×
618
                snap_start = SnapStartResponse(
1✔
619
                    ApplyOn=apply_on,
620
                    OptimizationStatus=optimization_status,
621
                )
622

623
            last_update = None
1✔
624
            new_state = VersionState(
1✔
625
                state=State.Pending,
626
                code=StateReasonCode.Creating,
627
                reason="The function is being created.",
628
            )
629
            if publish_to == FunctionVersionLatestPublished.LATEST_PUBLISHED:
1✔
NEW
630
                last_update = UpdateStatus(
×
631
                    status=LastUpdateStatus.InProgress,
632
                    code="Updating",
633
                    reason="The function is being updated.",
634
                )
NEW
635
                if is_active:
×
NEW
636
                    new_state = VersionState(state=State.Active)
×
637
            new_version = dataclasses.replace(
1✔
638
                current_latest_version,
639
                config=dataclasses.replace(
640
                    current_latest_version.config,
641
                    last_update=last_update,
642
                    state=new_state,
643
                    snap_start=snap_start,
644
                    **changes,
645
                ),
646
                id=new_id,
647
            )
648
            function.versions[qualifier] = new_version
1✔
649
        return new_version, True
1✔
650

651
    def _publish_version_from_existing_version(
1✔
652
        self,
653
        function_name: str,
654
        region: str,
655
        account_id: str,
656
        description: str | None = None,
657
        revision_id: str | None = None,
658
        code_sha256: str | None = None,
659
        publish_to: FunctionVersionLatestPublished | None = None,
660
    ) -> FunctionVersion:
661
        """
662
        Publish version from an existing, already initialized LATEST
663

664
        :param function_name: Function name
665
        :param region: region
666
        :param account_id: account id
667
        :param description: description
668
        :param revision_id: revision id (check if current version matches)
669
        :param code_sha256: code sha (check if current code matches)
670
        :return: new version
671
        """
672
        is_active = True if publish_to == FunctionVersionLatestPublished.LATEST_PUBLISHED else False
1✔
673
        new_version, changed = self._create_version_model(
1✔
674
            function_name=function_name,
675
            region=region,
676
            account_id=account_id,
677
            description=description,
678
            revision_id=revision_id,
679
            code_sha256=code_sha256,
680
            publish_to=publish_to,
681
            is_active=is_active,
682
        )
683
        if not changed:
1✔
684
            return new_version
1✔
685

686
        if new_version.config.CapacityProviderConfig:
1✔
NEW
687
            self.lambda_service.publish_version_async(new_version)
×
688
        else:
689
            self.lambda_service.publish_version(new_version)
1✔
690
        state = lambda_stores[account_id][region]
1✔
691
        function = state.functions.get(function_name)
1✔
692

693
        # Update revision id for $LATEST version
694
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
695
        latest_version = function.versions["$LATEST"]
1✔
696
        function.versions["$LATEST"] = dataclasses.replace(
1✔
697
            latest_version, config=dataclasses.replace(latest_version.config)
698
        )
699
        if new_version.config.CapacityProviderConfig:
1✔
700
            # publish_version happens async for functions with a capacity provider.
701
            # Therefore, we return the new_version with State=Pending or LastUpdateStatus=InProgress ($LATEST.PUBLISHED)
NEW
702
            return new_version
×
703
        else:
704
            # Regular functions yield an Active state modified during `publish_version` (sync).
705
            # Therefore, we need to get the updated version from the store.
706
            updated_version = function.versions.get(new_version.id.qualifier)
1✔
707
            return updated_version
1✔
708

709
    def _publish_version_with_changes(
1✔
710
        self,
711
        function_name: str,
712
        region: str,
713
        account_id: str,
714
        description: str | None = None,
715
        revision_id: str | None = None,
716
        code_sha256: str | None = None,
717
        publish_to: FunctionVersionLatestPublished | None = None,
718
        is_active: bool = False,
719
    ) -> FunctionVersion:
720
        """
721
        Publish version together with a new latest version (publish on create / update)
722

723
        :param function_name: Function name
724
        :param region: region
725
        :param account_id: account id
726
        :param description: description
727
        :param revision_id: revision id (check if current version matches)
728
        :param code_sha256: code sha (check if current code matches)
729
        :return: new version
730
        """
731
        new_version, changed = self._create_version_model(
1✔
732
            function_name=function_name,
733
            region=region,
734
            account_id=account_id,
735
            description=description,
736
            revision_id=revision_id,
737
            code_sha256=code_sha256,
738
            publish_to=publish_to,
739
            is_active=is_active,
740
        )
741
        if not changed:
1✔
742
            return new_version
×
743
        self.lambda_service.create_function_version(new_version)
1✔
744
        return new_version
1✔
745

746
    @staticmethod
1✔
747
    def _verify_env_variables(env_vars: dict[str, str]):
1✔
748
        dumped_env_vars = json.dumps(env_vars, separators=(",", ":"))
1✔
749
        if (
1✔
750
            len(dumped_env_vars.encode("utf-8"))
751
            > config.LAMBDA_LIMITS_MAX_FUNCTION_ENVVAR_SIZE_BYTES
752
        ):
753
            raise InvalidParameterValueException(
1✔
754
                f"Lambda was unable to configure your environment variables because the environment variables you have provided exceeded the 4KB limit. String measured: {dumped_env_vars}",
755
                Type="User",
756
            )
757

758
    @staticmethod
1✔
759
    def _validate_snapstart(snap_start: SnapStart, runtime: Runtime):
1✔
760
        apply_on = snap_start.get("ApplyOn")
1✔
761
        if apply_on not in [
1✔
762
            SnapStartApplyOn.PublishedVersions,
763
            SnapStartApplyOn.None_,
764
        ]:
765
            raise ValidationException(
1✔
766
                f"1 validation error detected: Value '{apply_on}' at 'snapStart.applyOn' failed to satisfy constraint: Member must satisfy enum value set: [PublishedVersions, None]"
767
            )
768

769
        if runtime not in SNAP_START_SUPPORTED_RUNTIMES:
1✔
770
            raise InvalidParameterValueException(
×
771
                f"{runtime} is not supported for SnapStart enabled functions.", Type="User"
772
            )
773

774
    def _validate_layers(self, new_layers: list[str], region: str, account_id: str):
1✔
775
        if len(new_layers) > LAMBDA_LAYERS_LIMIT_PER_FUNCTION:
1✔
776
            raise InvalidParameterValueException(
1✔
777
                "Cannot reference more than 5 layers.", Type="User"
778
            )
779

780
        visited_layers = {}
1✔
781
        for layer_version_arn in new_layers:
1✔
782
            (
1✔
783
                layer_region,
784
                layer_account_id,
785
                layer_name,
786
                layer_version_str,
787
            ) = api_utils.parse_layer_arn(layer_version_arn)
788
            if layer_version_str is None:
1✔
789
                raise ValidationException(
1✔
790
                    f"1 validation error detected: Value '[{layer_version_arn}]'"
791
                    + " at 'layers' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 2048, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: "
792
                    + "(arn:(aws[a-zA-Z-]*)?:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+), Member must not be null]",
793
                )
794

795
            state = lambda_stores[layer_account_id][layer_region]
1✔
796
            layer = state.layers.get(layer_name)
1✔
797
            layer_version = None
1✔
798
            if layer is not None:
1✔
799
                layer_version = layer.layer_versions.get(layer_version_str)
1✔
800
            if layer_account_id == account_id:
1✔
801
                if region and layer_region != region:
1✔
802
                    raise InvalidParameterValueException(
1✔
803
                        f"Layers are not in the same region as the function. "
804
                        f"Layers are expected to be in region {region}.",
805
                        Type="User",
806
                    )
807
                if layer is None or layer.layer_versions.get(layer_version_str) is None:
1✔
808
                    raise InvalidParameterValueException(
1✔
809
                        f"Layer version {layer_version_arn} does not exist.", Type="User"
810
                    )
811
            else:  # External layer from other account
812
                # TODO: validate IAM layer policy here, allowing access by default for now and only checking region
813
                if region and layer_region != region:
×
814
                    # TODO: detect user or role from context when IAM users are implemented
815
                    user = "user/localstack-testing"
×
816
                    raise AccessDeniedException(
×
817
                        f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
818
                    )
819
                if layer is None or layer_version is None:
×
820
                    # Limitation: cannot fetch external layers when using the same account id as the target layer
821
                    # because we do not want to trigger the layer fetcher for every non-existing layer.
822
                    if self.layer_fetcher is None:
×
823
                        raise NotImplementedError(
824
                            "Fetching shared layers from AWS is a pro feature."
825
                        )
826

827
                    layer = self.layer_fetcher.fetch_layer(layer_version_arn)
×
828
                    if layer is None:
×
829
                        # TODO: detect user or role from context when IAM users are implemented
830
                        user = "user/localstack-testing"
×
831
                        raise AccessDeniedException(
×
832
                            f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
833
                        )
834

835
                    # Distinguish between new layer and new layer version
836
                    if layer_version is None:
×
837
                        # Create whole layer from scratch
838
                        state.layers[layer_name] = layer
×
839
                    else:
840
                        # Create layer version if another version of the same layer already exists
841
                        state.layers[layer_name].layer_versions[layer_version_str] = (
×
842
                            layer.layer_versions.get(layer_version_str)
843
                        )
844

845
            # only the first two matches in the array are considered for the error message
846
            layer_arn = ":".join(layer_version_arn.split(":")[:-1])
1✔
847
            if layer_arn in visited_layers:
1✔
848
                conflict_layer_version_arn = visited_layers[layer_arn]
1✔
849
                raise InvalidParameterValueException(
1✔
850
                    f"Two different versions of the same layer are not allowed to be referenced in the same function. {conflict_layer_version_arn} and {layer_version_arn} are versions of the same layer.",
851
                    Type="User",
852
                )
853
            visited_layers[layer_arn] = layer_version_arn
1✔
854

855
    @staticmethod
1✔
856
    def map_layers(new_layers: list[str]) -> list[LayerVersion]:
1✔
857
        layers = []
1✔
858
        for layer_version_arn in new_layers:
1✔
859
            region_name, account_id, layer_name, layer_version = api_utils.parse_layer_arn(
1✔
860
                layer_version_arn
861
            )
862
            layer = lambda_stores[account_id][region_name].layers.get(layer_name)
1✔
863
            layer_version = layer.layer_versions.get(layer_version)
1✔
864
            layers.append(layer_version)
1✔
865
        return layers
1✔
866

867
    def get_function_recursion_config(
1✔
868
        self,
869
        context: RequestContext,
870
        function_name: UnqualifiedFunctionName,
871
        **kwargs,
872
    ) -> GetFunctionRecursionConfigResponse:
873
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
874
        function_name = api_utils.get_function_name(function_name, context)
1✔
875
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
876
        return GetFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
877

878
    def put_function_recursion_config(
1✔
879
        self,
880
        context: RequestContext,
881
        function_name: UnqualifiedFunctionName,
882
        recursive_loop: RecursiveLoop,
883
        **kwargs,
884
    ) -> PutFunctionRecursionConfigResponse:
885
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
886
        function_name = api_utils.get_function_name(function_name, context)
1✔
887

888
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
889

890
        allowed_values = list(RecursiveLoop.__members__.values())
1✔
891
        if recursive_loop not in allowed_values:
1✔
892
            raise ValidationException(
1✔
893
                f"1 validation error detected: Value '{recursive_loop}' at 'recursiveLoop' failed to satisfy constraint: "
894
                f"Member must satisfy enum value set: [Terminate, Allow]"
895
            )
896

897
        fn.recursive_loop = recursive_loop
1✔
898
        return PutFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
899

900
    @handler(operation="CreateFunction", expand=False)
1✔
901
    def create_function(
1✔
902
        self,
903
        context: RequestContext,
904
        request: CreateFunctionRequest,
905
    ) -> FunctionConfiguration:
906
        context_region = context.region
1✔
907
        context_account_id = context.account_id
1✔
908

909
        zip_file = request.get("Code", {}).get("ZipFile")
1✔
910
        if zip_file and len(zip_file) > config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED:
1✔
911
            raise RequestEntityTooLargeException(
1✔
912
                f"Zipped size must be smaller than {config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED} bytes"
913
            )
914

915
        if context.request.content_length > config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE:
1✔
916
            raise RequestEntityTooLargeException(
1✔
917
                f"Request must be smaller than {config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE} bytes for the CreateFunction operation"
918
            )
919

920
        if architectures := request.get("Architectures"):
1✔
921
            if len(architectures) != 1:
1✔
922
                raise ValidationException(
1✔
923
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
924
                    f"satisfy constraint: Member must have length less than or equal to 1",
925
                )
926
            if architectures[0] not in ARCHITECTURES:
1✔
927
                raise ValidationException(
1✔
928
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
929
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
930
                    f"[x86_64, arm64], Member must not be null]",
931
                )
932

933
        if env_vars := request.get("Environment", {}).get("Variables"):
1✔
934
            self._verify_env_variables(env_vars)
1✔
935

936
        if layers := request.get("Layers", []):
1✔
937
            self._validate_layers(layers, region=context_region, account_id=context_account_id)
1✔
938

939
        if not api_utils.is_role_arn(request.get("Role")):
1✔
940
            raise ValidationException(
1✔
941
                f"1 validation error detected: Value '{request.get('Role')}'"
942
                + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
943
            )
944
        if not self.lambda_service.can_assume_role(request.get("Role"), context.region):
1✔
945
            raise InvalidParameterValueException(
×
946
                "The role defined for the function cannot be assumed by Lambda.", Type="User"
947
            )
948
        package_type = request.get("PackageType", PackageType.Zip)
1✔
949
        runtime = request.get("Runtime")
1✔
950
        self._validate_runtime(package_type, runtime)
1✔
951

952
        request_function_name = request.get("FunctionName")
1✔
953

954
        function_name, *_ = api_utils.get_name_and_qualifier(
1✔
955
            function_arn_or_name=request_function_name,
956
            qualifier=None,
957
            context=context,
958
        )
959

960
        if runtime in DEPRECATED_RUNTIMES:
1✔
961
            LOG.warning(
1✔
962
                "The Lambda runtime %s} is deprecated. "
963
                "Please upgrade the runtime for the function %s: "
964
                "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
965
                runtime,
966
                function_name,
967
            )
968
        if snap_start := request.get("SnapStart"):
1✔
969
            self._validate_snapstart(snap_start, runtime)
1✔
970
        state = lambda_stores[context_account_id][context_region]
1✔
971

972
        with self.create_fn_lock:
1✔
973
            if function_name in state.functions:
1✔
974
                raise ResourceConflictException(f"Function already exist: {function_name}")
×
975
            fn = Function(function_name=function_name)
1✔
976
            arn = VersionIdentifier(
1✔
977
                function_name=function_name,
978
                qualifier="$LATEST",
979
                region=context_region,
980
                account=context_account_id,
981
            )
982
            # save function code to s3
983
            code = None
1✔
984
            image = None
1✔
985
            image_config = None
1✔
986
            runtime_version_config = RuntimeVersionConfig(
1✔
987
                # Limitation: the runtime id (presumably sha256 of image) is currently hardcoded
988
                # Potential implementation: provide (cached) sha256 hash of used Docker image
989
                RuntimeVersionArn=f"arn:{context.partition}:lambda:{context_region}::runtime:8eeff65f6809a3ce81507fe733fe09b835899b99481ba22fd75b5a7338290ec1"
990
            )
991
            request_code = request.get("Code")
1✔
992
            if package_type == PackageType.Zip:
1✔
993
                # TODO verify if correct combination of code is set
994
                if zip_file := request_code.get("ZipFile"):
1✔
995
                    code = store_lambda_archive(
1✔
996
                        archive_file=zip_file,
997
                        function_name=function_name,
998
                        region_name=context_region,
999
                        account_id=context_account_id,
1000
                    )
1001
                elif s3_bucket := request_code.get("S3Bucket"):
1✔
1002
                    s3_key = request_code["S3Key"]
1✔
1003
                    s3_object_version = request_code.get("S3ObjectVersion")
1✔
1004
                    code = store_s3_bucket_archive(
1✔
1005
                        archive_bucket=s3_bucket,
1006
                        archive_key=s3_key,
1007
                        archive_version=s3_object_version,
1008
                        function_name=function_name,
1009
                        region_name=context_region,
1010
                        account_id=context_account_id,
1011
                    )
1012
                else:
1013
                    raise LambdaServiceException("A ZIP file or S3 bucket is required")
×
1014
            elif package_type == PackageType.Image:
1✔
1015
                image = request_code.get("ImageUri")
1✔
1016
                if not image:
1✔
1017
                    raise LambdaServiceException(
×
1018
                        "An image is required when the package type is set to 'image'"
1019
                    )
1020
                image = create_image_code(image_uri=image)
1✔
1021

1022
                image_config_req = request.get("ImageConfig", {})
1✔
1023
                image_config = ImageConfig(
1✔
1024
                    command=image_config_req.get("Command"),
1025
                    entrypoint=image_config_req.get("EntryPoint"),
1026
                    working_directory=image_config_req.get("WorkingDirectory"),
1027
                )
1028
                # Runtime management controls are not available when providing a custom image
1029
                runtime_version_config = None
1✔
1030

1031
            # TODO: validations and figure out in which order
1032
            capacity_provider_config = None
1✔
1033
            memory_size = request.get("MemorySize", LAMBDA_DEFAULT_MEMORY_SIZE)
1✔
1034
            if "CapacityProviderConfig" in request:
1✔
NEW
1035
                capacity_provider_config = request["CapacityProviderConfig"]
×
NEW
1036
                default_config = CapacityProviderConfig(
×
1037
                    LambdaManagedInstancesCapacityProviderConfig=LambdaManagedInstancesCapacityProviderConfig(
1038
                        ExecutionEnvironmentMemoryGiBPerVCpu=2.0,
1039
                        PerExecutionEnvironmentMaxConcurrency=16,
1040
                    )
1041
                )
NEW
1042
                capacity_provider_config = merge_recursive(default_config, capacity_provider_config)
×
NEW
1043
                memory_size = 2048
×
NEW
1044
                if request.get("LoggingConfig", {}).get("LogFormat") == LogFormat.Text:
×
NEW
1045
                    raise InvalidParameterValueException(
×
1046
                        'LogLevel is not supported when LogFormat is set to "Text". Remove LogLevel from your request or change the LogFormat to "JSON" and try again.',
1047
                        Type="User",
1048
                    )
1049
            if "LoggingConfig" in request:
1✔
1050
                logging_config = request["LoggingConfig"]
1✔
1051
                LOG.warning(
1✔
1052
                    "Advanced Lambda Logging Configuration is currently mocked "
1053
                    "and will not impact the logging behavior. "
1054
                    "Please create a feature request if needed."
1055
                )
1056

1057
                # when switching to JSON, app and system level log is auto set to INFO
1058
                if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
1059
                    logging_config = {
1✔
1060
                        "ApplicationLogLevel": "INFO",
1061
                        "SystemLogLevel": "INFO",
1062
                        "LogGroup": f"/aws/lambda/{function_name}",
1063
                    } | logging_config
1064
                else:
1065
                    logging_config = (
×
1066
                        LoggingConfig(
1067
                            LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
1068
                        )
1069
                        | logging_config
1070
                    )
1071

1072
            elif capacity_provider_config:
1✔
NEW
1073
                logging_config = LoggingConfig(
×
1074
                    LogFormat=LogFormat.JSON,
1075
                    LogGroup=f"/aws/lambda/{function_name}",
1076
                    ApplicationLogLevel="INFO",
1077
                    SystemLogLevel="INFO",
1078
                )
1079
            else:
1080
                logging_config = LoggingConfig(
1✔
1081
                    LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
1082
                )
1083
            snap_start = (
1✔
1084
                None
1085
                if capacity_provider_config
1086
                else SnapStartResponse(
1087
                    ApplyOn=request.get("SnapStart", {}).get("ApplyOn", SnapStartApplyOn.None_),
1088
                    OptimizationStatus=SnapStartOptimizationStatus.Off,
1089
                )
1090
            )
1091
            version = FunctionVersion(
1✔
1092
                id=arn,
1093
                config=VersionFunctionConfiguration(
1094
                    last_modified=api_utils.format_lambda_date(datetime.datetime.now()),
1095
                    description=request.get("Description", ""),
1096
                    role=request["Role"],
1097
                    timeout=request.get("Timeout", LAMBDA_DEFAULT_TIMEOUT),
1098
                    runtime=request.get("Runtime"),
1099
                    memory_size=memory_size,
1100
                    handler=request.get("Handler"),
1101
                    package_type=package_type,
1102
                    environment=env_vars,
1103
                    architectures=request.get("Architectures") or [Architecture.x86_64],
1104
                    tracing_config_mode=request.get("TracingConfig", {}).get(
1105
                        "Mode", TracingMode.PassThrough
1106
                    ),
1107
                    image=image,
1108
                    image_config=image_config,
1109
                    code=code,
1110
                    layers=self.map_layers(layers),
1111
                    internal_revision=short_uid(),
1112
                    ephemeral_storage=LambdaEphemeralStorage(
1113
                        size=request.get("EphemeralStorage", {}).get("Size", 512)
1114
                    ),
1115
                    snap_start=snap_start,
1116
                    runtime_version_config=runtime_version_config,
1117
                    dead_letter_arn=request.get("DeadLetterConfig", {}).get("TargetArn"),
1118
                    vpc_config=self._build_vpc_config(
1119
                        context_account_id, context_region, request.get("VpcConfig")
1120
                    ),
1121
                    state=VersionState(
1122
                        state=State.Pending,
1123
                        code=StateReasonCode.Creating,
1124
                        reason="The function is being created.",
1125
                    ),
1126
                    logging_config=logging_config,
1127
                    # TODO: might need something like **optional_kwargs if None
1128
                    #   -> Test with regular GetFunction (i.e., without a capacity provider)
1129
                    CapacityProviderConfig=capacity_provider_config,
1130
                ),
1131
            )
1132
            version_post_response = None
1✔
1133
            if capacity_provider_config:
1✔
NEW
1134
                version_post_response = dataclasses.replace(
×
1135
                    version,
1136
                    config=dataclasses.replace(
1137
                        version.config,
1138
                        last_update=UpdateStatus(status=LastUpdateStatus.Successful),
1139
                        state=VersionState(state=State.ActiveNonInvocable),
1140
                    ),
1141
                )
1142
            fn.versions["$LATEST"] = version_post_response or version
1✔
1143
            state.functions[function_name] = fn
1✔
1144
        function_counter.labels(
1✔
1145
            operation=FunctionOperation.create,
1146
            runtime=runtime or "n/a",
1147
            status=FunctionStatus.success,
1148
            invocation_type="n/a",
1149
            package_type=package_type,
1150
        )
1151
        # TODO: consider potential other side effects of not having a function version for $LATEST
1152
        # Provisioning happens upon publishing for functions using a capacity provider
1153
        if not capacity_provider_config:
1✔
1154
            self.lambda_service.create_function_version(version)
1✔
1155

1156
        if tags := request.get("Tags"):
1✔
1157
            # This will check whether the function exists.
1158
            self._store_tags(arn.unqualified_arn(), tags)
1✔
1159

1160
        if request.get("Publish"):
1✔
1161
            version = self._publish_version_with_changes(
1✔
1162
                function_name=function_name,
1163
                region=context_region,
1164
                account_id=context_account_id,
1165
                publish_to=request.get("PublishTo"),
1166
            )
1167

1168
        if config.LAMBDA_SYNCHRONOUS_CREATE:
1✔
1169
            # block via retrying until "terminal" condition reached before returning
1170
            if not poll_condition(
×
1171
                lambda: get_function_version(
1172
                    function_name, version.id.qualifier, version.id.account, version.id.region
1173
                ).config.state.state
1174
                in [State.Active, State.ActiveNonInvocable, State.Failed],
1175
                timeout=10,
1176
            ):
1177
                LOG.warning(
×
1178
                    "LAMBDA_SYNCHRONOUS_CREATE is active, but waiting for %s reached timeout.",
1179
                    function_name,
1180
                )
1181

1182
        return api_utils.map_config_out(
1✔
1183
            version, return_qualified_arn=False, return_update_status=False
1184
        )
1185

1186
    def _validate_runtime(self, package_type, runtime):
1✔
1187
        runtimes = ALL_RUNTIMES
1✔
1188
        if config.LAMBDA_RUNTIME_VALIDATION:
1✔
1189
            runtimes = list(itertools.chain(RUNTIMES_AGGREGATED.values()))
1✔
1190

1191
        if package_type == PackageType.Zip and runtime not in runtimes:
1✔
1192
            # deprecated runtimes have different error
1193
            if runtime in DEPRECATED_RUNTIMES:
1✔
1194
                HINT_LOG.info(
1✔
1195
                    "Set env variable LAMBDA_RUNTIME_VALIDATION to 0"
1196
                    " in order to allow usage of deprecated runtimes"
1197
                )
1198
                self._check_for_recomended_migration_target(runtime)
1✔
1199

1200
            raise InvalidParameterValueException(
1✔
1201
                f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1202
                Type="User",
1203
            )
1204

1205
    def _check_for_recomended_migration_target(self, deprecated_runtime):
1✔
1206
        # AWS offers recommended runtime for migration for "newly" deprecated runtimes
1207
        # in order to preserve parity with error messages we need the code bellow
1208
        latest_runtime = DEPRECATED_RUNTIMES_UPGRADES.get(deprecated_runtime)
1✔
1209

1210
        if latest_runtime is not None:
1✔
1211
            LOG.debug(
1✔
1212
                "The Lambda runtime %s is deprecated. Please upgrade to a supported Lambda runtime such as %s.",
1213
                deprecated_runtime,
1214
                latest_runtime,
1215
            )
1216
            raise InvalidParameterValueException(
1✔
1217
                f"The runtime parameter of {deprecated_runtime} is no longer supported for creating or updating AWS Lambda functions. We recommend you use a supported runtime while creating or updating functions.",
1218
                Type="User",
1219
            )
1220

1221
    @handler(operation="UpdateFunctionConfiguration", expand=False)
1✔
1222
    def update_function_configuration(
1✔
1223
        self, context: RequestContext, request: UpdateFunctionConfigurationRequest
1224
    ) -> FunctionConfiguration:
1225
        """updates the $LATEST version of the function"""
1226
        function_name = request.get("FunctionName")
1✔
1227

1228
        # in case we got ARN or partial ARN
1229
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1230
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1231
        state = lambda_stores[account_id][region]
1✔
1232

1233
        if function_name not in state.functions:
1✔
1234
            raise ResourceNotFoundException(
×
1235
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1236
                Type="User",
1237
            )
1238
        function = state.functions[function_name]
1✔
1239

1240
        # TODO: lock modification of latest version
1241
        # TODO: notify service for changes relevant to re-provisioning of $LATEST
1242
        latest_version = function.latest()
1✔
1243
        latest_version_config = latest_version.config
1✔
1244

1245
        revision_id = request.get("RevisionId")
1✔
1246
        if revision_id and revision_id != latest_version.config.revision_id:
1✔
1247
            raise PreconditionFailedException(
1✔
1248
                "The Revision Id provided does not match the latest Revision Id. "
1249
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1250
                Type="User",
1251
            )
1252

1253
        replace_kwargs = {}
1✔
1254
        if "EphemeralStorage" in request:
1✔
1255
            replace_kwargs["ephemeral_storage"] = LambdaEphemeralStorage(
×
1256
                request.get("EphemeralStorage", {}).get("Size", 512)
1257
            )  # TODO: do defaults here apply as well?
1258

1259
        if "Role" in request:
1✔
1260
            if not api_utils.is_role_arn(request["Role"]):
1✔
1261
                raise ValidationException(
1✔
1262
                    f"1 validation error detected: Value '{request.get('Role')}'"
1263
                    + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
1264
                )
1265
            replace_kwargs["role"] = request["Role"]
1✔
1266

1267
        if "Description" in request:
1✔
1268
            replace_kwargs["description"] = request["Description"]
1✔
1269

1270
        if "Timeout" in request:
1✔
1271
            replace_kwargs["timeout"] = request["Timeout"]
1✔
1272

1273
        if "MemorySize" in request:
1✔
1274
            replace_kwargs["memory_size"] = request["MemorySize"]
1✔
1275

1276
        if "DeadLetterConfig" in request:
1✔
1277
            replace_kwargs["dead_letter_arn"] = request.get("DeadLetterConfig", {}).get("TargetArn")
1✔
1278

1279
        if vpc_config := request.get("VpcConfig"):
1✔
1280
            replace_kwargs["vpc_config"] = self._build_vpc_config(account_id, region, vpc_config)
1✔
1281

1282
        if "Handler" in request:
1✔
1283
            replace_kwargs["handler"] = request["Handler"]
1✔
1284

1285
        if "Runtime" in request:
1✔
1286
            runtime = request["Runtime"]
1✔
1287

1288
            if runtime not in ALL_RUNTIMES:
1✔
1289
                raise InvalidParameterValueException(
1✔
1290
                    f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1291
                    Type="User",
1292
                )
1293
            if runtime in DEPRECATED_RUNTIMES:
1✔
1294
                LOG.warning(
×
1295
                    "The Lambda runtime %s is deprecated. "
1296
                    "Please upgrade the runtime for the function %s: "
1297
                    "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
1298
                    runtime,
1299
                    function_name,
1300
                )
1301
            replace_kwargs["runtime"] = request["Runtime"]
1✔
1302

1303
        if snap_start := request.get("SnapStart"):
1✔
1304
            runtime = replace_kwargs.get("runtime") or latest_version_config.runtime
1✔
1305
            self._validate_snapstart(snap_start, runtime)
1✔
1306
            replace_kwargs["snap_start"] = SnapStartResponse(
1✔
1307
                ApplyOn=snap_start.get("ApplyOn", SnapStartApplyOn.None_),
1308
                OptimizationStatus=SnapStartOptimizationStatus.Off,
1309
            )
1310

1311
        if "Environment" in request:
1✔
1312
            if env_vars := request.get("Environment", {}).get("Variables", {}):
1✔
1313
                self._verify_env_variables(env_vars)
1✔
1314
            replace_kwargs["environment"] = env_vars
1✔
1315

1316
        if "Layers" in request:
1✔
1317
            new_layers = request["Layers"]
1✔
1318
            if new_layers:
1✔
1319
                self._validate_layers(new_layers, region=region, account_id=account_id)
1✔
1320
            replace_kwargs["layers"] = self.map_layers(new_layers)
1✔
1321

1322
        if "ImageConfig" in request:
1✔
1323
            new_image_config = request["ImageConfig"]
1✔
1324
            replace_kwargs["image_config"] = ImageConfig(
1✔
1325
                command=new_image_config.get("Command"),
1326
                entrypoint=new_image_config.get("EntryPoint"),
1327
                working_directory=new_image_config.get("WorkingDirectory"),
1328
            )
1329

1330
        if "LoggingConfig" in request:
1✔
1331
            logging_config = request["LoggingConfig"]
1✔
1332
            LOG.warning(
1✔
1333
                "Advanced Lambda Logging Configuration is currently mocked "
1334
                "and will not impact the logging behavior. "
1335
                "Please create a feature request if needed."
1336
            )
1337

1338
            # when switching to JSON, app and system level log is auto set to INFO
1339
            if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
1340
                logging_config = {
1✔
1341
                    "ApplicationLogLevel": "INFO",
1342
                    "SystemLogLevel": "INFO",
1343
                } | logging_config
1344

1345
            last_config = latest_version_config.logging_config
1✔
1346

1347
            # add partial update
1348
            new_logging_config = last_config | logging_config
1✔
1349

1350
            # in case we switched from JSON to Text we need to remove LogLevel keys
1351
            if (
1✔
1352
                new_logging_config.get("LogFormat") == LogFormat.Text
1353
                and last_config.get("LogFormat") == LogFormat.JSON
1354
            ):
1355
                new_logging_config.pop("ApplicationLogLevel", None)
1✔
1356
                new_logging_config.pop("SystemLogLevel", None)
1✔
1357

1358
            replace_kwargs["logging_config"] = new_logging_config
1✔
1359

1360
        if "TracingConfig" in request:
1✔
1361
            new_mode = request.get("TracingConfig", {}).get("Mode")
×
1362
            if new_mode:
×
1363
                replace_kwargs["tracing_config_mode"] = new_mode
×
1364

1365
        new_latest_version = dataclasses.replace(
1✔
1366
            latest_version,
1367
            config=dataclasses.replace(
1368
                latest_version_config,
1369
                last_modified=api_utils.generate_lambda_date(),
1370
                internal_revision=short_uid(),
1371
                last_update=UpdateStatus(
1372
                    status=LastUpdateStatus.InProgress,
1373
                    code="Creating",
1374
                    reason="The function is being created.",
1375
                ),
1376
                **replace_kwargs,
1377
            ),
1378
        )
1379
        function.versions["$LATEST"] = new_latest_version  # TODO: notify
1✔
1380
        self.lambda_service.update_version(new_version=new_latest_version)
1✔
1381

1382
        return api_utils.map_config_out(new_latest_version)
1✔
1383

1384
    @handler(operation="UpdateFunctionCode", expand=False)
1✔
1385
    def update_function_code(
1✔
1386
        self, context: RequestContext, request: UpdateFunctionCodeRequest
1387
    ) -> FunctionConfiguration:
1388
        """updates the $LATEST version of the function"""
1389
        # only supports normal zip packaging atm
1390
        # if request.get("Publish"):
1391
        #     self.lambda_service.create_function_version()
1392

1393
        function_name = request.get("FunctionName")
1✔
1394
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1395
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1396

1397
        store = lambda_stores[account_id][region]
1✔
1398
        if function_name not in store.functions:
1✔
1399
            raise ResourceNotFoundException(
×
1400
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1401
                Type="User",
1402
            )
1403
        function = store.functions[function_name]
1✔
1404

1405
        revision_id = request.get("RevisionId")
1✔
1406
        if revision_id and revision_id != function.latest().config.revision_id:
1✔
1407
            raise PreconditionFailedException(
1✔
1408
                "The Revision Id provided does not match the latest Revision Id. "
1409
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1410
                Type="User",
1411
            )
1412

1413
        # TODO verify if correct combination of code is set
1414
        image = None
1✔
1415
        if (
1✔
1416
            request.get("ZipFile") or request.get("S3Bucket")
1417
        ) and function.latest().config.package_type == PackageType.Image:
1418
            raise InvalidParameterValueException(
1✔
1419
                "Please provide ImageUri when updating a function with packageType Image.",
1420
                Type="User",
1421
            )
1422
        elif request.get("ImageUri") and function.latest().config.package_type == PackageType.Zip:
1✔
1423
            raise InvalidParameterValueException(
1✔
1424
                "Please don't provide ImageUri when updating a function with packageType Zip.",
1425
                Type="User",
1426
            )
1427

1428
        if zip_file := request.get("ZipFile"):
1✔
1429
            code = store_lambda_archive(
1✔
1430
                archive_file=zip_file,
1431
                function_name=function_name,
1432
                region_name=region,
1433
                account_id=account_id,
1434
            )
1435
        elif s3_bucket := request.get("S3Bucket"):
1✔
1436
            s3_key = request["S3Key"]
1✔
1437
            s3_object_version = request.get("S3ObjectVersion")
1✔
1438
            code = store_s3_bucket_archive(
1✔
1439
                archive_bucket=s3_bucket,
1440
                archive_key=s3_key,
1441
                archive_version=s3_object_version,
1442
                function_name=function_name,
1443
                region_name=region,
1444
                account_id=account_id,
1445
            )
1446
        elif image := request.get("ImageUri"):
1✔
1447
            code = None
1✔
1448
            image = create_image_code(image_uri=image)
1✔
1449
        else:
1450
            raise LambdaServiceException("A ZIP file, S3 bucket, or image is required")
×
1451

1452
        old_function_version = function.versions.get("$LATEST")
1✔
1453
        replace_kwargs = {"code": code} if code else {"image": image}
1✔
1454

1455
        if architectures := request.get("Architectures"):
1✔
1456
            if len(architectures) != 1:
×
1457
                raise ValidationException(
×
1458
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1459
                    f"satisfy constraint: Member must have length less than or equal to 1",
1460
                )
1461
            # An empty list of architectures is also forbidden. Further exceptions are tested here for create_function:
1462
            # tests.aws.services.lambda_.test_lambda_api.TestLambdaFunction.test_create_lambda_exceptions
1463
            if architectures[0] not in ARCHITECTURES:
×
1464
                raise ValidationException(
×
1465
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1466
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
1467
                    f"[x86_64, arm64], Member must not be null]",
1468
                )
1469
            replace_kwargs["architectures"] = architectures
×
1470

1471
        config = dataclasses.replace(
1✔
1472
            old_function_version.config,
1473
            internal_revision=short_uid(),
1474
            last_modified=api_utils.generate_lambda_date(),
1475
            last_update=UpdateStatus(
1476
                status=LastUpdateStatus.InProgress,
1477
                code="Creating",
1478
                reason="The function is being created.",
1479
            ),
1480
            **replace_kwargs,
1481
        )
1482
        function_version = dataclasses.replace(old_function_version, config=config)
1✔
1483
        function.versions["$LATEST"] = function_version
1✔
1484

1485
        self.lambda_service.update_version(new_version=function_version)
1✔
1486
        if request.get("Publish"):
1✔
1487
            function_version = self._publish_version_with_changes(
1✔
1488
                function_name=function_name,
1489
                region=region,
1490
                account_id=account_id,
1491
                # TODO: validations for PublishTo without Publish=True
1492
                publish_to=request.get("PublishTo"),
1493
                is_active=True,
1494
            )
1495
        return api_utils.map_config_out(
1✔
1496
            function_version, return_qualified_arn=bool(request.get("Publish"))
1497
        )
1498

1499
    # TODO: does deleting the latest published version affect the next versions number?
1500
    # TODO: what happens when we call this with a qualifier and a fully qualified ARN? (+ conflicts?)
1501
    # TODO: test different ARN patterns (shorthand ARN?)
1502
    # TODO: test deleting across regions?
1503
    # TODO: test mismatch between context region and region in ARN
1504
    # TODO: test qualifier $LATEST, alias-name and version
1505
    def delete_function(
1✔
1506
        self,
1507
        context: RequestContext,
1508
        function_name: NamespacedFunctionName,
1509
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1510
        **kwargs,
1511
    ) -> DeleteFunctionResponse:
1512
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1513
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1514
            function_name, qualifier, context
1515
        )
1516

1517
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1518
            raise InvalidParameterValueException(
×
1519
                "Deletion of aliases is not currently supported.",
1520
                Type="User",
1521
            )
1522

1523
        store = lambda_stores[account_id][region]
1✔
1524
        if qualifier == "$LATEST":
1✔
1525
            raise InvalidParameterValueException(
1✔
1526
                "$LATEST version cannot be deleted without deleting the function.", Type="User"
1527
            )
1528

1529
        if function_name not in store.functions:
1✔
1530
            e = ResourceNotFoundException(
1✔
1531
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1532
                Type="User",
1533
            )
1534
            raise e
1✔
1535
        function = store.functions.get(function_name)
1✔
1536

1537
        function_has_capacity_provider = False
1✔
1538
        if qualifier:
1✔
1539
            # delete a version of the function
1540
            version = function.versions.pop(qualifier, None)
1✔
1541
            if version:
1✔
1542
                if version.config.CapacityProviderConfig:
1✔
NEW
1543
                    function_has_capacity_provider = True
×
1544
                self.lambda_service.stop_version(version.id.qualified_arn())
1✔
1545
                destroy_code_if_not_used(code=version.config.code, function=function)
1✔
1546
        else:
1547
            # delete the whole function
1548
            # TODO: introduce locking for safe deletion: We could create a new version at the API layer before
1549
            #  the old version gets cleaned up in the internal lambda service.
1550
            function = store.functions.pop(function_name)
1✔
1551
            for version in function.versions.values():
1✔
1552
                # Functions with a capacity provider do NOT have a version manager for $LATEST because only
1553
                # published versions are invokable.
1554
                if version.config.CapacityProviderConfig:
1✔
NEW
1555
                    function_has_capacity_provider = True
×
NEW
1556
                    if version.id.qualifier == "$LATEST":
×
NEW
1557
                        pass
×
1558
                else:
1559
                    self.lambda_service.stop_version(qualified_arn=version.id.qualified_arn())
1✔
1560
                # we can safely destroy the code here
1561
                if version.config.code:
1✔
1562
                    version.config.code.destroy()
1✔
1563

1564
        return DeleteFunctionResponse(StatusCode=202 if function_has_capacity_provider else 204)
1✔
1565

1566
    def list_functions(
1✔
1567
        self,
1568
        context: RequestContext,
1569
        master_region: MasterRegion = None,  # (only relevant for lambda@edge)
1570
        function_version: FunctionVersionApi = None,
1571
        marker: String = None,
1572
        max_items: MaxListItems = None,
1573
        **kwargs,
1574
    ) -> ListFunctionsResponse:
1575
        state = lambda_stores[context.account_id][context.region]
1✔
1576

1577
        if function_version and function_version != FunctionVersionApi.ALL:
1✔
1578
            raise ValidationException(
1✔
1579
                f"1 validation error detected: Value '{function_version}'"
1580
                + " at 'functionVersion' failed to satisfy constraint: Member must satisfy enum value set: [ALL]"
1581
            )
1582

1583
        if function_version == FunctionVersionApi.ALL:
1✔
1584
            # include all versions for all function
1585
            versions = [v for f in state.functions.values() for v in f.versions.values()]
1✔
1586
            return_qualified_arn = True
1✔
1587
        else:
1588
            versions = [f.latest() for f in state.functions.values()]
1✔
1589
            return_qualified_arn = False
1✔
1590

1591
        versions = [
1✔
1592
            api_utils.map_to_list_response(
1593
                api_utils.map_config_out(fc, return_qualified_arn=return_qualified_arn)
1594
            )
1595
            for fc in versions
1596
        ]
1597
        versions = PaginatedList(versions)
1✔
1598
        page, token = versions.get_page(
1✔
1599
            lambda version: version["FunctionArn"],
1600
            marker,
1601
            max_items,
1602
        )
1603
        return ListFunctionsResponse(Functions=page, NextMarker=token)
1✔
1604

1605
    def get_function(
1✔
1606
        self,
1607
        context: RequestContext,
1608
        function_name: NamespacedFunctionName,
1609
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1610
        **kwargs,
1611
    ) -> GetFunctionResponse:
1612
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1613
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1614
            function_name, qualifier, context
1615
        )
1616

1617
        fn = lambda_stores[account_id][region].functions.get(function_name)
1✔
1618
        if fn is None:
1✔
1619
            if qualifier is None:
1✔
1620
                raise ResourceNotFoundException(
1✔
1621
                    f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
1622
                    Type="User",
1623
                )
1624
            else:
1625
                raise ResourceNotFoundException(
1✔
1626
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
1627
                    Type="User",
1628
                )
1629
        alias_name = None
1✔
1630
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1631
            if qualifier not in fn.aliases:
1✔
1632
                alias_arn = api_utils.qualified_lambda_arn(
1✔
1633
                    function_name, qualifier, account_id, region
1634
                )
1635
                raise ResourceNotFoundException(f"Function not found: {alias_arn}", Type="User")
1✔
1636
            alias_name = qualifier
1✔
1637
            qualifier = fn.aliases[alias_name].function_version
1✔
1638

1639
        version = get_function_version(
1✔
1640
            function_name=function_name,
1641
            qualifier=qualifier,
1642
            account_id=account_id,
1643
            region=region,
1644
        )
1645
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
1646
        additional_fields = {}
1✔
1647
        if tags:
1✔
1648
            additional_fields["Tags"] = tags
1✔
1649
        code_location = None
1✔
1650
        if code := version.config.code:
1✔
1651
            code_location = FunctionCodeLocation(
1✔
1652
                Location=code.generate_presigned_url(endpoint_url=config.external_service_url()),
1653
                RepositoryType="S3",
1654
            )
1655
        elif image := version.config.image:
1✔
1656
            code_location = FunctionCodeLocation(
1✔
1657
                ImageUri=image.image_uri,
1658
                RepositoryType=image.repository_type,
1659
                ResolvedImageUri=image.resolved_image_uri,
1660
            )
1661
        concurrency = None
1✔
1662
        if fn.reserved_concurrent_executions:
1✔
1663
            concurrency = Concurrency(
1✔
1664
                ReservedConcurrentExecutions=fn.reserved_concurrent_executions
1665
            )
1666

1667
        return GetFunctionResponse(
1✔
1668
            Configuration=api_utils.map_config_out(
1669
                version, return_qualified_arn=bool(qualifier), alias_name=alias_name
1670
            ),
1671
            Code=code_location,  # TODO
1672
            Concurrency=concurrency,
1673
            **additional_fields,
1674
        )
1675

1676
    def get_function_configuration(
1✔
1677
        self,
1678
        context: RequestContext,
1679
        function_name: NamespacedFunctionName,
1680
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1681
        **kwargs,
1682
    ) -> FunctionConfiguration:
1683
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1684
        # CAVE: THIS RETURN VALUE IS *NOT* THE SAME AS IN get_function (!) but seems to be only configuration part?
1685
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1686
            function_name, qualifier, context
1687
        )
1688
        version = get_function_version(
1✔
1689
            function_name=function_name,
1690
            qualifier=qualifier,
1691
            account_id=account_id,
1692
            region=region,
1693
        )
1694
        return api_utils.map_config_out(version, return_qualified_arn=bool(qualifier))
1✔
1695

1696
    def invoke(
1✔
1697
        self,
1698
        context: RequestContext,
1699
        function_name: NamespacedFunctionName,
1700
        invocation_type: InvocationType | None = None,
1701
        log_type: LogType | None = None,
1702
        client_context: String | None = None,
1703
        payload: IO[Blob] | None = None,
1704
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
1705
        **kwargs,
1706
    ) -> InvocationResponse:
1707
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1708
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1709
            function_name, qualifier, context
1710
        )
1711

1712
        user_agent = context.request.user_agent.string
1✔
1713

1714
        time_before = time.perf_counter()
1✔
1715
        try:
1✔
1716
            invocation_result = self.lambda_service.invoke(
1✔
1717
                function_name=function_name,
1718
                qualifier=qualifier,
1719
                region=region,
1720
                account_id=account_id,
1721
                invocation_type=invocation_type,
1722
                client_context=client_context,
1723
                request_id=context.request_id,
1724
                trace_context=context.trace_context,
1725
                payload=payload.read() if payload else None,
1726
                user_agent=user_agent,
1727
            )
1728
        except ServiceException:
1✔
1729
            raise
1✔
1730
        except EnvironmentStartupTimeoutException as e:
1✔
1731
            raise LambdaServiceException(
1✔
1732
                f"[{context.request_id}] Timeout while starting up lambda environment for function {function_name}:{qualifier}"
1733
            ) from e
1734
        except Exception as e:
1✔
1735
            LOG.error(
1✔
1736
                "[%s] Error while invoking lambda %s",
1737
                context.request_id,
1738
                function_name,
1739
                exc_info=LOG.isEnabledFor(logging.DEBUG),
1740
            )
1741
            raise LambdaServiceException(
1✔
1742
                f"[{context.request_id}] Internal error while executing lambda {function_name}:{qualifier}. Caused by {type(e).__name__}: {e}"
1743
            ) from e
1744

1745
        if invocation_type == InvocationType.Event:
1✔
1746
            # This happens when invocation type is event
1747
            return InvocationResponse(StatusCode=202)
1✔
1748
        if invocation_type == InvocationType.DryRun:
1✔
1749
            # This happens when invocation type is dryrun
1750
            return InvocationResponse(StatusCode=204)
1✔
1751
        LOG.debug("Lambda invocation duration: %0.2fms", (time.perf_counter() - time_before) * 1000)
1✔
1752

1753
        response = InvocationResponse(
1✔
1754
            StatusCode=200,
1755
            Payload=invocation_result.payload,
1756
            ExecutedVersion=invocation_result.executed_version,
1757
        )
1758

1759
        if invocation_result.is_error:
1✔
1760
            response["FunctionError"] = "Unhandled"
1✔
1761

1762
        if log_type == LogType.Tail:
1✔
1763
            response["LogResult"] = to_str(
1✔
1764
                base64.b64encode(to_bytes(invocation_result.logs)[-4096:])
1765
            )
1766

1767
        return response
1✔
1768

1769
    # Version operations
1770
    def publish_version(
1✔
1771
        self,
1772
        context: RequestContext,
1773
        function_name: FunctionName,
1774
        code_sha256: String | None = None,
1775
        description: Description | None = None,
1776
        revision_id: String | None = None,
1777
        publish_to: FunctionVersionLatestPublished | None = None,
1778
        **kwargs,
1779
    ) -> FunctionConfiguration:
1780
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1781
        function_name = api_utils.get_function_name(function_name, context)
1✔
1782
        new_version = self._publish_version_from_existing_version(
1✔
1783
            function_name=function_name,
1784
            description=description,
1785
            account_id=account_id,
1786
            region=region,
1787
            revision_id=revision_id,
1788
            code_sha256=code_sha256,
1789
            publish_to=publish_to,
1790
        )
1791
        return api_utils.map_config_out(new_version, return_qualified_arn=True)
1✔
1792

1793
    def list_versions_by_function(
1✔
1794
        self,
1795
        context: RequestContext,
1796
        function_name: NamespacedFunctionName,
1797
        marker: String = None,
1798
        max_items: MaxListItems = None,
1799
        **kwargs,
1800
    ) -> ListVersionsByFunctionResponse:
1801
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1802
        function_name = api_utils.get_function_name(function_name, context)
1✔
1803
        function = self._get_function(
1✔
1804
            function_name=function_name, region=region, account_id=account_id
1805
        )
1806
        versions = [
1✔
1807
            api_utils.map_to_list_response(
1808
                api_utils.map_config_out(version=version, return_qualified_arn=True)
1809
            )
1810
            for version in function.versions.values()
1811
        ]
1812
        items = PaginatedList(versions)
1✔
1813
        page, token = items.get_page(
1✔
1814
            lambda item: item,
1815
            marker,
1816
            max_items,
1817
        )
1818
        return ListVersionsByFunctionResponse(Versions=page, NextMarker=token)
1✔
1819

1820
    # Alias
1821

1822
    def _create_routing_config_model(
1✔
1823
        self, routing_config_dict: dict[str, float], function_version: FunctionVersion
1824
    ):
1825
        if len(routing_config_dict) > 1:
1✔
1826
            raise InvalidParameterValueException(
1✔
1827
                "Number of items in AdditionalVersionWeights cannot be greater than 1",
1828
                Type="User",
1829
            )
1830
        # should be exactly one item here, still iterating, might be supported in the future
1831
        for key, value in routing_config_dict.items():
1✔
1832
            if value < 0.0 or value >= 1.0:
1✔
1833
                raise ValidationException(
1✔
1834
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map value must satisfy constraint: [Member must have value less than or equal to 1.0, Member must have value greater than or equal to 0.0, Member must not be null]"
1835
                )
1836
            if key == function_version.id.qualifier:
1✔
1837
                raise InvalidParameterValueException(
1✔
1838
                    f"Invalid function version {function_version.id.qualifier}. Function version {function_version.id.qualifier} is already included in routing configuration.",
1839
                    Type="User",
1840
                )
1841
            # check if version target is latest, then no routing config is allowed
1842
            if function_version.id.qualifier == "$LATEST":
1✔
1843
                raise InvalidParameterValueException(
1✔
1844
                    "$LATEST is not supported for an alias pointing to more than 1 version"
1845
                )
1846
            if not api_utils.qualifier_is_version(key):
1✔
1847
                raise ValidationException(
1✔
1848
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map keys must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: [0-9]+]"
1849
                )
1850

1851
            # checking if the version in the config exists
1852
            get_function_version(
1✔
1853
                function_name=function_version.id.function_name,
1854
                qualifier=key,
1855
                region=function_version.id.region,
1856
                account_id=function_version.id.account,
1857
            )
1858
        return AliasRoutingConfig(version_weights=routing_config_dict)
1✔
1859

1860
    def create_alias(
1✔
1861
        self,
1862
        context: RequestContext,
1863
        function_name: FunctionName,
1864
        name: Alias,
1865
        function_version: VersionWithLatestPublished,
1866
        description: Description = None,
1867
        routing_config: AliasRoutingConfiguration = None,
1868
        **kwargs,
1869
    ) -> AliasConfiguration:
1870
        if not api_utils.qualifier_is_alias(name):
1✔
1871
            raise ValidationException(
1✔
1872
                f"1 validation error detected: Value '{name}' at 'name' failed to satisfy constraint: Member must satisfy regular expression pattern: (?!^[0-9]+$)([a-zA-Z0-9-_]+)"
1873
            )
1874

1875
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1876
        function_name = api_utils.get_function_name(function_name, context)
1✔
1877
        target_version = get_function_version(
1✔
1878
            function_name=function_name,
1879
            qualifier=function_version,
1880
            region=region,
1881
            account_id=account_id,
1882
        )
1883
        function = self._get_function(
1✔
1884
            function_name=function_name, region=region, account_id=account_id
1885
        )
1886
        # description is always present, if not specified it's an empty string
1887
        description = description or ""
1✔
1888
        with function.lock:
1✔
1889
            if existing_alias := function.aliases.get(name):
1✔
1890
                raise ResourceConflictException(
1✔
1891
                    f"Alias already exists: {api_utils.map_alias_out(alias=existing_alias, function=function)['AliasArn']}",
1892
                    Type="User",
1893
                )
1894
            # checking if the version exists
1895
            routing_configuration = None
1✔
1896
            if routing_config and (
1✔
1897
                routing_config_dict := routing_config.get("AdditionalVersionWeights")
1898
            ):
1899
                routing_configuration = self._create_routing_config_model(
1✔
1900
                    routing_config_dict, target_version
1901
                )
1902

1903
            alias = VersionAlias(
1✔
1904
                name=name,
1905
                function_version=function_version,
1906
                description=description,
1907
                routing_configuration=routing_configuration,
1908
            )
1909
            function.aliases[name] = alias
1✔
1910
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1911

1912
    def list_aliases(
1✔
1913
        self,
1914
        context: RequestContext,
1915
        function_name: FunctionName,
1916
        function_version: VersionWithLatestPublished = None,
1917
        marker: String = None,
1918
        max_items: MaxListItems = None,
1919
        **kwargs,
1920
    ) -> ListAliasesResponse:
1921
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1922
        function_name = api_utils.get_function_name(function_name, context)
1✔
1923
        function = self._get_function(
1✔
1924
            function_name=function_name, region=region, account_id=account_id
1925
        )
1926
        aliases = [
1✔
1927
            api_utils.map_alias_out(alias, function)
1928
            for alias in function.aliases.values()
1929
            if function_version is None or alias.function_version == function_version
1930
        ]
1931

1932
        aliases = PaginatedList(aliases)
1✔
1933
        page, token = aliases.get_page(
1✔
1934
            lambda alias: alias["AliasArn"],
1935
            marker,
1936
            max_items,
1937
        )
1938

1939
        return ListAliasesResponse(Aliases=page, NextMarker=token)
1✔
1940

1941
    def delete_alias(
1✔
1942
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
1943
    ) -> None:
1944
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1945
        function_name = api_utils.get_function_name(function_name, context)
1✔
1946
        function = self._get_function(
1✔
1947
            function_name=function_name, region=region, account_id=account_id
1948
        )
1949
        version_alias = function.aliases.pop(name, None)
1✔
1950

1951
        # cleanup related resources
1952
        if name in function.provisioned_concurrency_configs:
1✔
1953
            function.provisioned_concurrency_configs.pop(name)
1✔
1954

1955
        # TODO: Allow for deactivating/unregistering specific Lambda URLs
1956
        if version_alias and name in function.function_url_configs:
1✔
1957
            url_config = function.function_url_configs.pop(name)
1✔
1958
            LOG.debug(
1✔
1959
                "Stopping aliased Lambda Function URL %s for %s",
1960
                url_config.url,
1961
                url_config.function_name,
1962
            )
1963

1964
    def get_alias(
1✔
1965
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
1966
    ) -> AliasConfiguration:
1967
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1968
        function_name = api_utils.get_function_name(function_name, context)
1✔
1969
        function = self._get_function(
1✔
1970
            function_name=function_name, region=region, account_id=account_id
1971
        )
1972
        if not (alias := function.aliases.get(name)):
1✔
1973
            raise ResourceNotFoundException(
1✔
1974
                f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name=function_name, qualifier=name, region=region, account=account_id)}",
1975
                Type="User",
1976
            )
1977
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1978

1979
    def update_alias(
1✔
1980
        self,
1981
        context: RequestContext,
1982
        function_name: FunctionName,
1983
        name: Alias,
1984
        function_version: VersionWithLatestPublished = None,
1985
        description: Description = None,
1986
        routing_config: AliasRoutingConfiguration = None,
1987
        revision_id: String = None,
1988
        **kwargs,
1989
    ) -> AliasConfiguration:
1990
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1991
        function_name = api_utils.get_function_name(function_name, context)
1✔
1992
        function = self._get_function(
1✔
1993
            function_name=function_name, region=region, account_id=account_id
1994
        )
1995
        if not (alias := function.aliases.get(name)):
1✔
1996
            fn_arn = api_utils.qualified_lambda_arn(function_name, name, account_id, region)
1✔
1997
            raise ResourceNotFoundException(
1✔
1998
                f"Alias not found: {fn_arn}",
1999
                Type="User",
2000
            )
2001
        if revision_id and alias.revision_id != revision_id:
1✔
2002
            raise PreconditionFailedException(
1✔
2003
                "The Revision Id provided does not match the latest Revision Id. "
2004
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2005
                Type="User",
2006
            )
2007
        changes = {}
1✔
2008
        if function_version is not None:
1✔
2009
            changes |= {"function_version": function_version}
1✔
2010
        if description is not None:
1✔
2011
            changes |= {"description": description}
1✔
2012
        if routing_config is not None:
1✔
2013
            # if it is an empty dict or AdditionalVersionWeights is empty, set routing config to None
2014
            new_routing_config = None
1✔
2015
            if routing_config_dict := routing_config.get("AdditionalVersionWeights"):
1✔
2016
                new_routing_config = self._create_routing_config_model(routing_config_dict)
×
2017
            changes |= {"routing_configuration": new_routing_config}
1✔
2018
        # even if no changes are done, we have to update revision id for some reason
2019
        old_alias = alias
1✔
2020
        alias = dataclasses.replace(alias, **changes)
1✔
2021
        function.aliases[name] = alias
1✔
2022

2023
        # TODO: signal lambda service that pointer potentially changed
2024
        self.lambda_service.update_alias(old_alias=old_alias, new_alias=alias, function=function)
1✔
2025

2026
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
2027

2028
    # =======================================
2029
    # ======= EVENT SOURCE MAPPINGS =========
2030
    # =======================================
2031
    def check_service_resource_exists(
1✔
2032
        self, service: str, resource_arn: str, function_arn: str, function_role_arn: str
2033
    ):
2034
        """
2035
        Check if the service resource exists and if the function has access to it.
2036

2037
        Raises:
2038
            InvalidParameterValueException: If the service resource does not exist or the function does not have access to it.
2039
        """
2040
        arn = parse_arn(resource_arn)
1✔
2041
        source_client = get_internal_client(
1✔
2042
            arn=resource_arn,
2043
            role_arn=function_role_arn,
2044
            service_principal=ServicePrincipal.lambda_,
2045
            source_arn=function_arn,
2046
        )
2047
        if service in ["sqs", "sqs-fifo"]:
1✔
2048
            try:
1✔
2049
                # AWS uses `GetQueueAttributes` internally to verify the queue existence, but we need the `QueueUrl`
2050
                # which is not given directly. We build out a dummy `QueueUrl` which can be parsed by SQS to return
2051
                # the right value
2052
                queue_name = arn["resource"].split("/")[-1]
1✔
2053
                queue_url = f"http://sqs.{arn['region']}.domain/{arn['account']}/{queue_name}"
1✔
2054
                source_client.get_queue_attributes(QueueUrl=queue_url)
1✔
2055
            except ClientError as e:
1✔
2056
                error_code = e.response["Error"]["Code"]
1✔
2057
                if error_code == "AWS.SimpleQueueService.NonExistentQueue":
1✔
2058
                    raise InvalidParameterValueException(
1✔
2059
                        f"Error occurred while ReceiveMessage. SQS Error Code: {error_code}. SQS Error Message: {e.response['Error']['Message']}",
2060
                        Type="User",
2061
                    )
2062
                raise e
×
2063
        elif service in ["kinesis"]:
1✔
2064
            try:
1✔
2065
                source_client.describe_stream(StreamARN=resource_arn)
1✔
2066
            except ClientError as e:
1✔
2067
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
2068
                    raise InvalidParameterValueException(
1✔
2069
                        f"Stream not found: {resource_arn}",
2070
                        Type="User",
2071
                    )
2072
                raise e
×
2073
        elif service in ["dynamodb"]:
1✔
2074
            try:
1✔
2075
                source_client.describe_stream(StreamArn=resource_arn)
1✔
2076
            except ClientError as e:
1✔
2077
                if e.response["Error"]["Code"] == "ResourceNotFoundException":
1✔
2078
                    raise InvalidParameterValueException(
1✔
2079
                        f"Stream not found: {resource_arn}",
2080
                        Type="User",
2081
                    )
2082
                raise e
×
2083

2084
    @handler("CreateEventSourceMapping", expand=False)
1✔
2085
    def create_event_source_mapping(
1✔
2086
        self,
2087
        context: RequestContext,
2088
        request: CreateEventSourceMappingRequest,
2089
    ) -> EventSourceMappingConfiguration:
2090
        return self.create_event_source_mapping_v2(context, request)
1✔
2091

2092
    def create_event_source_mapping_v2(
1✔
2093
        self,
2094
        context: RequestContext,
2095
        request: CreateEventSourceMappingRequest,
2096
    ) -> EventSourceMappingConfiguration:
2097
        # Validations
2098
        function_arn, function_name, state, function_version, function_role = (
1✔
2099
            self.validate_event_source_mapping(context, request)
2100
        )
2101

2102
        esm_config = EsmConfigFactory(request, context, function_arn).get_esm_config()
1✔
2103

2104
        # Copy esm_config to avoid a race condition with potential async update in the store
2105
        state.event_source_mappings[esm_config["UUID"]] = esm_config.copy()
1✔
2106
        enabled = request.get("Enabled", True)
1✔
2107
        # TODO: check for potential async race condition update -> think about locking
2108
        esm_worker = EsmWorkerFactory(esm_config, function_role, enabled).get_esm_worker()
1✔
2109
        self.esm_workers[esm_worker.uuid] = esm_worker
1✔
2110
        # TODO: check StateTransitionReason, LastModified, LastProcessingResult (concurrent updates requires locking!)
2111
        if tags := request.get("Tags"):
1✔
2112
            self._store_tags(esm_config.get("EventSourceMappingArn"), tags)
1✔
2113
        esm_worker.create()
1✔
2114
        return esm_config
1✔
2115

2116
    def validate_event_source_mapping(self, context, request):
1✔
2117
        # TODO: test whether stream ARNs are valid sources for Pipes or ESM or whether only DynamoDB table ARNs work
2118
        # TODO: Validate MaxRecordAgeInSeconds (i.e cannot subceed 60s but can be -1) and MaxRetryAttempts parameters.
2119
        # See https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumrecordageinseconds
2120
        is_create_esm_request = context.operation.name == self.create_event_source_mapping.operation
1✔
2121

2122
        if destination_config := request.get("DestinationConfig"):
1✔
2123
            if "OnSuccess" in destination_config:
1✔
2124
                raise InvalidParameterValueException(
1✔
2125
                    "Unsupported DestinationConfig parameter for given event source mapping type.",
2126
                    Type="User",
2127
                )
2128

2129
        service = None
1✔
2130
        if "SelfManagedEventSource" in request:
1✔
2131
            service = "kafka"
×
2132
            if "SourceAccessConfigurations" not in request:
×
2133
                raise InvalidParameterValueException(
×
2134
                    "Required 'sourceAccessConfigurations' parameter is missing.", Type="User"
2135
                )
2136
        if service is None and "EventSourceArn" not in request:
1✔
2137
            raise InvalidParameterValueException("Unrecognized event source.", Type="User")
1✔
2138
        if service is None:
1✔
2139
            service = extract_service_from_arn(request["EventSourceArn"])
1✔
2140

2141
        batch_size = api_utils.validate_and_set_batch_size(service, request.get("BatchSize"))
1✔
2142
        if service in ["dynamodb", "kinesis"]:
1✔
2143
            starting_position = request.get("StartingPosition")
1✔
2144
            if not starting_position:
1✔
2145
                raise InvalidParameterValueException(
1✔
2146
                    "1 validation error detected: Value null at 'startingPosition' failed to satisfy constraint: Member must not be null.",
2147
                    Type="User",
2148
                )
2149

2150
            if starting_position not in KinesisStreamStartPosition.__members__:
1✔
2151
                raise ValidationException(
1✔
2152
                    f"1 validation error detected: Value '{starting_position}' at 'startingPosition' failed to satisfy constraint: Member must satisfy enum value set: [LATEST, AT_TIMESTAMP, TRIM_HORIZON]"
2153
                )
2154
            # AT_TIMESTAMP is not allowed for DynamoDB Streams
2155
            elif (
1✔
2156
                service == "dynamodb"
2157
                and starting_position not in DynamoDBStreamStartPosition.__members__
2158
            ):
2159
                raise InvalidParameterValueException(
1✔
2160
                    f"Unsupported starting position for arn type: {request['EventSourceArn']}",
2161
                    Type="User",
2162
                )
2163

2164
        if service in ["sqs", "sqs-fifo"]:
1✔
2165
            if batch_size > 10 and request.get("MaximumBatchingWindowInSeconds", 0) == 0:
1✔
2166
                raise InvalidParameterValueException(
1✔
2167
                    "Maximum batch window in seconds must be greater than 0 if maximum batch size is greater than 10",
2168
                    Type="User",
2169
                )
2170

2171
        if (filter_criteria := request.get("FilterCriteria")) is not None:
1✔
2172
            for filter_ in filter_criteria.get("Filters", []):
1✔
2173
                pattern_str = filter_.get("Pattern")
1✔
2174
                if not pattern_str or not isinstance(pattern_str, str):
1✔
2175
                    raise InvalidParameterValueException(
×
2176
                        "Invalid filter pattern definition.", Type="User"
2177
                    )
2178

2179
                if not validate_event_pattern(pattern_str):
1✔
2180
                    raise InvalidParameterValueException(
1✔
2181
                        "Invalid filter pattern definition.", Type="User"
2182
                    )
2183

2184
        # Can either have a FunctionName (i.e CreateEventSourceMapping request) or
2185
        # an internal EventSourceMappingConfiguration representation
2186
        request_function_name = request.get("FunctionName") or request.get("FunctionArn")
1✔
2187
        # can be either a partial arn or a full arn for the version/alias
2188
        function_name, qualifier, account, region = function_locators_from_arn(
1✔
2189
            request_function_name
2190
        )
2191
        # TODO: validate `context.region` vs. `region(request["FunctionName"])` vs. `region(request["EventSourceArn"])`
2192
        account = account or context.account_id
1✔
2193
        region = region or context.region
1✔
2194
        state = lambda_stores[account][region]
1✔
2195
        fn = state.functions.get(function_name)
1✔
2196
        if not fn:
1✔
2197
            raise InvalidParameterValueException("Function does not exist", Type="User")
1✔
2198

2199
        if qualifier:
1✔
2200
            # make sure the function version/alias exists
2201
            if api_utils.qualifier_is_alias(qualifier):
1✔
2202
                fn_alias = fn.aliases.get(qualifier)
1✔
2203
                if not fn_alias:
1✔
2204
                    raise Exception("unknown alias")  # TODO: cover via test
×
2205
            elif api_utils.qualifier_is_version(qualifier):
1✔
2206
                fn_version = fn.versions.get(qualifier)
1✔
2207
                if not fn_version:
1✔
2208
                    raise Exception("unknown version")  # TODO: cover via test
×
2209
            elif qualifier == "$LATEST":
1✔
2210
                pass
1✔
2211
            else:
2212
                raise Exception("invalid functionname")  # TODO: cover via test
×
2213
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account, region)
1✔
2214

2215
        else:
2216
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account, region)
1✔
2217

2218
        function_version = get_function_version_from_arn(fn_arn)
1✔
2219
        function_role = function_version.config.role
1✔
2220

2221
        if source_arn := request.get("EventSourceArn"):
1✔
2222
            self.check_service_resource_exists(service, source_arn, fn_arn, function_role)
1✔
2223
        # Check we are validating a CreateEventSourceMapping request
2224
        if is_create_esm_request:
1✔
2225

2226
            def _get_mapping_sources(mapping: dict[str, Any]) -> list[str]:
1✔
2227
                if event_source_arn := mapping.get("EventSourceArn"):
1✔
2228
                    return [event_source_arn]
1✔
2229
                return (
×
2230
                    mapping.get("SelfManagedEventSource", {})
2231
                    .get("Endpoints", {})
2232
                    .get("KAFKA_BOOTSTRAP_SERVERS", [])
2233
                )
2234

2235
            # check for event source duplicates
2236
            # TODO: currently validated for sqs, kinesis, and dynamodb
2237
            service_id = load_service(service).service_id
1✔
2238
            for uuid, mapping in state.event_source_mappings.items():
1✔
2239
                mapping_sources = _get_mapping_sources(mapping)
1✔
2240
                request_sources = _get_mapping_sources(request)
1✔
2241
                if mapping["FunctionArn"] == fn_arn and (
1✔
2242
                    set(mapping_sources).intersection(request_sources)
2243
                ):
2244
                    if service == "sqs":
1✔
2245
                        # *shakes fist at SQS*
2246
                        raise ResourceConflictException(
1✔
2247
                            f'An event source mapping with {service_id} arn (" {mapping["EventSourceArn"]} ") '
2248
                            f'and function (" {function_name} ") already exists. Please update or delete the '
2249
                            f"existing mapping with UUID {uuid}",
2250
                            Type="User",
2251
                        )
2252
                    elif service == "kafka":
1✔
2253
                        if set(mapping["Topics"]).intersection(request["Topics"]):
×
2254
                            raise ResourceConflictException(
×
2255
                                f'An event source mapping with event source ("{",".join(request_sources)}"), '
2256
                                f'function ("{fn_arn}"), '
2257
                                f'topics ("{",".join(request["Topics"])}") already exists. Please update or delete the '
2258
                                f"existing mapping with UUID {uuid}",
2259
                                Type="User",
2260
                            )
2261
                    else:
2262
                        raise ResourceConflictException(
1✔
2263
                            f'The event source arn (" {mapping["EventSourceArn"]} ") and function '
2264
                            f'(" {function_name} ") provided mapping already exists. Please update or delete the '
2265
                            f"existing mapping with UUID {uuid}",
2266
                            Type="User",
2267
                        )
2268
        return fn_arn, function_name, state, function_version, function_role
1✔
2269

2270
    @handler("UpdateEventSourceMapping", expand=False)
1✔
2271
    def update_event_source_mapping(
1✔
2272
        self,
2273
        context: RequestContext,
2274
        request: UpdateEventSourceMappingRequest,
2275
    ) -> EventSourceMappingConfiguration:
2276
        return self.update_event_source_mapping_v2(context, request)
1✔
2277

2278
    def update_event_source_mapping_v2(
1✔
2279
        self,
2280
        context: RequestContext,
2281
        request: UpdateEventSourceMappingRequest,
2282
    ) -> EventSourceMappingConfiguration:
2283
        # TODO: test and implement this properly (quite complex with many validations and limitations!)
2284
        LOG.warning(
1✔
2285
            "Updating Lambda Event Source Mapping is in experimental state and not yet fully tested."
2286
        )
2287
        state = lambda_stores[context.account_id][context.region]
1✔
2288
        request_data = {**request}
1✔
2289
        uuid = request_data.pop("UUID", None)
1✔
2290
        if not uuid:
1✔
2291
            raise ResourceNotFoundException(
×
2292
                "The resource you requested does not exist.", Type="User"
2293
            )
2294
        old_event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2295
        esm_worker = self.esm_workers.get(uuid)
1✔
2296
        if old_event_source_mapping is None or esm_worker is None:
1✔
2297
            raise ResourceNotFoundException(
1✔
2298
                "The resource you requested does not exist.", Type="User"
2299
            )  # TODO: test?
2300

2301
        # normalize values to overwrite
2302
        event_source_mapping = old_event_source_mapping | request_data
1✔
2303

2304
        temp_params = {}  # values only set for the returned response, not saved internally (e.g. transient state)
1✔
2305

2306
        # Validate the newly updated ESM object. We ignore the output here since we only care whether an Exception is raised.
2307
        function_arn, _, _, function_version, function_role = self.validate_event_source_mapping(
1✔
2308
            context, event_source_mapping
2309
        )
2310

2311
        # remove the FunctionName field
2312
        event_source_mapping.pop("FunctionName", None)
1✔
2313

2314
        if function_arn:
1✔
2315
            event_source_mapping["FunctionArn"] = function_arn
1✔
2316

2317
        # Only apply update if the desired state differs
2318
        enabled = request.get("Enabled")
1✔
2319
        if enabled is not None:
1✔
2320
            if enabled and old_event_source_mapping["State"] != EsmState.ENABLED:
1✔
2321
                event_source_mapping["State"] = EsmState.ENABLING
1✔
2322
            # TODO: What happens when trying to update during an update or failed state?!
2323
            elif not enabled and old_event_source_mapping["State"] == EsmState.ENABLED:
1✔
2324
                event_source_mapping["State"] = EsmState.DISABLING
1✔
2325
        else:
2326
            event_source_mapping["State"] = EsmState.UPDATING
1✔
2327

2328
        # To ensure parity, certain responses need to be immediately returned
2329
        temp_params["State"] = event_source_mapping["State"]
1✔
2330

2331
        state.event_source_mappings[uuid] = event_source_mapping
1✔
2332

2333
        # TODO: Currently, we re-create the entire ESM worker. Look into approach with better performance.
2334
        worker_factory = EsmWorkerFactory(
1✔
2335
            event_source_mapping, function_role, request.get("Enabled", esm_worker.enabled)
2336
        )
2337

2338
        # Get a new ESM worker object but do not active it, since the factory holds all logic for creating new worker from configuration.
2339
        updated_esm_worker = worker_factory.get_esm_worker()
1✔
2340
        self.esm_workers[uuid] = updated_esm_worker
1✔
2341

2342
        # We should stop() the worker since the delete() will remove the ESM from the state mapping.
2343
        esm_worker.stop()
1✔
2344
        # This will either create an EsmWorker in the CREATING state if enabled. Otherwise, the DISABLING state is set.
2345
        updated_esm_worker.create()
1✔
2346

2347
        return {**event_source_mapping, **temp_params}
1✔
2348

2349
    def delete_event_source_mapping(
1✔
2350
        self, context: RequestContext, uuid: String, **kwargs
2351
    ) -> EventSourceMappingConfiguration:
2352
        state = lambda_stores[context.account_id][context.region]
1✔
2353
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2354
        if not event_source_mapping:
1✔
2355
            raise ResourceNotFoundException(
1✔
2356
                "The resource you requested does not exist.", Type="User"
2357
            )
2358
        esm = state.event_source_mappings[uuid]
1✔
2359
        # TODO: add proper locking
2360
        esm_worker = self.esm_workers.pop(uuid, None)
1✔
2361
        # Asynchronous delete in v2
2362
        if not esm_worker:
1✔
2363
            raise ResourceNotFoundException(
×
2364
                "The resource you requested does not exist.", Type="User"
2365
            )
2366
        esm_worker.delete()
1✔
2367
        return {**esm, "State": EsmState.DELETING}
1✔
2368

2369
    def get_event_source_mapping(
1✔
2370
        self, context: RequestContext, uuid: String, **kwargs
2371
    ) -> EventSourceMappingConfiguration:
2372
        state = lambda_stores[context.account_id][context.region]
1✔
2373
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2374
        if not event_source_mapping:
1✔
2375
            raise ResourceNotFoundException(
1✔
2376
                "The resource you requested does not exist.", Type="User"
2377
            )
2378
        esm_worker = self.esm_workers.get(uuid)
1✔
2379
        if not esm_worker:
1✔
2380
            raise ResourceNotFoundException(
×
2381
                "The resource you requested does not exist.", Type="User"
2382
            )
2383
        event_source_mapping["State"] = esm_worker.current_state
1✔
2384
        event_source_mapping["StateTransitionReason"] = esm_worker.state_transition_reason
1✔
2385
        return event_source_mapping
1✔
2386

2387
    def list_event_source_mappings(
1✔
2388
        self,
2389
        context: RequestContext,
2390
        event_source_arn: Arn = None,
2391
        function_name: FunctionName = None,
2392
        marker: String = None,
2393
        max_items: MaxListItems = None,
2394
        **kwargs,
2395
    ) -> ListEventSourceMappingsResponse:
2396
        state = lambda_stores[context.account_id][context.region]
1✔
2397

2398
        esms = state.event_source_mappings.values()
1✔
2399
        # TODO: update and test State and StateTransitionReason for ESM v2
2400

2401
        if event_source_arn:  # TODO: validate pattern
1✔
2402
            esms = [e for e in esms if e.get("EventSourceArn") == event_source_arn]
1✔
2403

2404
        if function_name:
1✔
2405
            esms = [e for e in esms if function_name in e["FunctionArn"]]
1✔
2406

2407
        esms = PaginatedList(esms)
1✔
2408
        page, token = esms.get_page(
1✔
2409
            lambda x: x["UUID"],
2410
            marker,
2411
            max_items,
2412
        )
2413
        return ListEventSourceMappingsResponse(EventSourceMappings=page, NextMarker=token)
1✔
2414

2415
    def get_source_type_from_request(self, request: dict[str, Any]) -> str:
1✔
2416
        if event_source_arn := request.get("EventSourceArn", ""):
×
2417
            service = extract_service_from_arn(event_source_arn)
×
2418
            if service == "sqs" and "fifo" in event_source_arn:
×
2419
                service = "sqs-fifo"
×
2420
            return service
×
2421
        elif request.get("SelfManagedEventSource"):
×
2422
            return "kafka"
×
2423

2424
    # =======================================
2425
    # ============ FUNCTION URLS ============
2426
    # =======================================
2427

2428
    @staticmethod
1✔
2429
    def _validate_qualifier(qualifier: str) -> None:
1✔
2430
        if qualifier == "$LATEST" or (qualifier and api_utils.qualifier_is_version(qualifier)):
1✔
2431
            raise ValidationException(
1✔
2432
                f"1 validation error detected: Value '{qualifier}' at 'qualifier' failed to satisfy constraint: Member must satisfy regular expression pattern: ((?!^\\d+$)^[0-9a-zA-Z-_]+$)"
2433
            )
2434

2435
    @staticmethod
1✔
2436
    def _validate_invoke_mode(invoke_mode: str) -> None:
1✔
2437
        if invoke_mode and invoke_mode not in [InvokeMode.BUFFERED, InvokeMode.RESPONSE_STREAM]:
1✔
2438
            raise ValidationException(
1✔
2439
                f"1 validation error detected: Value '{invoke_mode}' at 'invokeMode' failed to satisfy constraint: Member must satisfy enum value set: [RESPONSE_STREAM, BUFFERED]"
2440
            )
2441
        if invoke_mode == InvokeMode.RESPONSE_STREAM:
1✔
2442
            # TODO should we actually fail for setting RESPONSE_STREAM?
2443
            #  It should trigger InvokeWithResponseStream which is not implemented
2444
            LOG.warning(
1✔
2445
                "The invokeMode 'RESPONSE_STREAM' is not yet supported on LocalStack. The property is only mocked, the execution will still be 'BUFFERED'"
2446
            )
2447

2448
    # TODO: what happens if function state is not active?
2449
    def create_function_url_config(
1✔
2450
        self,
2451
        context: RequestContext,
2452
        function_name: FunctionName,
2453
        auth_type: FunctionUrlAuthType,
2454
        qualifier: FunctionUrlQualifier = None,
2455
        cors: Cors = None,
2456
        invoke_mode: InvokeMode = None,
2457
        **kwargs,
2458
    ) -> CreateFunctionUrlConfigResponse:
2459
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2460
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2461
            function_name, qualifier, context
2462
        )
2463
        state = lambda_stores[account_id][region]
1✔
2464
        self._validate_qualifier(qualifier)
1✔
2465
        self._validate_invoke_mode(invoke_mode)
1✔
2466

2467
        fn = state.functions.get(function_name)
1✔
2468
        if fn is None:
1✔
2469
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2470

2471
        url_config = fn.function_url_configs.get(qualifier or "$LATEST")
1✔
2472
        if url_config:
1✔
2473
            raise ResourceConflictException(
1✔
2474
                f"Failed to create function url config for [functionArn = {url_config.function_arn}]. Error message:  FunctionUrlConfig exists for this Lambda function",
2475
                Type="User",
2476
            )
2477

2478
        if qualifier and qualifier != "$LATEST" and qualifier not in fn.aliases:
1✔
2479
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2480

2481
        normalized_qualifier = qualifier or "$LATEST"
1✔
2482

2483
        function_arn = (
1✔
2484
            api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
2485
            if qualifier
2486
            else api_utils.unqualified_lambda_arn(function_name, account_id, region)
2487
        )
2488

2489
        custom_id: str | None = None
1✔
2490

2491
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
2492
        if TAG_KEY_CUSTOM_URL in tags:
1✔
2493
            # Note: I really wanted to add verification here that the
2494
            # url_id is unique, so we could surface that to the user ASAP.
2495
            # However, it seems like that information isn't available yet,
2496
            # since (as far as I can tell) we call
2497
            # self.router.register_routes() once, in a single shot, for all
2498
            # of the routes -- and we need to verify that it's unique not
2499
            # just for this particular lambda function, but for the entire
2500
            # lambda provider. Therefore... that idea proved non-trivial!
2501
            custom_id_tag_value = (
1✔
2502
                f"{tags[TAG_KEY_CUSTOM_URL]}-{qualifier}" if qualifier else tags[TAG_KEY_CUSTOM_URL]
2503
            )
2504
            if TAG_KEY_CUSTOM_URL_VALIDATOR.match(custom_id_tag_value):
1✔
2505
                custom_id = custom_id_tag_value
1✔
2506

2507
            else:
2508
                # Note: we're logging here instead of raising to prioritize
2509
                # strict parity with AWS over the localstack-only custom_id
2510
                LOG.warning(
1✔
2511
                    "Invalid custom ID tag value for lambda URL (%s=%s). "
2512
                    "Replaced with default (random id)",
2513
                    TAG_KEY_CUSTOM_URL,
2514
                    custom_id_tag_value,
2515
                )
2516

2517
        # The url_id is the subdomain used for the URL we're creating. This
2518
        # is either created randomly (as in AWS), or can be passed as a tag
2519
        # to the lambda itself (localstack-only).
2520
        url_id: str
2521
        if custom_id is None:
1✔
2522
            url_id = api_utils.generate_random_url_id()
1✔
2523
        else:
2524
            url_id = custom_id
1✔
2525

2526
        host_definition = localstack_host(custom_port=config.GATEWAY_LISTEN[0].port)
1✔
2527
        fn.function_url_configs[normalized_qualifier] = FunctionUrlConfig(
1✔
2528
            function_arn=function_arn,
2529
            function_name=function_name,
2530
            cors=cors,
2531
            url_id=url_id,
2532
            url=f"http://{url_id}.lambda-url.{context.region}.{host_definition.host_and_port()}/",  # TODO: https support
2533
            auth_type=auth_type,
2534
            creation_time=api_utils.generate_lambda_date(),
2535
            last_modified_time=api_utils.generate_lambda_date(),
2536
            invoke_mode=invoke_mode,
2537
        )
2538

2539
        # persist and start URL
2540
        # TODO: implement URL invoke
2541
        api_url_config = api_utils.map_function_url_config(
1✔
2542
            fn.function_url_configs[normalized_qualifier]
2543
        )
2544

2545
        return CreateFunctionUrlConfigResponse(
1✔
2546
            FunctionUrl=api_url_config["FunctionUrl"],
2547
            FunctionArn=api_url_config["FunctionArn"],
2548
            AuthType=api_url_config["AuthType"],
2549
            Cors=api_url_config["Cors"],
2550
            CreationTime=api_url_config["CreationTime"],
2551
            InvokeMode=api_url_config["InvokeMode"],
2552
        )
2553

2554
    def get_function_url_config(
1✔
2555
        self,
2556
        context: RequestContext,
2557
        function_name: FunctionName,
2558
        qualifier: FunctionUrlQualifier = None,
2559
        **kwargs,
2560
    ) -> GetFunctionUrlConfigResponse:
2561
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2562
        state = lambda_stores[account_id][region]
1✔
2563

2564
        fn_name, qualifier = api_utils.get_name_and_qualifier(function_name, qualifier, context)
1✔
2565

2566
        self._validate_qualifier(qualifier)
1✔
2567

2568
        resolved_fn = state.functions.get(fn_name)
1✔
2569
        if not resolved_fn:
1✔
2570
            raise ResourceNotFoundException(
1✔
2571
                "The resource you requested does not exist.", Type="User"
2572
            )
2573

2574
        qualifier = qualifier or "$LATEST"
1✔
2575
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2576
        if not url_config:
1✔
2577
            raise ResourceNotFoundException(
1✔
2578
                "The resource you requested does not exist.", Type="User"
2579
            )
2580

2581
        return api_utils.map_function_url_config(url_config)
1✔
2582

2583
    def update_function_url_config(
1✔
2584
        self,
2585
        context: RequestContext,
2586
        function_name: FunctionName,
2587
        qualifier: FunctionUrlQualifier = None,
2588
        auth_type: FunctionUrlAuthType = None,
2589
        cors: Cors = None,
2590
        invoke_mode: InvokeMode = None,
2591
        **kwargs,
2592
    ) -> UpdateFunctionUrlConfigResponse:
2593
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2594
        state = lambda_stores[account_id][region]
1✔
2595

2596
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2597
            function_name, qualifier, context
2598
        )
2599
        self._validate_qualifier(qualifier)
1✔
2600
        self._validate_invoke_mode(invoke_mode)
1✔
2601

2602
        fn = state.functions.get(function_name)
1✔
2603
        if not fn:
1✔
2604
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2605

2606
        normalized_qualifier = qualifier or "$LATEST"
1✔
2607

2608
        if (
1✔
2609
            api_utils.qualifier_is_alias(normalized_qualifier)
2610
            and normalized_qualifier not in fn.aliases
2611
        ):
2612
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2613

2614
        url_config = fn.function_url_configs.get(normalized_qualifier)
1✔
2615
        if not url_config:
1✔
2616
            raise ResourceNotFoundException(
1✔
2617
                "The resource you requested does not exist.", Type="User"
2618
            )
2619

2620
        changes = {
1✔
2621
            "last_modified_time": api_utils.generate_lambda_date(),
2622
            **({"cors": cors} if cors is not None else {}),
2623
            **({"auth_type": auth_type} if auth_type is not None else {}),
2624
        }
2625

2626
        if invoke_mode:
1✔
2627
            changes["invoke_mode"] = invoke_mode
1✔
2628

2629
        new_url_config = dataclasses.replace(url_config, **changes)
1✔
2630
        fn.function_url_configs[normalized_qualifier] = new_url_config
1✔
2631

2632
        return UpdateFunctionUrlConfigResponse(
1✔
2633
            FunctionUrl=new_url_config.url,
2634
            FunctionArn=new_url_config.function_arn,
2635
            AuthType=new_url_config.auth_type,
2636
            Cors=new_url_config.cors,
2637
            CreationTime=new_url_config.creation_time,
2638
            LastModifiedTime=new_url_config.last_modified_time,
2639
            InvokeMode=new_url_config.invoke_mode,
2640
        )
2641

2642
    def delete_function_url_config(
1✔
2643
        self,
2644
        context: RequestContext,
2645
        function_name: FunctionName,
2646
        qualifier: FunctionUrlQualifier = None,
2647
        **kwargs,
2648
    ) -> None:
2649
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2650
        state = lambda_stores[account_id][region]
1✔
2651

2652
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2653
            function_name, qualifier, context
2654
        )
2655
        self._validate_qualifier(qualifier)
1✔
2656

2657
        resolved_fn = state.functions.get(function_name)
1✔
2658
        if not resolved_fn:
1✔
2659
            raise ResourceNotFoundException(
1✔
2660
                "The resource you requested does not exist.", Type="User"
2661
            )
2662

2663
        qualifier = qualifier or "$LATEST"
1✔
2664
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2665
        if not url_config:
1✔
2666
            raise ResourceNotFoundException(
1✔
2667
                "The resource you requested does not exist.", Type="User"
2668
            )
2669

2670
        del resolved_fn.function_url_configs[qualifier]
1✔
2671

2672
    def list_function_url_configs(
1✔
2673
        self,
2674
        context: RequestContext,
2675
        function_name: FunctionName,
2676
        marker: String = None,
2677
        max_items: MaxItems = None,
2678
        **kwargs,
2679
    ) -> ListFunctionUrlConfigsResponse:
2680
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2681
        state = lambda_stores[account_id][region]
1✔
2682

2683
        fn_name = api_utils.get_function_name(function_name, context)
1✔
2684
        resolved_fn = state.functions.get(fn_name)
1✔
2685
        if not resolved_fn:
1✔
2686
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2687

2688
        url_configs = [
1✔
2689
            api_utils.map_function_url_config(fn_conf)
2690
            for fn_conf in resolved_fn.function_url_configs.values()
2691
        ]
2692
        url_configs = PaginatedList(url_configs)
1✔
2693
        page, token = url_configs.get_page(
1✔
2694
            lambda url_config: url_config["FunctionArn"],
2695
            marker,
2696
            max_items,
2697
        )
2698
        url_configs = page
1✔
2699
        return ListFunctionUrlConfigsResponse(FunctionUrlConfigs=url_configs, NextMarker=token)
1✔
2700

2701
    # =======================================
2702
    # ============  Permissions  ============
2703
    # =======================================
2704

2705
    @handler("AddPermission", expand=False)
1✔
2706
    def add_permission(
1✔
2707
        self,
2708
        context: RequestContext,
2709
        request: AddPermissionRequest,
2710
    ) -> AddPermissionResponse:
2711
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2712
            request.get("FunctionName"), request.get("Qualifier"), context
2713
        )
2714

2715
        # validate qualifier
2716
        if qualifier is not None:
1✔
2717
            self._validate_qualifier_expression(qualifier)
1✔
2718
            if qualifier == "$LATEST":
1✔
2719
                raise InvalidParameterValueException(
1✔
2720
                    "We currently do not support adding policies for $LATEST.", Type="User"
2721
                )
2722
        account_id, region = api_utils.get_account_and_region(request.get("FunctionName"), context)
1✔
2723

2724
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2725
        resolved_qualifier, fn_arn = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2726

2727
        revision_id = request.get("RevisionId")
1✔
2728
        if revision_id:
1✔
2729
            fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2730
            if revision_id != fn_revision_id:
1✔
2731
                raise PreconditionFailedException(
1✔
2732
                    "The Revision Id provided does not match the latest Revision Id. "
2733
                    "Call the GetPolicy API to retrieve the latest Revision Id",
2734
                    Type="User",
2735
                )
2736

2737
        request_sid = request["StatementId"]
1✔
2738
        if not bool(STATEMENT_ID_REGEX.match(request_sid)):
1✔
2739
            raise ValidationException(
1✔
2740
                f"1 validation error detected: Value '{request_sid}' at 'statementId' failed to satisfy constraint: Member must satisfy regular expression pattern: ([a-zA-Z0-9-_]+)"
2741
            )
2742
        # check for an already existing policy and any conflicts in existing statements
2743
        existing_policy = resolved_fn.permissions.get(resolved_qualifier)
1✔
2744
        if existing_policy:
1✔
2745
            if request_sid in [s["Sid"] for s in existing_policy.policy.Statement]:
1✔
2746
                # uniqueness scope: statement id needs to be unique per qualified function ($LATEST, version, or alias)
2747
                # Counterexample: the same sid can exist within $LATEST, version, and alias
2748
                raise ResourceConflictException(
1✔
2749
                    f"The statement id ({request_sid}) provided already exists. Please provide a new statement id, or remove the existing statement.",
2750
                    Type="User",
2751
                )
2752

2753
        permission_statement = api_utils.build_statement(
1✔
2754
            partition=context.partition,
2755
            resource_arn=fn_arn,
2756
            statement_id=request["StatementId"],
2757
            action=request["Action"],
2758
            principal=request["Principal"],
2759
            source_arn=request.get("SourceArn"),
2760
            source_account=request.get("SourceAccount"),
2761
            principal_org_id=request.get("PrincipalOrgID"),
2762
            event_source_token=request.get("EventSourceToken"),
2763
            auth_type=request.get("FunctionUrlAuthType"),
2764
        )
2765
        new_policy = existing_policy
1✔
2766
        if not existing_policy:
1✔
2767
            new_policy = FunctionResourcePolicy(
1✔
2768
                policy=ResourcePolicy(Version="2012-10-17", Id="default", Statement=[])
2769
            )
2770
        new_policy.policy.Statement.append(permission_statement)
1✔
2771
        if not existing_policy:
1✔
2772
            resolved_fn.permissions[resolved_qualifier] = new_policy
1✔
2773

2774
        # Update revision id of alias or version
2775
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2776
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2777
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2778
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2779
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
1✔
2780
        # Assumes that a non-alias is a version
2781
        else:
2782
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2783
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2784
                resolved_version, config=dataclasses.replace(resolved_version.config)
2785
            )
2786
        return AddPermissionResponse(Statement=json.dumps(permission_statement))
1✔
2787

2788
    def remove_permission(
1✔
2789
        self,
2790
        context: RequestContext,
2791
        function_name: NamespacedFunctionName,
2792
        statement_id: NamespacedStatementId,
2793
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
2794
        revision_id: String | None = None,
2795
        **kwargs,
2796
    ) -> None:
2797
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2798
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2799
            function_name, qualifier, context
2800
        )
2801
        if qualifier is not None:
1✔
2802
            self._validate_qualifier_expression(qualifier)
1✔
2803

2804
        state = lambda_stores[account_id][region]
1✔
2805
        resolved_fn = state.functions.get(function_name)
1✔
2806
        if resolved_fn is None:
1✔
2807
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2808
            raise ResourceNotFoundException(f"No policy found for: {fn_arn}", Type="User")
1✔
2809

2810
        resolved_qualifier, _ = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2811
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2812
        if not function_permission:
1✔
2813
            raise ResourceNotFoundException(
1✔
2814
                "No policy is associated with the given resource.", Type="User"
2815
            )
2816

2817
        # try to find statement in policy and delete it
2818
        statement = None
1✔
2819
        for s in function_permission.policy.Statement:
1✔
2820
            if s["Sid"] == statement_id:
1✔
2821
                statement = s
1✔
2822
                break
1✔
2823

2824
        if not statement:
1✔
2825
            raise ResourceNotFoundException(
1✔
2826
                f"Statement {statement_id} is not found in resource policy.", Type="User"
2827
            )
2828
        fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2829
        if revision_id and revision_id != fn_revision_id:
1✔
UNCOV
2830
            raise PreconditionFailedException(
×
2831
                "The Revision Id provided does not match the latest Revision Id. "
2832
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2833
                Type="User",
2834
            )
2835
        function_permission.policy.Statement.remove(statement)
1✔
2836

2837
        # Update revision id for alias or version
2838
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2839
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2840
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2841
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
×
2842
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
×
2843
        # Assumes that a non-alias is a version
2844
        else:
2845
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2846
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2847
                resolved_version, config=dataclasses.replace(resolved_version.config)
2848
            )
2849

2850
        # remove the policy as a whole when there's no statement left in it
2851
        if len(function_permission.policy.Statement) == 0:
1✔
2852
            del resolved_fn.permissions[resolved_qualifier]
1✔
2853

2854
    def get_policy(
1✔
2855
        self,
2856
        context: RequestContext,
2857
        function_name: NamespacedFunctionName,
2858
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
2859
        **kwargs,
2860
    ) -> GetPolicyResponse:
2861
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2862
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2863
            function_name, qualifier, context
2864
        )
2865

2866
        if qualifier is not None:
1✔
2867
            self._validate_qualifier_expression(qualifier)
1✔
2868

2869
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2870

2871
        resolved_qualifier = qualifier or "$LATEST"
1✔
2872
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2873
        if not function_permission:
1✔
2874
            raise ResourceNotFoundException(
1✔
2875
                "The resource you requested does not exist.", Type="User"
2876
            )
2877

2878
        fn_revision_id = None
1✔
2879
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2880
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2881
            fn_revision_id = resolved_alias.revision_id
1✔
2882
        # Assumes that a non-alias is a version
2883
        else:
2884
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2885
            fn_revision_id = resolved_version.config.revision_id
1✔
2886

2887
        return GetPolicyResponse(
1✔
2888
            Policy=json.dumps(dataclasses.asdict(function_permission.policy)),
2889
            RevisionId=fn_revision_id,
2890
        )
2891

2892
    # =======================================
2893
    # ========  Code signing config  ========
2894
    # =======================================
2895

2896
    def create_code_signing_config(
1✔
2897
        self,
2898
        context: RequestContext,
2899
        allowed_publishers: AllowedPublishers,
2900
        description: Description | None = None,
2901
        code_signing_policies: CodeSigningPolicies | None = None,
2902
        tags: Tags | None = None,
2903
        **kwargs,
2904
    ) -> CreateCodeSigningConfigResponse:
2905
        account = context.account_id
1✔
2906
        region = context.region
1✔
2907

2908
        state = lambda_stores[account][region]
1✔
2909
        # TODO: can there be duplicates?
2910
        csc_id = f"csc-{get_random_hex(17)}"  # e.g. 'csc-077c33b4c19e26036'
1✔
2911
        csc_arn = f"arn:{context.partition}:lambda:{region}:{account}:code-signing-config:{csc_id}"
1✔
2912
        csc = CodeSigningConfig(
1✔
2913
            csc_id=csc_id,
2914
            arn=csc_arn,
2915
            allowed_publishers=allowed_publishers,
2916
            policies=code_signing_policies,
2917
            last_modified=api_utils.generate_lambda_date(),
2918
            description=description,
2919
        )
2920
        state.code_signing_configs[csc_arn] = csc
1✔
2921
        return CreateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
2922

2923
    def put_function_code_signing_config(
1✔
2924
        self,
2925
        context: RequestContext,
2926
        code_signing_config_arn: CodeSigningConfigArn,
2927
        function_name: NamespacedFunctionName,
2928
        **kwargs,
2929
    ) -> PutFunctionCodeSigningConfigResponse:
2930
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2931
        state = lambda_stores[account_id][region]
1✔
2932
        function_name = api_utils.get_function_name(function_name, context)
1✔
2933

2934
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2935
        if not csc:
1✔
2936
            raise CodeSigningConfigNotFoundException(
1✔
2937
                f"The code signing configuration cannot be found. Check that the provided configuration is not deleted: {code_signing_config_arn}.",
2938
                Type="User",
2939
            )
2940

2941
        fn = state.functions.get(function_name)
1✔
2942
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2943
        if not fn:
1✔
2944
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
2945

2946
        fn.code_signing_config_arn = code_signing_config_arn
1✔
2947
        return PutFunctionCodeSigningConfigResponse(
1✔
2948
            CodeSigningConfigArn=code_signing_config_arn, FunctionName=function_name
2949
        )
2950

2951
    def update_code_signing_config(
1✔
2952
        self,
2953
        context: RequestContext,
2954
        code_signing_config_arn: CodeSigningConfigArn,
2955
        description: Description = None,
2956
        allowed_publishers: AllowedPublishers = None,
2957
        code_signing_policies: CodeSigningPolicies = None,
2958
        **kwargs,
2959
    ) -> UpdateCodeSigningConfigResponse:
2960
        state = lambda_stores[context.account_id][context.region]
1✔
2961
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2962
        if not csc:
1✔
2963
            raise ResourceNotFoundException(
1✔
2964
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2965
            )
2966

2967
        changes = {
1✔
2968
            **(
2969
                {"allowed_publishers": allowed_publishers} if allowed_publishers is not None else {}
2970
            ),
2971
            **({"policies": code_signing_policies} if code_signing_policies is not None else {}),
2972
            **({"description": description} if description is not None else {}),
2973
        }
2974
        new_csc = dataclasses.replace(
1✔
2975
            csc, last_modified=api_utils.generate_lambda_date(), **changes
2976
        )
2977
        state.code_signing_configs[code_signing_config_arn] = new_csc
1✔
2978

2979
        return UpdateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(new_csc))
1✔
2980

2981
    def get_code_signing_config(
1✔
2982
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
2983
    ) -> GetCodeSigningConfigResponse:
2984
        state = lambda_stores[context.account_id][context.region]
1✔
2985
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2986
        if not csc:
1✔
2987
            raise ResourceNotFoundException(
1✔
2988
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2989
            )
2990

2991
        return GetCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
2992

2993
    def get_function_code_signing_config(
1✔
2994
        self, context: RequestContext, function_name: NamespacedFunctionName, **kwargs
2995
    ) -> GetFunctionCodeSigningConfigResponse:
2996
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2997
        state = lambda_stores[account_id][region]
1✔
2998
        function_name = api_utils.get_function_name(function_name, context)
1✔
2999
        fn = state.functions.get(function_name)
1✔
3000
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
3001
        if not fn:
1✔
3002
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
3003

3004
        if fn.code_signing_config_arn:
1✔
3005
            return GetFunctionCodeSigningConfigResponse(
1✔
3006
                CodeSigningConfigArn=fn.code_signing_config_arn, FunctionName=function_name
3007
            )
3008

3009
        return GetFunctionCodeSigningConfigResponse()
1✔
3010

3011
    def delete_function_code_signing_config(
1✔
3012
        self, context: RequestContext, function_name: NamespacedFunctionName, **kwargs
3013
    ) -> None:
3014
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3015
        state = lambda_stores[account_id][region]
1✔
3016
        function_name = api_utils.get_function_name(function_name, context)
1✔
3017
        fn = state.functions.get(function_name)
1✔
3018
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
3019
        if not fn:
1✔
3020
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
3021

3022
        fn.code_signing_config_arn = None
1✔
3023

3024
    def delete_code_signing_config(
1✔
3025
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
3026
    ) -> DeleteCodeSigningConfigResponse:
3027
        state = lambda_stores[context.account_id][context.region]
1✔
3028

3029
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
3030
        if not csc:
1✔
3031
            raise ResourceNotFoundException(
1✔
3032
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3033
            )
3034

3035
        del state.code_signing_configs[code_signing_config_arn]
1✔
3036

3037
        return DeleteCodeSigningConfigResponse()
1✔
3038

3039
    def list_code_signing_configs(
1✔
3040
        self,
3041
        context: RequestContext,
3042
        marker: String = None,
3043
        max_items: MaxListItems = None,
3044
        **kwargs,
3045
    ) -> ListCodeSigningConfigsResponse:
3046
        state = lambda_stores[context.account_id][context.region]
1✔
3047

3048
        cscs = [api_utils.map_csc(csc) for csc in state.code_signing_configs.values()]
1✔
3049
        cscs = PaginatedList(cscs)
1✔
3050
        page, token = cscs.get_page(
1✔
3051
            lambda csc: csc["CodeSigningConfigId"],
3052
            marker,
3053
            max_items,
3054
        )
3055
        return ListCodeSigningConfigsResponse(CodeSigningConfigs=page, NextMarker=token)
1✔
3056

3057
    def list_functions_by_code_signing_config(
1✔
3058
        self,
3059
        context: RequestContext,
3060
        code_signing_config_arn: CodeSigningConfigArn,
3061
        marker: String = None,
3062
        max_items: MaxListItems = None,
3063
        **kwargs,
3064
    ) -> ListFunctionsByCodeSigningConfigResponse:
3065
        account = context.account_id
1✔
3066
        region = context.region
1✔
3067

3068
        state = lambda_stores[account][region]
1✔
3069

3070
        if code_signing_config_arn not in state.code_signing_configs:
1✔
3071
            raise ResourceNotFoundException(
1✔
3072
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
3073
            )
3074

3075
        fn_arns = [
1✔
3076
            api_utils.unqualified_lambda_arn(fn.function_name, account, region)
3077
            for fn in state.functions.values()
3078
            if fn.code_signing_config_arn == code_signing_config_arn
3079
        ]
3080

3081
        cscs = PaginatedList(fn_arns)
1✔
3082
        page, token = cscs.get_page(
1✔
3083
            lambda x: x,
3084
            marker,
3085
            max_items,
3086
        )
3087
        return ListFunctionsByCodeSigningConfigResponse(FunctionArns=page, NextMarker=token)
1✔
3088

3089
    # =======================================
3090
    # =========  Account Settings   =========
3091
    # =======================================
3092

3093
    # CAVE: these settings & usages are *per* region!
3094
    # Lambda quotas: https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
3095
    def get_account_settings(self, context: RequestContext, **kwargs) -> GetAccountSettingsResponse:
1✔
3096
        state = lambda_stores[context.account_id][context.region]
1✔
3097

3098
        fn_count = 0
1✔
3099
        code_size_sum = 0
1✔
3100
        reserved_concurrency_sum = 0
1✔
3101
        for fn in state.functions.values():
1✔
3102
            fn_count += 1
1✔
3103
            for fn_version in fn.versions.values():
1✔
3104
                # Image-based Lambdas do not have a code attribute and count against the ECR quotas instead
3105
                if fn_version.config.package_type == PackageType.Zip:
1✔
3106
                    code_size_sum += fn_version.config.code.code_size
1✔
3107
            if fn.reserved_concurrent_executions is not None:
1✔
3108
                reserved_concurrency_sum += fn.reserved_concurrent_executions
1✔
3109
            for c in fn.provisioned_concurrency_configs.values():
1✔
3110
                reserved_concurrency_sum += c.provisioned_concurrent_executions
1✔
3111
        for layer in state.layers.values():
1✔
3112
            for layer_version in layer.layer_versions.values():
1✔
3113
                code_size_sum += layer_version.code.code_size
1✔
3114
        return GetAccountSettingsResponse(
1✔
3115
            AccountLimit=AccountLimit(
3116
                TotalCodeSize=config.LAMBDA_LIMITS_TOTAL_CODE_SIZE,
3117
                CodeSizeZipped=config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED,
3118
                CodeSizeUnzipped=config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED,
3119
                ConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS,
3120
                UnreservedConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS
3121
                - reserved_concurrency_sum,
3122
            ),
3123
            AccountUsage=AccountUsage(
3124
                TotalCodeSize=code_size_sum,
3125
                FunctionCount=fn_count,
3126
            ),
3127
        )
3128

3129
    # =======================================
3130
    # ==  Provisioned Concurrency Config   ==
3131
    # =======================================
3132

3133
    def _get_provisioned_config(
1✔
3134
        self, context: RequestContext, function_name: str, qualifier: str
3135
    ) -> ProvisionedConcurrencyConfiguration | None:
3136
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3137
        state = lambda_stores[account_id][region]
1✔
3138
        function_name = api_utils.get_function_name(function_name, context)
1✔
3139
        fn = state.functions.get(function_name)
1✔
3140
        if api_utils.qualifier_is_alias(qualifier):
1✔
3141
            fn_alias = None
1✔
3142
            if fn:
1✔
3143
                fn_alias = fn.aliases.get(qualifier)
1✔
3144
            if fn_alias is None:
1✔
3145
                raise ResourceNotFoundException(
1✔
3146
                    f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3147
                    Type="User",
3148
                )
3149
        elif api_utils.qualifier_is_version(qualifier):
1✔
3150
            fn_version = None
1✔
3151
            if fn:
1✔
3152
                fn_version = fn.versions.get(qualifier)
1✔
3153
            if fn_version is None:
1✔
3154
                raise ResourceNotFoundException(
1✔
3155
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
3156
                    Type="User",
3157
                )
3158

3159
        return fn.provisioned_concurrency_configs.get(qualifier)
1✔
3160

3161
    def put_provisioned_concurrency_config(
1✔
3162
        self,
3163
        context: RequestContext,
3164
        function_name: FunctionName,
3165
        qualifier: Qualifier,
3166
        provisioned_concurrent_executions: PositiveInteger,
3167
        **kwargs,
3168
    ) -> PutProvisionedConcurrencyConfigResponse:
3169
        if provisioned_concurrent_executions <= 0:
1✔
3170
            raise ValidationException(
1✔
3171
                f"1 validation error detected: Value '{provisioned_concurrent_executions}' at 'provisionedConcurrentExecutions' failed to satisfy constraint: Member must have value greater than or equal to 1"
3172
            )
3173

3174
        if qualifier == "$LATEST":
1✔
3175
            raise InvalidParameterValueException(
1✔
3176
                "Provisioned Concurrency Configs cannot be applied to unpublished function versions.",
3177
                Type="User",
3178
            )
3179
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3180
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3181
            function_name, qualifier, context
3182
        )
3183
        state = lambda_stores[account_id][region]
1✔
3184
        fn = state.functions.get(function_name)
1✔
3185

3186
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3187

3188
        if provisioned_config:  # TODO: merge?
1✔
3189
            # TODO: add a test for partial updates (if possible)
3190
            LOG.warning(
1✔
3191
                "Partial update of provisioned concurrency config is currently not supported."
3192
            )
3193

3194
        other_provisioned_sum = sum(
1✔
3195
            [
3196
                provisioned_configs.provisioned_concurrent_executions
3197
                for provisioned_qualifier, provisioned_configs in fn.provisioned_concurrency_configs.items()
3198
                if provisioned_qualifier != qualifier
3199
            ]
3200
        )
3201

3202
        if (
1✔
3203
            fn.reserved_concurrent_executions is not None
3204
            and fn.reserved_concurrent_executions
3205
            < other_provisioned_sum + provisioned_concurrent_executions
3206
        ):
3207
            raise InvalidParameterValueException(
1✔
3208
                "Requested Provisioned Concurrency should not be greater than the reservedConcurrentExecution for function",
3209
                Type="User",
3210
            )
3211

3212
        if provisioned_concurrent_executions > config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS:
1✔
3213
            raise InvalidParameterValueException(
1✔
3214
                f"Specified ConcurrentExecutions for function is greater than account's unreserved concurrency"
3215
                f" [{config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS}]."
3216
            )
3217

3218
        settings = self.get_account_settings(context)
1✔
3219
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
3220
            "UnreservedConcurrentExecutions"
3221
        ]
3222
        if (
1✔
3223
            unreserved_concurrent_executions - provisioned_concurrent_executions
3224
            < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY
3225
        ):
3226
            raise InvalidParameterValueException(
1✔
3227
                f"Specified ConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below"
3228
                f" its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
3229
            )
3230

3231
        provisioned_config = ProvisionedConcurrencyConfiguration(
1✔
3232
            provisioned_concurrent_executions, api_utils.generate_lambda_date()
3233
        )
3234
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3235

3236
        if api_utils.qualifier_is_alias(qualifier):
1✔
3237
            alias = fn.aliases.get(qualifier)
1✔
3238
            resolved_version = fn.versions.get(alias.function_version)
1✔
3239

3240
            if (
1✔
3241
                resolved_version
3242
                and fn.provisioned_concurrency_configs.get(alias.function_version) is not None
3243
            ):
3244
                raise ResourceConflictException(
1✔
3245
                    "Alias can't be used for Provisioned Concurrency configuration on an already Provisioned version",
3246
                    Type="User",
3247
                )
3248
            fn_arn = resolved_version.id.qualified_arn()
1✔
3249
        elif api_utils.qualifier_is_version(qualifier):
1✔
3250
            fn_version = fn.versions.get(qualifier)
1✔
3251

3252
            # TODO: might be useful other places, utilize
3253
            pointing_aliases = []
1✔
3254
            for alias in fn.aliases.values():
1✔
3255
                if (
1✔
3256
                    alias.function_version == qualifier
3257
                    and fn.provisioned_concurrency_configs.get(alias.name) is not None
3258
                ):
3259
                    pointing_aliases.append(alias.name)
1✔
3260
            if pointing_aliases:
1✔
3261
                raise ResourceConflictException(
1✔
3262
                    "Version is pointed by a Provisioned Concurrency alias", Type="User"
3263
                )
3264

3265
            fn_arn = fn_version.id.qualified_arn()
1✔
3266

3267
        manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3268

3269
        fn.provisioned_concurrency_configs[qualifier] = provisioned_config
1✔
3270

3271
        manager.update_provisioned_concurrency_config(
1✔
3272
            provisioned_config.provisioned_concurrent_executions
3273
        )
3274

3275
        return PutProvisionedConcurrencyConfigResponse(
1✔
3276
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3277
            AvailableProvisionedConcurrentExecutions=0,
3278
            AllocatedProvisionedConcurrentExecutions=0,
3279
            Status=ProvisionedConcurrencyStatusEnum.IN_PROGRESS,
3280
            # StatusReason=manager.provisioned_state.status_reason,
3281
            LastModified=provisioned_config.last_modified,  # TODO: does change with configuration or also with state changes?
3282
        )
3283

3284
    def get_provisioned_concurrency_config(
1✔
3285
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3286
    ) -> GetProvisionedConcurrencyConfigResponse:
3287
        if qualifier == "$LATEST":
1✔
3288
            raise InvalidParameterValueException(
1✔
3289
                "The function resource provided must be an alias or a published version.",
3290
                Type="User",
3291
            )
3292
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3293
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3294
            function_name, qualifier, context
3295
        )
3296

3297
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3298
        if not provisioned_config:
1✔
3299
            raise ProvisionedConcurrencyConfigNotFoundException(
1✔
3300
                "No Provisioned Concurrency Config found for this function", Type="User"
3301
            )
3302

3303
        # TODO: make this compatible with alias pointer migration on update
3304
        if api_utils.qualifier_is_alias(qualifier):
1✔
3305
            state = lambda_stores[account_id][region]
1✔
3306
            fn = state.functions.get(function_name)
1✔
3307
            alias = fn.aliases.get(qualifier)
1✔
3308
            fn_arn = api_utils.qualified_lambda_arn(
1✔
3309
                function_name, alias.function_version, account_id, region
3310
            )
3311
        else:
3312
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3313

3314
        ver_manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3315

3316
        return GetProvisionedConcurrencyConfigResponse(
1✔
3317
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3318
            LastModified=provisioned_config.last_modified,
3319
            AvailableProvisionedConcurrentExecutions=ver_manager.provisioned_state.available,
3320
            AllocatedProvisionedConcurrentExecutions=ver_manager.provisioned_state.allocated,
3321
            Status=ver_manager.provisioned_state.status,
3322
            StatusReason=ver_manager.provisioned_state.status_reason,
3323
        )
3324

3325
    def list_provisioned_concurrency_configs(
1✔
3326
        self,
3327
        context: RequestContext,
3328
        function_name: FunctionName,
3329
        marker: String = None,
3330
        max_items: MaxProvisionedConcurrencyConfigListItems = None,
3331
        **kwargs,
3332
    ) -> ListProvisionedConcurrencyConfigsResponse:
3333
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3334
        state = lambda_stores[account_id][region]
1✔
3335

3336
        function_name = api_utils.get_function_name(function_name, context)
1✔
3337
        fn = state.functions.get(function_name)
1✔
3338
        if fn is None:
1✔
3339
            raise ResourceNotFoundException(
1✔
3340
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
3341
                Type="User",
3342
            )
3343

3344
        configs = []
1✔
3345
        for qualifier, pc_config in fn.provisioned_concurrency_configs.items():
1✔
3346
            if api_utils.qualifier_is_alias(qualifier):
×
3347
                alias = fn.aliases.get(qualifier)
×
3348
                fn_arn = api_utils.qualified_lambda_arn(
×
3349
                    function_name, alias.function_version, account_id, region
3350
                )
3351
            else:
3352
                fn_arn = api_utils.qualified_lambda_arn(
×
3353
                    function_name, qualifier, account_id, region
3354
                )
3355

3356
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
3357

3358
            configs.append(
×
3359
                ProvisionedConcurrencyConfigListItem(
3360
                    FunctionArn=api_utils.qualified_lambda_arn(
3361
                        function_name, qualifier, account_id, region
3362
                    ),
3363
                    RequestedProvisionedConcurrentExecutions=pc_config.provisioned_concurrent_executions,
3364
                    AvailableProvisionedConcurrentExecutions=manager.provisioned_state.available,
3365
                    AllocatedProvisionedConcurrentExecutions=manager.provisioned_state.allocated,
3366
                    Status=manager.provisioned_state.status,
3367
                    StatusReason=manager.provisioned_state.status_reason,
3368
                    LastModified=pc_config.last_modified,
3369
                )
3370
            )
3371

3372
        provisioned_concurrency_configs = configs
1✔
3373
        provisioned_concurrency_configs = PaginatedList(provisioned_concurrency_configs)
1✔
3374
        page, token = provisioned_concurrency_configs.get_page(
1✔
3375
            lambda x: x,
3376
            marker,
3377
            max_items,
3378
        )
3379
        return ListProvisionedConcurrencyConfigsResponse(
1✔
3380
            ProvisionedConcurrencyConfigs=page, NextMarker=token
3381
        )
3382

3383
    def delete_provisioned_concurrency_config(
1✔
3384
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3385
    ) -> None:
3386
        if qualifier == "$LATEST":
1✔
3387
            raise InvalidParameterValueException(
1✔
3388
                "The function resource provided must be an alias or a published version.",
3389
                Type="User",
3390
            )
3391
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3392
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3393
            function_name, qualifier, context
3394
        )
3395
        state = lambda_stores[account_id][region]
1✔
3396
        fn = state.functions.get(function_name)
1✔
3397

3398
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3399
        # delete is idempotent and doesn't actually care about the provisioned concurrency config not existing
3400
        if provisioned_config:
1✔
3401
            fn.provisioned_concurrency_configs.pop(qualifier)
1✔
3402
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3403
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3404
            manager.update_provisioned_concurrency_config(0)
1✔
3405

3406
    # =======================================
3407
    # =======  Event Invoke Config   ========
3408
    # =======================================
3409

3410
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)"
3411
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]2((-gov)|(-iso(b?)))?-[a-z]+-\\d1)?:(\\d12)?:(.*)" ... (expected → actual)
3412

3413
    def _validate_destination_config(
1✔
3414
        self, store: LambdaStore, function_name: str, destination_config: DestinationConfig
3415
    ):
3416
        def _validate_destination_arn(destination_arn) -> bool:
1✔
3417
            if not api_utils.DESTINATION_ARN_PATTERN.match(destination_arn):
1✔
3418
                # technically we shouldn't handle this in the provider
3419
                raise ValidationException(
1✔
3420
                    "1 validation error detected: Value '"
3421
                    + destination_arn
3422
                    + "' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: "
3423
                    + "$|kafka://([^.]([a-zA-Z0-9\\-_.]{0,248}))|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:((eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)"
3424
                )
3425

3426
            match destination_arn.split(":")[2]:
1✔
3427
                case "lambda":
1✔
3428
                    fn_parts = api_utils.FULL_FN_ARN_PATTERN.search(destination_arn).groupdict()
1✔
3429
                    if fn_parts:
1✔
3430
                        # check if it exists
3431
                        fn = store.functions.get(fn_parts["function_name"])
1✔
3432
                        if not fn:
1✔
3433
                            raise InvalidParameterValueException(
1✔
3434
                                f"The destination ARN {destination_arn} is invalid.", Type="User"
3435
                            )
3436
                        if fn_parts["function_name"] == function_name:
1✔
3437
                            raise InvalidParameterValueException(
1✔
3438
                                "You can't specify the function as a destination for itself.",
3439
                                Type="User",
3440
                            )
3441
                case "sns" | "sqs" | "events":
1✔
3442
                    pass
1✔
3443
                case _:
1✔
3444
                    return False
1✔
3445
            return True
1✔
3446

3447
        validation_err = False
1✔
3448

3449
        failure_destination = destination_config.get("OnFailure", {}).get("Destination")
1✔
3450
        if failure_destination:
1✔
3451
            validation_err = validation_err or not _validate_destination_arn(failure_destination)
1✔
3452

3453
        success_destination = destination_config.get("OnSuccess", {}).get("Destination")
1✔
3454
        if success_destination:
1✔
3455
            validation_err = validation_err or not _validate_destination_arn(success_destination)
1✔
3456

3457
        if validation_err:
1✔
3458
            on_success_part = (
1✔
3459
                f"OnSuccess(destination={success_destination})" if success_destination else "null"
3460
            )
3461
            on_failure_part = (
1✔
3462
                f"OnFailure(destination={failure_destination})" if failure_destination else "null"
3463
            )
3464
            raise InvalidParameterValueException(
1✔
3465
                f"The provided destination config DestinationConfig(onSuccess={on_success_part}, onFailure={on_failure_part}) is invalid.",
3466
                Type="User",
3467
            )
3468

3469
    def put_function_event_invoke_config(
1✔
3470
        self,
3471
        context: RequestContext,
3472
        function_name: FunctionName,
3473
        qualifier: NumericLatestPublishedOrAliasQualifier = None,
3474
        maximum_retry_attempts: MaximumRetryAttempts = None,
3475
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3476
        destination_config: DestinationConfig = None,
3477
        **kwargs,
3478
    ) -> FunctionEventInvokeConfig:
3479
        """
3480
        Destination ARNs can be:
3481
        * SQS arn
3482
        * SNS arn
3483
        * Lambda arn
3484
        * EventBridge arn
3485

3486
        Differences between put_ and update_:
3487
            * put overwrites any existing config
3488
            * update allows changes only single values while keeping the rest of existing ones
3489
            * update fails on non-existing configs
3490

3491
        Differences between destination and DLQ
3492
            * "However, a dead-letter queue is part of a function's version-specific configuration, so it is locked in when you publish a version."
3493
            *  "On-failure destinations also support additional targets and include details about the function's response in the invocation record."
3494

3495
        """
3496
        if (
1✔
3497
            maximum_event_age_in_seconds is None
3498
            and maximum_retry_attempts is None
3499
            and destination_config is None
3500
        ):
3501
            raise InvalidParameterValueException(
1✔
3502
                "You must specify at least one of error handling or destination setting.",
3503
                Type="User",
3504
            )
3505
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3506
        state = lambda_stores[account_id][region]
1✔
3507
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3508
            function_name, qualifier, context
3509
        )
3510
        fn = state.functions.get(function_name)
1✔
3511
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3512
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3513

3514
        qualifier = qualifier or "$LATEST"
1✔
3515

3516
        # validate and normalize destination config
3517
        if destination_config:
1✔
3518
            self._validate_destination_config(state, function_name, destination_config)
1✔
3519

3520
        destination_config = DestinationConfig(
1✔
3521
            OnSuccess=OnSuccess(
3522
                Destination=(destination_config or {}).get("OnSuccess", {}).get("Destination")
3523
            ),
3524
            OnFailure=OnFailure(
3525
                Destination=(destination_config or {}).get("OnFailure", {}).get("Destination")
3526
            ),
3527
        )
3528

3529
        config = EventInvokeConfig(
1✔
3530
            function_name=function_name,
3531
            qualifier=qualifier,
3532
            maximum_event_age_in_seconds=maximum_event_age_in_seconds,
3533
            maximum_retry_attempts=maximum_retry_attempts,
3534
            last_modified=api_utils.generate_lambda_date(),
3535
            destination_config=destination_config,
3536
        )
3537
        fn.event_invoke_configs[qualifier] = config
1✔
3538

3539
        return FunctionEventInvokeConfig(
1✔
3540
            LastModified=datetime.datetime.strptime(
3541
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3542
            ),
3543
            FunctionArn=api_utils.qualified_lambda_arn(
3544
                function_name, qualifier or "$LATEST", account_id, region
3545
            ),
3546
            DestinationConfig=destination_config,
3547
            MaximumEventAgeInSeconds=maximum_event_age_in_seconds,
3548
            MaximumRetryAttempts=maximum_retry_attempts,
3549
        )
3550

3551
    def get_function_event_invoke_config(
1✔
3552
        self,
3553
        context: RequestContext,
3554
        function_name: FunctionName,
3555
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
3556
        **kwargs,
3557
    ) -> FunctionEventInvokeConfig:
3558
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3559
        state = lambda_stores[account_id][region]
1✔
3560
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3561
            function_name, qualifier, context
3562
        )
3563

3564
        qualifier = qualifier or "$LATEST"
1✔
3565
        fn = state.functions.get(function_name)
1✔
3566
        if not fn:
1✔
3567
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3568
            raise ResourceNotFoundException(
1✔
3569
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3570
            )
3571

3572
        config = fn.event_invoke_configs.get(qualifier)
1✔
3573
        if not config:
1✔
3574
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3575
            raise ResourceNotFoundException(
1✔
3576
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3577
            )
3578

3579
        return FunctionEventInvokeConfig(
1✔
3580
            LastModified=datetime.datetime.strptime(
3581
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3582
            ),
3583
            FunctionArn=api_utils.qualified_lambda_arn(
3584
                function_name, qualifier, account_id, region
3585
            ),
3586
            DestinationConfig=config.destination_config,
3587
            MaximumEventAgeInSeconds=config.maximum_event_age_in_seconds,
3588
            MaximumRetryAttempts=config.maximum_retry_attempts,
3589
        )
3590

3591
    def list_function_event_invoke_configs(
1✔
3592
        self,
3593
        context: RequestContext,
3594
        function_name: FunctionName,
3595
        marker: String = None,
3596
        max_items: MaxFunctionEventInvokeConfigListItems = None,
3597
        **kwargs,
3598
    ) -> ListFunctionEventInvokeConfigsResponse:
3599
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3600
        state = lambda_stores[account_id][region]
1✔
3601
        fn = state.functions.get(function_name)
1✔
3602
        if not fn:
1✔
3603
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3604

3605
        event_invoke_configs = [
1✔
3606
            FunctionEventInvokeConfig(
3607
                LastModified=c.last_modified,
3608
                FunctionArn=api_utils.qualified_lambda_arn(
3609
                    function_name, c.qualifier, account_id, region
3610
                ),
3611
                MaximumEventAgeInSeconds=c.maximum_event_age_in_seconds,
3612
                MaximumRetryAttempts=c.maximum_retry_attempts,
3613
                DestinationConfig=c.destination_config,
3614
            )
3615
            for c in fn.event_invoke_configs.values()
3616
        ]
3617

3618
        event_invoke_configs = PaginatedList(event_invoke_configs)
1✔
3619
        page, token = event_invoke_configs.get_page(
1✔
3620
            lambda x: x["FunctionArn"],
3621
            marker,
3622
            max_items,
3623
        )
3624
        return ListFunctionEventInvokeConfigsResponse(
1✔
3625
            FunctionEventInvokeConfigs=page, NextMarker=token
3626
        )
3627

3628
    def delete_function_event_invoke_config(
1✔
3629
        self,
3630
        context: RequestContext,
3631
        function_name: FunctionName,
3632
        qualifier: NumericLatestPublishedOrAliasQualifier | None = None,
3633
        **kwargs,
3634
    ) -> None:
3635
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3636
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3637
            function_name, qualifier, context
3638
        )
3639
        state = lambda_stores[account_id][region]
1✔
3640
        fn = state.functions.get(function_name)
1✔
3641
        resolved_qualifier = qualifier or "$LATEST"
1✔
3642
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3643
        if not fn:
1✔
3644
            raise ResourceNotFoundException(
1✔
3645
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3646
            )
3647

3648
        config = fn.event_invoke_configs.get(resolved_qualifier)
1✔
3649
        if not config:
1✔
3650
            raise ResourceNotFoundException(
1✔
3651
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3652
            )
3653

3654
        del fn.event_invoke_configs[resolved_qualifier]
1✔
3655

3656
    def update_function_event_invoke_config(
1✔
3657
        self,
3658
        context: RequestContext,
3659
        function_name: FunctionName,
3660
        qualifier: NumericLatestPublishedOrAliasQualifier = None,
3661
        maximum_retry_attempts: MaximumRetryAttempts = None,
3662
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3663
        destination_config: DestinationConfig = None,
3664
        **kwargs,
3665
    ) -> FunctionEventInvokeConfig:
3666
        # like put but only update single fields via replace
3667
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3668
        state = lambda_stores[account_id][region]
1✔
3669
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3670
            function_name, qualifier, context
3671
        )
3672

3673
        if (
1✔
3674
            maximum_event_age_in_seconds is None
3675
            and maximum_retry_attempts is None
3676
            and destination_config is None
3677
        ):
3678
            raise InvalidParameterValueException(
×
3679
                "You must specify at least one of error handling or destination setting.",
3680
                Type="User",
3681
            )
3682

3683
        fn = state.functions.get(function_name)
1✔
3684
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3685
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3686

3687
        qualifier = qualifier or "$LATEST"
1✔
3688

3689
        config = fn.event_invoke_configs.get(qualifier)
1✔
3690
        if not config:
1✔
3691
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3692
            raise ResourceNotFoundException(
1✔
3693
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3694
            )
3695

3696
        if destination_config:
1✔
3697
            self._validate_destination_config(state, function_name, destination_config)
×
3698

3699
        optional_kwargs = {
1✔
3700
            k: v
3701
            for k, v in {
3702
                "destination_config": destination_config,
3703
                "maximum_retry_attempts": maximum_retry_attempts,
3704
                "maximum_event_age_in_seconds": maximum_event_age_in_seconds,
3705
            }.items()
3706
            if v is not None
3707
        }
3708

3709
        new_config = dataclasses.replace(
1✔
3710
            config, last_modified=api_utils.generate_lambda_date(), **optional_kwargs
3711
        )
3712
        fn.event_invoke_configs[qualifier] = new_config
1✔
3713

3714
        return FunctionEventInvokeConfig(
1✔
3715
            LastModified=datetime.datetime.strptime(
3716
                new_config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3717
            ),
3718
            FunctionArn=api_utils.qualified_lambda_arn(
3719
                function_name, qualifier or "$LATEST", account_id, region
3720
            ),
3721
            DestinationConfig=new_config.destination_config,
3722
            MaximumEventAgeInSeconds=new_config.maximum_event_age_in_seconds,
3723
            MaximumRetryAttempts=new_config.maximum_retry_attempts,
3724
        )
3725

3726
    # =======================================
3727
    # ======  Layer & Layer Versions  =======
3728
    # =======================================
3729

3730
    @staticmethod
1✔
3731
    def _resolve_layer(
1✔
3732
        layer_name_or_arn: str, context: RequestContext
3733
    ) -> tuple[str, str, str, str | None]:
3734
        """
3735
        Return locator attributes for a given Lambda layer.
3736

3737
        :param layer_name_or_arn: Layer name or ARN
3738
        :param context: Request context
3739
        :return: Tuple of region, account ID, layer name, layer version
3740
        """
3741
        if api_utils.is_layer_arn(layer_name_or_arn):
1✔
3742
            return api_utils.parse_layer_arn(layer_name_or_arn)
1✔
3743

3744
        return context.region, context.account_id, layer_name_or_arn, None
1✔
3745

3746
    def publish_layer_version(
1✔
3747
        self,
3748
        context: RequestContext,
3749
        layer_name: LayerName,
3750
        content: LayerVersionContentInput,
3751
        description: Description | None = None,
3752
        compatible_runtimes: CompatibleRuntimes | None = None,
3753
        license_info: LicenseInfo | None = None,
3754
        compatible_architectures: CompatibleArchitectures | None = None,
3755
        **kwargs,
3756
    ) -> PublishLayerVersionResponse:
3757
        """
3758
        On first use of a LayerName a new layer is created and for each subsequent call with the same LayerName a new version is created.
3759
        Note that there are no $LATEST versions with layers!
3760

3761
        """
3762
        account = context.account_id
1✔
3763
        region = context.region
1✔
3764

3765
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
3766
            compatible_runtimes, compatible_architectures
3767
        )
3768
        if validation_errors:
1✔
3769
            raise ValidationException(
1✔
3770
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {'; '.join(validation_errors)}"
3771
            )
3772

3773
        state = lambda_stores[account][region]
1✔
3774
        with self.create_layer_lock:
1✔
3775
            if layer_name not in state.layers:
1✔
3776
                # we don't have a version so create new layer object
3777
                # lock is required to avoid creating two v1 objects for the same name
3778
                layer = Layer(
1✔
3779
                    arn=api_utils.layer_arn(layer_name=layer_name, account=account, region=region)
3780
                )
3781
                state.layers[layer_name] = layer
1✔
3782

3783
        layer = state.layers[layer_name]
1✔
3784
        with layer.next_version_lock:
1✔
3785
            next_version = LambdaLayerVersionIdentifier(
1✔
3786
                account_id=account, region=region, layer_name=layer_name
3787
            ).generate(next_version=layer.next_version)
3788
            # When creating a layer with user defined layer version, it is possible that we
3789
            # create layer versions out of order.
3790
            # ie. a user could replicate layer v2 then layer v1. It is important to always keep the maximum possible
3791
            # value for next layer to avoid overwriting existing versions
3792
            if layer.next_version <= next_version:
1✔
3793
                # We don't need to update layer.next_version if the created version is lower than the "next in line"
3794
                layer.next_version = max(next_version, layer.next_version) + 1
1✔
3795

3796
        # creating a new layer
3797
        if content.get("ZipFile"):
1✔
3798
            code = store_lambda_archive(
1✔
3799
                archive_file=content["ZipFile"],
3800
                function_name=layer_name,
3801
                region_name=region,
3802
                account_id=account,
3803
            )
3804
        else:
3805
            code = store_s3_bucket_archive(
1✔
3806
                archive_bucket=content["S3Bucket"],
3807
                archive_key=content["S3Key"],
3808
                archive_version=content.get("S3ObjectVersion"),
3809
                function_name=layer_name,
3810
                region_name=region,
3811
                account_id=account,
3812
            )
3813

3814
        new_layer_version = LayerVersion(
1✔
3815
            layer_version_arn=api_utils.layer_version_arn(
3816
                layer_name=layer_name,
3817
                account=account,
3818
                region=region,
3819
                version=str(next_version),
3820
            ),
3821
            layer_arn=layer.arn,
3822
            version=next_version,
3823
            description=description or "",
3824
            license_info=license_info,
3825
            compatible_runtimes=compatible_runtimes,
3826
            compatible_architectures=compatible_architectures,
3827
            created=api_utils.generate_lambda_date(),
3828
            code=code,
3829
        )
3830

3831
        layer.layer_versions[str(next_version)] = new_layer_version
1✔
3832

3833
        return api_utils.map_layer_out(new_layer_version)
1✔
3834

3835
    def get_layer_version(
1✔
3836
        self,
3837
        context: RequestContext,
3838
        layer_name: LayerName,
3839
        version_number: LayerVersionNumber,
3840
        **kwargs,
3841
    ) -> GetLayerVersionResponse:
3842
        # TODO: handle layer_name as an ARN
3843

3844
        region_name, account_id, layer_name, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
3845
        state = lambda_stores[account_id][region_name]
1✔
3846

3847
        layer = state.layers.get(layer_name)
1✔
3848
        if version_number < 1:
1✔
3849
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
3850
        if layer is None:
1✔
3851
            raise ResourceNotFoundException(
1✔
3852
                "The resource you requested does not exist.", Type="User"
3853
            )
3854
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3855
        if layer_version is None:
1✔
3856
            raise ResourceNotFoundException(
1✔
3857
                "The resource you requested does not exist.", Type="User"
3858
            )
3859
        return api_utils.map_layer_out(layer_version)
1✔
3860

3861
    def get_layer_version_by_arn(
1✔
3862
        self, context: RequestContext, arn: LayerVersionArn, **kwargs
3863
    ) -> GetLayerVersionResponse:
3864
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3865
            arn, context
3866
        )
3867

3868
        if not layer_version:
1✔
3869
            raise ValidationException(
1✔
3870
                f"1 validation error detected: Value '{arn}' at 'arn' failed to satisfy constraint: Member must satisfy regular expression pattern: "
3871
                + "(arn:(aws[a-zA-Z-]*)?:lambda:(eusc-)?[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+)"
3872
            )
3873

3874
        store = lambda_stores[account_id][region_name]
1✔
3875
        if not (layers := store.layers.get(layer_name)):
1✔
3876
            raise ResourceNotFoundException(
×
3877
                "The resource you requested does not exist.", Type="User"
3878
            )
3879

3880
        layer_version = layers.layer_versions.get(layer_version)
1✔
3881

3882
        if not layer_version:
1✔
3883
            raise ResourceNotFoundException(
1✔
3884
                "The resource you requested does not exist.", Type="User"
3885
            )
3886

3887
        return api_utils.map_layer_out(layer_version)
1✔
3888

3889
    def list_layers(
1✔
3890
        self,
3891
        context: RequestContext,
3892
        compatible_runtime: Runtime | None = None,
3893
        marker: String | None = None,
3894
        max_items: MaxLayerListItems | None = None,
3895
        compatible_architecture: Architecture | None = None,
3896
        **kwargs,
3897
    ) -> ListLayersResponse:
3898
        validation_errors = []
1✔
3899

3900
        validation_error_arch = api_utils.validate_layer_architecture(compatible_architecture)
1✔
3901
        if validation_error_arch:
1✔
3902
            validation_errors.append(validation_error_arch)
1✔
3903

3904
        validation_error_runtime = api_utils.validate_layer_runtime(compatible_runtime)
1✔
3905
        if validation_error_runtime:
1✔
3906
            validation_errors.append(validation_error_runtime)
1✔
3907

3908
        if validation_errors:
1✔
3909
            raise ValidationException(
1✔
3910
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
3911
            )
3912
        # TODO: handle filter: compatible_runtime
3913
        # TODO: handle filter: compatible_architecture
3914

3915
        state = lambda_stores[context.account_id][context.region]
×
3916
        layers = state.layers
×
3917

3918
        # TODO: test how filters behave together with only returning layers here? Does it return the latest "matching" layer, i.e. does it ignore later layer versions that don't match?
3919

3920
        responses: list[LayersListItem] = []
×
3921
        for layer_name, layer in layers.items():
×
3922
            # fetch latest version
3923
            layer_versions = list(layer.layer_versions.values())
×
3924
            sorted(layer_versions, key=lambda x: x.version)
×
3925
            latest_layer_version = layer_versions[-1]
×
3926
            responses.append(
×
3927
                LayersListItem(
3928
                    LayerName=layer_name,
3929
                    LayerArn=layer.arn,
3930
                    LatestMatchingVersion=api_utils.map_layer_out(latest_layer_version),
3931
                )
3932
            )
3933

3934
        responses = PaginatedList(responses)
×
3935
        page, token = responses.get_page(
×
3936
            lambda version: version,
3937
            marker,
3938
            max_items,
3939
        )
3940

3941
        return ListLayersResponse(NextMarker=token, Layers=page)
×
3942

3943
    def list_layer_versions(
1✔
3944
        self,
3945
        context: RequestContext,
3946
        layer_name: LayerName,
3947
        compatible_runtime: Runtime | None = None,
3948
        marker: String | None = None,
3949
        max_items: MaxLayerListItems | None = None,
3950
        compatible_architecture: Architecture | None = None,
3951
        **kwargs,
3952
    ) -> ListLayerVersionsResponse:
3953
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
3954
            [compatible_runtime] if compatible_runtime else [],
3955
            [compatible_architecture] if compatible_architecture else [],
3956
        )
3957
        if validation_errors:
1✔
3958
            raise ValidationException(
×
3959
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
3960
            )
3961

3962
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3963
            layer_name, context
3964
        )
3965
        state = lambda_stores[account_id][region_name]
1✔
3966

3967
        # TODO: Test & handle filter: compatible_runtime
3968
        # TODO: Test & handle filter: compatible_architecture
3969
        all_layer_versions = []
1✔
3970
        layer = state.layers.get(layer_name)
1✔
3971
        if layer is not None:
1✔
3972
            for layer_version in layer.layer_versions.values():
1✔
3973
                all_layer_versions.append(api_utils.map_layer_out(layer_version))
1✔
3974

3975
        all_layer_versions.sort(key=lambda x: x["Version"], reverse=True)
1✔
3976
        all_layer_versions = PaginatedList(all_layer_versions)
1✔
3977
        page, token = all_layer_versions.get_page(
1✔
3978
            lambda version: version["LayerVersionArn"],
3979
            marker,
3980
            max_items,
3981
        )
3982
        return ListLayerVersionsResponse(NextMarker=token, LayerVersions=page)
1✔
3983

3984
    def delete_layer_version(
1✔
3985
        self,
3986
        context: RequestContext,
3987
        layer_name: LayerName,
3988
        version_number: LayerVersionNumber,
3989
        **kwargs,
3990
    ) -> None:
3991
        if version_number < 1:
1✔
3992
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
3993

3994
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3995
            layer_name, context
3996
        )
3997

3998
        store = lambda_stores[account_id][region_name]
1✔
3999
        layer = store.layers.get(layer_name, {})
1✔
4000
        if layer:
1✔
4001
            layer.layer_versions.pop(str(version_number), None)
1✔
4002

4003
    # =======================================
4004
    # =====  Layer Version Permissions  =====
4005
    # =======================================
4006
    # TODO: lock updates that change revision IDs
4007

4008
    def add_layer_version_permission(
1✔
4009
        self,
4010
        context: RequestContext,
4011
        layer_name: LayerName,
4012
        version_number: LayerVersionNumber,
4013
        statement_id: StatementId,
4014
        action: LayerPermissionAllowedAction,
4015
        principal: LayerPermissionAllowedPrincipal,
4016
        organization_id: OrganizationId = None,
4017
        revision_id: String = None,
4018
        **kwargs,
4019
    ) -> AddLayerVersionPermissionResponse:
4020
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4021
        # `layer_n` contains the layer name.
4022
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
4023

4024
        if action != "lambda:GetLayerVersion":
1✔
4025
            raise ValidationException(
1✔
4026
                f"1 validation error detected: Value '{action}' at 'action' failed to satisfy constraint: Member must satisfy regular expression pattern: lambda:GetLayerVersion"
4027
            )
4028

4029
        store = lambda_stores[account_id][region_name]
1✔
4030
        layer = store.layers.get(layer_n)
1✔
4031

4032
        layer_version_arn = api_utils.layer_version_arn(
1✔
4033
            layer_name, account_id, region_name, str(version_number)
4034
        )
4035

4036
        if layer is None:
1✔
4037
            raise ResourceNotFoundException(
1✔
4038
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4039
            )
4040
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4041
        if layer_version is None:
1✔
4042
            raise ResourceNotFoundException(
1✔
4043
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4044
            )
4045
        # do we have a policy? if not set one
4046
        if layer_version.policy is None:
1✔
4047
            layer_version.policy = LayerPolicy()
1✔
4048

4049
        if statement_id in layer_version.policy.statements:
1✔
4050
            raise ResourceConflictException(
1✔
4051
                f"The statement id ({statement_id}) provided already exists. Please provide a new statement id, or remove the existing statement.",
4052
                Type="User",
4053
            )
4054

4055
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
4056
            raise PreconditionFailedException(
1✔
4057
                "The Revision Id provided does not match the latest Revision Id. "
4058
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
4059
                Type="User",
4060
            )
4061

4062
        statement = LayerPolicyStatement(
1✔
4063
            sid=statement_id, action=action, principal=principal, organization_id=organization_id
4064
        )
4065

4066
        old_statements = layer_version.policy.statements
1✔
4067
        layer_version.policy = dataclasses.replace(
1✔
4068
            layer_version.policy, statements={**old_statements, statement_id: statement}
4069
        )
4070

4071
        return AddLayerVersionPermissionResponse(
1✔
4072
            Statement=json.dumps(
4073
                {
4074
                    "Sid": statement.sid,
4075
                    "Effect": "Allow",
4076
                    "Principal": statement.principal,
4077
                    "Action": statement.action,
4078
                    "Resource": layer_version.layer_version_arn,
4079
                }
4080
            ),
4081
            RevisionId=layer_version.policy.revision_id,
4082
        )
4083

4084
    def remove_layer_version_permission(
1✔
4085
        self,
4086
        context: RequestContext,
4087
        layer_name: LayerName,
4088
        version_number: LayerVersionNumber,
4089
        statement_id: StatementId,
4090
        revision_id: String = None,
4091
        **kwargs,
4092
    ) -> None:
4093
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4094
        # `layer_n` contains the layer name.
4095
        region_name, account_id, layer_n, layer_version = LambdaProvider._resolve_layer(
1✔
4096
            layer_name, context
4097
        )
4098

4099
        layer_version_arn = api_utils.layer_version_arn(
1✔
4100
            layer_name, account_id, region_name, str(version_number)
4101
        )
4102

4103
        state = lambda_stores[account_id][region_name]
1✔
4104
        layer = state.layers.get(layer_n)
1✔
4105
        if layer is None:
1✔
4106
            raise ResourceNotFoundException(
1✔
4107
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4108
            )
4109
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4110
        if layer_version is None:
1✔
4111
            raise ResourceNotFoundException(
1✔
4112
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4113
            )
4114

4115
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
4116
            raise PreconditionFailedException(
1✔
4117
                "The Revision Id provided does not match the latest Revision Id. "
4118
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
4119
                Type="User",
4120
            )
4121

4122
        if statement_id not in layer_version.policy.statements:
1✔
4123
            raise ResourceNotFoundException(
1✔
4124
                f"Statement {statement_id} is not found in resource policy.", Type="User"
4125
            )
4126

4127
        old_statements = layer_version.policy.statements
1✔
4128
        layer_version.policy = dataclasses.replace(
1✔
4129
            layer_version.policy,
4130
            statements={k: v for k, v in old_statements.items() if k != statement_id},
4131
        )
4132

4133
    def get_layer_version_policy(
1✔
4134
        self,
4135
        context: RequestContext,
4136
        layer_name: LayerName,
4137
        version_number: LayerVersionNumber,
4138
        **kwargs,
4139
    ) -> GetLayerVersionPolicyResponse:
4140
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
4141
        # `layer_n` contains the layer name.
4142
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
4143

4144
        layer_version_arn = api_utils.layer_version_arn(
1✔
4145
            layer_name, account_id, region_name, str(version_number)
4146
        )
4147

4148
        store = lambda_stores[account_id][region_name]
1✔
4149
        layer = store.layers.get(layer_n)
1✔
4150

4151
        if layer is None:
1✔
4152
            raise ResourceNotFoundException(
1✔
4153
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4154
            )
4155

4156
        layer_version = layer.layer_versions.get(str(version_number))
1✔
4157
        if layer_version is None:
1✔
4158
            raise ResourceNotFoundException(
1✔
4159
                f"Layer version {layer_version_arn} does not exist.", Type="User"
4160
            )
4161

4162
        if layer_version.policy is None:
1✔
4163
            raise ResourceNotFoundException(
1✔
4164
                "No policy is associated with the given resource.", Type="User"
4165
            )
4166

4167
        return GetLayerVersionPolicyResponse(
1✔
4168
            Policy=json.dumps(
4169
                {
4170
                    "Version": layer_version.policy.version,
4171
                    "Id": layer_version.policy.id,
4172
                    "Statement": [
4173
                        {
4174
                            "Sid": ps.sid,
4175
                            "Effect": "Allow",
4176
                            "Principal": ps.principal,
4177
                            "Action": ps.action,
4178
                            "Resource": layer_version.layer_version_arn,
4179
                        }
4180
                        for ps in layer_version.policy.statements.values()
4181
                    ],
4182
                }
4183
            ),
4184
            RevisionId=layer_version.policy.revision_id,
4185
        )
4186

4187
    # =======================================
4188
    # =======  Function Concurrency  ========
4189
    # =======================================
4190
    # (Reserved) function concurrency is scoped to the whole function
4191

4192
    def get_function_concurrency(
1✔
4193
        self, context: RequestContext, function_name: FunctionName, **kwargs
4194
    ) -> GetFunctionConcurrencyResponse:
4195
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4196
        function_name = api_utils.get_function_name(function_name, context)
1✔
4197
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
4198
        return GetFunctionConcurrencyResponse(
1✔
4199
            ReservedConcurrentExecutions=fn.reserved_concurrent_executions
4200
        )
4201

4202
    def put_function_concurrency(
1✔
4203
        self,
4204
        context: RequestContext,
4205
        function_name: FunctionName,
4206
        reserved_concurrent_executions: ReservedConcurrentExecutions,
4207
        **kwargs,
4208
    ) -> Concurrency:
4209
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4210

4211
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4212
        if qualifier:
1✔
4213
            raise InvalidParameterValueException(
1✔
4214
                "This operation is permitted on Lambda functions only. Aliases and versions do not support this operation. Please specify either a function name or an unqualified function ARN.",
4215
                Type="User",
4216
            )
4217

4218
        store = lambda_stores[account_id][region]
1✔
4219
        fn = store.functions.get(function_name)
1✔
4220
        if not fn:
1✔
4221
            fn_arn = api_utils.qualified_lambda_arn(
1✔
4222
                function_name,
4223
                qualifier="$LATEST",
4224
                account=account_id,
4225
                region=region,
4226
            )
4227
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
4228

4229
        settings = self.get_account_settings(context)
1✔
4230
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
4231
            "UnreservedConcurrentExecutions"
4232
        ]
4233

4234
        # The existing reserved concurrent executions for the same function are already deduced in
4235
        # unreserved_concurrent_executions but must not count because the new one will replace the existing one.
4236
        # Joel tested this behavior manually against AWS (2023-11-28).
4237
        existing_reserved_concurrent_executions = (
1✔
4238
            fn.reserved_concurrent_executions if fn.reserved_concurrent_executions else 0
4239
        )
4240
        if (
1✔
4241
            unreserved_concurrent_executions
4242
            - reserved_concurrent_executions
4243
            + existing_reserved_concurrent_executions
4244
        ) < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY:
4245
            raise InvalidParameterValueException(
1✔
4246
                f"Specified ReservedConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
4247
            )
4248

4249
        total_provisioned_concurrency = sum(
1✔
4250
            [
4251
                provisioned_configs.provisioned_concurrent_executions
4252
                for provisioned_configs in fn.provisioned_concurrency_configs.values()
4253
            ]
4254
        )
4255
        if total_provisioned_concurrency > reserved_concurrent_executions:
1✔
4256
            raise InvalidParameterValueException(
1✔
4257
                f" ReservedConcurrentExecutions  {reserved_concurrent_executions} should not be lower than function's total provisioned concurrency [{total_provisioned_concurrency}]."
4258
            )
4259

4260
        fn.reserved_concurrent_executions = reserved_concurrent_executions
1✔
4261

4262
        return Concurrency(ReservedConcurrentExecutions=fn.reserved_concurrent_executions)
1✔
4263

4264
    def delete_function_concurrency(
1✔
4265
        self, context: RequestContext, function_name: FunctionName, **kwargs
4266
    ) -> None:
4267
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4268
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4269
        store = lambda_stores[account_id][region]
1✔
4270
        fn = store.functions.get(function_name)
1✔
4271
        fn.reserved_concurrent_executions = None
1✔
4272

4273
    # =======================================
4274
    # ===============  TAGS   ===============
4275
    # =======================================
4276
    # only Function, Event Source Mapping, and Code Signing Config (not currently supported by LocalStack) ARNs an are available for tagging in AWS
4277

4278
    def _get_tags(self, resource: TaggableResource) -> dict[str, str]:
1✔
4279
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4280
        lambda_adapted_tags = {
1✔
4281
            tag["Key"]: tag["Value"]
4282
            for tag in state.TAGS.list_tags_for_resource(resource).get("Tags")
4283
        }
4284
        return lambda_adapted_tags
1✔
4285

4286
    def _store_tags(self, resource: TaggableResource, tags: dict[str, str]):
1✔
4287
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4288
        if len(state.TAGS.tags.get(resource, {}) | tags) > LAMBDA_TAG_LIMIT_PER_RESOURCE:
1✔
4289
            raise InvalidParameterValueException(
1✔
4290
                "Number of tags exceeds resource tag limit.", Type="User"
4291
            )
4292

4293
        tag_svc_adapted_tags = [{"Key": key, "Value": value} for key, value in tags.items()]
1✔
4294
        state.TAGS.tag_resource(resource, tag_svc_adapted_tags)
1✔
4295

4296
    def fetch_lambda_store_for_tagging(self, resource: TaggableResource) -> LambdaStore:
1✔
4297
        """
4298
        Takes a resource ARN for a TaggableResource (Lambda Function, Event Source Mapping, Code Signing Config, or Capacity Provider) and returns a corresponding
4299
        LambdaStore for its region and account.
4300

4301
        In addition, this function validates that the ARN is a valid TaggableResource type, and that the TaggableResource exists.
4302

4303
        Raises:
4304
            ValidationException: If the resource ARN is not a full ARN for a TaggableResource.
4305
            ResourceNotFoundException: If the specified resource does not exist.
4306
            InvalidParameterValueException: If the resource ARN is a qualified Lambda Function.
4307
        """
4308

4309
        def _raise_validation_exception():
1✔
4310
            raise ValidationException(
1✔
4311
                f"1 validation error detected: Value '{resource}' at 'resource' failed to satisfy constraint: Member must satisfy regular expression pattern: {api_utils.TAGGABLE_RESOURCE_ARN_PATTERN}"
4312
            )
4313

4314
        # Check whether the ARN we have been passed is correctly formatted
4315
        parsed_resource_arn: ArnData = None
1✔
4316
        try:
1✔
4317
            parsed_resource_arn = parse_arn(resource)
1✔
4318
        except Exception:
1✔
4319
            _raise_validation_exception()
1✔
4320

4321
        # TODO: Should we be checking whether this is a full ARN?
4322
        region, account_id, resource_type = map(
1✔
4323
            parsed_resource_arn.get, ("region", "account", "resource")
4324
        )
4325

4326
        if not all((region, account_id, resource_type)):
1✔
4327
            _raise_validation_exception()
×
4328

4329
        if not (parts := resource_type.split(":")):
1✔
4330
            _raise_validation_exception()
×
4331

4332
        resource_type, resource_identifier, *qualifier = parts
1✔
4333

4334
        # Qualifier validation raises before checking for NotFound
4335
        if qualifier:
1✔
4336
            if resource_type == "function":
1✔
4337
                raise InvalidParameterValueException(
1✔
4338
                    "Tags on function aliases and versions are not supported. Please specify a function ARN.",
4339
                    Type="User",
4340
                )
4341
            _raise_validation_exception()
1✔
4342

4343
        if resource_type == "event-source-mapping":
1✔
4344
            self._get_esm(resource_identifier, account_id, region)
1✔
4345
        elif resource_type == "code-signing-config":
1✔
4346
            raise NotImplementedError("Resource tagging on CSC not yet implemented.")
4347
        elif resource_type == "function":
1✔
4348
            self._get_function(
1✔
4349
                function_name=resource_identifier, account_id=account_id, region=region
4350
            )
4351
        elif resource_type == "capacity-provider":
1✔
4352
            self._get_capacity_provider(resource_identifier, account_id, region)
1✔
4353
        else:
4354
            _raise_validation_exception()
1✔
4355

4356
        # If no exceptions are raised, assume ARN and referenced resource is valid for tag operations
4357
        return lambda_stores[account_id][region]
1✔
4358

4359
    def tag_resource(
1✔
4360
        self, context: RequestContext, resource: TaggableResource, tags: Tags, **kwargs
4361
    ) -> None:
4362
        if not tags:
1✔
4363
            raise InvalidParameterValueException(
1✔
4364
                "An error occurred and the request cannot be processed.", Type="User"
4365
            )
4366
        self._store_tags(resource, tags)
1✔
4367

4368
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4369
            "function"
4370
        ):
4371
            name, _, account, region = function_locators_from_arn(resource)
1✔
4372
            function = self._get_function(name, account, region)
1✔
4373
            with function.lock:
1✔
4374
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4375
                latest_version = function.versions["$LATEST"]
1✔
4376
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4377
                    latest_version, config=dataclasses.replace(latest_version.config)
4378
                )
4379

4380
    def list_tags(
1✔
4381
        self, context: RequestContext, resource: TaggableResource, **kwargs
4382
    ) -> ListTagsResponse:
4383
        tags = self._get_tags(resource)
1✔
4384
        return ListTagsResponse(Tags=tags)
1✔
4385

4386
    def untag_resource(
1✔
4387
        self, context: RequestContext, resource: TaggableResource, tag_keys: TagKeyList, **kwargs
4388
    ) -> None:
4389
        if not tag_keys:
1✔
4390
            raise ValidationException(
1✔
4391
                "1 validation error detected: Value null at 'tagKeys' failed to satisfy constraint: Member must not be null"
4392
            )  # should probably be generalized a bit
4393

4394
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4395
        state.TAGS.untag_resource(resource, tag_keys)
1✔
4396

4397
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4398
            "function"
4399
        ):
4400
            name, _, account, region = function_locators_from_arn(resource)
1✔
4401
            function = self._get_function(name, account, region)
1✔
4402
            # TODO: Potential race condition
4403
            with function.lock:
1✔
4404
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4405
                latest_version = function.versions["$LATEST"]
1✔
4406
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4407
                    latest_version, config=dataclasses.replace(latest_version.config)
4408
                )
4409

4410
    # =======================================
4411
    # =======  LEGACY / DEPRECATED   ========
4412
    # =======================================
4413

4414
    def invoke_async(
1✔
4415
        self,
4416
        context: RequestContext,
4417
        function_name: NamespacedFunctionName,
4418
        invoke_args: IO[BlobStream],
4419
        **kwargs,
4420
    ) -> InvokeAsyncResponse:
4421
        """LEGACY API endpoint. Even AWS heavily discourages its usage."""
4422
        raise NotImplementedError
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc