• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 08a45e28-4998-4845-a88b-f2c425830a31

21 Feb 2025 08:33PM UTC coverage: 86.896% (+0.01%) from 86.883%
08a45e28-4998-4845-a88b-f2c425830a31

push

circleci

web-flow
fix SNS FIFO ordering (#12285)

Co-authored-by: Daniel Fangl <daniel.fangl@localstack.cloud>

70 of 79 new or added lines in 2 files covered. (88.61%)

117 existing lines in 8 files now uncovered.

61670 of 70970 relevant lines covered (86.9%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.81
/localstack-core/localstack/services/lambda_/api_utils.py
1
"""Utilities related to Lambda API operations such as ARN handling, validations, and output formatting.
2
Everything related to behavior or implicit functionality goes into `lambda_utils.py`.
3
"""
4

5
import datetime
1✔
6
import random
1✔
7
import re
1✔
8
import string
1✔
9
from typing import TYPE_CHECKING, Any, Optional, Tuple
1✔
10

11
from localstack.aws.api import CommonServiceException, RequestContext
1✔
12
from localstack.aws.api import lambda_ as api_spec
1✔
13
from localstack.aws.api.lambda_ import (
1✔
14
    AliasConfiguration,
15
    Architecture,
16
    DeadLetterConfig,
17
    EnvironmentResponse,
18
    EphemeralStorage,
19
    FunctionConfiguration,
20
    FunctionUrlAuthType,
21
    ImageConfig,
22
    ImageConfigResponse,
23
    InvalidParameterValueException,
24
    LayerVersionContentOutput,
25
    PublishLayerVersionResponse,
26
    ResourceNotFoundException,
27
    TracingConfig,
28
    VpcConfigResponse,
29
)
30
from localstack.services.lambda_.invocation import AccessDeniedException
1✔
31
from localstack.services.lambda_.runtimes import ALL_RUNTIMES, VALID_LAYER_RUNTIMES, VALID_RUNTIMES
1✔
32
from localstack.utils.aws.arns import ARN_PARTITION_REGEX, get_partition
1✔
33
from localstack.utils.collections import merge_recursive
1✔
34

35
if TYPE_CHECKING:
1✔
36
    from localstack.services.lambda_.invocation.lambda_models import (
×
37
        CodeSigningConfig,
38
        Function,
39
        FunctionUrlConfig,
40
        FunctionVersion,
41
        LayerVersion,
42
        VersionAlias,
43
    )
44
    from localstack.services.lambda_.invocation.models import LambdaStore
×
45

46

47
# Pattern for a full (both with and without qualifier) lambda function ARN
48
FULL_FN_ARN_PATTERN = re.compile(
1✔
49
    rf"{ARN_PARTITION_REGEX}:lambda:(?P<region_name>[^:]+):(?P<account_id>\d{{12}}):function:(?P<function_name>[^:]+)(:(?P<qualifier>.*))?$"
50
)
51

52
# Pattern for a full (both with and without qualifier) lambda layer ARN
53
# TODO: It looks like they added `|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+` in 2024-11
54
LAYER_VERSION_ARN_PATTERN = re.compile(
1✔
55
    rf"{ARN_PARTITION_REGEX}:lambda:(?P<region_name>[^:]+):(?P<account_id>\d{{12}}):layer:(?P<layer_name>[^:]+)(:(?P<layer_version>\d+))?$"
56
)
57

58

59
# Pattern for a valid destination arn
60
DESTINATION_ARN_PATTERN = re.compile(
1✔
61
    r"^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\-])+:([a-z]{2}(-gov)?-[a-z]+-\d{1})?:(\d{12})?:(.*)"
62
)
63

64
AWS_FUNCTION_NAME_REGEX = re.compile(
1✔
65
    "^(arn:(aws[a-zA-Z-]*)?:lambda:)?([a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:)?(\\d{12}:)?(function:)?([a-zA-Z0-9-_.]+)(:(\\$LATEST|[a-zA-Z0-9-_]+))?$"
66
)
67

68
# Pattern for extracting various attributes from a full or partial ARN or just a function name.
69
FUNCTION_NAME_REGEX = re.compile(
1✔
70
    r"(arn:(aws[a-zA-Z-]*):lambda:)?((?P<region>[a-z]{2}(-gov)?-[a-z]+-\d{1}):)?(?:(?P<account>\d{12}):)?(function:)?(?P<name>[a-zA-Z0-9-_\.]+)(:(?P<qualifier>\$LATEST|[a-zA-Z0-9-_]+))?"
71
)  # also length 1-170 incl.
72
# Pattern for a lambda function handler
73
HANDLER_REGEX = re.compile(r"[^\s]+")
1✔
74
# Pattern for a valid kms key
75
KMS_KEY_ARN_REGEX = re.compile(r"(arn:(aws[a-zA-Z-]*)?:[a-z0-9-.]+:.*)|()")
1✔
76
# Pattern for a valid IAM role assumed by a lambda function
77
ROLE_REGEX = re.compile(r"arn:(aws[a-zA-Z-]*)?:iam::\d{12}:role/?[a-zA-Z_0-9+=,.@\-_/]+")
1✔
78
# Pattern for a valid AWS account
79
AWS_ACCOUNT_REGEX = re.compile(r"\d{12}")
1✔
80
# Pattern for a signing job arn
81
SIGNING_JOB_ARN_REGEX = re.compile(
1✔
82
    r"arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\-])+:([a-z]{2}(-gov)?-[a-z]+-\d{1})?:(\d{12})?:(.*)"
83
)
84
# Pattern for a signing profiler version arn
85
SIGNING_PROFILE_VERSION_ARN_REGEX = re.compile(
1✔
86
    r"arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\-])+:([a-z]{2}(-gov)?-[a-z]+-\d{1})?:(\d{12})?:(.*)"
87
)
88
# Combined pattern for alias and version based on AWS error using "(|[a-zA-Z0-9$_-]+)"
89
QUALIFIER_REGEX = re.compile(r"(^[a-zA-Z0-9$_-]+$)")
1✔
90
# Pattern for a version qualifier
91
VERSION_REGEX = re.compile(r"^[0-9]+$")
1✔
92
# Pattern for an alias qualifier
93
# Rules: https://docs.aws.amazon.com/lambda/latest/dg/API_CreateAlias.html#SSS-CreateAlias-request-Name
94
# The original regex from AWS misses ^ and $ in the second regex, which allowed for partial substring matches
95
ALIAS_REGEX = re.compile(r"(?!^[0-9]+$)(^[a-zA-Z0-9-_]+$)")
1✔
96
# Permission statement id
97
STATEMENT_ID_REGEX = re.compile(r"^[a-zA-Z0-9-_]+$")
1✔
98
# Pattern for a valid SubnetId
99
SUBNET_ID_REGEX = re.compile(r"^subnet-[0-9a-z]*$")
1✔
100

101

102
URL_CHAR_SET = string.ascii_lowercase + string.digits
1✔
103
# Date format as returned by the lambda service
104
LAMBDA_DATE_FORMAT = "%Y-%m-%dT%H:%M:%S.%f+0000"
1✔
105

106
# An unordered list of all Lambda CPU architectures supported by LocalStack.
107
ARCHITECTURES = [Architecture.arm64, Architecture.x86_64]
1✔
108

109
# ARN pattern returned in validation exception messages.
110
# Some excpetions from AWS return a '\.' in the function name regex
111
# pattern therefore we can sub this value in when appropriate.
112
ARN_NAME_PATTERN_VALIDATION_TEMPLATE = "(arn:(aws[a-zA-Z-]*)?:lambda:)?([a-z]{{2}}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{{1}}:)?(\\d{{12}}:)?(function:)?([a-zA-Z0-9-_{0}]+)(:(\\$LATEST|[a-zA-Z0-9-_]+))?"
1✔
113

114
# AWS response when invalid ARNs are used in Tag operations.
115
TAGGABLE_RESOURCE_ARN_PATTERN = "arn:(aws[a-zA-Z-]*):lambda:[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:(function:[a-zA-Z0-9-_]+(:(\\$LATEST|[a-zA-Z0-9-_]+))?|layer:([a-zA-Z0-9-_]+)|code-signing-config:csc-[a-z0-9]{17}|event-source-mapping:[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})"
1✔
116

117

118
def validate_function_name(function_name_or_arn: str, operation_type: str):
1✔
119
    function_name, *_ = function_locators_from_arn(function_name_or_arn)
1✔
120
    arn_name_pattern = ARN_NAME_PATTERN_VALIDATION_TEMPLATE.format("")
1✔
121
    max_length = 170
1✔
122

123
    match operation_type:
1✔
124
        case "GetFunction" | "Invoke":
1✔
125
            arn_name_pattern = ARN_NAME_PATTERN_VALIDATION_TEMPLATE.format(r"\.")
1✔
126
        case "CreateFunction" if function_name == function_name_or_arn:  # only a function name
1✔
127
            max_length = 64
1✔
128
        case "CreateFunction" | "DeleteFunction":
1✔
129
            max_length = 140
1✔
130

131
    validations = []
1✔
132
    if len(function_name_or_arn) > max_length:
1✔
133
        constraint = f"Member must have length less than or equal to {max_length}"
1✔
134
        validation_msg = f"Value '{function_name_or_arn}' at 'functionName' failed to satisfy constraint: {constraint}"
1✔
135
        validations.append(validation_msg)
1✔
136

137
    if not AWS_FUNCTION_NAME_REGEX.match(function_name_or_arn) or not function_name:
1✔
138
        constraint = f"Member must satisfy regular expression pattern: {arn_name_pattern}"
1✔
139
        validation_msg = f"Value '{function_name_or_arn}' at 'functionName' failed to satisfy constraint: {constraint}"
1✔
140
        validations.append(validation_msg)
1✔
141

142
    return validations
1✔
143

144

145
def validate_qualifier(qualifier: str):
1✔
146
    validations = []
1✔
147

148
    if len(qualifier) > 128:
1✔
149
        constraint = "Member must have length less than or equal to 128"
1✔
150
        validation_msg = (
1✔
151
            f"Value '{qualifier}' at 'qualifier' failed to satisfy constraint: {constraint}"
152
        )
153
        validations.append(validation_msg)
1✔
154

155
    if not QUALIFIER_REGEX.match(qualifier):
1✔
156
        constraint = "Member must satisfy regular expression pattern: (|[a-zA-Z0-9$_-]+)"
1✔
157
        validation_msg = (
1✔
158
            f"Value '{qualifier}' at 'qualifier' failed to satisfy constraint: {constraint}"
159
        )
160
        validations.append(validation_msg)
1✔
161

162
    return validations
1✔
163

164

165
def construct_validation_exception_message(validation_errors):
1✔
166
    if validation_errors:
1✔
167
        return f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {'; '.join(validation_errors)}"
1✔
168

UNCOV
169
    return None
×
170

171

172
def map_function_url_config(model: "FunctionUrlConfig") -> api_spec.FunctionUrlConfig:
1✔
173
    return api_spec.FunctionUrlConfig(
1✔
174
        FunctionUrl=model.url,
175
        FunctionArn=model.function_arn,
176
        CreationTime=model.creation_time,
177
        LastModifiedTime=model.last_modified_time,
178
        Cors=model.cors,
179
        AuthType=model.auth_type,
180
        InvokeMode=model.invoke_mode,
181
    )
182

183

184
def map_csc(model: "CodeSigningConfig") -> api_spec.CodeSigningConfig:
1✔
185
    return api_spec.CodeSigningConfig(
1✔
186
        CodeSigningConfigId=model.csc_id,
187
        CodeSigningConfigArn=model.arn,
188
        Description=model.description,
189
        AllowedPublishers=model.allowed_publishers,
190
        CodeSigningPolicies=model.policies,
191
        LastModified=model.last_modified,
192
    )
193

194

195
def get_config_for_url(store: "LambdaStore", url_id: str) -> "Optional[FunctionUrlConfig]":
1✔
196
    """
197
    Get a config object when resolving a URL
198

199
    :param store: Lambda Store
200
    :param url_id: unique url ID (prefixed domain when calling the function)
201
    :return: FunctionUrlConfig that belongs to this ID
202

203
    # TODO: quite inefficient: optimize
204
    """
205
    for fn_name, fn in store.functions.items():
×
206
        for qualifier, fn_url_config in fn.function_url_configs.items():
×
207
            if fn_url_config.url_id == url_id:
×
UNCOV
208
                return fn_url_config
×
UNCOV
209
    return None
×
210

211

212
def is_qualifier_expression(qualifier: str) -> bool:
1✔
213
    """Checks if a given qualifier is a syntactically accepted expression.
214
    It is not necessarily a valid alias or version.
215

216
    :param qualifier: Qualifier to check
217
    :return True if syntactically accepted qualifier expression, false otherwise
218
    """
219
    return bool(QUALIFIER_REGEX.match(qualifier))
1✔
220

221

222
def qualifier_is_version(qualifier: str) -> bool:
1✔
223
    """
224
    Checks if a given qualifier represents a version
225

226
    :param qualifier: Qualifier to check
227
    :return: True if it matches a version, false otherwise
228
    """
229
    return bool(VERSION_REGEX.match(qualifier))
1✔
230

231

232
def qualifier_is_alias(qualifier: str) -> bool:
1✔
233
    """
234
    Checks if a given qualifier represents an alias
235

236
    :param qualifier: Qualifier to check
237
    :return: True if it matches an alias, false otherwise
238
    """
239
    return bool(ALIAS_REGEX.match(qualifier))
1✔
240

241

242
def get_function_name(function_arn_or_name: str, context: RequestContext) -> str:
1✔
243
    """
244
    Return function name from a given arn.
245
    Will check if the context region matches the arn region in the arn, if an arn is provided.
246

247
    :param function_arn_or_name: Function arn or only name
248
    :return: function name
249
    """
250
    name, _ = get_name_and_qualifier(function_arn_or_name, qualifier=None, context=context)
1✔
251
    return name
1✔
252

253

254
def function_locators_from_arn(arn: str) -> tuple[str | None, str | None, str | None, str | None]:
1✔
255
    """
256
    Takes a full or partial arn, or a name
257

258
    :param arn: Given arn (or name)
259
    :return: tuple with (name, qualifier, account, region). Qualifier and region are none if missing
260
    """
261

262
    if matched := FUNCTION_NAME_REGEX.match(arn):
1✔
263
        name = matched.group("name")
1✔
264
        qualifier = matched.group("qualifier")
1✔
265
        account = matched.group("account")
1✔
266
        region = matched.group("region")
1✔
267
        return (name, qualifier, account, region)
1✔
268

269
    return None, None, None, None
1✔
270

271

272
def get_account_and_region(function_arn_or_name: str, context: RequestContext) -> Tuple[str, str]:
1✔
273
    """
274
    Takes a full ARN, partial ARN or a name. Returns account ID and region from ARN if available, else
275
    falls back to context account ID and region.
276

277
    Lambda allows cross-account access. This function should be used to resolve the correct Store based on the ARN.
278
    """
279
    _, _, account_id, region = function_locators_from_arn(function_arn_or_name)
1✔
280
    return account_id or context.account_id, region or context.region
1✔
281

282

283
def get_name_and_qualifier(
1✔
284
    function_arn_or_name: str, qualifier: str | None, context: RequestContext
285
) -> tuple[str, str | None]:
286
    """
287
    Takes a full or partial arn, or a name and a qualifier.
288

289
    :param function_arn_or_name: Given arn (or name)
290
    :param qualifier: A qualifier for the function (or None)
291
    :param context: Request context
292
    :return: tuple with (name, qualifier). Qualifier is none if missing
293
    :raises: `ResourceNotFoundException` when the context's region differs from the ARN's region
294
    :raises: `AccessDeniedException` when the context's account ID differs from the ARN's account ID
295
    :raises: `ValidationExcpetion` when a function ARN/name or qualifier fails validation checks
296
    :raises: `InvalidParameterValueException` when a qualified arn is provided and the qualifier does not match (but is given)
297
    """
298
    function_name, arn_qualifier, account, region = function_locators_from_arn(function_arn_or_name)
1✔
299
    operation_type = context.operation.name
1✔
300

301
    if operation_type not in _supported_resource_based_operations:
1✔
302
        if account and account != context.account_id:
1✔
303
            raise AccessDeniedException(None)
1✔
304

305
    # TODO: should this only run if operation type is unsupported?
306
    if region and region != context.region:
1✔
307
        raise ResourceNotFoundException(
1✔
308
            f"Functions from '{region}' are not reachable in this region ('{context.region}')",
309
            Type="User",
310
        )
311

312
    validation_errors = []
1✔
313
    if function_arn_or_name:
1✔
314
        validation_errors.extend(validate_function_name(function_arn_or_name, operation_type))
1✔
315

316
    if qualifier:
1✔
317
        validation_errors.extend(validate_qualifier(qualifier))
1✔
318

319
    is_only_function_name = function_arn_or_name == function_name
1✔
320
    if validation_errors:
1✔
321
        message = construct_validation_exception_message(validation_errors)
1✔
322
        # Edge-case where the error type is not ValidationException
323
        if (
1✔
324
            operation_type == "CreateFunction"
325
            and is_only_function_name
326
            and arn_qualifier is None
327
            and region is None
328
        ):  # just name OR partial
329
            raise InvalidParameterValueException(message=message, Type="User")
1✔
330
        raise CommonServiceException(message=message, code="ValidationException")
1✔
331

332
    if qualifier and arn_qualifier and arn_qualifier != qualifier:
1✔
333
        raise InvalidParameterValueException(
1✔
334
            "The derived qualifier from the function name does not match the specified qualifier.",
335
            Type="User",
336
        )
337

338
    qualifier = qualifier or arn_qualifier
1✔
339
    return function_name, qualifier
1✔
340

341

342
def build_statement(
1✔
343
    partition: str,
344
    resource_arn: str,
345
    statement_id: str,
346
    action: str,
347
    principal: str,
348
    source_arn: Optional[str] = None,
349
    source_account: Optional[str] = None,
350
    principal_org_id: Optional[str] = None,
351
    event_source_token: Optional[str] = None,
352
    auth_type: Optional[FunctionUrlAuthType] = None,
353
) -> dict[str, Any]:
354
    statement = {
1✔
355
        "Sid": statement_id,
356
        "Effect": "Allow",
357
        "Action": action,
358
        "Resource": resource_arn,
359
    }
360

361
    # See AWS service principals for comprehensive docs:
362
    # https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements_principal.html
363
    # TODO: validate against actual list of IAM-supported AWS services (e.g., lambda.amazonaws.com)
364
    if principal.endswith(".amazonaws.com"):
1✔
365
        statement["Principal"] = {"Service": principal}
1✔
366
    elif is_aws_account(principal):
1✔
367
        statement["Principal"] = {"AWS": f"arn:{partition}:iam::{principal}:root"}
1✔
368
    # TODO: potentially validate against IAM?
369
    elif re.match(f"{ARN_PARTITION_REGEX}:iam:", principal):
1✔
370
        statement["Principal"] = {"AWS": principal}
1✔
371
    elif principal == "*":
1✔
372
        statement["Principal"] = principal
1✔
373
    # TODO: unclear whether above matching is complete?
374
    else:
375
        raise InvalidParameterValueException(
1✔
376
            "The provided principal was invalid. Please check the principal and try again.",
377
            Type="User",
378
        )
379

380
    condition = dict()
1✔
381
    if auth_type:
1✔
382
        update = {"StringEquals": {"lambda:FunctionUrlAuthType": auth_type}}
1✔
383
        condition = merge_recursive(condition, update)
1✔
384

385
    if principal_org_id:
1✔
386
        update = {"StringEquals": {"aws:PrincipalOrgID": principal_org_id}}
1✔
387
        condition = merge_recursive(condition, update)
1✔
388

389
    if source_account:
1✔
390
        update = {"StringEquals": {"AWS:SourceAccount": source_account}}
1✔
391
        condition = merge_recursive(condition, update)
1✔
392

393
    if event_source_token:
1✔
394
        update = {"StringEquals": {"lambda:EventSourceToken": event_source_token}}
1✔
395
        condition = merge_recursive(condition, update)
1✔
396

397
    if source_arn:
1✔
398
        update = {"ArnLike": {"AWS:SourceArn": source_arn}}
1✔
399
        condition = merge_recursive(condition, update)
1✔
400

401
    if condition:
1✔
402
        statement["Condition"] = condition
1✔
403

404
    return statement
1✔
405

406

407
def generate_random_url_id() -> str:
1✔
408
    """
409
    32 characters [0-9a-z] url ID
410
    """
411

412
    return "".join(random.choices(URL_CHAR_SET, k=32))
1✔
413

414

415
def unqualified_lambda_arn(function_name: str, account: str, region: str):
1✔
416
    """
417
    Generate an unqualified lambda arn
418

419
    :param function_name: Function name (not an arn!)
420
    :param account: Account ID
421
    :param region: Region
422
    :return: Unqualified lambda arn
423
    """
424
    return f"arn:{get_partition(region)}:lambda:{region}:{account}:function:{function_name}"
1✔
425

426

427
def qualified_lambda_arn(
1✔
428
    function_name: str, qualifier: Optional[str], account: str, region: str
429
) -> str:
430
    """
431
    Generate a qualified lambda arn
432

433
    :param function_name: Function name (not an arn!)
434
    :param qualifier: qualifier (will be set to $LATEST if not present)
435
    :param account: Account ID
436
    :param region: Region
437
    :return: Qualified lambda arn
438
    """
439
    qualifier = qualifier or "$LATEST"
1✔
440
    return f"{unqualified_lambda_arn(function_name=function_name, account=account, region=region)}:{qualifier}"
1✔
441

442

443
def lambda_arn(function_name: str, qualifier: Optional[str], account: str, region: str) -> str:
1✔
444
    """
445
    Return the lambda arn for the given parameters, with a qualifier if supplied, without otherwise
446

447
    :param function_name: Function name
448
    :param qualifier: Qualifier. May be left out, then the returning arn does not have one either
449
    :param account: Account ID
450
    :param region: Region of the Lambda
451
    :return: Lambda Arn with or without qualifier
452
    """
453
    if qualifier:
1✔
454
        return qualified_lambda_arn(
1✔
455
            function_name=function_name, qualifier=qualifier, account=account, region=region
456
        )
457
    else:
458
        return unqualified_lambda_arn(function_name=function_name, account=account, region=region)
1✔
459

460

461
def is_role_arn(role_arn: str) -> bool:
1✔
462
    """
463
    Returns true if the provided string is a role arn, false otherwise
464

465
    :param role_arn: Potential role arn
466
    :return: Boolean indicating if input is a role arn
467
    """
468
    return bool(ROLE_REGEX.match(role_arn))
1✔
469

470

471
def is_aws_account(aws_account: str) -> bool:
1✔
472
    """
473
    Returns true if the provided string is an AWS account, false otherwise
474

475
    :param role_arn: Potential AWS account
476
    :return: Boolean indicating if input is an AWS account
477
    """
478
    return bool(AWS_ACCOUNT_REGEX.match(aws_account))
1✔
479

480

481
def format_lambda_date(date_to_format: datetime.datetime) -> str:
1✔
482
    """Format a given datetime to a string generated with the lambda date format"""
483
    return date_to_format.strftime(LAMBDA_DATE_FORMAT)
1✔
484

485

486
def generate_lambda_date() -> str:
1✔
487
    """Get the current date as string generated with the lambda date format"""
488
    return format_lambda_date(datetime.datetime.now())
1✔
489

490

491
def map_update_status_config(version: "FunctionVersion") -> dict[str, str]:
1✔
492
    """Map version model to dict output"""
493
    result = {}
1✔
494
    if version.config.last_update:
1✔
495
        if version.config.last_update.status:
1✔
496
            result["LastUpdateStatus"] = version.config.last_update.status
1✔
497
        if version.config.last_update.code:
1✔
498
            result["LastUpdateStatusReasonCode"] = version.config.last_update.code
1✔
499
        if version.config.last_update.reason:
1✔
500
            result["LastUpdateStatusReason"] = version.config.last_update.reason
1✔
501
    return result
1✔
502

503

504
def map_state_config(version: "FunctionVersion") -> dict[str, str]:
1✔
505
    """Map version state to dict output"""
506
    result = {}
1✔
507
    if version_state := version.config.state:
1✔
508
        if version_state.state:
1✔
509
            result["State"] = version_state.state
1✔
510
        if version_state.reason:
1✔
511
            result["StateReason"] = version_state.reason
1✔
512
        if version_state.code:
1✔
513
            result["StateReasonCode"] = version_state.code
1✔
514
    return result
1✔
515

516

517
def map_config_out(
1✔
518
    version: "FunctionVersion",
519
    return_qualified_arn: bool = False,
520
    return_update_status: bool = True,
521
    alias_name: str | None = None,
522
) -> FunctionConfiguration:
523
    """map function version to function configuration"""
524

525
    # handle optional entries that shouldn't be rendered at all if not present
526
    optional_kwargs = {}
1✔
527
    if return_update_status:
1✔
528
        optional_kwargs.update(map_update_status_config(version))
1✔
529
    optional_kwargs.update(map_state_config(version))
1✔
530

531
    if version.config.architectures:
1✔
532
        optional_kwargs["Architectures"] = version.config.architectures
1✔
533

534
    if version.config.dead_letter_arn:
1✔
535
        optional_kwargs["DeadLetterConfig"] = DeadLetterConfig(
1✔
536
            TargetArn=version.config.dead_letter_arn
537
        )
538

539
    if version.config.vpc_config:
1✔
540
        optional_kwargs["VpcConfig"] = VpcConfigResponse(
1✔
541
            VpcId=version.config.vpc_config.vpc_id,
542
            SubnetIds=version.config.vpc_config.subnet_ids,
543
            SecurityGroupIds=version.config.vpc_config.security_group_ids,
544
        )
545

546
    if version.config.environment is not None:
1✔
547
        optional_kwargs["Environment"] = EnvironmentResponse(
1✔
548
            Variables=version.config.environment
549
        )  # TODO: Errors key?
550

551
    if version.config.layers:
1✔
552
        optional_kwargs["Layers"] = [
1✔
553
            {"Arn": layer.layer_version_arn, "CodeSize": layer.code.code_size}
554
            for layer in version.config.layers
555
        ]
556
    if version.config.image_config:
1✔
557
        image_config = ImageConfig()
1✔
558
        if version.config.image_config.command:
1✔
559
            image_config["Command"] = version.config.image_config.command
1✔
560
        if version.config.image_config.entrypoint:
1✔
561
            image_config["EntryPoint"] = version.config.image_config.entrypoint
1✔
562
        if version.config.image_config.working_directory:
1✔
563
            image_config["WorkingDirectory"] = version.config.image_config.working_directory
1✔
564
        if image_config:
1✔
565
            optional_kwargs["ImageConfigResponse"] = ImageConfigResponse(ImageConfig=image_config)
1✔
566
    if version.config.code:
1✔
567
        optional_kwargs["CodeSize"] = version.config.code.code_size
1✔
568
        optional_kwargs["CodeSha256"] = version.config.code.code_sha256
1✔
569
    elif version.config.image:
1✔
570
        optional_kwargs["CodeSize"] = 0
1✔
571
        optional_kwargs["CodeSha256"] = version.config.image.code_sha256
1✔
572

573
    # output for an alias qualifier is completely the same except for the returned ARN
574
    if alias_name:
1✔
575
        function_arn = f"{':'.join(version.id.qualified_arn().split(':')[:-1])}:{alias_name}"
1✔
576
    else:
577
        function_arn = (
1✔
578
            version.id.qualified_arn() if return_qualified_arn else version.id.unqualified_arn()
579
        )
580

581
    func_conf = FunctionConfiguration(
1✔
582
        RevisionId=version.config.revision_id,
583
        FunctionName=version.id.function_name,
584
        FunctionArn=function_arn,
585
        LastModified=version.config.last_modified,
586
        Version=version.id.qualifier,
587
        Description=version.config.description,
588
        Role=version.config.role,
589
        Timeout=version.config.timeout,
590
        Runtime=version.config.runtime,
591
        Handler=version.config.handler,
592
        MemorySize=version.config.memory_size,
593
        PackageType=version.config.package_type,
594
        TracingConfig=TracingConfig(Mode=version.config.tracing_config_mode),
595
        EphemeralStorage=EphemeralStorage(Size=version.config.ephemeral_storage.size),
596
        SnapStart=version.config.snap_start,
597
        RuntimeVersionConfig=version.config.runtime_version_config,
598
        LoggingConfig=version.config.logging_config,
599
        **optional_kwargs,
600
    )
601
    return func_conf
1✔
602

603

604
def map_to_list_response(config: FunctionConfiguration) -> FunctionConfiguration:
1✔
605
    """remove values not usually presented in list operations from function config output"""
606
    shallow_copy = config.copy()
1✔
607
    for k in [
1✔
608
        "State",
609
        "StateReason",
610
        "StateReasonCode",
611
        "LastUpdateStatus",
612
        "LastUpdateStatusReason",
613
        "LastUpdateStatusReasonCode",
614
        "RuntimeVersionConfig",
615
    ]:
616
        shallow_copy.pop(k, None)
1✔
617
    return shallow_copy
1✔
618

619

620
def map_alias_out(alias: "VersionAlias", function: "Function") -> AliasConfiguration:
1✔
621
    """map alias model to alias configuration output"""
622
    alias_arn = f"{function.latest().id.unqualified_arn()}:{alias.name}"
1✔
623
    optional_kwargs = {}
1✔
624
    if alias.routing_configuration:
1✔
625
        optional_kwargs |= {
1✔
626
            "RoutingConfig": {
627
                "AdditionalVersionWeights": alias.routing_configuration.version_weights
628
            }
629
        }
630
    return AliasConfiguration(
1✔
631
        AliasArn=alias_arn,
632
        Description=alias.description,
633
        FunctionVersion=alias.function_version,
634
        Name=alias.name,
635
        RevisionId=alias.revision_id,
636
        **optional_kwargs,
637
    )
638

639

640
def validate_and_set_batch_size(service: str, batch_size: Optional[int] = None) -> int:
1✔
641
    min_batch_size = 1
1✔
642

643
    BATCH_SIZE_RANGES = {
1✔
644
        "kafka": (100, 10_000),
645
        "kinesis": (100, 10_000),
646
        "dynamodb": (100, 10_000),
647
        "sqs-fifo": (10, 10),
648
        "sqs": (10, 10_000),
649
        "mq": (100, 10_000),
650
    }
651
    svc_range = BATCH_SIZE_RANGES.get(service)
1✔
652

653
    if svc_range:
1✔
654
        default_batch_size, max_batch_size = svc_range
1✔
655

656
        if batch_size is None:
1✔
657
            batch_size = default_batch_size
1✔
658

659
        if batch_size < min_batch_size or batch_size > max_batch_size:
1✔
UNCOV
660
            raise InvalidParameterValueException("out of bounds todo", Type="User")  # TODO: test
×
661

662
    return batch_size
1✔
663

664

665
def map_layer_out(layer_version: "LayerVersion") -> PublishLayerVersionResponse:
1✔
666
    return PublishLayerVersionResponse(
1✔
667
        Content=LayerVersionContentOutput(
668
            Location=layer_version.code.generate_presigned_url(),
669
            CodeSha256=layer_version.code.code_sha256,
670
            CodeSize=layer_version.code.code_size,
671
            # SigningProfileVersionArn="", # same as in function configuration
672
            # SigningJobArn="" # same as in function configuration
673
        ),
674
        LicenseInfo=layer_version.license_info,
675
        Description=layer_version.description,
676
        CompatibleArchitectures=layer_version.compatible_architectures,
677
        CompatibleRuntimes=layer_version.compatible_runtimes,
678
        CreatedDate=layer_version.created,
679
        LayerArn=layer_version.layer_arn,
680
        LayerVersionArn=layer_version.layer_version_arn,
681
        Version=layer_version.version,
682
    )
683

684

685
def layer_arn(layer_name: str, account: str, region: str):
1✔
686
    return f"arn:{get_partition(region)}:lambda:{region}:{account}:layer:{layer_name}"
1✔
687

688

689
def layer_version_arn(layer_name: str, account: str, region: str, version: str):
1✔
690
    return f"arn:{get_partition(region)}:lambda:{region}:{account}:layer:{layer_name}:{version}"
1✔
691

692

693
def parse_layer_arn(layer_version_arn: str) -> Tuple[str, str, str, str]:
1✔
694
    return LAYER_VERSION_ARN_PATTERN.match(layer_version_arn).group(
1✔
695
        "region_name", "account_id", "layer_name", "layer_version"
696
    )
697

698

699
def validate_layer_runtime(compatible_runtime: str) -> str | None:
1✔
700
    if compatible_runtime is not None and compatible_runtime not in ALL_RUNTIMES:
1✔
701
        return f"Value '{compatible_runtime}' at 'compatibleRuntime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_LAYER_RUNTIMES}"
1✔
702
    return None
1✔
703

704

705
def validate_layer_architecture(compatible_architecture: str) -> str | None:
1✔
706
    if compatible_architecture is not None and compatible_architecture not in ARCHITECTURES:
1✔
707
        return f"Value '{compatible_architecture}' at 'compatibleArchitecture' failed to satisfy constraint: Member must satisfy enum value set: [x86_64, arm64]"
1✔
708
    return None
1✔
709

710

711
def validate_layer_runtimes_and_architectures(
1✔
712
    compatible_runtimes: list[str], compatible_architectures: list[str]
713
):
714
    validations = []
1✔
715

716
    if compatible_runtimes and set(compatible_runtimes).difference(ALL_RUNTIMES):
1✔
717
        constraint = f"Member must satisfy enum value set: {VALID_RUNTIMES}"
1✔
718
        validation_msg = f"Value '[{', '.join([s for s in compatible_runtimes])}]' at 'compatibleRuntimes' failed to satisfy constraint: {constraint}"
1✔
719
        validations.append(validation_msg)
1✔
720

721
    if compatible_architectures and set(compatible_architectures).difference(ARCHITECTURES):
1✔
722
        constraint = "[Member must satisfy enum value set: [x86_64, arm64]]"
1✔
723
        validation_msg = f"Value '[{', '.join([s for s in compatible_architectures])}]' at 'compatibleArchitectures' failed to satisfy constraint: Member must satisfy constraint: {constraint}"
1✔
724
        validations.append(validation_msg)
1✔
725

726
    return validations
1✔
727

728

729
def is_layer_arn(layer_name: str) -> bool:
1✔
730
    return LAYER_VERSION_ARN_PATTERN.match(layer_name) is not None
1✔
731

732

733
# See Lambda API actions that support resource-based IAM policies
734
# https://docs.aws.amazon.com/lambda/latest/dg/access-control-resource-based.html#permissions-resource-api
735
_supported_resource_based_operations = {
1✔
736
    "CreateAlias",
737
    "DeleteAlias",
738
    "DeleteFunction",
739
    "DeleteFunctionConcurrency",
740
    "DeleteFunctionEventInvokeConfig",
741
    "DeleteProvisionedConcurrencyConfig",
742
    "GetAlias",
743
    "GetFunction",
744
    "GetFunctionConcurrency",
745
    "GetFunctionConfiguration",
746
    "GetFunctionEventInvokeConfig",
747
    "GetPolicy",
748
    "GetProvisionedConcurrencyConfig",
749
    "Invoke",
750
    "ListAliases",
751
    "ListFunctionEventInvokeConfigs",
752
    "ListProvisionedConcurrencyConfigs",
753
    "ListTags",
754
    "ListVersionsByFunction",
755
    "PublishVersion",
756
    "PutFunctionConcurrency",
757
    "PutFunctionEventInvokeConfig",
758
    "PutProvisionedConcurrencyConfig",
759
    "TagResource",
760
    "UntagResource",
761
    "UpdateAlias",
762
    "UpdateFunctionCode",
763
    "UpdateFunctionEventInvokeConfig",
764
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc