• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 08a45e28-4998-4845-a88b-f2c425830a31

21 Feb 2025 08:33PM UTC coverage: 86.896% (+0.01%) from 86.883%
08a45e28-4998-4845-a88b-f2c425830a31

push

circleci

web-flow
fix SNS FIFO ordering (#12285)

Co-authored-by: Daniel Fangl <daniel.fangl@localstack.cloud>

70 of 79 new or added lines in 2 files covered. (88.61%)

117 existing lines in 8 files now uncovered.

61670 of 70970 relevant lines covered (86.9%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.92
/localstack-core/localstack/services/lambda_/provider.py
1
import base64
1✔
2
import dataclasses
1✔
3
import datetime
1✔
4
import itertools
1✔
5
import json
1✔
6
import logging
1✔
7
import re
1✔
8
import threading
1✔
9
import time
1✔
10
from typing import IO, Any, Optional, Tuple
1✔
11

12
from localstack import config
1✔
13
from localstack.aws.api import RequestContext, ServiceException, handler
1✔
14
from localstack.aws.api.lambda_ import (
1✔
15
    AccountLimit,
16
    AccountUsage,
17
    AddLayerVersionPermissionResponse,
18
    AddPermissionRequest,
19
    AddPermissionResponse,
20
    Alias,
21
    AliasConfiguration,
22
    AliasRoutingConfiguration,
23
    AllowedPublishers,
24
    Architecture,
25
    Arn,
26
    Blob,
27
    BlobStream,
28
    CodeSigningConfigArn,
29
    CodeSigningConfigNotFoundException,
30
    CodeSigningPolicies,
31
    CompatibleArchitectures,
32
    CompatibleRuntimes,
33
    Concurrency,
34
    Cors,
35
    CreateCodeSigningConfigResponse,
36
    CreateEventSourceMappingRequest,
37
    CreateFunctionRequest,
38
    CreateFunctionUrlConfigResponse,
39
    DeleteCodeSigningConfigResponse,
40
    Description,
41
    DestinationConfig,
42
    EventSourceMappingConfiguration,
43
    FunctionCodeLocation,
44
    FunctionConfiguration,
45
    FunctionEventInvokeConfig,
46
    FunctionName,
47
    FunctionUrlAuthType,
48
    FunctionUrlQualifier,
49
    GetAccountSettingsResponse,
50
    GetCodeSigningConfigResponse,
51
    GetFunctionCodeSigningConfigResponse,
52
    GetFunctionConcurrencyResponse,
53
    GetFunctionRecursionConfigResponse,
54
    GetFunctionResponse,
55
    GetFunctionUrlConfigResponse,
56
    GetLayerVersionPolicyResponse,
57
    GetLayerVersionResponse,
58
    GetPolicyResponse,
59
    GetProvisionedConcurrencyConfigResponse,
60
    InvalidParameterValueException,
61
    InvocationResponse,
62
    InvocationType,
63
    InvokeAsyncResponse,
64
    InvokeMode,
65
    LambdaApi,
66
    LastUpdateStatus,
67
    LayerName,
68
    LayerPermissionAllowedAction,
69
    LayerPermissionAllowedPrincipal,
70
    LayersListItem,
71
    LayerVersionArn,
72
    LayerVersionContentInput,
73
    LayerVersionNumber,
74
    LicenseInfo,
75
    ListAliasesResponse,
76
    ListCodeSigningConfigsResponse,
77
    ListEventSourceMappingsResponse,
78
    ListFunctionEventInvokeConfigsResponse,
79
    ListFunctionsByCodeSigningConfigResponse,
80
    ListFunctionsResponse,
81
    ListFunctionUrlConfigsResponse,
82
    ListLayersResponse,
83
    ListLayerVersionsResponse,
84
    ListProvisionedConcurrencyConfigsResponse,
85
    ListTagsResponse,
86
    ListVersionsByFunctionResponse,
87
    LogFormat,
88
    LoggingConfig,
89
    LogType,
90
    MasterRegion,
91
    MaxFunctionEventInvokeConfigListItems,
92
    MaximumEventAgeInSeconds,
93
    MaximumRetryAttempts,
94
    MaxItems,
95
    MaxLayerListItems,
96
    MaxListItems,
97
    MaxProvisionedConcurrencyConfigListItems,
98
    NamespacedFunctionName,
99
    NamespacedStatementId,
100
    OnFailure,
101
    OnSuccess,
102
    OrganizationId,
103
    PackageType,
104
    PositiveInteger,
105
    PreconditionFailedException,
106
    ProvisionedConcurrencyConfigListItem,
107
    ProvisionedConcurrencyConfigNotFoundException,
108
    ProvisionedConcurrencyStatusEnum,
109
    PublishLayerVersionResponse,
110
    PutFunctionCodeSigningConfigResponse,
111
    PutFunctionRecursionConfigResponse,
112
    PutProvisionedConcurrencyConfigResponse,
113
    Qualifier,
114
    RecursiveLoop,
115
    ReservedConcurrentExecutions,
116
    ResourceConflictException,
117
    ResourceNotFoundException,
118
    Runtime,
119
    RuntimeVersionConfig,
120
    SnapStart,
121
    SnapStartApplyOn,
122
    SnapStartOptimizationStatus,
123
    SnapStartResponse,
124
    State,
125
    StatementId,
126
    StateReasonCode,
127
    String,
128
    TaggableResource,
129
    TagKeyList,
130
    Tags,
131
    TracingMode,
132
    UnqualifiedFunctionName,
133
    UpdateCodeSigningConfigResponse,
134
    UpdateEventSourceMappingRequest,
135
    UpdateFunctionCodeRequest,
136
    UpdateFunctionConfigurationRequest,
137
    UpdateFunctionUrlConfigResponse,
138
    Version,
139
)
140
from localstack.aws.api.lambda_ import FunctionVersion as FunctionVersionApi
1✔
141
from localstack.aws.api.lambda_ import ServiceException as LambdaServiceException
1✔
142
from localstack.aws.connect import connect_to
1✔
143
from localstack.aws.spec import load_service
1✔
144
from localstack.services.edge import ROUTER
1✔
145
from localstack.services.lambda_ import api_utils
1✔
146
from localstack.services.lambda_ import hooks as lambda_hooks
1✔
147
from localstack.services.lambda_.api_utils import (
1✔
148
    ARCHITECTURES,
149
    STATEMENT_ID_REGEX,
150
    SUBNET_ID_REGEX,
151
    function_locators_from_arn,
152
)
153
from localstack.services.lambda_.event_source_mapping.esm_config_factory import (
1✔
154
    EsmConfigFactory,
155
)
156
from localstack.services.lambda_.event_source_mapping.esm_worker import (
1✔
157
    EsmState,
158
    EsmWorker,
159
)
160
from localstack.services.lambda_.event_source_mapping.esm_worker_factory import (
1✔
161
    EsmWorkerFactory,
162
)
163
from localstack.services.lambda_.invocation import AccessDeniedException
1✔
164
from localstack.services.lambda_.invocation.execution_environment import (
1✔
165
    EnvironmentStartupTimeoutException,
166
)
167
from localstack.services.lambda_.invocation.lambda_models import (
1✔
168
    AliasRoutingConfig,
169
    CodeSigningConfig,
170
    EventInvokeConfig,
171
    Function,
172
    FunctionResourcePolicy,
173
    FunctionUrlConfig,
174
    FunctionVersion,
175
    ImageConfig,
176
    LambdaEphemeralStorage,
177
    Layer,
178
    LayerPolicy,
179
    LayerPolicyStatement,
180
    LayerVersion,
181
    ProvisionedConcurrencyConfiguration,
182
    RequestEntityTooLargeException,
183
    ResourcePolicy,
184
    UpdateStatus,
185
    ValidationException,
186
    VersionAlias,
187
    VersionFunctionConfiguration,
188
    VersionIdentifier,
189
    VersionState,
190
    VpcConfig,
191
)
192
from localstack.services.lambda_.invocation.lambda_service import (
1✔
193
    LambdaService,
194
    create_image_code,
195
    destroy_code_if_not_used,
196
    lambda_stores,
197
    store_lambda_archive,
198
    store_s3_bucket_archive,
199
)
200
from localstack.services.lambda_.invocation.models import LambdaStore
1✔
201
from localstack.services.lambda_.invocation.runtime_executor import get_runtime_executor
1✔
202
from localstack.services.lambda_.lambda_utils import HINT_LOG
1✔
203
from localstack.services.lambda_.layerfetcher.layer_fetcher import LayerFetcher
1✔
204
from localstack.services.lambda_.provider_utils import (
1✔
205
    LambdaLayerVersionIdentifier,
206
    get_function_version,
207
    get_function_version_from_arn,
208
)
209
from localstack.services.lambda_.runtimes import (
1✔
210
    ALL_RUNTIMES,
211
    DEPRECATED_RUNTIMES,
212
    DEPRECATED_RUNTIMES_UPGRADES,
213
    RUNTIMES_AGGREGATED,
214
    VALID_RUNTIMES,
215
)
216
from localstack.services.lambda_.urlrouter import FunctionUrlRouter
1✔
217
from localstack.services.plugins import ServiceLifecycleHook
1✔
218
from localstack.state import StateVisitor
1✔
219
from localstack.utils.aws.arns import (
1✔
220
    ArnData,
221
    extract_resource_from_arn,
222
    extract_service_from_arn,
223
    get_partition,
224
    lambda_event_source_mapping_arn,
225
    parse_arn,
226
)
227
from localstack.utils.bootstrap import is_api_enabled
1✔
228
from localstack.utils.collections import PaginatedList
1✔
229
from localstack.utils.event_matcher import validate_event_pattern
1✔
230
from localstack.utils.lambda_debug_mode.lambda_debug_mode_session import LambdaDebugModeSession
1✔
231
from localstack.utils.strings import get_random_hex, short_uid, to_bytes, to_str
1✔
232
from localstack.utils.sync import poll_condition
1✔
233
from localstack.utils.urls import localstack_host
1✔
234

235
LOG = logging.getLogger(__name__)
1✔
236

237
LAMBDA_DEFAULT_TIMEOUT = 3
1✔
238
LAMBDA_DEFAULT_MEMORY_SIZE = 128
1✔
239

240
LAMBDA_TAG_LIMIT_PER_RESOURCE = 50
1✔
241
LAMBDA_LAYERS_LIMIT_PER_FUNCTION = 5
1✔
242

243
TAG_KEY_CUSTOM_URL = "_custom_id_"
1✔
244
# Requirements (from RFC3986 & co): not longer than 63, first char must be
245
# alpha, then alphanumeric or hyphen, except cannot start or end with hyphen
246
TAG_KEY_CUSTOM_URL_VALIDATOR = re.compile(r"^[A-Za-z]([A-Za-z0-9\-]{0,61}[A-Za-z0-9])?$")
1✔
247

248

249
class LambdaProvider(LambdaApi, ServiceLifecycleHook):
1✔
250
    lambda_service: LambdaService
1✔
251
    create_fn_lock: threading.RLock
1✔
252
    create_layer_lock: threading.RLock
1✔
253
    router: FunctionUrlRouter
1✔
254
    esm_workers: dict[str, EsmWorker]
1✔
255
    layer_fetcher: LayerFetcher | None
1✔
256

257
    def __init__(self) -> None:
1✔
258
        self.lambda_service = LambdaService()
1✔
259
        self.create_fn_lock = threading.RLock()
1✔
260
        self.create_layer_lock = threading.RLock()
1✔
261
        self.router = FunctionUrlRouter(ROUTER, self.lambda_service)
1✔
262
        self.esm_workers = {}
1✔
263
        self.layer_fetcher = None
1✔
264
        lambda_hooks.inject_layer_fetcher.run(self)
1✔
265

266
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
UNCOV
267
        visitor.visit(lambda_stores)
×
268

269
    def on_before_start(self):
1✔
270
        # Attempt to start the Lambda Debug Mode session object.
271
        try:
1✔
272
            lambda_debug_mode_session = LambdaDebugModeSession.get()
1✔
273
            lambda_debug_mode_session.ensure_running()
1✔
274
        except Exception as ex:
×
UNCOV
275
            LOG.error(
×
276
                "Unexpected error encountered when attempting to initialise Lambda Debug Mode '%s'.",
277
                ex,
278
            )
279

280
    def on_before_state_reset(self):
1✔
UNCOV
281
        self.lambda_service.stop()
×
282

283
    def on_after_state_reset(self):
1✔
UNCOV
284
        self.router.lambda_service = self.lambda_service = LambdaService()
×
285

286
    def on_before_state_load(self):
1✔
UNCOV
287
        self.lambda_service.stop()
×
288

289
    def on_after_state_load(self):
1✔
290
        self.lambda_service = LambdaService()
×
UNCOV
291
        self.router.lambda_service = self.lambda_service
×
292

293
        for account_id, account_bundle in lambda_stores.items():
×
294
            for region_name, state in account_bundle.items():
×
295
                for fn in state.functions.values():
×
UNCOV
296
                    for fn_version in fn.versions.values():
×
297
                        # restore the "Pending" state for every function version and start it
298
                        try:
×
UNCOV
299
                            new_state = VersionState(
×
300
                                state=State.Pending,
301
                                code=StateReasonCode.Creating,
302
                                reason="The function is being created.",
303
                            )
304
                            new_config = dataclasses.replace(fn_version.config, state=new_state)
×
305
                            new_version = dataclasses.replace(fn_version, config=new_config)
×
306
                            fn.versions[fn_version.id.qualifier] = new_version
×
UNCOV
307
                            self.lambda_service.create_function_version(fn_version).result(
×
308
                                timeout=5
309
                            )
310
                        except Exception:
×
UNCOV
311
                            LOG.warning(
×
312
                                "Failed to restore function version %s",
313
                                fn_version.id.qualified_arn(),
314
                                exc_info=True,
315
                            )
316
                    # restore provisioned concurrency per function considering both versions and aliases
UNCOV
317
                    for (
×
318
                        provisioned_qualifier,
319
                        provisioned_config,
320
                    ) in fn.provisioned_concurrency_configs.items():
321
                        fn_arn = None
×
322
                        try:
×
323
                            if api_utils.qualifier_is_alias(provisioned_qualifier):
×
324
                                alias = fn.aliases.get(provisioned_qualifier)
×
325
                                resolved_version = fn.versions.get(alias.function_version)
×
326
                                fn_arn = resolved_version.id.qualified_arn()
×
327
                            elif api_utils.qualifier_is_version(provisioned_qualifier):
×
328
                                fn_version = fn.versions.get(provisioned_qualifier)
×
UNCOV
329
                                fn_arn = fn_version.id.qualified_arn()
×
330
                            else:
UNCOV
331
                                raise InvalidParameterValueException(
×
332
                                    "Invalid qualifier type:"
333
                                    " Qualifier can only be an alias or a version for provisioned concurrency."
334
                                )
335

336
                            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
UNCOV
337
                            manager.update_provisioned_concurrency_config(
×
338
                                provisioned_config.provisioned_concurrent_executions
339
                            )
340
                        except Exception:
×
UNCOV
341
                            LOG.warning(
×
342
                                "Failed to restore provisioned concurrency %s for function %s",
343
                                provisioned_config,
344
                                fn_arn,
345
                                exc_info=True,
346
                            )
347

UNCOV
348
                for esm in state.event_source_mappings.values():
×
349
                    # Restores event source workers
UNCOV
350
                    function_arn = esm.get("FunctionArn")
×
351

352
                    # TODO: How do we know the event source is up?
353
                    # A basic poll to see if the mapped Lambda function is active/failed
UNCOV
354
                    if not poll_condition(
×
355
                        lambda: get_function_version_from_arn(function_arn).config.state.state
356
                        in [State.Active, State.Failed],
357
                        timeout=10,
358
                    ):
UNCOV
359
                        LOG.warning(
×
360
                            "Creating ESM for Lambda that is not in running state: %s",
361
                            function_arn,
362
                        )
363

364
                    function_version = get_function_version_from_arn(function_arn)
×
UNCOV
365
                    function_role = function_version.config.role
×
366

UNCOV
367
                    is_esm_enabled = esm.get("State", EsmState.DISABLED) not in (
×
368
                        EsmState.DISABLED,
369
                        EsmState.DISABLING,
370
                    )
UNCOV
371
                    esm_worker = EsmWorkerFactory(
×
372
                        esm, function_role, is_esm_enabled
373
                    ).get_esm_worker()
374

375
                    # Note: a worker is created in the DISABLED state if not enabled
UNCOV
376
                    esm_worker.create()
×
377
                    # TODO: assigning the esm_worker to the dict only works after .create(). Could it cause a race
378
                    #  condition if we get a shutdown here and have a worker thread spawned but not accounted for?
UNCOV
379
                    self.esm_workers[esm_worker.uuid] = esm_worker
×
380

381
    def on_after_init(self):
1✔
382
        self.router.register_routes()
1✔
383
        get_runtime_executor().validate_environment()
1✔
384

385
    def on_before_stop(self) -> None:
1✔
386
        for esm_worker in self.esm_workers.values():
1✔
387
            esm_worker.stop_for_shutdown()
1✔
388

389
        # TODO: should probably unregister routes?
390
        self.lambda_service.stop()
1✔
391
        # Attempt to signal to the Lambda Debug Mode session object to stop.
392
        try:
1✔
393
            lambda_debug_mode_session = LambdaDebugModeSession.get()
1✔
394
            lambda_debug_mode_session.signal_stop()
1✔
395
        except Exception as ex:
×
UNCOV
396
            LOG.error(
×
397
                "Unexpected error encountered when attempting to signal Lambda Debug Mode to stop '%s'.",
398
                ex,
399
            )
400

401
    @staticmethod
1✔
402
    def _get_function(function_name: str, account_id: str, region: str) -> Function:
1✔
403
        state = lambda_stores[account_id][region]
1✔
404
        function = state.functions.get(function_name)
1✔
405
        if not function:
1✔
406
            arn = api_utils.unqualified_lambda_arn(
1✔
407
                function_name=function_name,
408
                account=account_id,
409
                region=region,
410
            )
411
            raise ResourceNotFoundException(
1✔
412
                f"Function not found: {arn}",
413
                Type="User",
414
            )
415
        return function
1✔
416

417
    @staticmethod
1✔
418
    def _get_esm(uuid: str, account_id: str, region: str) -> EventSourceMappingConfiguration:
1✔
419
        state = lambda_stores[account_id][region]
1✔
420
        esm = state.event_source_mappings.get(uuid)
1✔
421
        if not esm:
1✔
422
            arn = lambda_event_source_mapping_arn(uuid, account_id, region)
1✔
423
            raise ResourceNotFoundException(
1✔
424
                f"Event source mapping not found: {arn}",
425
                Type="User",
426
            )
427
        return esm
1✔
428

429
    @staticmethod
1✔
430
    def _validate_qualifier_expression(qualifier: str) -> None:
1✔
431
        if error_messages := api_utils.validate_qualifier(qualifier):
1✔
UNCOV
432
            raise ValidationException(
×
433
                message=api_utils.construct_validation_exception_message(error_messages)
434
            )
435

436
    @staticmethod
1✔
437
    def _resolve_fn_qualifier(resolved_fn: Function, qualifier: str | None) -> tuple[str, str]:
1✔
438
        """Attempts to resolve a given qualifier and returns a qualifier that exists or
439
        raises an appropriate ResourceNotFoundException.
440

441
        :param resolved_fn: The resolved lambda function
442
        :param qualifier: The qualifier to be resolved or None
443
        :return: Tuple of (resolved qualifier, function arn either qualified or unqualified)"""
444
        function_name = resolved_fn.function_name
1✔
445
        # assuming function versions need to live in the same account and region
446
        account_id = resolved_fn.latest().id.account
1✔
447
        region = resolved_fn.latest().id.region
1✔
448
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
449
        if qualifier is not None:
1✔
450
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
451
            if api_utils.qualifier_is_alias(qualifier):
1✔
452
                if qualifier not in resolved_fn.aliases:
1✔
453
                    raise ResourceNotFoundException(f"Cannot find alias arn: {fn_arn}", Type="User")
1✔
454
            elif api_utils.qualifier_is_version(qualifier) or qualifier == "$LATEST":
1✔
455
                if qualifier not in resolved_fn.versions:
1✔
456
                    raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
457
            else:
458
                # matches qualifier pattern but invalid alias or version
459
                raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
460
        resolved_qualifier = qualifier or "$LATEST"
1✔
461
        return resolved_qualifier, fn_arn
1✔
462

463
    @staticmethod
1✔
464
    def _function_revision_id(resolved_fn: Function, resolved_qualifier: str) -> str:
1✔
465
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
466
            return resolved_fn.aliases[resolved_qualifier].revision_id
1✔
467
        # Assumes that a non-alias is a version
468
        else:
469
            return resolved_fn.versions[resolved_qualifier].config.revision_id
1✔
470

471
    def _resolve_vpc_id(self, account_id: str, region_name: str, subnet_id: str) -> str:
1✔
472
        ec2_client = connect_to(aws_access_key_id=account_id, region_name=region_name).ec2
1✔
473
        try:
1✔
474
            return ec2_client.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["VpcId"]
1✔
475
        except ec2_client.exceptions.ClientError as e:
1✔
476
            code = e.response["Error"]["Code"]
1✔
477
            message = e.response["Error"]["Message"]
1✔
478
            raise InvalidParameterValueException(
1✔
479
                f"Error occurred while DescribeSubnets. EC2 Error Code: {code}. EC2 Error Message: {message}",
480
                Type="User",
481
            )
482

483
    def _build_vpc_config(
1✔
484
        self,
485
        account_id: str,
486
        region_name: str,
487
        vpc_config: Optional[dict] = None,
488
    ) -> VpcConfig | None:
489
        if not vpc_config or not is_api_enabled("ec2"):
1✔
490
            return None
1✔
491

492
        subnet_ids = vpc_config.get("SubnetIds", [])
1✔
493
        if subnet_ids is not None and len(subnet_ids) == 0:
1✔
494
            return VpcConfig(vpc_id="", security_group_ids=[], subnet_ids=[])
1✔
495

496
        subnet_id = subnet_ids[0]
1✔
497
        if not bool(SUBNET_ID_REGEX.match(subnet_id)):
1✔
498
            raise ValidationException(
1✔
499
                f"1 validation error detected: Value '[{subnet_id}]' at 'vpcConfig.subnetIds' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 0, Member must satisfy regular expression pattern: ^subnet-[0-9a-z]*$]"
500
            )
501

502
        return VpcConfig(
1✔
503
            vpc_id=self._resolve_vpc_id(account_id, region_name, subnet_id),
504
            security_group_ids=vpc_config.get("SecurityGroupIds", []),
505
            subnet_ids=subnet_ids,
506
        )
507

508
    def _create_version_model(
1✔
509
        self,
510
        function_name: str,
511
        region: str,
512
        account_id: str,
513
        description: str | None = None,
514
        revision_id: str | None = None,
515
        code_sha256: str | None = None,
516
    ) -> tuple[FunctionVersion, bool]:
517
        """
518
        Release a new version to the model if all restrictions are met.
519
        Restrictions:
520
          - CodeSha256, if provided, must equal the current latest version code hash
521
          - RevisionId, if provided, must equal the current latest version revision id
522
          - Some changes have been done to the latest version since last publish
523
        Will return a tuple of the version, and whether the version was published (True) or the latest available version was taken (False).
524
        This can happen if the latest version has not been changed since the last version publish, in this case the last version will be returned.
525

526
        :param function_name: Function name to be published
527
        :param region: Region of the function
528
        :param account_id: Account of the function
529
        :param description: new description of the version (will be the description of the function if missing)
530
        :param revision_id: Revision id, function will raise error if it does not match latest revision id
531
        :param code_sha256: Code sha256, function will raise error if it does not match latest code hash
532
        :return: Tuple of (published version, whether version was released or last released version returned, since nothing changed)
533
        """
534
        current_latest_version = get_function_version(
1✔
535
            function_name=function_name, qualifier="$LATEST", account_id=account_id, region=region
536
        )
537
        if revision_id and current_latest_version.config.revision_id != revision_id:
1✔
538
            raise PreconditionFailedException(
1✔
539
                "The Revision Id provided does not match the latest Revision Id. Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
540
                Type="User",
541
            )
542

543
        # check if code hashes match if they are specified
544
        current_hash = (
1✔
545
            current_latest_version.config.code.code_sha256
546
            if current_latest_version.config.package_type == PackageType.Zip
547
            else current_latest_version.config.image.code_sha256
548
        )
549
        # if the code is a zip package and hot reloaded (hot reloading is currently only supported for zip packagetypes)
550
        # we cannot enforce the codesha256 check
551
        is_hot_reloaded_zip_package = (
1✔
552
            current_latest_version.config.package_type == PackageType.Zip
553
            and current_latest_version.config.code.is_hot_reloading()
554
        )
555
        if code_sha256 and current_hash != code_sha256 and not is_hot_reloaded_zip_package:
1✔
556
            raise InvalidParameterValueException(
1✔
557
                f"CodeSHA256 ({code_sha256}) is different from current CodeSHA256 in $LATEST ({current_hash}). Please try again with the CodeSHA256 in $LATEST.",
558
                Type="User",
559
            )
560

561
        state = lambda_stores[account_id][region]
1✔
562
        function = state.functions.get(function_name)
1✔
563
        changes = {}
1✔
564
        if description is not None:
1✔
565
            changes["description"] = description
1✔
566
        # TODO copy environment instead of restarting one, get rid of all the "Pending"s
567

568
        with function.lock:
1✔
569
            if function.next_version > 1 and (
1✔
570
                prev_version := function.versions.get(str(function.next_version - 1))
571
            ):
572
                if (
1✔
573
                    prev_version.config.internal_revision
574
                    == current_latest_version.config.internal_revision
575
                ):
576
                    return prev_version, False
1✔
577
            # TODO check if there was a change since last version
578
            next_version = str(function.next_version)
1✔
579
            function.next_version += 1
1✔
580
            new_id = VersionIdentifier(
1✔
581
                function_name=function_name,
582
                qualifier=next_version,
583
                region=region,
584
                account=account_id,
585
            )
586
            apply_on = current_latest_version.config.snap_start["ApplyOn"]
1✔
587
            optimization_status = SnapStartOptimizationStatus.Off
1✔
588
            if apply_on == SnapStartApplyOn.PublishedVersions:
1✔
589
                optimization_status = SnapStartOptimizationStatus.On
1✔
590
            snap_start = SnapStartResponse(
1✔
591
                ApplyOn=apply_on,
592
                OptimizationStatus=optimization_status,
593
            )
594
            new_version = dataclasses.replace(
1✔
595
                current_latest_version,
596
                config=dataclasses.replace(
597
                    current_latest_version.config,
598
                    last_update=None,  # versions never have a last update status
599
                    state=VersionState(
600
                        state=State.Pending,
601
                        code=StateReasonCode.Creating,
602
                        reason="The function is being created.",
603
                    ),
604
                    snap_start=snap_start,
605
                    **changes,
606
                ),
607
                id=new_id,
608
            )
609
            function.versions[next_version] = new_version
1✔
610
        return new_version, True
1✔
611

612
    def _publish_version_from_existing_version(
1✔
613
        self,
614
        function_name: str,
615
        region: str,
616
        account_id: str,
617
        description: str | None = None,
618
        revision_id: str | None = None,
619
        code_sha256: str | None = None,
620
    ) -> FunctionVersion:
621
        """
622
        Publish version from an existing, already initialized LATEST
623

624
        :param function_name: Function name
625
        :param region: region
626
        :param account_id: account id
627
        :param description: description
628
        :param revision_id: revision id (check if current version matches)
629
        :param code_sha256: code sha (check if current code matches)
630
        :return: new version
631
        """
632
        new_version, changed = self._create_version_model(
1✔
633
            function_name=function_name,
634
            region=region,
635
            account_id=account_id,
636
            description=description,
637
            revision_id=revision_id,
638
            code_sha256=code_sha256,
639
        )
640
        if not changed:
1✔
641
            return new_version
1✔
642
        self.lambda_service.publish_version(new_version)
1✔
643
        state = lambda_stores[account_id][region]
1✔
644
        function = state.functions.get(function_name)
1✔
645
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
646
        latest_version = function.versions["$LATEST"]
1✔
647
        function.versions["$LATEST"] = dataclasses.replace(
1✔
648
            latest_version, config=dataclasses.replace(latest_version.config)
649
        )
650
        return function.versions.get(new_version.id.qualifier)
1✔
651

652
    def _publish_version_with_changes(
1✔
653
        self,
654
        function_name: str,
655
        region: str,
656
        account_id: str,
657
        description: str | None = None,
658
        revision_id: str | None = None,
659
        code_sha256: str | None = None,
660
    ) -> FunctionVersion:
661
        """
662
        Publish version together with a new latest version (publish on create / update)
663

664
        :param function_name: Function name
665
        :param region: region
666
        :param account_id: account id
667
        :param description: description
668
        :param revision_id: revision id (check if current version matches)
669
        :param code_sha256: code sha (check if current code matches)
670
        :return: new version
671
        """
672
        new_version, changed = self._create_version_model(
1✔
673
            function_name=function_name,
674
            region=region,
675
            account_id=account_id,
676
            description=description,
677
            revision_id=revision_id,
678
            code_sha256=code_sha256,
679
        )
680
        if not changed:
1✔
UNCOV
681
            return new_version
×
682
        self.lambda_service.create_function_version(new_version)
1✔
683
        return new_version
1✔
684

685
    @staticmethod
1✔
686
    def _verify_env_variables(env_vars: dict[str, str]):
1✔
687
        dumped_env_vars = json.dumps(env_vars, separators=(",", ":"))
1✔
688
        if (
1✔
689
            len(dumped_env_vars.encode("utf-8"))
690
            > config.LAMBDA_LIMITS_MAX_FUNCTION_ENVVAR_SIZE_BYTES
691
        ):
692
            raise InvalidParameterValueException(
1✔
693
                f"Lambda was unable to configure your environment variables because the environment variables you have provided exceeded the 4KB limit. String measured: {dumped_env_vars}",
694
                Type="User",
695
            )
696

697
    @staticmethod
1✔
698
    def _validate_snapstart(snap_start: SnapStart, runtime: Runtime):
1✔
699
        apply_on = snap_start.get("ApplyOn")
1✔
700
        if apply_on not in [
1✔
701
            SnapStartApplyOn.PublishedVersions,
702
            SnapStartApplyOn.None_,
703
        ]:
704
            raise ValidationException(
1✔
705
                f"1 validation error detected: Value '{apply_on}' at 'snapStart.applyOn' failed to satisfy constraint: Member must satisfy enum value set: [PublishedVersions, None]"
706
            )
707

708
    def _validate_layers(self, new_layers: list[str], region: str, account_id: str):
1✔
709
        if len(new_layers) > LAMBDA_LAYERS_LIMIT_PER_FUNCTION:
1✔
710
            raise InvalidParameterValueException(
1✔
711
                "Cannot reference more than 5 layers.", Type="User"
712
            )
713

714
        visited_layers = dict()
1✔
715
        for layer_version_arn in new_layers:
1✔
716
            (
1✔
717
                layer_region,
718
                layer_account_id,
719
                layer_name,
720
                layer_version_str,
721
            ) = api_utils.parse_layer_arn(layer_version_arn)
722
            if layer_version_str is None:
1✔
723
                raise ValidationException(
1✔
724
                    f"1 validation error detected: Value '[{layer_version_arn}]'"
725
                    + r" at 'layers' failed to satisfy constraint: Member must satisfy constraint: [Member must have length less than or equal to 140, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: (arn:[a-zA-Z0-9-]+:lambda:[a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\d{1}:\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+), Member must not be null]",
726
                )
727

728
            state = lambda_stores[layer_account_id][layer_region]
1✔
729
            layer = state.layers.get(layer_name)
1✔
730
            layer_version = None
1✔
731
            if layer is not None:
1✔
732
                layer_version = layer.layer_versions.get(layer_version_str)
1✔
733
            if layer_account_id == account_id:
1✔
734
                if region and layer_region != region:
1✔
735
                    raise InvalidParameterValueException(
1✔
736
                        f"Layers are not in the same region as the function. "
737
                        f"Layers are expected to be in region {region}.",
738
                        Type="User",
739
                    )
740
                if layer is None or layer.layer_versions.get(layer_version_str) is None:
1✔
741
                    raise InvalidParameterValueException(
1✔
742
                        f"Layer version {layer_version_arn} does not exist.", Type="User"
743
                    )
744
            else:  # External layer from other account
745
                # TODO: validate IAM layer policy here, allowing access by default for now and only checking region
746
                if region and layer_region != region:
×
747
                    # TODO: detect user or role from context when IAM users are implemented
UNCOV
748
                    user = "user/localstack-testing"
×
749
                    raise AccessDeniedException(
×
750
                        f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
751
                    )
UNCOV
752
                if layer is None or layer_version is None:
×
753
                    # Limitation: cannot fetch external layers when using the same account id as the target layer
754
                    # because we do not want to trigger the layer fetcher for every non-existing layer.
755
                    if self.layer_fetcher is None:
×
756
                        raise NotImplementedError(
757
                            "Fetching shared layers from AWS is a pro feature."
758
                        )
759

760
                    layer = self.layer_fetcher.fetch_layer(layer_version_arn)
×
UNCOV
761
                    if layer is None:
×
762
                        # TODO: detect user or role from context when IAM users are implemented
UNCOV
763
                        user = "user/localstack-testing"
×
UNCOV
764
                        raise AccessDeniedException(
×
765
                            f"User: arn:{get_partition(region)}:iam::{account_id}:{user} is not authorized to perform: lambda:GetLayerVersion on resource: {layer_version_arn} because no resource-based policy allows the lambda:GetLayerVersion action"
766
                        )
767

768
                    # Distinguish between new layer and new layer version
UNCOV
769
                    if layer_version is None:
×
770
                        # Create whole layer from scratch
UNCOV
771
                        state.layers[layer_name] = layer
×
772
                    else:
773
                        # Create layer version if another version of the same layer already exists
UNCOV
774
                        state.layers[layer_name].layer_versions[layer_version_str] = (
×
775
                            layer.layer_versions.get(layer_version_str)
776
                        )
777

778
            # only the first two matches in the array are considered for the error message
779
            layer_arn = ":".join(layer_version_arn.split(":")[:-1])
1✔
780
            if layer_arn in visited_layers:
1✔
781
                conflict_layer_version_arn = visited_layers[layer_arn]
1✔
782
                raise InvalidParameterValueException(
1✔
783
                    f"Two different versions of the same layer are not allowed to be referenced in the same function. {conflict_layer_version_arn} and {layer_version_arn} are versions of the same layer.",
784
                    Type="User",
785
                )
786
            visited_layers[layer_arn] = layer_version_arn
1✔
787

788
    @staticmethod
1✔
789
    def map_layers(new_layers: list[str]) -> list[LayerVersion]:
1✔
790
        layers = []
1✔
791
        for layer_version_arn in new_layers:
1✔
792
            region_name, account_id, layer_name, layer_version = api_utils.parse_layer_arn(
1✔
793
                layer_version_arn
794
            )
795
            layer = lambda_stores[account_id][region_name].layers.get(layer_name)
1✔
796
            layer_version = layer.layer_versions.get(layer_version)
1✔
797
            layers.append(layer_version)
1✔
798
        return layers
1✔
799

800
    def get_function_recursion_config(
1✔
801
        self,
802
        context: RequestContext,
803
        function_name: UnqualifiedFunctionName,
804
        **kwargs,
805
    ) -> GetFunctionRecursionConfigResponse:
806
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
807
        function_name = api_utils.get_function_name(function_name, context)
1✔
808
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
809
        return GetFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
810

811
    def put_function_recursion_config(
1✔
812
        self,
813
        context: RequestContext,
814
        function_name: UnqualifiedFunctionName,
815
        recursive_loop: RecursiveLoop,
816
        **kwargs,
817
    ) -> PutFunctionRecursionConfigResponse:
818
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
819
        function_name = api_utils.get_function_name(function_name, context)
1✔
820

821
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
822

823
        allowed_values = list(RecursiveLoop.__members__.values())
1✔
824
        if recursive_loop not in allowed_values:
1✔
825
            raise ValidationException(
1✔
826
                f"1 validation error detected: Value '{recursive_loop}' at 'recursiveLoop' failed to satisfy constraint: "
827
                f"Member must satisfy enum value set: [Terminate, Allow]"
828
            )
829

830
        fn.recursive_loop = recursive_loop
1✔
831
        return PutFunctionRecursionConfigResponse(RecursiveLoop=fn.recursive_loop)
1✔
832

833
    @handler(operation="CreateFunction", expand=False)
1✔
834
    def create_function(
1✔
835
        self,
836
        context: RequestContext,
837
        request: CreateFunctionRequest,
838
    ) -> FunctionConfiguration:
839
        context_region = context.region
1✔
840
        context_account_id = context.account_id
1✔
841

842
        zip_file = request.get("Code", {}).get("ZipFile")
1✔
843
        if zip_file and len(zip_file) > config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED:
1✔
844
            raise RequestEntityTooLargeException(
1✔
845
                f"Zipped size must be smaller than {config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED} bytes"
846
            )
847

848
        if context.request.content_length > config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE:
1✔
849
            raise RequestEntityTooLargeException(
1✔
850
                f"Request must be smaller than {config.LAMBDA_LIMITS_CREATE_FUNCTION_REQUEST_SIZE} bytes for the CreateFunction operation"
851
            )
852

853
        if architectures := request.get("Architectures"):
1✔
854
            if len(architectures) != 1:
1✔
855
                raise ValidationException(
1✔
856
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
857
                    f"satisfy constraint: Member must have length less than or equal to 1",
858
                )
859
            if architectures[0] not in ARCHITECTURES:
1✔
860
                raise ValidationException(
1✔
861
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
862
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
863
                    f"[x86_64, arm64], Member must not be null]",
864
                )
865

866
        if env_vars := request.get("Environment", {}).get("Variables"):
1✔
867
            self._verify_env_variables(env_vars)
1✔
868

869
        if layers := request.get("Layers", []):
1✔
870
            self._validate_layers(layers, region=context_region, account_id=context_account_id)
1✔
871

872
        if not api_utils.is_role_arn(request.get("Role")):
1✔
873
            raise ValidationException(
1✔
874
                f"1 validation error detected: Value '{request.get('Role')}'"
875
                + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
876
            )
877
        if not self.lambda_service.can_assume_role(request.get("Role"), context.region):
1✔
UNCOV
878
            raise InvalidParameterValueException(
×
879
                "The role defined for the function cannot be assumed by Lambda.", Type="User"
880
            )
881
        package_type = request.get("PackageType", PackageType.Zip)
1✔
882
        runtime = request.get("Runtime")
1✔
883
        self._validate_runtime(package_type, runtime)
1✔
884

885
        request_function_name = request.get("FunctionName")
1✔
886

887
        function_name, *_ = api_utils.get_name_and_qualifier(
1✔
888
            function_arn_or_name=request_function_name,
889
            qualifier=None,
890
            context=context,
891
        )
892

893
        if runtime in DEPRECATED_RUNTIMES:
1✔
894
            LOG.warning(
1✔
895
                "The Lambda runtime %s} is deprecated. "
896
                "Please upgrade the runtime for the function %s: "
897
                "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
898
                runtime,
899
                function_name,
900
            )
901
        if snap_start := request.get("SnapStart"):
1✔
902
            self._validate_snapstart(snap_start, runtime)
1✔
903
        state = lambda_stores[context_account_id][context_region]
1✔
904

905
        with self.create_fn_lock:
1✔
906
            if function_name in state.functions:
1✔
UNCOV
907
                raise ResourceConflictException(f"Function already exist: {function_name}")
×
908
            fn = Function(function_name=function_name)
1✔
909
            arn = VersionIdentifier(
1✔
910
                function_name=function_name,
911
                qualifier="$LATEST",
912
                region=context_region,
913
                account=context_account_id,
914
            )
915
            # save function code to s3
916
            code = None
1✔
917
            image = None
1✔
918
            image_config = None
1✔
919
            runtime_version_config = RuntimeVersionConfig(
1✔
920
                # Limitation: the runtime id (presumably sha256 of image) is currently hardcoded
921
                # Potential implementation: provide (cached) sha256 hash of used Docker image
922
                RuntimeVersionArn=f"arn:{context.partition}:lambda:{context_region}::runtime:8eeff65f6809a3ce81507fe733fe09b835899b99481ba22fd75b5a7338290ec1"
923
            )
924
            request_code = request.get("Code")
1✔
925
            if package_type == PackageType.Zip:
1✔
926
                # TODO verify if correct combination of code is set
927
                if zip_file := request_code.get("ZipFile"):
1✔
928
                    code = store_lambda_archive(
1✔
929
                        archive_file=zip_file,
930
                        function_name=function_name,
931
                        region_name=context_region,
932
                        account_id=context_account_id,
933
                    )
934
                elif s3_bucket := request_code.get("S3Bucket"):
1✔
935
                    s3_key = request_code["S3Key"]
1✔
936
                    s3_object_version = request_code.get("S3ObjectVersion")
1✔
937
                    code = store_s3_bucket_archive(
1✔
938
                        archive_bucket=s3_bucket,
939
                        archive_key=s3_key,
940
                        archive_version=s3_object_version,
941
                        function_name=function_name,
942
                        region_name=context_region,
943
                        account_id=context_account_id,
944
                    )
945
                else:
UNCOV
946
                    raise LambdaServiceException("Gotta have s3 bucket or zip file")
×
947
            elif package_type == PackageType.Image:
1✔
948
                image = request_code.get("ImageUri")
1✔
949
                if not image:
1✔
UNCOV
950
                    raise LambdaServiceException("Gotta have an image when package type is image")
×
951
                image = create_image_code(image_uri=image)
1✔
952

953
                image_config_req = request.get("ImageConfig", {})
1✔
954
                image_config = ImageConfig(
1✔
955
                    command=image_config_req.get("Command"),
956
                    entrypoint=image_config_req.get("EntryPoint"),
957
                    working_directory=image_config_req.get("WorkingDirectory"),
958
                )
959
                # Runtime management controls are not available when providing a custom image
960
                runtime_version_config = None
1✔
961
            if "LoggingConfig" in request:
1✔
962
                logging_config = request["LoggingConfig"]
1✔
963
                LOG.warning(
1✔
964
                    "Advanced Lambda Logging Configuration is currently mocked "
965
                    "and will not impact the logging behavior. "
966
                    "Please create a feature request if needed."
967
                )
968

969
                # when switching to JSON, app and system level log is auto set to INFO
970
                if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
971
                    logging_config = {
1✔
972
                        "ApplicationLogLevel": "INFO",
973
                        "SystemLogLevel": "INFO",
974
                        "LogGroup": f"/aws/lambda/{function_name}",
975
                    } | logging_config
976
                else:
UNCOV
977
                    logging_config = (
×
978
                        LoggingConfig(
979
                            LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
980
                        )
981
                        | logging_config
982
                    )
983

984
            else:
985
                logging_config = LoggingConfig(
1✔
986
                    LogFormat=LogFormat.Text, LogGroup=f"/aws/lambda/{function_name}"
987
                )
988

989
            version = FunctionVersion(
1✔
990
                id=arn,
991
                config=VersionFunctionConfiguration(
992
                    last_modified=api_utils.format_lambda_date(datetime.datetime.now()),
993
                    description=request.get("Description", ""),
994
                    role=request["Role"],
995
                    timeout=request.get("Timeout", LAMBDA_DEFAULT_TIMEOUT),
996
                    runtime=request.get("Runtime"),
997
                    memory_size=request.get("MemorySize", LAMBDA_DEFAULT_MEMORY_SIZE),
998
                    handler=request.get("Handler"),
999
                    package_type=package_type,
1000
                    environment=env_vars,
1001
                    architectures=request.get("Architectures") or [Architecture.x86_64],
1002
                    tracing_config_mode=request.get("TracingConfig", {}).get(
1003
                        "Mode", TracingMode.PassThrough
1004
                    ),
1005
                    image=image,
1006
                    image_config=image_config,
1007
                    code=code,
1008
                    layers=self.map_layers(layers),
1009
                    internal_revision=short_uid(),
1010
                    ephemeral_storage=LambdaEphemeralStorage(
1011
                        size=request.get("EphemeralStorage", {}).get("Size", 512)
1012
                    ),
1013
                    snap_start=SnapStartResponse(
1014
                        ApplyOn=request.get("SnapStart", {}).get("ApplyOn", SnapStartApplyOn.None_),
1015
                        OptimizationStatus=SnapStartOptimizationStatus.Off,
1016
                    ),
1017
                    runtime_version_config=runtime_version_config,
1018
                    dead_letter_arn=request.get("DeadLetterConfig", {}).get("TargetArn"),
1019
                    vpc_config=self._build_vpc_config(
1020
                        context_account_id, context_region, request.get("VpcConfig")
1021
                    ),
1022
                    state=VersionState(
1023
                        state=State.Pending,
1024
                        code=StateReasonCode.Creating,
1025
                        reason="The function is being created.",
1026
                    ),
1027
                    logging_config=logging_config,
1028
                ),
1029
            )
1030
            fn.versions["$LATEST"] = version
1✔
1031
            state.functions[function_name] = fn
1✔
1032
        self.lambda_service.create_function_version(version)
1✔
1033

1034
        if tags := request.get("Tags"):
1✔
1035
            # This will check whether the function exists.
1036
            self._store_tags(arn.unqualified_arn(), tags)
1✔
1037

1038
        if request.get("Publish"):
1✔
1039
            version = self._publish_version_with_changes(
1✔
1040
                function_name=function_name, region=context_region, account_id=context_account_id
1041
            )
1042

1043
        if config.LAMBDA_SYNCHRONOUS_CREATE:
1✔
1044
            # block via retrying until "terminal" condition reached before returning
UNCOV
1045
            if not poll_condition(
×
1046
                lambda: get_function_version(
1047
                    function_name, version.id.qualifier, version.id.account, version.id.region
1048
                ).config.state.state
1049
                in [State.Active, State.Failed],
1050
                timeout=10,
1051
            ):
UNCOV
1052
                LOG.warning(
×
1053
                    "LAMBDA_SYNCHRONOUS_CREATE is active, but waiting for %s reached timeout.",
1054
                    function_name,
1055
                )
1056

1057
        return api_utils.map_config_out(
1✔
1058
            version, return_qualified_arn=False, return_update_status=False
1059
        )
1060

1061
    def _validate_runtime(self, package_type, runtime):
1✔
1062
        runtimes = ALL_RUNTIMES
1✔
1063
        if config.LAMBDA_RUNTIME_VALIDATION:
1✔
1064
            runtimes = list(itertools.chain(RUNTIMES_AGGREGATED.values()))
1✔
1065

1066
        if package_type == PackageType.Zip and runtime not in runtimes:
1✔
1067
            # deprecated runtimes have different error
1068
            if runtime in DEPRECATED_RUNTIMES:
1✔
1069
                HINT_LOG.info(
1✔
1070
                    "Set env variable LAMBDA_RUNTIME_VALIDATION to 0"
1071
                    " in order to allow usage of deprecated runtimes"
1072
                )
1073
                self._check_for_recomended_migration_target(runtime)
1✔
1074

1075
            raise InvalidParameterValueException(
1✔
1076
                f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1077
                Type="User",
1078
            )
1079

1080
    def _check_for_recomended_migration_target(self, deprecated_runtime):
1✔
1081
        # AWS offers recommended runtime for migration for "newly" deprecated runtimes
1082
        # in order to preserve parity with error messages we need the code bellow
1083
        latest_runtime = DEPRECATED_RUNTIMES_UPGRADES.get(deprecated_runtime)
1✔
1084

1085
        if latest_runtime is not None:
1✔
1086
            LOG.debug(
1✔
1087
                "The Lambda runtime %s is deprecated. Please upgrade to a supported Lambda runtime such as %s.",
1088
                deprecated_runtime,
1089
                latest_runtime,
1090
            )
1091
            raise InvalidParameterValueException(
1✔
1092
                f"The runtime parameter of {deprecated_runtime} is no longer supported for creating or updating AWS Lambda functions. We recommend you use a supported runtime while creating or updating functions.",
1093
                Type="User",
1094
            )
1095

1096
    @handler(operation="UpdateFunctionConfiguration", expand=False)
1✔
1097
    def update_function_configuration(
1✔
1098
        self, context: RequestContext, request: UpdateFunctionConfigurationRequest
1099
    ) -> FunctionConfiguration:
1100
        """updates the $LATEST version of the function"""
1101
        function_name = request.get("FunctionName")
1✔
1102

1103
        # in case we got ARN or partial ARN
1104
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1105
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1106
        state = lambda_stores[account_id][region]
1✔
1107

1108
        if function_name not in state.functions:
1✔
UNCOV
1109
            raise ResourceNotFoundException(
×
1110
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1111
                Type="User",
1112
            )
1113
        function = state.functions[function_name]
1✔
1114

1115
        # TODO: lock modification of latest version
1116
        # TODO: notify service for changes relevant to re-provisioning of $LATEST
1117
        latest_version = function.latest()
1✔
1118
        latest_version_config = latest_version.config
1✔
1119

1120
        revision_id = request.get("RevisionId")
1✔
1121
        if revision_id and revision_id != latest_version.config.revision_id:
1✔
1122
            raise PreconditionFailedException(
1✔
1123
                "The Revision Id provided does not match the latest Revision Id. "
1124
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1125
                Type="User",
1126
            )
1127

1128
        replace_kwargs = {}
1✔
1129
        if "EphemeralStorage" in request:
1✔
UNCOV
1130
            replace_kwargs["ephemeral_storage"] = LambdaEphemeralStorage(
×
1131
                request.get("EphemeralStorage", {}).get("Size", 512)
1132
            )  # TODO: do defaults here apply as well?
1133

1134
        if "Role" in request:
1✔
1135
            if not api_utils.is_role_arn(request["Role"]):
1✔
1136
                raise ValidationException(
1✔
1137
                    f"1 validation error detected: Value '{request.get('Role')}'"
1138
                    + " at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+"
1139
                )
1140
            replace_kwargs["role"] = request["Role"]
1✔
1141

1142
        if "Description" in request:
1✔
1143
            replace_kwargs["description"] = request["Description"]
1✔
1144

1145
        if "Timeout" in request:
1✔
1146
            replace_kwargs["timeout"] = request["Timeout"]
1✔
1147

1148
        if "MemorySize" in request:
1✔
1149
            replace_kwargs["memory_size"] = request["MemorySize"]
1✔
1150

1151
        if "DeadLetterConfig" in request:
1✔
1152
            replace_kwargs["dead_letter_arn"] = request.get("DeadLetterConfig", {}).get("TargetArn")
1✔
1153

1154
        if vpc_config := request.get("VpcConfig"):
1✔
1155
            replace_kwargs["vpc_config"] = self._build_vpc_config(account_id, region, vpc_config)
1✔
1156

1157
        if "Handler" in request:
1✔
1158
            replace_kwargs["handler"] = request["Handler"]
1✔
1159

1160
        if "Runtime" in request:
1✔
1161
            runtime = request["Runtime"]
1✔
1162

1163
            if runtime not in ALL_RUNTIMES:
1✔
1164
                raise InvalidParameterValueException(
1✔
1165
                    f"Value {runtime} at 'runtime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_RUNTIMES} or be a valid ARN",
1166
                    Type="User",
1167
                )
1168
            if runtime in DEPRECATED_RUNTIMES:
1✔
UNCOV
1169
                LOG.warning(
×
1170
                    "The Lambda runtime %s is deprecated. "
1171
                    "Please upgrade the runtime for the function %s: "
1172
                    "https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html",
1173
                    runtime,
1174
                    function_name,
1175
                )
1176
            replace_kwargs["runtime"] = request["Runtime"]
1✔
1177

1178
        if snap_start := request.get("SnapStart"):
1✔
1179
            runtime = replace_kwargs.get("runtime") or latest_version_config.runtime
1✔
1180
            self._validate_snapstart(snap_start, runtime)
1✔
1181
            replace_kwargs["snap_start"] = SnapStartResponse(
1✔
1182
                ApplyOn=snap_start.get("ApplyOn", SnapStartApplyOn.None_),
1183
                OptimizationStatus=SnapStartOptimizationStatus.Off,
1184
            )
1185

1186
        if "Environment" in request:
1✔
1187
            if env_vars := request.get("Environment", {}).get("Variables", {}):
1✔
1188
                self._verify_env_variables(env_vars)
1✔
1189
            replace_kwargs["environment"] = env_vars
1✔
1190

1191
        if "Layers" in request:
1✔
1192
            new_layers = request["Layers"]
1✔
1193
            if new_layers:
1✔
1194
                self._validate_layers(new_layers, region=region, account_id=account_id)
1✔
1195
            replace_kwargs["layers"] = self.map_layers(new_layers)
1✔
1196

1197
        if "ImageConfig" in request:
1✔
1198
            new_image_config = request["ImageConfig"]
1✔
1199
            replace_kwargs["image_config"] = ImageConfig(
1✔
1200
                command=new_image_config.get("Command"),
1201
                entrypoint=new_image_config.get("EntryPoint"),
1202
                working_directory=new_image_config.get("WorkingDirectory"),
1203
            )
1204

1205
        if "LoggingConfig" in request:
1✔
1206
            logging_config = request["LoggingConfig"]
1✔
1207
            LOG.warning(
1✔
1208
                "Advanced Lambda Logging Configuration is currently mocked "
1209
                "and will not impact the logging behavior. "
1210
                "Please create a feature request if needed."
1211
            )
1212

1213
            # when switching to JSON, app and system level log is auto set to INFO
1214
            if logging_config.get("LogFormat", None) == LogFormat.JSON:
1✔
1215
                logging_config = {
1✔
1216
                    "ApplicationLogLevel": "INFO",
1217
                    "SystemLogLevel": "INFO",
1218
                } | logging_config
1219

1220
            last_config = latest_version_config.logging_config
1✔
1221

1222
            # add partial update
1223
            new_logging_config = last_config | logging_config
1✔
1224

1225
            # in case we switched from JSON to Text we need to remove LogLevel keys
1226
            if (
1✔
1227
                new_logging_config.get("LogFormat") == LogFormat.Text
1228
                and last_config.get("LogFormat") == LogFormat.JSON
1229
            ):
1230
                new_logging_config.pop("ApplicationLogLevel", None)
1✔
1231
                new_logging_config.pop("SystemLogLevel", None)
1✔
1232

1233
            replace_kwargs["logging_config"] = new_logging_config
1✔
1234

1235
        if "TracingConfig" in request:
1✔
UNCOV
1236
            new_mode = request.get("TracingConfig", {}).get("Mode")
×
UNCOV
1237
            if new_mode:
×
UNCOV
1238
                replace_kwargs["tracing_config_mode"] = new_mode
×
1239

1240
        new_latest_version = dataclasses.replace(
1✔
1241
            latest_version,
1242
            config=dataclasses.replace(
1243
                latest_version_config,
1244
                last_modified=api_utils.generate_lambda_date(),
1245
                internal_revision=short_uid(),
1246
                last_update=UpdateStatus(
1247
                    status=LastUpdateStatus.InProgress,
1248
                    code="Creating",
1249
                    reason="The function is being created.",
1250
                ),
1251
                **replace_kwargs,
1252
            ),
1253
        )
1254
        function.versions["$LATEST"] = new_latest_version  # TODO: notify
1✔
1255
        self.lambda_service.update_version(new_version=new_latest_version)
1✔
1256

1257
        return api_utils.map_config_out(new_latest_version)
1✔
1258

1259
    @handler(operation="UpdateFunctionCode", expand=False)
1✔
1260
    def update_function_code(
1✔
1261
        self, context: RequestContext, request: UpdateFunctionCodeRequest
1262
    ) -> FunctionConfiguration:
1263
        """updates the $LATEST version of the function"""
1264
        # only supports normal zip packaging atm
1265
        # if request.get("Publish"):
1266
        #     self.lambda_service.create_function_version()
1267

1268
        function_name = request.get("FunctionName")
1✔
1269
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1270
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
1271

1272
        store = lambda_stores[account_id][region]
1✔
1273
        if function_name not in store.functions:
1✔
UNCOV
1274
            raise ResourceNotFoundException(
×
1275
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1276
                Type="User",
1277
            )
1278
        function = store.functions[function_name]
1✔
1279

1280
        revision_id = request.get("RevisionId")
1✔
1281
        if revision_id and revision_id != function.latest().config.revision_id:
1✔
1282
            raise PreconditionFailedException(
1✔
1283
                "The Revision Id provided does not match the latest Revision Id. "
1284
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1285
                Type="User",
1286
            )
1287

1288
        # TODO verify if correct combination of code is set
1289
        image = None
1✔
1290
        if (
1✔
1291
            request.get("ZipFile") or request.get("S3Bucket")
1292
        ) and function.latest().config.package_type == PackageType.Image:
1293
            raise InvalidParameterValueException(
1✔
1294
                "Please provide ImageUri when updating a function with packageType Image.",
1295
                Type="User",
1296
            )
1297
        elif request.get("ImageUri") and function.latest().config.package_type == PackageType.Zip:
1✔
1298
            raise InvalidParameterValueException(
1✔
1299
                "Please don't provide ImageUri when updating a function with packageType Zip.",
1300
                Type="User",
1301
            )
1302

1303
        if zip_file := request.get("ZipFile"):
1✔
1304
            code = store_lambda_archive(
1✔
1305
                archive_file=zip_file,
1306
                function_name=function_name,
1307
                region_name=region,
1308
                account_id=account_id,
1309
            )
1310
        elif s3_bucket := request.get("S3Bucket"):
1✔
1311
            s3_key = request["S3Key"]
1✔
1312
            s3_object_version = request.get("S3ObjectVersion")
1✔
1313
            code = store_s3_bucket_archive(
1✔
1314
                archive_bucket=s3_bucket,
1315
                archive_key=s3_key,
1316
                archive_version=s3_object_version,
1317
                function_name=function_name,
1318
                region_name=region,
1319
                account_id=account_id,
1320
            )
1321
        elif image := request.get("ImageUri"):
1✔
1322
            code = None
1✔
1323
            image = create_image_code(image_uri=image)
1✔
1324
        else:
1325
            raise LambdaServiceException("Gotta have s3 bucket or zip file or image")
×
1326

1327
        old_function_version = function.versions.get("$LATEST")
1✔
1328
        replace_kwargs = {"code": code} if code else {"image": image}
1✔
1329

1330
        if architectures := request.get("Architectures"):
1✔
UNCOV
1331
            if len(architectures) != 1:
×
UNCOV
1332
                raise ValidationException(
×
1333
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1334
                    f"satisfy constraint: Member must have length less than or equal to 1",
1335
                )
1336
            # An empty list of architectures is also forbidden. Further exceptions are tested here for create_function:
1337
            # tests.aws.services.lambda_.test_lambda_api.TestLambdaFunction.test_create_lambda_exceptions
UNCOV
1338
            if architectures[0] not in ARCHITECTURES:
×
UNCOV
1339
                raise ValidationException(
×
1340
                    f"1 validation error detected: Value '[{', '.join(architectures)}]' at 'architectures' failed to "
1341
                    f"satisfy constraint: Member must satisfy constraint: [Member must satisfy enum value set: "
1342
                    f"[x86_64, arm64], Member must not be null]",
1343
                )
UNCOV
1344
            replace_kwargs["architectures"] = architectures
×
1345

1346
        config = dataclasses.replace(
1✔
1347
            old_function_version.config,
1348
            internal_revision=short_uid(),
1349
            last_modified=api_utils.generate_lambda_date(),
1350
            last_update=UpdateStatus(
1351
                status=LastUpdateStatus.InProgress,
1352
                code="Creating",
1353
                reason="The function is being created.",
1354
            ),
1355
            **replace_kwargs,
1356
        )
1357
        function_version = dataclasses.replace(old_function_version, config=config)
1✔
1358
        function.versions["$LATEST"] = function_version
1✔
1359

1360
        self.lambda_service.update_version(new_version=function_version)
1✔
1361
        if request.get("Publish"):
1✔
1362
            function_version = self._publish_version_with_changes(
1✔
1363
                function_name=function_name, region=region, account_id=account_id
1364
            )
1365
        return api_utils.map_config_out(
1✔
1366
            function_version, return_qualified_arn=bool(request.get("Publish"))
1367
        )
1368

1369
    # TODO: does deleting the latest published version affect the next versions number?
1370
    # TODO: what happens when we call this with a qualifier and a fully qualified ARN? (+ conflicts?)
1371
    # TODO: test different ARN patterns (shorthand ARN?)
1372
    # TODO: test deleting across regions?
1373
    # TODO: test mismatch between context region and region in ARN
1374
    # TODO: test qualifier $LATEST, alias-name and version
1375
    def delete_function(
1✔
1376
        self,
1377
        context: RequestContext,
1378
        function_name: FunctionName,
1379
        qualifier: Qualifier = None,
1380
        **kwargs,
1381
    ) -> None:
1382
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1383
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1384
            function_name, qualifier, context
1385
        )
1386

1387
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
UNCOV
1388
            raise InvalidParameterValueException(
×
1389
                "Deletion of aliases is not currently supported.",
1390
                Type="User",
1391
            )
1392

1393
        store = lambda_stores[account_id][region]
1✔
1394
        if qualifier == "$LATEST":
1✔
1395
            raise InvalidParameterValueException(
1✔
1396
                "$LATEST version cannot be deleted without deleting the function.", Type="User"
1397
            )
1398

1399
        if function_name not in store.functions:
1✔
1400
            e = ResourceNotFoundException(
1✔
1401
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name=function_name, region=region, account=account_id)}",
1402
                Type="User",
1403
            )
1404
            raise e
1✔
1405
        function = store.functions.get(function_name)
1✔
1406

1407
        if qualifier:
1✔
1408
            # delete a version of the function
1409
            version = function.versions.pop(qualifier, None)
1✔
1410
            if version:
1✔
1411
                self.lambda_service.stop_version(version.id.qualified_arn())
1✔
1412
                destroy_code_if_not_used(code=version.config.code, function=function)
1✔
1413
        else:
1414
            # delete the whole function
1415
            # TODO: introduce locking for safe deletion: We could create a new version at the API layer before
1416
            #  the old version gets cleaned up in the internal lambda service.
1417
            function = store.functions.pop(function_name)
1✔
1418
            for version in function.versions.values():
1✔
1419
                self.lambda_service.stop_version(qualified_arn=version.id.qualified_arn())
1✔
1420
                # we can safely destroy the code here
1421
                if version.config.code:
1✔
1422
                    version.config.code.destroy()
1✔
1423

1424
    def list_functions(
1✔
1425
        self,
1426
        context: RequestContext,
1427
        master_region: MasterRegion = None,  # (only relevant for lambda@edge)
1428
        function_version: FunctionVersionApi = None,
1429
        marker: String = None,
1430
        max_items: MaxListItems = None,
1431
        **kwargs,
1432
    ) -> ListFunctionsResponse:
1433
        state = lambda_stores[context.account_id][context.region]
1✔
1434

1435
        if function_version and function_version != FunctionVersionApi.ALL:
1✔
1436
            raise ValidationException(
1✔
1437
                f"1 validation error detected: Value '{function_version}'"
1438
                + " at 'functionVersion' failed to satisfy constraint: Member must satisfy enum value set: [ALL]"
1439
            )
1440

1441
        if function_version == FunctionVersionApi.ALL:
1✔
1442
            # include all versions for all function
1443
            versions = [v for f in state.functions.values() for v in f.versions.values()]
1✔
1444
            return_qualified_arn = True
1✔
1445
        else:
1446
            versions = [f.latest() for f in state.functions.values()]
1✔
1447
            return_qualified_arn = False
1✔
1448

1449
        versions = [
1✔
1450
            api_utils.map_to_list_response(
1451
                api_utils.map_config_out(fc, return_qualified_arn=return_qualified_arn)
1452
            )
1453
            for fc in versions
1454
        ]
1455
        versions = PaginatedList(versions)
1✔
1456
        page, token = versions.get_page(
1✔
1457
            lambda version: version["FunctionArn"],
1458
            marker,
1459
            max_items,
1460
        )
1461
        return ListFunctionsResponse(Functions=page, NextMarker=token)
1✔
1462

1463
    def get_function(
1✔
1464
        self,
1465
        context: RequestContext,
1466
        function_name: NamespacedFunctionName,
1467
        qualifier: Qualifier = None,
1468
        **kwargs,
1469
    ) -> GetFunctionResponse:
1470
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1471
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1472
            function_name, qualifier, context
1473
        )
1474

1475
        fn = lambda_stores[account_id][region].functions.get(function_name)
1✔
1476
        if fn is None:
1✔
1477
            if qualifier is None:
1✔
1478
                raise ResourceNotFoundException(
1✔
1479
                    f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
1480
                    Type="User",
1481
                )
1482
            else:
1483
                raise ResourceNotFoundException(
1✔
1484
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
1485
                    Type="User",
1486
                )
1487
        alias_name = None
1✔
1488
        if qualifier and api_utils.qualifier_is_alias(qualifier):
1✔
1489
            if qualifier not in fn.aliases:
1✔
1490
                alias_arn = api_utils.qualified_lambda_arn(
1✔
1491
                    function_name, qualifier, account_id, region
1492
                )
1493
                raise ResourceNotFoundException(f"Function not found: {alias_arn}", Type="User")
1✔
1494
            alias_name = qualifier
1✔
1495
            qualifier = fn.aliases[alias_name].function_version
1✔
1496

1497
        version = get_function_version(
1✔
1498
            function_name=function_name,
1499
            qualifier=qualifier,
1500
            account_id=account_id,
1501
            region=region,
1502
        )
1503
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
1504
        additional_fields = {}
1✔
1505
        if tags:
1✔
1506
            additional_fields["Tags"] = tags
1✔
1507
        code_location = None
1✔
1508
        if code := version.config.code:
1✔
1509
            code_location = FunctionCodeLocation(
1✔
1510
                Location=code.generate_presigned_url(), RepositoryType="S3"
1511
            )
1512
        elif image := version.config.image:
1✔
1513
            code_location = FunctionCodeLocation(
1✔
1514
                ImageUri=image.image_uri,
1515
                RepositoryType=image.repository_type,
1516
                ResolvedImageUri=image.resolved_image_uri,
1517
            )
1518

1519
        return GetFunctionResponse(
1✔
1520
            Configuration=api_utils.map_config_out(
1521
                version, return_qualified_arn=bool(qualifier), alias_name=alias_name
1522
            ),
1523
            Code=code_location,  # TODO
1524
            **additional_fields,
1525
            # Concurrency={},  # TODO
1526
        )
1527

1528
    def get_function_configuration(
1✔
1529
        self,
1530
        context: RequestContext,
1531
        function_name: NamespacedFunctionName,
1532
        qualifier: Qualifier = None,
1533
        **kwargs,
1534
    ) -> FunctionConfiguration:
1535
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1536
        # CAVE: THIS RETURN VALUE IS *NOT* THE SAME AS IN get_function (!) but seems to be only configuration part?
1537
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1538
            function_name, qualifier, context
1539
        )
1540
        version = get_function_version(
1✔
1541
            function_name=function_name,
1542
            qualifier=qualifier,
1543
            account_id=account_id,
1544
            region=region,
1545
        )
1546
        return api_utils.map_config_out(version, return_qualified_arn=bool(qualifier))
1✔
1547

1548
    def invoke(
1✔
1549
        self,
1550
        context: RequestContext,
1551
        function_name: NamespacedFunctionName,
1552
        invocation_type: InvocationType = None,
1553
        log_type: LogType = None,
1554
        client_context: String = None,
1555
        payload: IO[Blob] = None,
1556
        qualifier: Qualifier = None,
1557
        **kwargs,
1558
    ) -> InvocationResponse:
1559
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1560
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
1561
            function_name, qualifier, context
1562
        )
1563

1564
        time_before = time.perf_counter()
1✔
1565
        try:
1✔
1566
            invocation_result = self.lambda_service.invoke(
1✔
1567
                function_name=function_name,
1568
                qualifier=qualifier,
1569
                region=region,
1570
                account_id=account_id,
1571
                invocation_type=invocation_type,
1572
                client_context=client_context,
1573
                request_id=context.request_id,
1574
                trace_context=context.trace_context,
1575
                payload=payload.read() if payload else None,
1576
            )
1577
        except ServiceException:
1✔
1578
            raise
1✔
1579
        except EnvironmentStartupTimeoutException as e:
1✔
1580
            raise LambdaServiceException("Internal error while executing lambda") from e
1✔
1581
        except Exception as e:
1✔
1582
            LOG.error("Error while invoking lambda", exc_info=e)
1✔
1583
            raise LambdaServiceException("Internal error while executing lambda") from e
1✔
1584

1585
        if invocation_type == InvocationType.Event:
1✔
1586
            # This happens when invocation type is event
1587
            return InvocationResponse(StatusCode=202)
1✔
1588
        if invocation_type == InvocationType.DryRun:
1✔
1589
            # This happens when invocation type is dryrun
1590
            return InvocationResponse(StatusCode=204)
1✔
1591
        LOG.debug("Lambda invocation duration: %0.2fms", (time.perf_counter() - time_before) * 1000)
1✔
1592

1593
        response = InvocationResponse(
1✔
1594
            StatusCode=200,
1595
            Payload=invocation_result.payload,
1596
            ExecutedVersion=invocation_result.executed_version,
1597
        )
1598

1599
        if invocation_result.is_error:
1✔
1600
            response["FunctionError"] = "Unhandled"
1✔
1601

1602
        if log_type == LogType.Tail:
1✔
1603
            response["LogResult"] = to_str(
1✔
1604
                base64.b64encode(to_bytes(invocation_result.logs)[-4096:])
1605
            )
1606

1607
        return response
1✔
1608

1609
    # Version operations
1610
    def publish_version(
1✔
1611
        self,
1612
        context: RequestContext,
1613
        function_name: FunctionName,
1614
        code_sha256: String = None,
1615
        description: Description = None,
1616
        revision_id: String = None,
1617
        **kwargs,
1618
    ) -> FunctionConfiguration:
1619
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1620
        function_name = api_utils.get_function_name(function_name, context)
1✔
1621
        new_version = self._publish_version_from_existing_version(
1✔
1622
            function_name=function_name,
1623
            description=description,
1624
            account_id=account_id,
1625
            region=region,
1626
            revision_id=revision_id,
1627
            code_sha256=code_sha256,
1628
        )
1629
        return api_utils.map_config_out(new_version, return_qualified_arn=True)
1✔
1630

1631
    def list_versions_by_function(
1✔
1632
        self,
1633
        context: RequestContext,
1634
        function_name: NamespacedFunctionName,
1635
        marker: String = None,
1636
        max_items: MaxListItems = None,
1637
        **kwargs,
1638
    ) -> ListVersionsByFunctionResponse:
1639
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1640
        function_name = api_utils.get_function_name(function_name, context)
1✔
1641
        function = self._get_function(
1✔
1642
            function_name=function_name, region=region, account_id=account_id
1643
        )
1644
        versions = [
1✔
1645
            api_utils.map_to_list_response(
1646
                api_utils.map_config_out(version=version, return_qualified_arn=True)
1647
            )
1648
            for version in function.versions.values()
1649
        ]
1650
        items = PaginatedList(versions)
1✔
1651
        page, token = items.get_page(
1✔
1652
            lambda item: item,
1653
            marker,
1654
            max_items,
1655
        )
1656
        return ListVersionsByFunctionResponse(Versions=page, NextMarker=token)
1✔
1657

1658
    # Alias
1659

1660
    def _create_routing_config_model(
1✔
1661
        self, routing_config_dict: dict[str, float], function_version: FunctionVersion
1662
    ):
1663
        if len(routing_config_dict) > 1:
1✔
1664
            raise InvalidParameterValueException(
1✔
1665
                "Number of items in AdditionalVersionWeights cannot be greater than 1",
1666
                Type="User",
1667
            )
1668
        # should be exactly one item here, still iterating, might be supported in the future
1669
        for key, value in routing_config_dict.items():
1✔
1670
            if value < 0.0 or value >= 1.0:
1✔
1671
                raise ValidationException(
1✔
1672
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map value must satisfy constraint: [Member must have value less than or equal to 1.0, Member must have value greater than or equal to 0.0, Member must not be null]"
1673
                )
1674
            if key == function_version.id.qualifier:
1✔
1675
                raise InvalidParameterValueException(
1✔
1676
                    f"Invalid function version {function_version.id.qualifier}. Function version {function_version.id.qualifier} is already included in routing configuration.",
1677
                    Type="User",
1678
                )
1679
            # check if version target is latest, then no routing config is allowed
1680
            if function_version.id.qualifier == "$LATEST":
1✔
1681
                raise InvalidParameterValueException(
1✔
1682
                    "$LATEST is not supported for an alias pointing to more than 1 version"
1683
                )
1684
            if not api_utils.qualifier_is_version(key):
1✔
1685
                raise ValidationException(
1✔
1686
                    f"1 validation error detected: Value '{{{key}={value}}}' at 'routingConfig.additionalVersionWeights' failed to satisfy constraint: Map keys must satisfy constraint: [Member must have length less than or equal to 1024, Member must have length greater than or equal to 1, Member must satisfy regular expression pattern: [0-9]+, Member must not be null]"
1687
                )
1688

1689
            # checking if the version in the config exists
1690
            get_function_version(
1✔
1691
                function_name=function_version.id.function_name,
1692
                qualifier=key,
1693
                region=function_version.id.region,
1694
                account_id=function_version.id.account,
1695
            )
1696
        return AliasRoutingConfig(version_weights=routing_config_dict)
1✔
1697

1698
    def create_alias(
1✔
1699
        self,
1700
        context: RequestContext,
1701
        function_name: FunctionName,
1702
        name: Alias,
1703
        function_version: Version,
1704
        description: Description = None,
1705
        routing_config: AliasRoutingConfiguration = None,
1706
        **kwargs,
1707
    ) -> AliasConfiguration:
1708
        if not api_utils.qualifier_is_alias(name):
1✔
1709
            raise ValidationException(
1✔
1710
                f"1 validation error detected: Value '{name}' at 'name' failed to satisfy constraint: Member must satisfy regular expression pattern: (?!^[0-9]+$)([a-zA-Z0-9-_]+)"
1711
            )
1712

1713
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1714
        function_name = api_utils.get_function_name(function_name, context)
1✔
1715
        target_version = get_function_version(
1✔
1716
            function_name=function_name,
1717
            qualifier=function_version,
1718
            region=region,
1719
            account_id=account_id,
1720
        )
1721
        function = self._get_function(
1✔
1722
            function_name=function_name, region=region, account_id=account_id
1723
        )
1724
        # description is always present, if not specified it's an empty string
1725
        description = description or ""
1✔
1726
        with function.lock:
1✔
1727
            if existing_alias := function.aliases.get(name):
1✔
1728
                raise ResourceConflictException(
1✔
1729
                    f"Alias already exists: {api_utils.map_alias_out(alias=existing_alias, function=function)['AliasArn']}",
1730
                    Type="User",
1731
                )
1732
            # checking if the version exists
1733
            routing_configuration = None
1✔
1734
            if routing_config and (
1✔
1735
                routing_config_dict := routing_config.get("AdditionalVersionWeights")
1736
            ):
1737
                routing_configuration = self._create_routing_config_model(
1✔
1738
                    routing_config_dict, target_version
1739
                )
1740

1741
            alias = VersionAlias(
1✔
1742
                name=name,
1743
                function_version=function_version,
1744
                description=description,
1745
                routing_configuration=routing_configuration,
1746
            )
1747
            function.aliases[name] = alias
1✔
1748
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1749

1750
    def list_aliases(
1✔
1751
        self,
1752
        context: RequestContext,
1753
        function_name: FunctionName,
1754
        function_version: Version = None,
1755
        marker: String = None,
1756
        max_items: MaxListItems = None,
1757
        **kwargs,
1758
    ) -> ListAliasesResponse:
1759
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1760
        function_name = api_utils.get_function_name(function_name, context)
1✔
1761
        function = self._get_function(
1✔
1762
            function_name=function_name, region=region, account_id=account_id
1763
        )
1764
        aliases = [
1✔
1765
            api_utils.map_alias_out(alias, function)
1766
            for alias in function.aliases.values()
1767
            if function_version is None or alias.function_version == function_version
1768
        ]
1769

1770
        aliases = PaginatedList(aliases)
1✔
1771
        page, token = aliases.get_page(
1✔
1772
            lambda alias: alias["AliasArn"],
1773
            marker,
1774
            max_items,
1775
        )
1776

1777
        return ListAliasesResponse(Aliases=page, NextMarker=token)
1✔
1778

1779
    def delete_alias(
1✔
1780
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
1781
    ) -> None:
1782
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1783
        function_name = api_utils.get_function_name(function_name, context)
1✔
1784
        function = self._get_function(
1✔
1785
            function_name=function_name, region=region, account_id=account_id
1786
        )
1787
        version_alias = function.aliases.pop(name, None)
1✔
1788

1789
        # cleanup related resources
1790
        if name in function.provisioned_concurrency_configs:
1✔
1791
            function.provisioned_concurrency_configs.pop(name)
1✔
1792

1793
        # TODO: Allow for deactivating/unregistering specific Lambda URLs
1794
        if version_alias and name in function.function_url_configs:
1✔
1795
            url_config = function.function_url_configs.pop(name)
1✔
1796
            LOG.debug(
1✔
1797
                "Stopping aliased Lambda Function URL %s for %s",
1798
                url_config.url,
1799
                url_config.function_name,
1800
            )
1801

1802
    def get_alias(
1✔
1803
        self, context: RequestContext, function_name: FunctionName, name: Alias, **kwargs
1804
    ) -> AliasConfiguration:
1805
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1806
        function_name = api_utils.get_function_name(function_name, context)
1✔
1807
        function = self._get_function(
1✔
1808
            function_name=function_name, region=region, account_id=account_id
1809
        )
1810
        if not (alias := function.aliases.get(name)):
1✔
1811
            raise ResourceNotFoundException(
1✔
1812
                f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name=function_name, qualifier=name, region=region, account=account_id)}",
1813
                Type="User",
1814
            )
1815
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1816

1817
    def update_alias(
1✔
1818
        self,
1819
        context: RequestContext,
1820
        function_name: FunctionName,
1821
        name: Alias,
1822
        function_version: Version = None,
1823
        description: Description = None,
1824
        routing_config: AliasRoutingConfiguration = None,
1825
        revision_id: String = None,
1826
        **kwargs,
1827
    ) -> AliasConfiguration:
1828
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
1829
        function_name = api_utils.get_function_name(function_name, context)
1✔
1830
        function = self._get_function(
1✔
1831
            function_name=function_name, region=region, account_id=account_id
1832
        )
1833
        if not (alias := function.aliases.get(name)):
1✔
1834
            fn_arn = api_utils.qualified_lambda_arn(function_name, name, account_id, region)
1✔
1835
            raise ResourceNotFoundException(
1✔
1836
                f"Alias not found: {fn_arn}",
1837
                Type="User",
1838
            )
1839
        if revision_id and alias.revision_id != revision_id:
1✔
1840
            raise PreconditionFailedException(
1✔
1841
                "The Revision Id provided does not match the latest Revision Id. "
1842
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
1843
                Type="User",
1844
            )
1845
        changes = {}
1✔
1846
        if function_version is not None:
1✔
1847
            changes |= {"function_version": function_version}
1✔
1848
        if description is not None:
1✔
1849
            changes |= {"description": description}
1✔
1850
        if routing_config is not None:
1✔
1851
            # if it is an empty dict or AdditionalVersionWeights is empty, set routing config to None
1852
            new_routing_config = None
1✔
1853
            if routing_config_dict := routing_config.get("AdditionalVersionWeights"):
1✔
UNCOV
1854
                new_routing_config = self._create_routing_config_model(routing_config_dict)
×
1855
            changes |= {"routing_configuration": new_routing_config}
1✔
1856
        # even if no changes are done, we have to update revision id for some reason
1857
        old_alias = alias
1✔
1858
        alias = dataclasses.replace(alias, **changes)
1✔
1859
        function.aliases[name] = alias
1✔
1860

1861
        # TODO: signal lambda service that pointer potentially changed
1862
        self.lambda_service.update_alias(old_alias=old_alias, new_alias=alias, function=function)
1✔
1863

1864
        return api_utils.map_alias_out(alias=alias, function=function)
1✔
1865

1866
    # =======================================
1867
    # ======= EVENT SOURCE MAPPINGS =========
1868
    # =======================================
1869

1870
    @handler("CreateEventSourceMapping", expand=False)
1✔
1871
    def create_event_source_mapping(
1✔
1872
        self,
1873
        context: RequestContext,
1874
        request: CreateEventSourceMappingRequest,
1875
    ) -> EventSourceMappingConfiguration:
1876
        return self.create_event_source_mapping_v2(context, request)
1✔
1877

1878
    def create_event_source_mapping_v2(
1✔
1879
        self,
1880
        context: RequestContext,
1881
        request: CreateEventSourceMappingRequest,
1882
    ) -> EventSourceMappingConfiguration:
1883
        # Validations
1884
        function_arn, function_name, state = self.validate_event_source_mapping(context, request)
1✔
1885

1886
        esm_config = EsmConfigFactory(request, context, function_arn).get_esm_config()
1✔
1887

1888
        # Copy esm_config to avoid a race condition with potential async update in the store
1889
        state.event_source_mappings[esm_config["UUID"]] = esm_config.copy()
1✔
1890
        function_version = get_function_version_from_arn(function_arn)
1✔
1891
        function_role = function_version.config.role
1✔
1892
        enabled = request.get("Enabled", True)
1✔
1893
        # TODO: check for potential async race condition update -> think about locking
1894
        esm_worker = EsmWorkerFactory(esm_config, function_role, enabled).get_esm_worker()
1✔
1895
        self.esm_workers[esm_worker.uuid] = esm_worker
1✔
1896
        # TODO: check StateTransitionReason, LastModified, LastProcessingResult (concurrent updates requires locking!)
1897
        if tags := request.get("Tags"):
1✔
1898
            self._store_tags(esm_config.get("EventSourceMappingArn"), tags)
1✔
1899
        esm_worker.create()
1✔
1900
        return esm_config
1✔
1901

1902
    def validate_event_source_mapping(self, context, request):
1✔
1903
        # TODO: test whether stream ARNs are valid sources for Pipes or ESM or whether only DynamoDB table ARNs work
1904
        is_create_esm_request = context.operation.name == self.create_event_source_mapping.operation
1✔
1905

1906
        if destination_config := request.get("DestinationConfig"):
1✔
1907
            if "OnSuccess" in destination_config:
1✔
1908
                raise InvalidParameterValueException(
1✔
1909
                    "Unsupported DestinationConfig parameter for given event source mapping type.",
1910
                    Type="User",
1911
                )
1912

1913
        service = None
1✔
1914
        if "SelfManagedEventSource" in request:
1✔
UNCOV
1915
            service = "kafka"
×
UNCOV
1916
            if "SourceAccessConfigurations" not in request:
×
UNCOV
1917
                raise InvalidParameterValueException(
×
1918
                    "Required 'sourceAccessConfigurations' parameter is missing.", Type="User"
1919
                )
1920
        if service is None and "EventSourceArn" not in request:
1✔
1921
            raise InvalidParameterValueException("Unrecognized event source.", Type="User")
1✔
1922
        if service is None:
1✔
1923
            service = extract_service_from_arn(request["EventSourceArn"])
1✔
1924

1925
        batch_size = api_utils.validate_and_set_batch_size(service, request.get("BatchSize"))
1✔
1926
        if service in ["dynamodb", "kinesis"] and "StartingPosition" not in request:
1✔
1927
            raise InvalidParameterValueException(
1✔
1928
                "1 validation error detected: Value null at 'startingPosition' failed to satisfy constraint: Member must not be null.",
1929
                Type="User",
1930
            )
1931
        if service in ["sqs", "sqs-fifo"]:
1✔
1932
            if batch_size > 10 and request.get("MaximumBatchingWindowInSeconds", 0) == 0:
1✔
1933
                raise InvalidParameterValueException(
1✔
1934
                    "Maximum batch window in seconds must be greater than 0 if maximum batch size is greater than 10",
1935
                    Type="User",
1936
                )
1937

1938
        if (filter_criteria := request.get("FilterCriteria")) is not None:
1✔
1939
            for filter_ in filter_criteria.get("Filters", []):
1✔
1940
                pattern_str = filter_.get("Pattern")
1✔
1941
                if not pattern_str or not isinstance(pattern_str, str):
1✔
UNCOV
1942
                    raise InvalidParameterValueException(
×
1943
                        "Invalid filter pattern definition.", Type="User"
1944
                    )
1945

1946
                if not validate_event_pattern(pattern_str):
1✔
1947
                    raise InvalidParameterValueException(
1✔
1948
                        "Invalid filter pattern definition.", Type="User"
1949
                    )
1950

1951
        # Can either have a FunctionName (i.e CreateEventSourceMapping request) or
1952
        # an internal EventSourceMappingConfiguration representation
1953
        request_function_name = request.get("FunctionName") or request.get("FunctionArn")
1✔
1954
        # can be either a partial arn or a full arn for the version/alias
1955
        function_name, qualifier, account, region = function_locators_from_arn(
1✔
1956
            request_function_name
1957
        )
1958
        # TODO: validate `context.region` vs. `region(request["FunctionName"])` vs. `region(request["EventSourceArn"])`
1959
        account = account or context.account_id
1✔
1960
        region = region or context.region
1✔
1961
        state = lambda_stores[account][region]
1✔
1962
        fn = state.functions.get(function_name)
1✔
1963
        if not fn:
1✔
1964
            raise InvalidParameterValueException("Function does not exist", Type="User")
1✔
1965
        if qualifier:
1✔
1966
            # make sure the function version/alias exists
1967
            if api_utils.qualifier_is_alias(qualifier):
1✔
1968
                fn_alias = fn.aliases.get(qualifier)
1✔
1969
                if not fn_alias:
1✔
UNCOV
1970
                    raise Exception("unknown alias")  # TODO: cover via test
×
1971
            elif api_utils.qualifier_is_version(qualifier):
1✔
1972
                fn_version = fn.versions.get(qualifier)
1✔
1973
                if not fn_version:
1✔
UNCOV
1974
                    raise Exception("unknown version")  # TODO: cover via test
×
1975
            elif qualifier == "$LATEST":
1✔
1976
                pass
1✔
1977
            else:
UNCOV
1978
                raise Exception("invalid functionname")  # TODO: cover via test
×
1979
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account, region)
1✔
1980

1981
        else:
1982
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account, region)
1✔
1983

1984
        # Check we are validating a CreateEventSourceMapping request
1985
        if is_create_esm_request:
1✔
1986

1987
            def _get_mapping_sources(mapping: dict[str, Any]) -> list[str]:
1✔
1988
                if event_source_arn := mapping.get("EventSourceArn"):
1✔
1989
                    return [event_source_arn]
1✔
UNCOV
1990
                return (
×
1991
                    mapping.get("SelfManagedEventSource", {})
1992
                    .get("Endpoints", {})
1993
                    .get("KAFKA_BOOTSTRAP_SERVERS", [])
1994
                )
1995

1996
            # check for event source duplicates
1997
            # TODO: currently validated for sqs, kinesis, and dynamodb
1998
            service_id = load_service(service).service_id
1✔
1999
            for uuid, mapping in state.event_source_mappings.items():
1✔
2000
                mapping_sources = _get_mapping_sources(mapping)
1✔
2001
                request_sources = _get_mapping_sources(request)
1✔
2002
                if mapping["FunctionArn"] == fn_arn and (
1✔
2003
                    set(mapping_sources).intersection(request_sources)
2004
                ):
2005
                    if service == "sqs":
1✔
2006
                        # *shakes fist at SQS*
2007
                        raise ResourceConflictException(
1✔
2008
                            f'An event source mapping with {service_id} arn (" {mapping["EventSourceArn"]} ") '
2009
                            f'and function (" {function_name} ") already exists. Please update or delete the '
2010
                            f"existing mapping with UUID {uuid}",
2011
                            Type="User",
2012
                        )
2013
                    elif service == "kafka":
1✔
UNCOV
2014
                        if set(mapping["Topics"]).intersection(request["Topics"]):
×
UNCOV
2015
                            raise ResourceConflictException(
×
2016
                                f'An event source mapping with event source ("{",".join(request_sources)}"), '
2017
                                f'function ("{fn_arn}"), '
2018
                                f'topics ("{",".join(request["Topics"])}") already exists. Please update or delete the '
2019
                                f"existing mapping with UUID {uuid}",
2020
                                Type="User",
2021
                            )
2022
                    else:
2023
                        raise ResourceConflictException(
1✔
2024
                            f'The event source arn (" {mapping["EventSourceArn"]} ") and function '
2025
                            f'(" {function_name} ") provided mapping already exists. Please update or delete the '
2026
                            f"existing mapping with UUID {uuid}",
2027
                            Type="User",
2028
                        )
2029
        return fn_arn, function_name, state
1✔
2030

2031
    @handler("UpdateEventSourceMapping", expand=False)
1✔
2032
    def update_event_source_mapping(
1✔
2033
        self,
2034
        context: RequestContext,
2035
        request: UpdateEventSourceMappingRequest,
2036
    ) -> EventSourceMappingConfiguration:
2037
        return self.update_event_source_mapping_v2(context, request)
1✔
2038

2039
    def update_event_source_mapping_v2(
1✔
2040
        self,
2041
        context: RequestContext,
2042
        request: UpdateEventSourceMappingRequest,
2043
    ) -> EventSourceMappingConfiguration:
2044
        # TODO: test and implement this properly (quite complex with many validations and limitations!)
2045
        LOG.warning(
1✔
2046
            "Updating Lambda Event Source Mapping is in experimental state and not yet fully tested."
2047
        )
2048
        state = lambda_stores[context.account_id][context.region]
1✔
2049
        request_data = {**request}
1✔
2050
        uuid = request_data.pop("UUID", None)
1✔
2051
        if not uuid:
1✔
UNCOV
2052
            raise ResourceNotFoundException(
×
2053
                "The resource you requested does not exist.", Type="User"
2054
            )
2055
        old_event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2056
        esm_worker = self.esm_workers.get(uuid)
1✔
2057
        if old_event_source_mapping is None or esm_worker is None:
1✔
2058
            raise ResourceNotFoundException(
1✔
2059
                "The resource you requested does not exist.", Type="User"
2060
            )  # TODO: test?
2061

2062
        # normalize values to overwrite
2063
        event_source_mapping = old_event_source_mapping | request_data
1✔
2064

2065
        temp_params = {}  # values only set for the returned response, not saved internally (e.g. transient state)
1✔
2066

2067
        # Validate the newly updated ESM object. We ignore the output here since we only care whether an Exception is raised.
2068
        function_arn, _, _ = self.validate_event_source_mapping(context, event_source_mapping)
1✔
2069

2070
        # remove the FunctionName field
2071
        event_source_mapping.pop("FunctionName", None)
1✔
2072

2073
        if function_arn:
1✔
2074
            event_source_mapping["FunctionArn"] = function_arn
1✔
2075

2076
        # Only apply update if the desired state differs
2077
        enabled = request.get("Enabled")
1✔
2078
        if enabled is not None:
1✔
2079
            if enabled and old_event_source_mapping["State"] != EsmState.ENABLED:
1✔
2080
                event_source_mapping["State"] = EsmState.ENABLING
1✔
2081
            # TODO: What happens when trying to update during an update or failed state?!
2082
            elif not enabled and old_event_source_mapping["State"] == EsmState.ENABLED:
1✔
2083
                event_source_mapping["State"] = EsmState.DISABLING
1✔
2084
        else:
2085
            event_source_mapping["State"] = EsmState.UPDATING
1✔
2086

2087
        # To ensure parity, certain responses need to be immediately returned
2088
        temp_params["State"] = event_source_mapping["State"]
1✔
2089

2090
        state.event_source_mappings[uuid] = event_source_mapping
1✔
2091

2092
        # TODO: Currently, we re-create the entire ESM worker. Look into approach with better performance.
2093
        function_version = get_function_version_from_arn(function_arn)
1✔
2094
        function_role = function_version.config.role
1✔
2095
        worker_factory = EsmWorkerFactory(
1✔
2096
            event_source_mapping, function_role, request.get("Enabled", esm_worker.enabled)
2097
        )
2098

2099
        # Get a new ESM worker object but do not active it, since the factory holds all logic for creating new worker from configuration.
2100
        updated_esm_worker = worker_factory.get_esm_worker()
1✔
2101
        self.esm_workers[uuid] = updated_esm_worker
1✔
2102

2103
        # We should stop() the worker since the delete() will remove the ESM from the state mapping.
2104
        esm_worker.stop()
1✔
2105
        # This will either create an EsmWorker in the CREATING state if enabled. Otherwise, the DISABLING state is set.
2106
        updated_esm_worker.create()
1✔
2107

2108
        return {**event_source_mapping, **temp_params}
1✔
2109

2110
    def delete_event_source_mapping(
1✔
2111
        self, context: RequestContext, uuid: String, **kwargs
2112
    ) -> EventSourceMappingConfiguration:
2113
        state = lambda_stores[context.account_id][context.region]
1✔
2114
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2115
        if not event_source_mapping:
1✔
2116
            raise ResourceNotFoundException(
1✔
2117
                "The resource you requested does not exist.", Type="User"
2118
            )
2119
        esm = state.event_source_mappings[uuid]
1✔
2120
        # TODO: add proper locking
2121
        esm_worker = self.esm_workers.pop(uuid, None)
1✔
2122
        # Asynchronous delete in v2
2123
        if not esm_worker:
1✔
UNCOV
2124
            raise ResourceNotFoundException(
×
2125
                "The resource you requested does not exist.", Type="User"
2126
            )
2127
        esm_worker.delete()
1✔
2128
        return {**esm, "State": EsmState.DELETING}
1✔
2129

2130
    def get_event_source_mapping(
1✔
2131
        self, context: RequestContext, uuid: String, **kwargs
2132
    ) -> EventSourceMappingConfiguration:
2133
        state = lambda_stores[context.account_id][context.region]
1✔
2134
        event_source_mapping = state.event_source_mappings.get(uuid)
1✔
2135
        if not event_source_mapping:
1✔
2136
            raise ResourceNotFoundException(
1✔
2137
                "The resource you requested does not exist.", Type="User"
2138
            )
2139
        esm_worker = self.esm_workers.get(uuid)
1✔
2140
        if not esm_worker:
1✔
UNCOV
2141
            raise ResourceNotFoundException(
×
2142
                "The resource you requested does not exist.", Type="User"
2143
            )
2144
        event_source_mapping["State"] = esm_worker.current_state
1✔
2145
        event_source_mapping["StateTransitionReason"] = esm_worker.state_transition_reason
1✔
2146
        return event_source_mapping
1✔
2147

2148
    def list_event_source_mappings(
1✔
2149
        self,
2150
        context: RequestContext,
2151
        event_source_arn: Arn = None,
2152
        function_name: FunctionName = None,
2153
        marker: String = None,
2154
        max_items: MaxListItems = None,
2155
        **kwargs,
2156
    ) -> ListEventSourceMappingsResponse:
2157
        state = lambda_stores[context.account_id][context.region]
1✔
2158

2159
        esms = state.event_source_mappings.values()
1✔
2160
        # TODO: update and test State and StateTransitionReason for ESM v2
2161

2162
        if event_source_arn:  # TODO: validate pattern
1✔
2163
            esms = [e for e in esms if e.get("EventSourceArn") == event_source_arn]
1✔
2164

2165
        if function_name:
1✔
2166
            esms = [e for e in esms if function_name in e["FunctionArn"]]
1✔
2167

2168
        esms = PaginatedList(esms)
1✔
2169
        page, token = esms.get_page(
1✔
2170
            lambda x: x["UUID"],
2171
            marker,
2172
            max_items,
2173
        )
2174
        return ListEventSourceMappingsResponse(EventSourceMappings=page, NextMarker=token)
1✔
2175

2176
    def get_source_type_from_request(self, request: dict[str, Any]) -> str:
1✔
UNCOV
2177
        if event_source_arn := request.get("EventSourceArn", ""):
×
UNCOV
2178
            service = extract_service_from_arn(event_source_arn)
×
UNCOV
2179
            if service == "sqs" and "fifo" in event_source_arn:
×
UNCOV
2180
                service = "sqs-fifo"
×
UNCOV
2181
            return service
×
UNCOV
2182
        elif request.get("SelfManagedEventSource"):
×
UNCOV
2183
            return "kafka"
×
2184

2185
    # =======================================
2186
    # ============ FUNCTION URLS ============
2187
    # =======================================
2188

2189
    @staticmethod
1✔
2190
    def _validate_qualifier(qualifier: str) -> None:
1✔
2191
        if qualifier == "$LATEST" or (qualifier and api_utils.qualifier_is_version(qualifier)):
1✔
2192
            raise ValidationException(
1✔
2193
                f"1 validation error detected: Value '{qualifier}' at 'qualifier' failed to satisfy constraint: Member must satisfy regular expression pattern: ((?!^\\d+$)^[0-9a-zA-Z-_]+$)"
2194
            )
2195

2196
    @staticmethod
1✔
2197
    def _validate_invoke_mode(invoke_mode: str) -> None:
1✔
2198
        if invoke_mode and invoke_mode not in [InvokeMode.BUFFERED, InvokeMode.RESPONSE_STREAM]:
1✔
2199
            raise ValidationException(
1✔
2200
                f"1 validation error detected: Value '{invoke_mode}' at 'invokeMode' failed to satisfy constraint: Member must satisfy enum value set: [RESPONSE_STREAM, BUFFERED]"
2201
            )
2202
        if invoke_mode == InvokeMode.RESPONSE_STREAM:
1✔
2203
            # TODO should we actually fail for setting RESPONSE_STREAM?
2204
            #  It should trigger InvokeWithResponseStream which is not implemented
2205
            LOG.warning(
1✔
2206
                "The invokeMode 'RESPONSE_STREAM' is not yet supported on LocalStack. The property is only mocked, the execution will still be 'BUFFERED'"
2207
            )
2208

2209
    # TODO: what happens if function state is not active?
2210
    def create_function_url_config(
1✔
2211
        self,
2212
        context: RequestContext,
2213
        function_name: FunctionName,
2214
        auth_type: FunctionUrlAuthType,
2215
        qualifier: FunctionUrlQualifier = None,
2216
        cors: Cors = None,
2217
        invoke_mode: InvokeMode = None,
2218
        **kwargs,
2219
    ) -> CreateFunctionUrlConfigResponse:
2220
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2221
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2222
            function_name, qualifier, context
2223
        )
2224
        state = lambda_stores[account_id][region]
1✔
2225
        self._validate_qualifier(qualifier)
1✔
2226
        self._validate_invoke_mode(invoke_mode)
1✔
2227

2228
        fn = state.functions.get(function_name)
1✔
2229
        if fn is None:
1✔
2230
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2231

2232
        url_config = fn.function_url_configs.get(qualifier or "$LATEST")
1✔
2233
        if url_config:
1✔
2234
            raise ResourceConflictException(
1✔
2235
                f"Failed to create function url config for [functionArn = {url_config.function_arn}]. Error message:  FunctionUrlConfig exists for this Lambda function",
2236
                Type="User",
2237
            )
2238

2239
        if qualifier and qualifier != "$LATEST" and qualifier not in fn.aliases:
1✔
2240
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2241

2242
        normalized_qualifier = qualifier or "$LATEST"
1✔
2243

2244
        function_arn = (
1✔
2245
            api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
2246
            if qualifier
2247
            else api_utils.unqualified_lambda_arn(function_name, account_id, region)
2248
        )
2249

2250
        custom_id: str | None = None
1✔
2251

2252
        tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region))
1✔
2253
        if TAG_KEY_CUSTOM_URL in tags:
1✔
2254
            # Note: I really wanted to add verification here that the
2255
            # url_id is unique, so we could surface that to the user ASAP.
2256
            # However, it seems like that information isn't available yet,
2257
            # since (as far as I can tell) we call
2258
            # self.router.register_routes() once, in a single shot, for all
2259
            # of the routes -- and we need to verify that it's unique not
2260
            # just for this particular lambda function, but for the entire
2261
            # lambda provider. Therefore... that idea proved non-trivial!
2262
            custom_id_tag_value = (
1✔
2263
                f"{tags[TAG_KEY_CUSTOM_URL]}-{qualifier}" if qualifier else tags[TAG_KEY_CUSTOM_URL]
2264
            )
2265
            if TAG_KEY_CUSTOM_URL_VALIDATOR.match(custom_id_tag_value):
1✔
2266
                custom_id = custom_id_tag_value
1✔
2267

2268
            else:
2269
                # Note: we're logging here instead of raising to prioritize
2270
                # strict parity with AWS over the localstack-only custom_id
2271
                LOG.warning(
1✔
2272
                    "Invalid custom ID tag value for lambda URL (%s=%s). "
2273
                    "Replaced with default (random id)",
2274
                    TAG_KEY_CUSTOM_URL,
2275
                    custom_id_tag_value,
2276
                )
2277

2278
        # The url_id is the subdomain used for the URL we're creating. This
2279
        # is either created randomly (as in AWS), or can be passed as a tag
2280
        # to the lambda itself (localstack-only).
2281
        url_id: str
2282
        if custom_id is None:
1✔
2283
            url_id = api_utils.generate_random_url_id()
1✔
2284
        else:
2285
            url_id = custom_id
1✔
2286

2287
        host_definition = localstack_host(custom_port=config.GATEWAY_LISTEN[0].port)
1✔
2288
        fn.function_url_configs[normalized_qualifier] = FunctionUrlConfig(
1✔
2289
            function_arn=function_arn,
2290
            function_name=function_name,
2291
            cors=cors,
2292
            url_id=url_id,
2293
            url=f"http://{url_id}.lambda-url.{context.region}.{host_definition.host_and_port()}/",  # TODO: https support
2294
            auth_type=auth_type,
2295
            creation_time=api_utils.generate_lambda_date(),
2296
            last_modified_time=api_utils.generate_lambda_date(),
2297
            invoke_mode=invoke_mode,
2298
        )
2299

2300
        # persist and start URL
2301
        # TODO: implement URL invoke
2302
        api_url_config = api_utils.map_function_url_config(
1✔
2303
            fn.function_url_configs[normalized_qualifier]
2304
        )
2305

2306
        return CreateFunctionUrlConfigResponse(
1✔
2307
            FunctionUrl=api_url_config["FunctionUrl"],
2308
            FunctionArn=api_url_config["FunctionArn"],
2309
            AuthType=api_url_config["AuthType"],
2310
            Cors=api_url_config["Cors"],
2311
            CreationTime=api_url_config["CreationTime"],
2312
            InvokeMode=api_url_config["InvokeMode"],
2313
        )
2314

2315
    def get_function_url_config(
1✔
2316
        self,
2317
        context: RequestContext,
2318
        function_name: FunctionName,
2319
        qualifier: FunctionUrlQualifier = None,
2320
        **kwargs,
2321
    ) -> GetFunctionUrlConfigResponse:
2322
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2323
        state = lambda_stores[account_id][region]
1✔
2324

2325
        fn_name, qualifier = api_utils.get_name_and_qualifier(function_name, qualifier, context)
1✔
2326

2327
        self._validate_qualifier(qualifier)
1✔
2328

2329
        resolved_fn = state.functions.get(fn_name)
1✔
2330
        if not resolved_fn:
1✔
2331
            raise ResourceNotFoundException(
1✔
2332
                "The resource you requested does not exist.", Type="User"
2333
            )
2334

2335
        qualifier = qualifier or "$LATEST"
1✔
2336
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2337
        if not url_config:
1✔
2338
            raise ResourceNotFoundException(
1✔
2339
                "The resource you requested does not exist.", Type="User"
2340
            )
2341

2342
        return api_utils.map_function_url_config(url_config)
1✔
2343

2344
    def update_function_url_config(
1✔
2345
        self,
2346
        context: RequestContext,
2347
        function_name: FunctionName,
2348
        qualifier: FunctionUrlQualifier = None,
2349
        auth_type: FunctionUrlAuthType = None,
2350
        cors: Cors = None,
2351
        invoke_mode: InvokeMode = None,
2352
        **kwargs,
2353
    ) -> UpdateFunctionUrlConfigResponse:
2354
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2355
        state = lambda_stores[account_id][region]
1✔
2356

2357
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2358
            function_name, qualifier, context
2359
        )
2360
        self._validate_qualifier(qualifier)
1✔
2361
        self._validate_invoke_mode(invoke_mode)
1✔
2362

2363
        fn = state.functions.get(function_name)
1✔
2364
        if not fn:
1✔
2365
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2366

2367
        normalized_qualifier = qualifier or "$LATEST"
1✔
2368

2369
        if (
1✔
2370
            api_utils.qualifier_is_alias(normalized_qualifier)
2371
            and normalized_qualifier not in fn.aliases
2372
        ):
2373
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2374

2375
        url_config = fn.function_url_configs.get(normalized_qualifier)
1✔
2376
        if not url_config:
1✔
2377
            raise ResourceNotFoundException(
1✔
2378
                "The resource you requested does not exist.", Type="User"
2379
            )
2380

2381
        changes = {
1✔
2382
            "last_modified_time": api_utils.generate_lambda_date(),
2383
            **({"cors": cors} if cors is not None else {}),
2384
            **({"auth_type": auth_type} if auth_type is not None else {}),
2385
        }
2386

2387
        if invoke_mode:
1✔
2388
            changes["invoke_mode"] = invoke_mode
1✔
2389

2390
        new_url_config = dataclasses.replace(url_config, **changes)
1✔
2391
        fn.function_url_configs[normalized_qualifier] = new_url_config
1✔
2392

2393
        return UpdateFunctionUrlConfigResponse(
1✔
2394
            FunctionUrl=new_url_config.url,
2395
            FunctionArn=new_url_config.function_arn,
2396
            AuthType=new_url_config.auth_type,
2397
            Cors=new_url_config.cors,
2398
            CreationTime=new_url_config.creation_time,
2399
            LastModifiedTime=new_url_config.last_modified_time,
2400
            InvokeMode=new_url_config.invoke_mode,
2401
        )
2402

2403
    def delete_function_url_config(
1✔
2404
        self,
2405
        context: RequestContext,
2406
        function_name: FunctionName,
2407
        qualifier: FunctionUrlQualifier = None,
2408
        **kwargs,
2409
    ) -> None:
2410
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2411
        state = lambda_stores[account_id][region]
1✔
2412

2413
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2414
            function_name, qualifier, context
2415
        )
2416
        self._validate_qualifier(qualifier)
1✔
2417

2418
        resolved_fn = state.functions.get(function_name)
1✔
2419
        if not resolved_fn:
1✔
2420
            raise ResourceNotFoundException(
1✔
2421
                "The resource you requested does not exist.", Type="User"
2422
            )
2423

2424
        qualifier = qualifier or "$LATEST"
1✔
2425
        url_config = resolved_fn.function_url_configs.get(qualifier)
1✔
2426
        if not url_config:
1✔
2427
            raise ResourceNotFoundException(
1✔
2428
                "The resource you requested does not exist.", Type="User"
2429
            )
2430

2431
        del resolved_fn.function_url_configs[qualifier]
1✔
2432

2433
    def list_function_url_configs(
1✔
2434
        self,
2435
        context: RequestContext,
2436
        function_name: FunctionName,
2437
        marker: String = None,
2438
        max_items: MaxItems = None,
2439
        **kwargs,
2440
    ) -> ListFunctionUrlConfigsResponse:
2441
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2442
        state = lambda_stores[account_id][region]
1✔
2443

2444
        fn_name = api_utils.get_function_name(function_name, context)
1✔
2445
        resolved_fn = state.functions.get(fn_name)
1✔
2446
        if not resolved_fn:
1✔
2447
            raise ResourceNotFoundException("Function does not exist", Type="User")
1✔
2448

2449
        url_configs = [
1✔
2450
            api_utils.map_function_url_config(fn_conf)
2451
            for fn_conf in resolved_fn.function_url_configs.values()
2452
        ]
2453
        url_configs = PaginatedList(url_configs)
1✔
2454
        page, token = url_configs.get_page(
1✔
2455
            lambda url_config: url_config["FunctionArn"],
2456
            marker,
2457
            max_items,
2458
        )
2459
        url_configs = page
1✔
2460
        return ListFunctionUrlConfigsResponse(FunctionUrlConfigs=url_configs, NextMarker=token)
1✔
2461

2462
    # =======================================
2463
    # ============  Permissions  ============
2464
    # =======================================
2465

2466
    @handler("AddPermission", expand=False)
1✔
2467
    def add_permission(
1✔
2468
        self,
2469
        context: RequestContext,
2470
        request: AddPermissionRequest,
2471
    ) -> AddPermissionResponse:
2472
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2473
            request.get("FunctionName"), request.get("Qualifier"), context
2474
        )
2475

2476
        # validate qualifier
2477
        if qualifier is not None:
1✔
2478
            self._validate_qualifier_expression(qualifier)
1✔
2479
            if qualifier == "$LATEST":
1✔
2480
                raise InvalidParameterValueException(
1✔
2481
                    "We currently do not support adding policies for $LATEST.", Type="User"
2482
                )
2483
        account_id, region = api_utils.get_account_and_region(request.get("FunctionName"), context)
1✔
2484

2485
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2486
        resolved_qualifier, fn_arn = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2487

2488
        revision_id = request.get("RevisionId")
1✔
2489
        if revision_id:
1✔
2490
            fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2491
            if revision_id != fn_revision_id:
1✔
2492
                raise PreconditionFailedException(
1✔
2493
                    "The Revision Id provided does not match the latest Revision Id. "
2494
                    "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2495
                    Type="User",
2496
                )
2497

2498
        request_sid = request["StatementId"]
1✔
2499
        if not bool(STATEMENT_ID_REGEX.match(request_sid)):
1✔
2500
            raise ValidationException(
1✔
2501
                f"1 validation error detected: Value '{request_sid}' at 'statementId' failed to satisfy constraint: Member must satisfy regular expression pattern: ([a-zA-Z0-9-_]+)"
2502
            )
2503
        # check for an already existing policy and any conflicts in existing statements
2504
        existing_policy = resolved_fn.permissions.get(resolved_qualifier)
1✔
2505
        if existing_policy:
1✔
2506
            if request_sid in [s["Sid"] for s in existing_policy.policy.Statement]:
1✔
2507
                # uniqueness scope: statement id needs to be unique per qualified function ($LATEST, version, or alias)
2508
                # Counterexample: the same sid can exist within $LATEST, version, and alias
2509
                raise ResourceConflictException(
1✔
2510
                    f"The statement id ({request_sid}) provided already exists. Please provide a new statement id, or remove the existing statement.",
2511
                    Type="User",
2512
                )
2513

2514
        permission_statement = api_utils.build_statement(
1✔
2515
            partition=context.partition,
2516
            resource_arn=fn_arn,
2517
            statement_id=request["StatementId"],
2518
            action=request["Action"],
2519
            principal=request["Principal"],
2520
            source_arn=request.get("SourceArn"),
2521
            source_account=request.get("SourceAccount"),
2522
            principal_org_id=request.get("PrincipalOrgID"),
2523
            event_source_token=request.get("EventSourceToken"),
2524
            auth_type=request.get("FunctionUrlAuthType"),
2525
        )
2526
        new_policy = existing_policy
1✔
2527
        if not existing_policy:
1✔
2528
            new_policy = FunctionResourcePolicy(
1✔
2529
                policy=ResourcePolicy(Version="2012-10-17", Id="default", Statement=[])
2530
            )
2531
        new_policy.policy.Statement.append(permission_statement)
1✔
2532
        if not existing_policy:
1✔
2533
            resolved_fn.permissions[resolved_qualifier] = new_policy
1✔
2534

2535
        # Update revision id of alias or version
2536
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2537
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2538
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2539
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2540
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
1✔
2541
        # Assumes that a non-alias is a version
2542
        else:
2543
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2544
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2545
                resolved_version, config=dataclasses.replace(resolved_version.config)
2546
            )
2547
        return AddPermissionResponse(Statement=json.dumps(permission_statement))
1✔
2548

2549
    def remove_permission(
1✔
2550
        self,
2551
        context: RequestContext,
2552
        function_name: FunctionName,
2553
        statement_id: NamespacedStatementId,
2554
        qualifier: Qualifier = None,
2555
        revision_id: String = None,
2556
        **kwargs,
2557
    ) -> None:
2558
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2559
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2560
            function_name, qualifier, context
2561
        )
2562
        if qualifier is not None:
1✔
2563
            self._validate_qualifier_expression(qualifier)
1✔
2564

2565
        state = lambda_stores[account_id][region]
1✔
2566
        resolved_fn = state.functions.get(function_name)
1✔
2567
        if resolved_fn is None:
1✔
2568
            fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2569
            raise ResourceNotFoundException(f"No policy found for: {fn_arn}", Type="User")
1✔
2570

2571
        resolved_qualifier, _ = self._resolve_fn_qualifier(resolved_fn, qualifier)
1✔
2572
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2573
        if not function_permission:
1✔
2574
            raise ResourceNotFoundException(
1✔
2575
                "No policy is associated with the given resource.", Type="User"
2576
            )
2577

2578
        # try to find statement in policy and delete it
2579
        statement = None
1✔
2580
        for s in function_permission.policy.Statement:
1✔
2581
            if s["Sid"] == statement_id:
1✔
2582
                statement = s
1✔
2583
                break
1✔
2584

2585
        if not statement:
1✔
2586
            raise ResourceNotFoundException(
1✔
2587
                f"Statement {statement_id} is not found in resource policy.", Type="User"
2588
            )
2589
        fn_revision_id = self._function_revision_id(resolved_fn, resolved_qualifier)
1✔
2590
        if revision_id and revision_id != fn_revision_id:
1✔
2591
            raise PreconditionFailedException(
1✔
2592
                "The Revision Id provided does not match the latest Revision Id. "
2593
                "Call the GetFunction/GetAlias API to retrieve the latest Revision Id",
2594
                Type="User",
2595
            )
2596
        function_permission.policy.Statement.remove(statement)
1✔
2597

2598
        # Update revision id for alias or version
2599
        # TODO: re-evaluate data model to prevent this dirty hack just for bumping the revision id
2600
        # TODO: does that need a `with function.lock` for atomic updates of the policy + revision_id?
2601
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
UNCOV
2602
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
×
UNCOV
2603
            resolved_fn.aliases[resolved_qualifier] = dataclasses.replace(resolved_alias)
×
2604
        # Assumes that a non-alias is a version
2605
        else:
2606
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2607
            resolved_fn.versions[resolved_qualifier] = dataclasses.replace(
1✔
2608
                resolved_version, config=dataclasses.replace(resolved_version.config)
2609
            )
2610

2611
        # remove the policy as a whole when there's no statement left in it
2612
        if len(function_permission.policy.Statement) == 0:
1✔
2613
            del resolved_fn.permissions[resolved_qualifier]
1✔
2614

2615
    def get_policy(
1✔
2616
        self,
2617
        context: RequestContext,
2618
        function_name: NamespacedFunctionName,
2619
        qualifier: Qualifier = None,
2620
        **kwargs,
2621
    ) -> GetPolicyResponse:
2622
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2623
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2624
            function_name, qualifier, context
2625
        )
2626

2627
        if qualifier is not None:
1✔
2628
            self._validate_qualifier_expression(qualifier)
1✔
2629

2630
        resolved_fn = self._get_function(function_name, account_id, region)
1✔
2631

2632
        resolved_qualifier = qualifier or "$LATEST"
1✔
2633
        function_permission = resolved_fn.permissions.get(resolved_qualifier)
1✔
2634
        if not function_permission:
1✔
2635
            raise ResourceNotFoundException(
1✔
2636
                "The resource you requested does not exist.", Type="User"
2637
            )
2638

2639
        fn_revision_id = None
1✔
2640
        if api_utils.qualifier_is_alias(resolved_qualifier):
1✔
2641
            resolved_alias = resolved_fn.aliases[resolved_qualifier]
1✔
2642
            fn_revision_id = resolved_alias.revision_id
1✔
2643
        # Assumes that a non-alias is a version
2644
        else:
2645
            resolved_version = resolved_fn.versions[resolved_qualifier]
1✔
2646
            fn_revision_id = resolved_version.config.revision_id
1✔
2647

2648
        return GetPolicyResponse(
1✔
2649
            Policy=json.dumps(dataclasses.asdict(function_permission.policy)),
2650
            RevisionId=fn_revision_id,
2651
        )
2652

2653
    # =======================================
2654
    # ========  Code signing config  ========
2655
    # =======================================
2656

2657
    def create_code_signing_config(
1✔
2658
        self,
2659
        context: RequestContext,
2660
        allowed_publishers: AllowedPublishers,
2661
        description: Description = None,
2662
        code_signing_policies: CodeSigningPolicies = None,
2663
        tags: Tags = None,
2664
        **kwargs,
2665
    ) -> CreateCodeSigningConfigResponse:
2666
        account = context.account_id
1✔
2667
        region = context.region
1✔
2668

2669
        state = lambda_stores[account][region]
1✔
2670
        # TODO: can there be duplicates?
2671
        csc_id = f"csc-{get_random_hex(17)}"  # e.g. 'csc-077c33b4c19e26036'
1✔
2672
        csc_arn = f"arn:{context.partition}:lambda:{region}:{account}:code-signing-config:{csc_id}"
1✔
2673
        csc = CodeSigningConfig(
1✔
2674
            csc_id=csc_id,
2675
            arn=csc_arn,
2676
            allowed_publishers=allowed_publishers,
2677
            policies=code_signing_policies,
2678
            last_modified=api_utils.generate_lambda_date(),
2679
            description=description,
2680
        )
2681
        state.code_signing_configs[csc_arn] = csc
1✔
2682
        return CreateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
2683

2684
    def put_function_code_signing_config(
1✔
2685
        self,
2686
        context: RequestContext,
2687
        code_signing_config_arn: CodeSigningConfigArn,
2688
        function_name: FunctionName,
2689
        **kwargs,
2690
    ) -> PutFunctionCodeSigningConfigResponse:
2691
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2692
        state = lambda_stores[account_id][region]
1✔
2693
        function_name = api_utils.get_function_name(function_name, context)
1✔
2694

2695
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2696
        if not csc:
1✔
2697
            raise CodeSigningConfigNotFoundException(
1✔
2698
                f"The code signing configuration cannot be found. Check that the provided configuration is not deleted: {code_signing_config_arn}.",
2699
                Type="User",
2700
            )
2701

2702
        fn = state.functions.get(function_name)
1✔
2703
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2704
        if not fn:
1✔
2705
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
2706

2707
        fn.code_signing_config_arn = code_signing_config_arn
1✔
2708
        return PutFunctionCodeSigningConfigResponse(
1✔
2709
            CodeSigningConfigArn=code_signing_config_arn, FunctionName=function_name
2710
        )
2711

2712
    def update_code_signing_config(
1✔
2713
        self,
2714
        context: RequestContext,
2715
        code_signing_config_arn: CodeSigningConfigArn,
2716
        description: Description = None,
2717
        allowed_publishers: AllowedPublishers = None,
2718
        code_signing_policies: CodeSigningPolicies = None,
2719
        **kwargs,
2720
    ) -> UpdateCodeSigningConfigResponse:
2721
        state = lambda_stores[context.account_id][context.region]
1✔
2722
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2723
        if not csc:
1✔
2724
            raise ResourceNotFoundException(
1✔
2725
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2726
            )
2727

2728
        changes = {
1✔
2729
            **(
2730
                {"allowed_publishers": allowed_publishers} if allowed_publishers is not None else {}
2731
            ),
2732
            **({"policies": code_signing_policies} if code_signing_policies is not None else {}),
2733
            **({"description": description} if description is not None else {}),
2734
        }
2735
        new_csc = dataclasses.replace(
1✔
2736
            csc, last_modified=api_utils.generate_lambda_date(), **changes
2737
        )
2738
        state.code_signing_configs[code_signing_config_arn] = new_csc
1✔
2739

2740
        return UpdateCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(new_csc))
1✔
2741

2742
    def get_code_signing_config(
1✔
2743
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
2744
    ) -> GetCodeSigningConfigResponse:
2745
        state = lambda_stores[context.account_id][context.region]
1✔
2746
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2747
        if not csc:
1✔
2748
            raise ResourceNotFoundException(
1✔
2749
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2750
            )
2751

2752
        return GetCodeSigningConfigResponse(CodeSigningConfig=api_utils.map_csc(csc))
1✔
2753

2754
    def get_function_code_signing_config(
1✔
2755
        self, context: RequestContext, function_name: FunctionName, **kwargs
2756
    ) -> GetFunctionCodeSigningConfigResponse:
2757
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2758
        state = lambda_stores[account_id][region]
1✔
2759
        function_name = api_utils.get_function_name(function_name, context)
1✔
2760
        fn = state.functions.get(function_name)
1✔
2761
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2762
        if not fn:
1✔
2763
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
2764

2765
        if fn.code_signing_config_arn:
1✔
2766
            return GetFunctionCodeSigningConfigResponse(
1✔
2767
                CodeSigningConfigArn=fn.code_signing_config_arn, FunctionName=function_name
2768
            )
2769

2770
        return GetFunctionCodeSigningConfigResponse()
1✔
2771

2772
    def delete_function_code_signing_config(
1✔
2773
        self, context: RequestContext, function_name: FunctionName, **kwargs
2774
    ) -> None:
2775
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2776
        state = lambda_stores[account_id][region]
1✔
2777
        function_name = api_utils.get_function_name(function_name, context)
1✔
2778
        fn = state.functions.get(function_name)
1✔
2779
        fn_arn = api_utils.unqualified_lambda_arn(function_name, account_id, region)
1✔
2780
        if not fn:
1✔
2781
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
2782

2783
        fn.code_signing_config_arn = None
1✔
2784

2785
    def delete_code_signing_config(
1✔
2786
        self, context: RequestContext, code_signing_config_arn: CodeSigningConfigArn, **kwargs
2787
    ) -> DeleteCodeSigningConfigResponse:
2788
        state = lambda_stores[context.account_id][context.region]
1✔
2789

2790
        csc = state.code_signing_configs.get(code_signing_config_arn)
1✔
2791
        if not csc:
1✔
2792
            raise ResourceNotFoundException(
1✔
2793
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2794
            )
2795

2796
        del state.code_signing_configs[code_signing_config_arn]
1✔
2797

2798
        return DeleteCodeSigningConfigResponse()
1✔
2799

2800
    def list_code_signing_configs(
1✔
2801
        self,
2802
        context: RequestContext,
2803
        marker: String = None,
2804
        max_items: MaxListItems = None,
2805
        **kwargs,
2806
    ) -> ListCodeSigningConfigsResponse:
2807
        state = lambda_stores[context.account_id][context.region]
1✔
2808

2809
        cscs = [api_utils.map_csc(csc) for csc in state.code_signing_configs.values()]
1✔
2810
        cscs = PaginatedList(cscs)
1✔
2811
        page, token = cscs.get_page(
1✔
2812
            lambda csc: csc["CodeSigningConfigId"],
2813
            marker,
2814
            max_items,
2815
        )
2816
        return ListCodeSigningConfigsResponse(CodeSigningConfigs=page, NextMarker=token)
1✔
2817

2818
    def list_functions_by_code_signing_config(
1✔
2819
        self,
2820
        context: RequestContext,
2821
        code_signing_config_arn: CodeSigningConfigArn,
2822
        marker: String = None,
2823
        max_items: MaxListItems = None,
2824
        **kwargs,
2825
    ) -> ListFunctionsByCodeSigningConfigResponse:
2826
        account = context.account_id
1✔
2827
        region = context.region
1✔
2828

2829
        state = lambda_stores[account][region]
1✔
2830

2831
        if code_signing_config_arn not in state.code_signing_configs:
1✔
2832
            raise ResourceNotFoundException(
1✔
2833
                f"The Lambda code signing configuration {code_signing_config_arn} can not be found."
2834
            )
2835

2836
        fn_arns = [
1✔
2837
            api_utils.unqualified_lambda_arn(fn.function_name, account, region)
2838
            for fn in state.functions.values()
2839
            if fn.code_signing_config_arn == code_signing_config_arn
2840
        ]
2841

2842
        cscs = PaginatedList(fn_arns)
1✔
2843
        page, token = cscs.get_page(
1✔
2844
            lambda x: x,
2845
            marker,
2846
            max_items,
2847
        )
2848
        return ListFunctionsByCodeSigningConfigResponse(FunctionArns=page, NextMarker=token)
1✔
2849

2850
    # =======================================
2851
    # =========  Account Settings   =========
2852
    # =======================================
2853

2854
    # CAVE: these settings & usages are *per* region!
2855
    # Lambda quotas: https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
2856
    def get_account_settings(self, context: RequestContext, **kwargs) -> GetAccountSettingsResponse:
1✔
2857
        state = lambda_stores[context.account_id][context.region]
1✔
2858

2859
        fn_count = 0
1✔
2860
        code_size_sum = 0
1✔
2861
        reserved_concurrency_sum = 0
1✔
2862
        for fn in state.functions.values():
1✔
2863
            fn_count += 1
1✔
2864
            for fn_version in fn.versions.values():
1✔
2865
                # Image-based Lambdas do not have a code attribute and count against the ECR quotas instead
2866
                if fn_version.config.package_type == PackageType.Zip:
1✔
2867
                    code_size_sum += fn_version.config.code.code_size
1✔
2868
            if fn.reserved_concurrent_executions is not None:
1✔
2869
                reserved_concurrency_sum += fn.reserved_concurrent_executions
1✔
2870
            for c in fn.provisioned_concurrency_configs.values():
1✔
2871
                reserved_concurrency_sum += c.provisioned_concurrent_executions
1✔
2872
        for layer in state.layers.values():
1✔
2873
            for layer_version in layer.layer_versions.values():
1✔
2874
                code_size_sum += layer_version.code.code_size
1✔
2875
        return GetAccountSettingsResponse(
1✔
2876
            AccountLimit=AccountLimit(
2877
                TotalCodeSize=config.LAMBDA_LIMITS_TOTAL_CODE_SIZE,
2878
                CodeSizeZipped=config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED,
2879
                CodeSizeUnzipped=config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED,
2880
                ConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS,
2881
                UnreservedConcurrentExecutions=config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS
2882
                - reserved_concurrency_sum,
2883
            ),
2884
            AccountUsage=AccountUsage(
2885
                TotalCodeSize=code_size_sum,
2886
                FunctionCount=fn_count,
2887
            ),
2888
        )
2889

2890
    # =======================================
2891
    # ==  Provisioned Concurrency Config   ==
2892
    # =======================================
2893

2894
    def _get_provisioned_config(
1✔
2895
        self, context: RequestContext, function_name: str, qualifier: str
2896
    ) -> ProvisionedConcurrencyConfiguration | None:
2897
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2898
        state = lambda_stores[account_id][region]
1✔
2899
        function_name = api_utils.get_function_name(function_name, context)
1✔
2900
        fn = state.functions.get(function_name)
1✔
2901
        if api_utils.qualifier_is_alias(qualifier):
1✔
2902
            fn_alias = None
1✔
2903
            if fn:
1✔
2904
                fn_alias = fn.aliases.get(qualifier)
1✔
2905
            if fn_alias is None:
1✔
2906
                raise ResourceNotFoundException(
1✔
2907
                    f"Cannot find alias arn: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
2908
                    Type="User",
2909
                )
2910
        elif api_utils.qualifier_is_version(qualifier):
1✔
2911
            fn_version = None
1✔
2912
            if fn:
1✔
2913
                fn_version = fn.versions.get(qualifier)
1✔
2914
            if fn_version is None:
1✔
2915
                raise ResourceNotFoundException(
1✔
2916
                    f"Function not found: {api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)}",
2917
                    Type="User",
2918
                )
2919

2920
        return fn.provisioned_concurrency_configs.get(qualifier)
1✔
2921

2922
    def put_provisioned_concurrency_config(
1✔
2923
        self,
2924
        context: RequestContext,
2925
        function_name: FunctionName,
2926
        qualifier: Qualifier,
2927
        provisioned_concurrent_executions: PositiveInteger,
2928
        **kwargs,
2929
    ) -> PutProvisionedConcurrencyConfigResponse:
2930
        if provisioned_concurrent_executions <= 0:
1✔
2931
            raise ValidationException(
1✔
2932
                f"1 validation error detected: Value '{provisioned_concurrent_executions}' at 'provisionedConcurrentExecutions' failed to satisfy constraint: Member must have value greater than or equal to 1"
2933
            )
2934

2935
        if qualifier == "$LATEST":
1✔
2936
            raise InvalidParameterValueException(
1✔
2937
                "Provisioned Concurrency Configs cannot be applied to unpublished function versions.",
2938
                Type="User",
2939
            )
2940
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
2941
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
2942
            function_name, qualifier, context
2943
        )
2944
        state = lambda_stores[account_id][region]
1✔
2945
        fn = state.functions.get(function_name)
1✔
2946

2947
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
2948

2949
        if provisioned_config:  # TODO: merge?
1✔
2950
            # TODO: add a test for partial updates (if possible)
2951
            LOG.warning(
1✔
2952
                "Partial update of provisioned concurrency config is currently not supported."
2953
            )
2954

2955
        other_provisioned_sum = sum(
1✔
2956
            [
2957
                provisioned_configs.provisioned_concurrent_executions
2958
                for provisioned_qualifier, provisioned_configs in fn.provisioned_concurrency_configs.items()
2959
                if provisioned_qualifier != qualifier
2960
            ]
2961
        )
2962

2963
        if (
1✔
2964
            fn.reserved_concurrent_executions is not None
2965
            and fn.reserved_concurrent_executions
2966
            < other_provisioned_sum + provisioned_concurrent_executions
2967
        ):
2968
            raise InvalidParameterValueException(
1✔
2969
                "Requested Provisioned Concurrency should not be greater than the reservedConcurrentExecution for function",
2970
                Type="User",
2971
            )
2972

2973
        if provisioned_concurrent_executions > config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS:
1✔
2974
            raise InvalidParameterValueException(
1✔
2975
                f"Specified ConcurrentExecutions for function is greater than account's unreserved concurrency"
2976
                f" [{config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS}]."
2977
            )
2978

2979
        settings = self.get_account_settings(context)
1✔
2980
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
2981
            "UnreservedConcurrentExecutions"
2982
        ]
2983
        if (
1✔
2984
            unreserved_concurrent_executions - provisioned_concurrent_executions
2985
            < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY
2986
        ):
2987
            raise InvalidParameterValueException(
1✔
2988
                f"Specified ConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below"
2989
                f" its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
2990
            )
2991

2992
        provisioned_config = ProvisionedConcurrencyConfiguration(
1✔
2993
            provisioned_concurrent_executions, api_utils.generate_lambda_date()
2994
        )
2995
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
2996

2997
        if api_utils.qualifier_is_alias(qualifier):
1✔
2998
            alias = fn.aliases.get(qualifier)
1✔
2999
            resolved_version = fn.versions.get(alias.function_version)
1✔
3000

3001
            if (
1✔
3002
                resolved_version
3003
                and fn.provisioned_concurrency_configs.get(alias.function_version) is not None
3004
            ):
3005
                raise ResourceConflictException(
1✔
3006
                    "Alias can't be used for Provisioned Concurrency configuration on an already Provisioned version",
3007
                    Type="User",
3008
                )
3009
            fn_arn = resolved_version.id.qualified_arn()
1✔
3010
        elif api_utils.qualifier_is_version(qualifier):
1✔
3011
            fn_version = fn.versions.get(qualifier)
1✔
3012

3013
            # TODO: might be useful other places, utilize
3014
            pointing_aliases = []
1✔
3015
            for alias in fn.aliases.values():
1✔
3016
                if (
1✔
3017
                    alias.function_version == qualifier
3018
                    and fn.provisioned_concurrency_configs.get(alias.name) is not None
3019
                ):
3020
                    pointing_aliases.append(alias.name)
1✔
3021
            if pointing_aliases:
1✔
3022
                raise ResourceConflictException(
1✔
3023
                    "Version is pointed by a Provisioned Concurrency alias", Type="User"
3024
                )
3025

3026
            fn_arn = fn_version.id.qualified_arn()
1✔
3027

3028
        manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3029

3030
        fn.provisioned_concurrency_configs[qualifier] = provisioned_config
1✔
3031

3032
        manager.update_provisioned_concurrency_config(
1✔
3033
            provisioned_config.provisioned_concurrent_executions
3034
        )
3035

3036
        return PutProvisionedConcurrencyConfigResponse(
1✔
3037
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3038
            AvailableProvisionedConcurrentExecutions=0,
3039
            AllocatedProvisionedConcurrentExecutions=0,
3040
            Status=ProvisionedConcurrencyStatusEnum.IN_PROGRESS,
3041
            # StatusReason=manager.provisioned_state.status_reason,
3042
            LastModified=provisioned_config.last_modified,  # TODO: does change with configuration or also with state changes?
3043
        )
3044

3045
    def get_provisioned_concurrency_config(
1✔
3046
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3047
    ) -> GetProvisionedConcurrencyConfigResponse:
3048
        if qualifier == "$LATEST":
1✔
3049
            raise InvalidParameterValueException(
1✔
3050
                "The function resource provided must be an alias or a published version.",
3051
                Type="User",
3052
            )
3053
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3054
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3055
            function_name, qualifier, context
3056
        )
3057

3058
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3059
        if not provisioned_config:
1✔
3060
            raise ProvisionedConcurrencyConfigNotFoundException(
1✔
3061
                "No Provisioned Concurrency Config found for this function", Type="User"
3062
            )
3063

3064
        # TODO: make this compatible with alias pointer migration on update
3065
        if api_utils.qualifier_is_alias(qualifier):
1✔
3066
            state = lambda_stores[account_id][region]
1✔
3067
            fn = state.functions.get(function_name)
1✔
3068
            alias = fn.aliases.get(qualifier)
1✔
3069
            fn_arn = api_utils.qualified_lambda_arn(
1✔
3070
                function_name, alias.function_version, account_id, region
3071
            )
3072
        else:
3073
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3074

3075
        ver_manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3076

3077
        return GetProvisionedConcurrencyConfigResponse(
1✔
3078
            RequestedProvisionedConcurrentExecutions=provisioned_config.provisioned_concurrent_executions,
3079
            LastModified=provisioned_config.last_modified,
3080
            AvailableProvisionedConcurrentExecutions=ver_manager.provisioned_state.available,
3081
            AllocatedProvisionedConcurrentExecutions=ver_manager.provisioned_state.allocated,
3082
            Status=ver_manager.provisioned_state.status,
3083
            StatusReason=ver_manager.provisioned_state.status_reason,
3084
        )
3085

3086
    def list_provisioned_concurrency_configs(
1✔
3087
        self,
3088
        context: RequestContext,
3089
        function_name: FunctionName,
3090
        marker: String = None,
3091
        max_items: MaxProvisionedConcurrencyConfigListItems = None,
3092
        **kwargs,
3093
    ) -> ListProvisionedConcurrencyConfigsResponse:
3094
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3095
        state = lambda_stores[account_id][region]
1✔
3096

3097
        function_name = api_utils.get_function_name(function_name, context)
1✔
3098
        fn = state.functions.get(function_name)
1✔
3099
        if fn is None:
1✔
3100
            raise ResourceNotFoundException(
1✔
3101
                f"Function not found: {api_utils.unqualified_lambda_arn(function_name, account_id, region)}",
3102
                Type="User",
3103
            )
3104

3105
        configs = []
1✔
3106
        for qualifier, pc_config in fn.provisioned_concurrency_configs.items():
1✔
UNCOV
3107
            if api_utils.qualifier_is_alias(qualifier):
×
UNCOV
3108
                alias = fn.aliases.get(qualifier)
×
UNCOV
3109
                fn_arn = api_utils.qualified_lambda_arn(
×
3110
                    function_name, alias.function_version, account_id, region
3111
                )
3112
            else:
UNCOV
3113
                fn_arn = api_utils.qualified_lambda_arn(
×
3114
                    function_name, qualifier, account_id, region
3115
                )
3116

UNCOV
3117
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
×
3118

UNCOV
3119
            configs.append(
×
3120
                ProvisionedConcurrencyConfigListItem(
3121
                    FunctionArn=api_utils.qualified_lambda_arn(
3122
                        function_name, qualifier, account_id, region
3123
                    ),
3124
                    RequestedProvisionedConcurrentExecutions=pc_config.provisioned_concurrent_executions,
3125
                    AvailableProvisionedConcurrentExecutions=manager.provisioned_state.available,
3126
                    AllocatedProvisionedConcurrentExecutions=manager.provisioned_state.allocated,
3127
                    Status=manager.provisioned_state.status,
3128
                    StatusReason=manager.provisioned_state.status_reason,
3129
                    LastModified=pc_config.last_modified,
3130
                )
3131
            )
3132

3133
        provisioned_concurrency_configs = configs
1✔
3134
        provisioned_concurrency_configs = PaginatedList(provisioned_concurrency_configs)
1✔
3135
        page, token = provisioned_concurrency_configs.get_page(
1✔
3136
            lambda x: x,
3137
            marker,
3138
            max_items,
3139
        )
3140
        return ListProvisionedConcurrencyConfigsResponse(
1✔
3141
            ProvisionedConcurrencyConfigs=page, NextMarker=token
3142
        )
3143

3144
    def delete_provisioned_concurrency_config(
1✔
3145
        self, context: RequestContext, function_name: FunctionName, qualifier: Qualifier, **kwargs
3146
    ) -> None:
3147
        if qualifier == "$LATEST":
1✔
3148
            raise InvalidParameterValueException(
1✔
3149
                "The function resource provided must be an alias or a published version.",
3150
                Type="User",
3151
            )
3152
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3153
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3154
            function_name, qualifier, context
3155
        )
3156
        state = lambda_stores[account_id][region]
1✔
3157
        fn = state.functions.get(function_name)
1✔
3158

3159
        provisioned_config = self._get_provisioned_config(context, function_name, qualifier)
1✔
3160
        # delete is idempotent and doesn't actually care about the provisioned concurrency config not existing
3161
        if provisioned_config:
1✔
3162
            fn.provisioned_concurrency_configs.pop(qualifier)
1✔
3163
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3164
            manager = self.lambda_service.get_lambda_version_manager(fn_arn)
1✔
3165
            manager.update_provisioned_concurrency_config(0)
1✔
3166

3167
    # =======================================
3168
    # =======  Event Invoke Config   ========
3169
    # =======================================
3170

3171
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)"
3172
    # "1 validation error detected: Value 'arn:aws:_-/!lambda:<region>:111111111111:function:<function-name:1>' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]2((-gov)|(-iso(b?)))?-[a-z]+-\\d1)?:(\\d12)?:(.*)" ... (expected → actual)
3173

3174
    def _validate_destination_config(
1✔
3175
        self, store: LambdaStore, function_name: str, destination_config: DestinationConfig
3176
    ):
3177
        def _validate_destination_arn(destination_arn) -> bool:
1✔
3178
            if not api_utils.DESTINATION_ARN_PATTERN.match(destination_arn):
1✔
3179
                # technically we shouldn't handle this in the provider
3180
                raise ValidationException(
1✔
3181
                    "1 validation error detected: Value '"
3182
                    + destination_arn
3183
                    + r"' at 'destinationConfig.onFailure.destination' failed to satisfy constraint: Member must satisfy regular expression pattern: ^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\-])+:([a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\d{1})?:(\d{12})?:(.*)"
3184
                )
3185

3186
            match destination_arn.split(":")[2]:
1✔
3187
                case "lambda":
1✔
3188
                    fn_parts = api_utils.FULL_FN_ARN_PATTERN.search(destination_arn).groupdict()
1✔
3189
                    if fn_parts:
1✔
3190
                        # check if it exists
3191
                        fn = store.functions.get(fn_parts["function_name"])
1✔
3192
                        if not fn:
1✔
3193
                            raise InvalidParameterValueException(
1✔
3194
                                f"The destination ARN {destination_arn} is invalid.", Type="User"
3195
                            )
3196
                        if fn_parts["function_name"] == function_name:
1✔
3197
                            raise InvalidParameterValueException(
1✔
3198
                                "You can't specify the function as a destination for itself.",
3199
                                Type="User",
3200
                            )
3201
                case "sns" | "sqs" | "events":
1✔
3202
                    pass
1✔
3203
                case _:
1✔
3204
                    return False
1✔
3205
            return True
1✔
3206

3207
        validation_err = False
1✔
3208

3209
        failure_destination = destination_config.get("OnFailure", {}).get("Destination")
1✔
3210
        if failure_destination:
1✔
3211
            validation_err = validation_err or not _validate_destination_arn(failure_destination)
1✔
3212

3213
        success_destination = destination_config.get("OnSuccess", {}).get("Destination")
1✔
3214
        if success_destination:
1✔
3215
            validation_err = validation_err or not _validate_destination_arn(success_destination)
1✔
3216

3217
        if validation_err:
1✔
3218
            on_success_part = (
1✔
3219
                f"OnSuccess(destination={success_destination})" if success_destination else "null"
3220
            )
3221
            on_failure_part = (
1✔
3222
                f"OnFailure(destination={failure_destination})" if failure_destination else "null"
3223
            )
3224
            raise InvalidParameterValueException(
1✔
3225
                f"The provided destination config DestinationConfig(onSuccess={on_success_part}, onFailure={on_failure_part}) is invalid.",
3226
                Type="User",
3227
            )
3228

3229
    def put_function_event_invoke_config(
1✔
3230
        self,
3231
        context: RequestContext,
3232
        function_name: FunctionName,
3233
        qualifier: Qualifier = None,
3234
        maximum_retry_attempts: MaximumRetryAttempts = None,
3235
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3236
        destination_config: DestinationConfig = None,
3237
        **kwargs,
3238
    ) -> FunctionEventInvokeConfig:
3239
        """
3240
        Destination ARNs can be:
3241
        * SQS arn
3242
        * SNS arn
3243
        * Lambda arn
3244
        * EventBridge arn
3245

3246
        Differences between put_ and update_:
3247
            * put overwrites any existing config
3248
            * update allows changes only single values while keeping the rest of existing ones
3249
            * update fails on non-existing configs
3250

3251
        Differences between destination and DLQ
3252
            * "However, a dead-letter queue is part of a function's version-specific configuration, so it is locked in when you publish a version."
3253
            *  "On-failure destinations also support additional targets and include details about the function's response in the invocation record."
3254

3255
        """
3256
        if (
1✔
3257
            maximum_event_age_in_seconds is None
3258
            and maximum_retry_attempts is None
3259
            and destination_config is None
3260
        ):
3261
            raise InvalidParameterValueException(
1✔
3262
                "You must specify at least one of error handling or destination setting.",
3263
                Type="User",
3264
            )
3265
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3266
        state = lambda_stores[account_id][region]
1✔
3267
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3268
            function_name, qualifier, context
3269
        )
3270
        fn = state.functions.get(function_name)
1✔
3271
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3272
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3273

3274
        qualifier = qualifier or "$LATEST"
1✔
3275

3276
        # validate and normalize destination config
3277
        if destination_config:
1✔
3278
            self._validate_destination_config(state, function_name, destination_config)
1✔
3279

3280
        destination_config = DestinationConfig(
1✔
3281
            OnSuccess=OnSuccess(
3282
                Destination=(destination_config or {}).get("OnSuccess", {}).get("Destination")
3283
            ),
3284
            OnFailure=OnFailure(
3285
                Destination=(destination_config or {}).get("OnFailure", {}).get("Destination")
3286
            ),
3287
        )
3288

3289
        config = EventInvokeConfig(
1✔
3290
            function_name=function_name,
3291
            qualifier=qualifier,
3292
            maximum_event_age_in_seconds=maximum_event_age_in_seconds,
3293
            maximum_retry_attempts=maximum_retry_attempts,
3294
            last_modified=api_utils.generate_lambda_date(),
3295
            destination_config=destination_config,
3296
        )
3297
        fn.event_invoke_configs[qualifier] = config
1✔
3298

3299
        return FunctionEventInvokeConfig(
1✔
3300
            LastModified=datetime.datetime.strptime(
3301
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3302
            ),
3303
            FunctionArn=api_utils.qualified_lambda_arn(
3304
                function_name, qualifier or "$LATEST", account_id, region
3305
            ),
3306
            DestinationConfig=destination_config,
3307
            MaximumEventAgeInSeconds=maximum_event_age_in_seconds,
3308
            MaximumRetryAttempts=maximum_retry_attempts,
3309
        )
3310

3311
    def get_function_event_invoke_config(
1✔
3312
        self,
3313
        context: RequestContext,
3314
        function_name: FunctionName,
3315
        qualifier: Qualifier = None,
3316
        **kwargs,
3317
    ) -> FunctionEventInvokeConfig:
3318
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3319
        state = lambda_stores[account_id][region]
1✔
3320
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3321
            function_name, qualifier, context
3322
        )
3323

3324
        qualifier = qualifier or "$LATEST"
1✔
3325
        fn = state.functions.get(function_name)
1✔
3326
        if not fn:
1✔
3327
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3328
            raise ResourceNotFoundException(
1✔
3329
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3330
            )
3331

3332
        config = fn.event_invoke_configs.get(qualifier)
1✔
3333
        if not config:
1✔
3334
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3335
            raise ResourceNotFoundException(
1✔
3336
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3337
            )
3338

3339
        return FunctionEventInvokeConfig(
1✔
3340
            LastModified=datetime.datetime.strptime(
3341
                config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3342
            ),
3343
            FunctionArn=api_utils.qualified_lambda_arn(
3344
                function_name, qualifier, account_id, region
3345
            ),
3346
            DestinationConfig=config.destination_config,
3347
            MaximumEventAgeInSeconds=config.maximum_event_age_in_seconds,
3348
            MaximumRetryAttempts=config.maximum_retry_attempts,
3349
        )
3350

3351
    def list_function_event_invoke_configs(
1✔
3352
        self,
3353
        context: RequestContext,
3354
        function_name: FunctionName,
3355
        marker: String = None,
3356
        max_items: MaxFunctionEventInvokeConfigListItems = None,
3357
        **kwargs,
3358
    ) -> ListFunctionEventInvokeConfigsResponse:
3359
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3360
        state = lambda_stores[account_id][region]
1✔
3361
        fn = state.functions.get(function_name)
1✔
3362
        if not fn:
1✔
3363
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3364

3365
        event_invoke_configs = [
1✔
3366
            FunctionEventInvokeConfig(
3367
                LastModified=c.last_modified,
3368
                FunctionArn=api_utils.qualified_lambda_arn(
3369
                    function_name, c.qualifier, account_id, region
3370
                ),
3371
                MaximumEventAgeInSeconds=c.maximum_event_age_in_seconds,
3372
                MaximumRetryAttempts=c.maximum_retry_attempts,
3373
                DestinationConfig=c.destination_config,
3374
            )
3375
            for c in fn.event_invoke_configs.values()
3376
        ]
3377

3378
        event_invoke_configs = PaginatedList(event_invoke_configs)
1✔
3379
        page, token = event_invoke_configs.get_page(
1✔
3380
            lambda x: x["FunctionArn"],
3381
            marker,
3382
            max_items,
3383
        )
3384
        return ListFunctionEventInvokeConfigsResponse(
1✔
3385
            FunctionEventInvokeConfigs=page, NextMarker=token
3386
        )
3387

3388
    def delete_function_event_invoke_config(
1✔
3389
        self,
3390
        context: RequestContext,
3391
        function_name: FunctionName,
3392
        qualifier: Qualifier = None,
3393
        **kwargs,
3394
    ) -> None:
3395
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3396
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3397
            function_name, qualifier, context
3398
        )
3399
        state = lambda_stores[account_id][region]
1✔
3400
        fn = state.functions.get(function_name)
1✔
3401
        resolved_qualifier = qualifier or "$LATEST"
1✔
3402
        fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3403
        if not fn:
1✔
3404
            raise ResourceNotFoundException(
1✔
3405
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3406
            )
3407

3408
        config = fn.event_invoke_configs.get(resolved_qualifier)
1✔
3409
        if not config:
1✔
3410
            raise ResourceNotFoundException(
1✔
3411
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3412
            )
3413

3414
        del fn.event_invoke_configs[resolved_qualifier]
1✔
3415

3416
    def update_function_event_invoke_config(
1✔
3417
        self,
3418
        context: RequestContext,
3419
        function_name: FunctionName,
3420
        qualifier: Qualifier = None,
3421
        maximum_retry_attempts: MaximumRetryAttempts = None,
3422
        maximum_event_age_in_seconds: MaximumEventAgeInSeconds = None,
3423
        destination_config: DestinationConfig = None,
3424
        **kwargs,
3425
    ) -> FunctionEventInvokeConfig:
3426
        # like put but only update single fields via replace
3427
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3428
        state = lambda_stores[account_id][region]
1✔
3429
        function_name, qualifier = api_utils.get_name_and_qualifier(
1✔
3430
            function_name, qualifier, context
3431
        )
3432

3433
        if (
1✔
3434
            maximum_event_age_in_seconds is None
3435
            and maximum_retry_attempts is None
3436
            and destination_config is None
3437
        ):
UNCOV
3438
            raise InvalidParameterValueException(
×
3439
                "You must specify at least one of error handling or destination setting.",
3440
                Type="User",
3441
            )
3442

3443
        fn = state.functions.get(function_name)
1✔
3444
        if not fn or (qualifier and not (qualifier in fn.aliases or qualifier in fn.versions)):
1✔
3445
            raise ResourceNotFoundException("The function doesn't exist.", Type="User")
1✔
3446

3447
        qualifier = qualifier or "$LATEST"
1✔
3448

3449
        config = fn.event_invoke_configs.get(qualifier)
1✔
3450
        if not config:
1✔
3451
            fn_arn = api_utils.qualified_lambda_arn(function_name, qualifier, account_id, region)
1✔
3452
            raise ResourceNotFoundException(
1✔
3453
                f"The function {fn_arn} doesn't have an EventInvokeConfig", Type="User"
3454
            )
3455

3456
        if destination_config:
1✔
UNCOV
3457
            self._validate_destination_config(state, function_name, destination_config)
×
3458

3459
        optional_kwargs = {
1✔
3460
            k: v
3461
            for k, v in {
3462
                "destination_config": destination_config,
3463
                "maximum_retry_attempts": maximum_retry_attempts,
3464
                "maximum_event_age_in_seconds": maximum_event_age_in_seconds,
3465
            }.items()
3466
            if v is not None
3467
        }
3468

3469
        new_config = dataclasses.replace(
1✔
3470
            config, last_modified=api_utils.generate_lambda_date(), **optional_kwargs
3471
        )
3472
        fn.event_invoke_configs[qualifier] = new_config
1✔
3473

3474
        return FunctionEventInvokeConfig(
1✔
3475
            LastModified=datetime.datetime.strptime(
3476
                new_config.last_modified, api_utils.LAMBDA_DATE_FORMAT
3477
            ),
3478
            FunctionArn=api_utils.qualified_lambda_arn(
3479
                function_name, qualifier or "$LATEST", account_id, region
3480
            ),
3481
            DestinationConfig=new_config.destination_config,
3482
            MaximumEventAgeInSeconds=new_config.maximum_event_age_in_seconds,
3483
            MaximumRetryAttempts=new_config.maximum_retry_attempts,
3484
        )
3485

3486
    # =======================================
3487
    # ======  Layer & Layer Versions  =======
3488
    # =======================================
3489

3490
    @staticmethod
1✔
3491
    def _resolve_layer(
1✔
3492
        layer_name_or_arn: str, context: RequestContext
3493
    ) -> Tuple[str, str, str, Optional[str]]:
3494
        """
3495
        Return locator attributes for a given Lambda layer.
3496

3497
        :param layer_name_or_arn: Layer name or ARN
3498
        :param context: Request context
3499
        :return: Tuple of region, account ID, layer name, layer version
3500
        """
3501
        if api_utils.is_layer_arn(layer_name_or_arn):
1✔
3502
            return api_utils.parse_layer_arn(layer_name_or_arn)
1✔
3503

3504
        return context.region, context.account_id, layer_name_or_arn, None
1✔
3505

3506
    def publish_layer_version(
1✔
3507
        self,
3508
        context: RequestContext,
3509
        layer_name: LayerName,
3510
        content: LayerVersionContentInput,
3511
        description: Description = None,
3512
        compatible_runtimes: CompatibleRuntimes = None,
3513
        license_info: LicenseInfo = None,
3514
        compatible_architectures: CompatibleArchitectures = None,
3515
        **kwargs,
3516
    ) -> PublishLayerVersionResponse:
3517
        """
3518
        On first use of a LayerName a new layer is created and for each subsequent call with the same LayerName a new version is created.
3519
        Note that there are no $LATEST versions with layers!
3520

3521
        """
3522
        account = context.account_id
1✔
3523
        region = context.region
1✔
3524

3525
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
3526
            compatible_runtimes, compatible_architectures
3527
        )
3528
        if validation_errors:
1✔
3529
            raise ValidationException(
1✔
3530
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {'; '.join(validation_errors)}"
3531
            )
3532

3533
        state = lambda_stores[account][region]
1✔
3534
        with self.create_layer_lock:
1✔
3535
            if layer_name not in state.layers:
1✔
3536
                # we don't have a version so create new layer object
3537
                # lock is required to avoid creating two v1 objects for the same name
3538
                layer = Layer(
1✔
3539
                    arn=api_utils.layer_arn(layer_name=layer_name, account=account, region=region)
3540
                )
3541
                state.layers[layer_name] = layer
1✔
3542

3543
        layer = state.layers[layer_name]
1✔
3544
        with layer.next_version_lock:
1✔
3545
            next_version = LambdaLayerVersionIdentifier(
1✔
3546
                account_id=account, region=region, layer_name=layer_name
3547
            ).generate(next_version=layer.next_version)
3548
            # When creating a layer with user defined layer version, it is possible that we
3549
            # create layer versions out of order.
3550
            # ie. a user could replicate layer v2 then layer v1. It is important to always keep the maximum possible
3551
            # value for next layer to avoid overwriting existing versions
3552
            if layer.next_version <= next_version:
1✔
3553
                # We don't need to update layer.next_version if the created version is lower than the "next in line"
3554
                layer.next_version = max(next_version, layer.next_version) + 1
1✔
3555

3556
        # creating a new layer
3557
        if content.get("ZipFile"):
1✔
3558
            code = store_lambda_archive(
1✔
3559
                archive_file=content["ZipFile"],
3560
                function_name=layer_name,
3561
                region_name=region,
3562
                account_id=account,
3563
            )
3564
        else:
3565
            code = store_s3_bucket_archive(
1✔
3566
                archive_bucket=content["S3Bucket"],
3567
                archive_key=content["S3Key"],
3568
                archive_version=content.get("S3ObjectVersion"),
3569
                function_name=layer_name,
3570
                region_name=region,
3571
                account_id=account,
3572
            )
3573

3574
        new_layer_version = LayerVersion(
1✔
3575
            layer_version_arn=api_utils.layer_version_arn(
3576
                layer_name=layer_name,
3577
                account=account,
3578
                region=region,
3579
                version=str(next_version),
3580
            ),
3581
            layer_arn=layer.arn,
3582
            version=next_version,
3583
            description=description or "",
3584
            license_info=license_info,
3585
            compatible_runtimes=compatible_runtimes,
3586
            compatible_architectures=compatible_architectures,
3587
            created=api_utils.generate_lambda_date(),
3588
            code=code,
3589
        )
3590

3591
        layer.layer_versions[str(next_version)] = new_layer_version
1✔
3592

3593
        return api_utils.map_layer_out(new_layer_version)
1✔
3594

3595
    def get_layer_version(
1✔
3596
        self,
3597
        context: RequestContext,
3598
        layer_name: LayerName,
3599
        version_number: LayerVersionNumber,
3600
        **kwargs,
3601
    ) -> GetLayerVersionResponse:
3602
        # TODO: handle layer_name as an ARN
3603

3604
        region_name, account_id, layer_name, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
3605
        state = lambda_stores[account_id][region_name]
1✔
3606

3607
        layer = state.layers.get(layer_name)
1✔
3608
        if version_number < 1:
1✔
3609
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
3610
        if layer is None:
1✔
3611
            raise ResourceNotFoundException(
1✔
3612
                "The resource you requested does not exist.", Type="User"
3613
            )
3614
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3615
        if layer_version is None:
1✔
3616
            raise ResourceNotFoundException(
1✔
3617
                "The resource you requested does not exist.", Type="User"
3618
            )
3619
        return api_utils.map_layer_out(layer_version)
1✔
3620

3621
    def get_layer_version_by_arn(
1✔
3622
        self, context: RequestContext, arn: LayerVersionArn, **kwargs
3623
    ) -> GetLayerVersionResponse:
3624
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3625
            arn, context
3626
        )
3627

3628
        if not layer_version:
1✔
3629
            raise ValidationException(
1✔
3630
                f"1 validation error detected: Value '{arn}' at 'arn' failed to satisfy constraint: Member must satisfy regular expression pattern: "
3631
                + "(arn:(aws[a-zA-Z-]*)?:lambda:[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:layer:[a-zA-Z0-9-_]+:[0-9]+)|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+)"
3632
            )
3633

3634
        store = lambda_stores[account_id][region_name]
1✔
3635
        if not (layers := store.layers.get(layer_name)):
1✔
UNCOV
3636
            raise ResourceNotFoundException(
×
3637
                "The resource you requested does not exist.", Type="User"
3638
            )
3639

3640
        layer_version = layers.layer_versions.get(layer_version)
1✔
3641

3642
        if not layer_version:
1✔
3643
            raise ResourceNotFoundException(
1✔
3644
                "The resource you requested does not exist.", Type="User"
3645
            )
3646

3647
        return api_utils.map_layer_out(layer_version)
1✔
3648

3649
    def list_layers(
1✔
3650
        self,
3651
        context: RequestContext,
3652
        compatible_runtime: Runtime = None,
3653
        marker: String = None,
3654
        max_items: MaxLayerListItems = None,
3655
        compatible_architecture: Architecture = None,
3656
        **kwargs,
3657
    ) -> ListLayersResponse:
3658
        validation_errors = []
1✔
3659

3660
        validation_error_arch = api_utils.validate_layer_architecture(compatible_architecture)
1✔
3661
        if validation_error_arch:
1✔
3662
            validation_errors.append(validation_error_arch)
1✔
3663

3664
        validation_error_runtime = api_utils.validate_layer_runtime(compatible_runtime)
1✔
3665
        if validation_error_runtime:
1✔
3666
            validation_errors.append(validation_error_runtime)
1✔
3667

3668
        if validation_errors:
1✔
3669
            raise ValidationException(
1✔
3670
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
3671
            )
3672
        # TODO: handle filter: compatible_runtime
3673
        # TODO: handle filter: compatible_architecture
3674

UNCOV
3675
        state = lambda_stores[context.account_id][context.region]
×
UNCOV
3676
        layers = state.layers
×
3677

3678
        # TODO: test how filters behave together with only returning layers here? Does it return the latest "matching" layer, i.e. does it ignore later layer versions that don't match?
3679

3680
        responses: list[LayersListItem] = []
×
3681
        for layer_name, layer in layers.items():
×
3682
            # fetch latest version
UNCOV
3683
            layer_versions = list(layer.layer_versions.values())
×
UNCOV
3684
            sorted(layer_versions, key=lambda x: x.version)
×
UNCOV
3685
            latest_layer_version = layer_versions[-1]
×
UNCOV
3686
            responses.append(
×
3687
                LayersListItem(
3688
                    LayerName=layer_name,
3689
                    LayerArn=layer.arn,
3690
                    LatestMatchingVersion=api_utils.map_layer_out(latest_layer_version),
3691
                )
3692
            )
3693

UNCOV
3694
        responses = PaginatedList(responses)
×
UNCOV
3695
        page, token = responses.get_page(
×
3696
            lambda version: version,
3697
            marker,
3698
            max_items,
3699
        )
3700

UNCOV
3701
        return ListLayersResponse(NextMarker=token, Layers=page)
×
3702

3703
    def list_layer_versions(
1✔
3704
        self,
3705
        context: RequestContext,
3706
        layer_name: LayerName,
3707
        compatible_runtime: Runtime = None,
3708
        marker: String = None,
3709
        max_items: MaxLayerListItems = None,
3710
        compatible_architecture: Architecture = None,
3711
        **kwargs,
3712
    ) -> ListLayerVersionsResponse:
3713
        validation_errors = api_utils.validate_layer_runtimes_and_architectures(
1✔
3714
            [compatible_runtime] if compatible_runtime else [],
3715
            [compatible_architecture] if compatible_architecture else [],
3716
        )
3717
        if validation_errors:
1✔
UNCOV
3718
            raise ValidationException(
×
3719
                f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {';'.join(validation_errors)}"
3720
            )
3721

3722
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3723
            layer_name, context
3724
        )
3725
        state = lambda_stores[account_id][region_name]
1✔
3726

3727
        # TODO: Test & handle filter: compatible_runtime
3728
        # TODO: Test & handle filter: compatible_architecture
3729
        all_layer_versions = []
1✔
3730
        layer = state.layers.get(layer_name)
1✔
3731
        if layer is not None:
1✔
3732
            for layer_version in layer.layer_versions.values():
1✔
3733
                all_layer_versions.append(api_utils.map_layer_out(layer_version))
1✔
3734

3735
        all_layer_versions.sort(key=lambda x: x["Version"], reverse=True)
1✔
3736
        all_layer_versions = PaginatedList(all_layer_versions)
1✔
3737
        page, token = all_layer_versions.get_page(
1✔
3738
            lambda version: version["LayerVersionArn"],
3739
            marker,
3740
            max_items,
3741
        )
3742
        return ListLayerVersionsResponse(NextMarker=token, LayerVersions=page)
1✔
3743

3744
    def delete_layer_version(
1✔
3745
        self,
3746
        context: RequestContext,
3747
        layer_name: LayerName,
3748
        version_number: LayerVersionNumber,
3749
        **kwargs,
3750
    ) -> None:
3751
        if version_number < 1:
1✔
3752
            raise InvalidParameterValueException("Layer Version Cannot be less than 1", Type="User")
1✔
3753

3754
        region_name, account_id, layer_name, layer_version = LambdaProvider._resolve_layer(
1✔
3755
            layer_name, context
3756
        )
3757

3758
        store = lambda_stores[account_id][region_name]
1✔
3759
        layer = store.layers.get(layer_name, {})
1✔
3760
        if layer:
1✔
3761
            layer.layer_versions.pop(str(version_number), None)
1✔
3762

3763
    # =======================================
3764
    # =====  Layer Version Permissions  =====
3765
    # =======================================
3766
    # TODO: lock updates that change revision IDs
3767

3768
    def add_layer_version_permission(
1✔
3769
        self,
3770
        context: RequestContext,
3771
        layer_name: LayerName,
3772
        version_number: LayerVersionNumber,
3773
        statement_id: StatementId,
3774
        action: LayerPermissionAllowedAction,
3775
        principal: LayerPermissionAllowedPrincipal,
3776
        organization_id: OrganizationId = None,
3777
        revision_id: String = None,
3778
        **kwargs,
3779
    ) -> AddLayerVersionPermissionResponse:
3780
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
3781
        # `layer_n` contains the layer name.
3782
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
3783

3784
        if action != "lambda:GetLayerVersion":
1✔
3785
            raise ValidationException(
1✔
3786
                f"1 validation error detected: Value '{action}' at 'action' failed to satisfy constraint: Member must satisfy regular expression pattern: lambda:GetLayerVersion"
3787
            )
3788

3789
        store = lambda_stores[account_id][region_name]
1✔
3790
        layer = store.layers.get(layer_n)
1✔
3791

3792
        layer_version_arn = api_utils.layer_version_arn(
1✔
3793
            layer_name, account_id, region_name, str(version_number)
3794
        )
3795

3796
        if layer is None:
1✔
3797
            raise ResourceNotFoundException(
1✔
3798
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3799
            )
3800
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3801
        if layer_version is None:
1✔
3802
            raise ResourceNotFoundException(
1✔
3803
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3804
            )
3805
        # do we have a policy? if not set one
3806
        if layer_version.policy is None:
1✔
3807
            layer_version.policy = LayerPolicy()
1✔
3808

3809
        if statement_id in layer_version.policy.statements:
1✔
3810
            raise ResourceConflictException(
1✔
3811
                f"The statement id ({statement_id}) provided already exists. Please provide a new statement id, or remove the existing statement.",
3812
                Type="User",
3813
            )
3814

3815
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
3816
            raise PreconditionFailedException(
1✔
3817
                "The Revision Id provided does not match the latest Revision Id. "
3818
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
3819
                Type="User",
3820
            )
3821

3822
        statement = LayerPolicyStatement(
1✔
3823
            sid=statement_id, action=action, principal=principal, organization_id=organization_id
3824
        )
3825

3826
        old_statements = layer_version.policy.statements
1✔
3827
        layer_version.policy = dataclasses.replace(
1✔
3828
            layer_version.policy, statements={**old_statements, statement_id: statement}
3829
        )
3830

3831
        return AddLayerVersionPermissionResponse(
1✔
3832
            Statement=json.dumps(
3833
                {
3834
                    "Sid": statement.sid,
3835
                    "Effect": "Allow",
3836
                    "Principal": statement.principal,
3837
                    "Action": statement.action,
3838
                    "Resource": layer_version.layer_version_arn,
3839
                }
3840
            ),
3841
            RevisionId=layer_version.policy.revision_id,
3842
        )
3843

3844
    def remove_layer_version_permission(
1✔
3845
        self,
3846
        context: RequestContext,
3847
        layer_name: LayerName,
3848
        version_number: LayerVersionNumber,
3849
        statement_id: StatementId,
3850
        revision_id: String = None,
3851
        **kwargs,
3852
    ) -> None:
3853
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
3854
        # `layer_n` contains the layer name.
3855
        region_name, account_id, layer_n, layer_version = LambdaProvider._resolve_layer(
1✔
3856
            layer_name, context
3857
        )
3858

3859
        layer_version_arn = api_utils.layer_version_arn(
1✔
3860
            layer_name, account_id, region_name, str(version_number)
3861
        )
3862

3863
        state = lambda_stores[account_id][region_name]
1✔
3864
        layer = state.layers.get(layer_n)
1✔
3865
        if layer is None:
1✔
3866
            raise ResourceNotFoundException(
1✔
3867
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3868
            )
3869
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3870
        if layer_version is None:
1✔
3871
            raise ResourceNotFoundException(
1✔
3872
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3873
            )
3874

3875
        if revision_id and layer_version.policy.revision_id != revision_id:
1✔
3876
            raise PreconditionFailedException(
1✔
3877
                "The Revision Id provided does not match the latest Revision Id. "
3878
                "Call the GetLayerPolicy API to retrieve the latest Revision Id",
3879
                Type="User",
3880
            )
3881

3882
        if statement_id not in layer_version.policy.statements:
1✔
3883
            raise ResourceNotFoundException(
1✔
3884
                f"Statement {statement_id} is not found in resource policy.", Type="User"
3885
            )
3886

3887
        old_statements = layer_version.policy.statements
1✔
3888
        layer_version.policy = dataclasses.replace(
1✔
3889
            layer_version.policy,
3890
            statements={k: v for k, v in old_statements.items() if k != statement_id},
3891
        )
3892

3893
    def get_layer_version_policy(
1✔
3894
        self,
3895
        context: RequestContext,
3896
        layer_name: LayerName,
3897
        version_number: LayerVersionNumber,
3898
        **kwargs,
3899
    ) -> GetLayerVersionPolicyResponse:
3900
        # `layer_name` can either be layer name or ARN. It is used to generate error messages.
3901
        # `layer_n` contains the layer name.
3902
        region_name, account_id, layer_n, _ = LambdaProvider._resolve_layer(layer_name, context)
1✔
3903

3904
        layer_version_arn = api_utils.layer_version_arn(
1✔
3905
            layer_name, account_id, region_name, str(version_number)
3906
        )
3907

3908
        store = lambda_stores[account_id][region_name]
1✔
3909
        layer = store.layers.get(layer_n)
1✔
3910

3911
        if layer is None:
1✔
3912
            raise ResourceNotFoundException(
1✔
3913
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3914
            )
3915

3916
        layer_version = layer.layer_versions.get(str(version_number))
1✔
3917
        if layer_version is None:
1✔
3918
            raise ResourceNotFoundException(
1✔
3919
                f"Layer version {layer_version_arn} does not exist.", Type="User"
3920
            )
3921

3922
        if layer_version.policy is None:
1✔
3923
            raise ResourceNotFoundException(
1✔
3924
                "No policy is associated with the given resource.", Type="User"
3925
            )
3926

3927
        return GetLayerVersionPolicyResponse(
1✔
3928
            Policy=json.dumps(
3929
                {
3930
                    "Version": layer_version.policy.version,
3931
                    "Id": layer_version.policy.id,
3932
                    "Statement": [
3933
                        {
3934
                            "Sid": ps.sid,
3935
                            "Effect": "Allow",
3936
                            "Principal": ps.principal,
3937
                            "Action": ps.action,
3938
                            "Resource": layer_version.layer_version_arn,
3939
                        }
3940
                        for ps in layer_version.policy.statements.values()
3941
                    ],
3942
                }
3943
            ),
3944
            RevisionId=layer_version.policy.revision_id,
3945
        )
3946

3947
    # =======================================
3948
    # =======  Function Concurrency  ========
3949
    # =======================================
3950
    # (Reserved) function concurrency is scoped to the whole function
3951

3952
    def get_function_concurrency(
1✔
3953
        self, context: RequestContext, function_name: FunctionName, **kwargs
3954
    ) -> GetFunctionConcurrencyResponse:
3955
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3956
        function_name = api_utils.get_function_name(function_name, context)
1✔
3957
        fn = self._get_function(function_name=function_name, region=region, account_id=account_id)
1✔
3958
        return GetFunctionConcurrencyResponse(
1✔
3959
            ReservedConcurrentExecutions=fn.reserved_concurrent_executions
3960
        )
3961

3962
    def put_function_concurrency(
1✔
3963
        self,
3964
        context: RequestContext,
3965
        function_name: FunctionName,
3966
        reserved_concurrent_executions: ReservedConcurrentExecutions,
3967
        **kwargs,
3968
    ) -> Concurrency:
3969
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
3970

3971
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
3972
        if qualifier:
1✔
3973
            raise InvalidParameterValueException(
1✔
3974
                "This operation is permitted on Lambda functions only. Aliases and versions do not support this operation. Please specify either a function name or an unqualified function ARN.",
3975
                Type="User",
3976
            )
3977

3978
        store = lambda_stores[account_id][region]
1✔
3979
        fn = store.functions.get(function_name)
1✔
3980
        if not fn:
1✔
3981
            fn_arn = api_utils.qualified_lambda_arn(
1✔
3982
                function_name,
3983
                qualifier="$LATEST",
3984
                account=account_id,
3985
                region=region,
3986
            )
3987
            raise ResourceNotFoundException(f"Function not found: {fn_arn}", Type="User")
1✔
3988

3989
        settings = self.get_account_settings(context)
1✔
3990
        unreserved_concurrent_executions = settings["AccountLimit"][
1✔
3991
            "UnreservedConcurrentExecutions"
3992
        ]
3993

3994
        # The existing reserved concurrent executions for the same function are already deduced in
3995
        # unreserved_concurrent_executions but must not count because the new one will replace the existing one.
3996
        # Joel tested this behavior manually against AWS (2023-11-28).
3997
        existing_reserved_concurrent_executions = (
1✔
3998
            fn.reserved_concurrent_executions if fn.reserved_concurrent_executions else 0
3999
        )
4000
        if (
1✔
4001
            unreserved_concurrent_executions
4002
            - reserved_concurrent_executions
4003
            + existing_reserved_concurrent_executions
4004
        ) < config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY:
4005
            raise InvalidParameterValueException(
1✔
4006
                f"Specified ReservedConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below its minimum value of [{config.LAMBDA_LIMITS_MINIMUM_UNRESERVED_CONCURRENCY}]."
4007
            )
4008

4009
        total_provisioned_concurrency = sum(
1✔
4010
            [
4011
                provisioned_configs.provisioned_concurrent_executions
4012
                for provisioned_configs in fn.provisioned_concurrency_configs.values()
4013
            ]
4014
        )
4015
        if total_provisioned_concurrency > reserved_concurrent_executions:
1✔
4016
            raise InvalidParameterValueException(
1✔
4017
                f" ReservedConcurrentExecutions  {reserved_concurrent_executions} should not be lower than function's total provisioned concurrency [{total_provisioned_concurrency}]."
4018
            )
4019

4020
        fn.reserved_concurrent_executions = reserved_concurrent_executions
1✔
4021

4022
        return Concurrency(ReservedConcurrentExecutions=fn.reserved_concurrent_executions)
1✔
4023

4024
    def delete_function_concurrency(
1✔
4025
        self, context: RequestContext, function_name: FunctionName, **kwargs
4026
    ) -> None:
4027
        account_id, region = api_utils.get_account_and_region(function_name, context)
1✔
4028
        function_name, qualifier = api_utils.get_name_and_qualifier(function_name, None, context)
1✔
4029
        store = lambda_stores[account_id][region]
1✔
4030
        fn = store.functions.get(function_name)
1✔
4031
        fn.reserved_concurrent_executions = None
1✔
4032

4033
    # =======================================
4034
    # ===============  TAGS   ===============
4035
    # =======================================
4036
    # only Function, Event Source Mapping, and Code Signing Config (not currently supported by LocalStack) ARNs an are available for tagging in AWS
4037

4038
    def _get_tags(self, resource: TaggableResource) -> dict[str, str]:
1✔
4039
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4040
        lambda_adapted_tags = {
1✔
4041
            tag["Key"]: tag["Value"]
4042
            for tag in state.TAGS.list_tags_for_resource(resource).get("Tags")
4043
        }
4044
        return lambda_adapted_tags
1✔
4045

4046
    def _store_tags(self, resource: TaggableResource, tags: dict[str, str]):
1✔
4047
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4048
        if len(state.TAGS.tags.get(resource, {}) | tags) > LAMBDA_TAG_LIMIT_PER_RESOURCE:
1✔
4049
            raise InvalidParameterValueException(
1✔
4050
                "Number of tags exceeds resource tag limit.", Type="User"
4051
            )
4052

4053
        tag_svc_adapted_tags = [{"Key": key, "Value": value} for key, value in tags.items()]
1✔
4054
        state.TAGS.tag_resource(resource, tag_svc_adapted_tags)
1✔
4055

4056
    def fetch_lambda_store_for_tagging(self, resource: TaggableResource) -> LambdaStore:
1✔
4057
        """
4058
        Takes a resource ARN for a TaggableResource (Lambda Function, Event Source Mapping, or Code Signing Config) and returns a corresponding
4059
        LambdaStore for its region and account.
4060

4061
        In addition, this function validates that the ARN is a valid TaggableResource type, and that the TaggableResource exists.
4062

4063
        Raises:
4064
            ValidationException: If the resource ARN is not a full ARN for a TaggableResource.
4065
            ResourceNotFoundException: If the specified resource does not exist.
4066
            InvalidParameterValueException: If the resource ARN is a qualified Lambda Function.
4067
        """
4068

4069
        def _raise_validation_exception():
1✔
4070
            raise ValidationException(
1✔
4071
                f"1 validation error detected: Value '{resource}' at 'resource' failed to satisfy constraint: Member must satisfy regular expression pattern: {api_utils.TAGGABLE_RESOURCE_ARN_PATTERN}"
4072
            )
4073

4074
        # Check whether the ARN we have been passed is correctly formatted
4075
        parsed_resource_arn: ArnData = None
1✔
4076
        try:
1✔
4077
            parsed_resource_arn = parse_arn(resource)
1✔
4078
        except Exception:
1✔
4079
            _raise_validation_exception()
1✔
4080

4081
        # TODO: Should we be checking whether this is a full ARN?
4082
        region, account_id, resource_type = map(
1✔
4083
            parsed_resource_arn.get, ("region", "account", "resource")
4084
        )
4085

4086
        if not all((region, account_id, resource_type)):
1✔
UNCOV
4087
            _raise_validation_exception()
×
4088

4089
        if not (parts := resource_type.split(":")):
1✔
UNCOV
4090
            _raise_validation_exception()
×
4091

4092
        resource_type, resource_identifier, *qualifier = parts
1✔
4093
        if resource_type not in {"event-source-mapping", "code-signing-config", "function"}:
1✔
4094
            _raise_validation_exception()
1✔
4095

4096
        if qualifier:
1✔
4097
            if resource_type == "function":
1✔
4098
                raise InvalidParameterValueException(
1✔
4099
                    "Tags on function aliases and versions are not supported. Please specify a function ARN.",
4100
                    Type="User",
4101
                )
4102
            _raise_validation_exception()
1✔
4103

4104
        match resource_type:
1✔
4105
            case "event-source-mapping":
1✔
4106
                self._get_esm(resource_identifier, account_id, region)
1✔
4107
            case "code-signing-config":
1✔
4108
                raise NotImplementedError("Resource tagging on CSC not yet implemented.")
4109
            case "function":
1✔
4110
                self._get_function(
1✔
4111
                    function_name=resource_identifier, account_id=account_id, region=region
4112
                )
4113

4114
        # If no exceptions are raised, assume ARN and referenced resource is valid for tag operations
4115
        return lambda_stores[account_id][region]
1✔
4116

4117
    def tag_resource(
1✔
4118
        self, context: RequestContext, resource: TaggableResource, tags: Tags, **kwargs
4119
    ) -> None:
4120
        if not tags:
1✔
4121
            raise InvalidParameterValueException(
1✔
4122
                "An error occurred and the request cannot be processed.", Type="User"
4123
            )
4124
        self._store_tags(resource, tags)
1✔
4125

4126
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4127
            "function"
4128
        ):
4129
            name, _, account, region = function_locators_from_arn(resource)
1✔
4130
            function = self._get_function(name, account, region)
1✔
4131
            with function.lock:
1✔
4132
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4133
                latest_version = function.versions["$LATEST"]
1✔
4134
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4135
                    latest_version, config=dataclasses.replace(latest_version.config)
4136
                )
4137

4138
    def list_tags(
1✔
4139
        self, context: RequestContext, resource: TaggableResource, **kwargs
4140
    ) -> ListTagsResponse:
4141
        tags = self._get_tags(resource)
1✔
4142
        return ListTagsResponse(Tags=tags)
1✔
4143

4144
    def untag_resource(
1✔
4145
        self, context: RequestContext, resource: TaggableResource, tag_keys: TagKeyList, **kwargs
4146
    ) -> None:
4147
        if not tag_keys:
1✔
4148
            raise ValidationException(
1✔
4149
                "1 validation error detected: Value null at 'tagKeys' failed to satisfy constraint: Member must not be null"
4150
            )  # should probably be generalized a bit
4151

4152
        state = self.fetch_lambda_store_for_tagging(resource)
1✔
4153
        state.TAGS.untag_resource(resource, tag_keys)
1✔
4154

4155
        if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith(
1✔
4156
            "function"
4157
        ):
4158
            name, _, account, region = function_locators_from_arn(resource)
1✔
4159
            function = self._get_function(name, account, region)
1✔
4160
            # TODO: Potential race condition
4161
            with function.lock:
1✔
4162
                # dirty hack for changed revision id, should reevaluate model to prevent this:
4163
                latest_version = function.versions["$LATEST"]
1✔
4164
                function.versions["$LATEST"] = dataclasses.replace(
1✔
4165
                    latest_version, config=dataclasses.replace(latest_version.config)
4166
                )
4167

4168
    # =======================================
4169
    # =======  LEGACY / DEPRECATED   ========
4170
    # =======================================
4171

4172
    def invoke_async(
1✔
4173
        self,
4174
        context: RequestContext,
4175
        function_name: NamespacedFunctionName,
4176
        invoke_args: IO[BlobStream],
4177
        **kwargs,
4178
    ) -> InvokeAsyncResponse:
4179
        """LEGACY API endpoint. Even AWS heavily discourages its usage."""
4180
        raise NotImplementedError
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc