• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 17601295436

10 Sep 2025 12:27AM UTC coverage: 86.847% (+0.01%) from 86.837%
17601295436

push

github

web-flow
CFn: improve parity of describing failed change sets (#13123)

12 of 12 new or added lines in 1 file covered. (100.0%)

231 existing lines in 7 files now uncovered.

67122 of 77288 relevant lines covered (86.85%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.82
/localstack-core/localstack/services/lambda_/api_utils.py
1
"""Utilities related to Lambda API operations such as ARN handling, validations, and output formatting.
2
Everything related to behavior or implicit functionality goes into `lambda_utils.py`.
3
"""
4

5
import datetime
1✔
6
import random
1✔
7
import re
1✔
8
import string
1✔
9
from typing import TYPE_CHECKING, Any
1✔
10

11
from localstack import config
1✔
12
from localstack.aws.api import CommonServiceException, RequestContext
1✔
13
from localstack.aws.api import lambda_ as api_spec
1✔
14
from localstack.aws.api.lambda_ import (
1✔
15
    AliasConfiguration,
16
    Architecture,
17
    DeadLetterConfig,
18
    EnvironmentResponse,
19
    EphemeralStorage,
20
    FunctionConfiguration,
21
    FunctionUrlAuthType,
22
    ImageConfig,
23
    ImageConfigResponse,
24
    InvalidParameterValueException,
25
    LayerVersionContentOutput,
26
    PublishLayerVersionResponse,
27
    ResourceNotFoundException,
28
    TracingConfig,
29
    VpcConfigResponse,
30
)
31
from localstack.services.lambda_.invocation import AccessDeniedException
1✔
32
from localstack.services.lambda_.runtimes import ALL_RUNTIMES, VALID_LAYER_RUNTIMES, VALID_RUNTIMES
1✔
33
from localstack.utils.aws.arns import ARN_PARTITION_REGEX, get_partition
1✔
34
from localstack.utils.collections import merge_recursive
1✔
35

36
if TYPE_CHECKING:
1✔
UNCOV
37
    from localstack.services.lambda_.invocation.lambda_models import (
×
38
        CodeSigningConfig,
39
        Function,
40
        FunctionUrlConfig,
41
        FunctionVersion,
42
        LayerVersion,
43
        VersionAlias,
44
    )
UNCOV
45
    from localstack.services.lambda_.invocation.models import LambdaStore
×
46

47

48
# Pattern for a full (both with and without qualifier) lambda function ARN
49
FULL_FN_ARN_PATTERN = re.compile(
1✔
50
    rf"{ARN_PARTITION_REGEX}:lambda:(?P<region_name>[^:]+):(?P<account_id>\d{{12}}):function:(?P<function_name>[^:]+)(:(?P<qualifier>.*))?$"
51
)
52

53
# Pattern for a full (both with and without qualifier) lambda layer ARN
54
# TODO: It looks like they added `|(arn:[a-zA-Z0-9-]+:lambda:::awslayer:[a-zA-Z0-9-_]+` in 2024-11
55
LAYER_VERSION_ARN_PATTERN = re.compile(
1✔
56
    rf"{ARN_PARTITION_REGEX}:lambda:(?P<region_name>[^:]+):(?P<account_id>\d{{12}}):layer:(?P<layer_name>[^:]+)(:(?P<layer_version>\d+))?$"
57
)
58

59

60
# Pattern for a valid destination arn
61
DESTINATION_ARN_PATTERN = re.compile(
1✔
62
    r"^$|arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\-])+:([a-z]{2}(-gov)?-[a-z]+-\d{1})?:(\d{12})?:(.*)"
63
)
64

65
AWS_FUNCTION_NAME_REGEX = re.compile(
1✔
66
    "^(arn:(aws[a-zA-Z-]*)?:lambda:)?([a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:)?(\\d{12}:)?(function:)?([a-zA-Z0-9-_.]+)(:(\\$LATEST|[a-zA-Z0-9-_]+))?$"
67
)
68

69
# Pattern for extracting various attributes from a full or partial ARN or just a function name.
70
FUNCTION_NAME_REGEX = re.compile(
1✔
71
    r"(arn:(aws[a-zA-Z-]*):lambda:)?((?P<region>[a-z]{2}(-gov)?-[a-z]+-\d{1}):)?(?:(?P<account>\d{12}):)?(function:)?(?P<name>[a-zA-Z0-9-_\.]+)(:(?P<qualifier>\$LATEST|[a-zA-Z0-9-_]+))?"
72
)  # also length 1-170 incl.
73
# Pattern for a lambda function handler
74
HANDLER_REGEX = re.compile(r"[^\s]+")
1✔
75
# Pattern for a valid kms key
76
KMS_KEY_ARN_REGEX = re.compile(r"(arn:(aws[a-zA-Z-]*)?:[a-z0-9-.]+:.*)|()")
1✔
77
# Pattern for a valid IAM role assumed by a lambda function
78
ROLE_REGEX = re.compile(r"arn:(aws[a-zA-Z-]*)?:iam::\d{12}:role/?[a-zA-Z_0-9+=,.@\-_/]+")
1✔
79
# Pattern for a valid AWS account
80
AWS_ACCOUNT_REGEX = re.compile(r"\d{12}")
1✔
81
# Pattern for a signing job arn
82
SIGNING_JOB_ARN_REGEX = re.compile(
1✔
83
    r"arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\-])+:([a-z]{2}(-gov)?-[a-z]+-\d{1})?:(\d{12})?:(.*)"
84
)
85
# Pattern for a signing profiler version arn
86
SIGNING_PROFILE_VERSION_ARN_REGEX = re.compile(
1✔
87
    r"arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\-])+:([a-z]{2}(-gov)?-[a-z]+-\d{1})?:(\d{12})?:(.*)"
88
)
89
# Combined pattern for alias and version based on AWS error using "(|[a-zA-Z0-9$_-]+)"
90
QUALIFIER_REGEX = re.compile(r"(^[a-zA-Z0-9$_-]+$)")
1✔
91
# Pattern for a version qualifier
92
VERSION_REGEX = re.compile(r"^[0-9]+$")
1✔
93
# Pattern for an alias qualifier
94
# Rules: https://docs.aws.amazon.com/lambda/latest/dg/API_CreateAlias.html#SSS-CreateAlias-request-Name
95
# The original regex from AWS misses ^ and $ in the second regex, which allowed for partial substring matches
96
ALIAS_REGEX = re.compile(r"(?!^[0-9]+$)(^[a-zA-Z0-9-_]+$)")
1✔
97
# Permission statement id
98
STATEMENT_ID_REGEX = re.compile(r"^[a-zA-Z0-9-_]+$")
1✔
99
# Pattern for a valid SubnetId
100
SUBNET_ID_REGEX = re.compile(r"^subnet-[0-9a-z]*$")
1✔
101

102

103
URL_CHAR_SET = string.ascii_lowercase + string.digits
1✔
104
# Date format as returned by the lambda service
105
LAMBDA_DATE_FORMAT = "%Y-%m-%dT%H:%M:%S.%f+0000"
1✔
106

107
# An unordered list of all Lambda CPU architectures supported by LocalStack.
108
ARCHITECTURES = [Architecture.arm64, Architecture.x86_64]
1✔
109

110
# ARN pattern returned in validation exception messages.
111
# Some excpetions from AWS return a '\.' in the function name regex
112
# pattern therefore we can sub this value in when appropriate.
113
ARN_NAME_PATTERN_VALIDATION_TEMPLATE = "(arn:(aws[a-zA-Z-]*)?:lambda:)?([a-z]{{2}}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{{1}}:)?(\\d{{12}}:)?(function:)?([a-zA-Z0-9-_{0}]+)(:(\\$LATEST|[a-zA-Z0-9-_]+))?"
1✔
114

115
# AWS response when invalid ARNs are used in Tag operations.
116
TAGGABLE_RESOURCE_ARN_PATTERN = "arn:(aws[a-zA-Z-]*):lambda:[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:(function:[a-zA-Z0-9-_]+(:(\\$LATEST|[a-zA-Z0-9-_]+))?|layer:([a-zA-Z0-9-_]+)|code-signing-config:csc-[a-z0-9]{17}|event-source-mapping:[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})"
1✔
117

118

119
def validate_function_name(function_name_or_arn: str, operation_type: str):
1✔
120
    function_name, *_ = function_locators_from_arn(function_name_or_arn)
1✔
121
    arn_name_pattern = ARN_NAME_PATTERN_VALIDATION_TEMPLATE.format("")
1✔
122
    max_length = 170
1✔
123

124
    match operation_type:
1✔
125
        case "GetFunction" | "Invoke":
1✔
126
            arn_name_pattern = ARN_NAME_PATTERN_VALIDATION_TEMPLATE.format(r"\.")
1✔
127
        case "CreateFunction" if function_name == function_name_or_arn:  # only a function name
1✔
128
            max_length = 64
1✔
129
        case "CreateFunction" | "DeleteFunction":
1✔
130
            max_length = 140
1✔
131

132
    validations = []
1✔
133
    if len(function_name_or_arn) > max_length:
1✔
134
        constraint = f"Member must have length less than or equal to {max_length}"
1✔
135
        validation_msg = f"Value '{function_name_or_arn}' at 'functionName' failed to satisfy constraint: {constraint}"
1✔
136
        validations.append(validation_msg)
1✔
137

138
    if not AWS_FUNCTION_NAME_REGEX.match(function_name_or_arn) or not function_name:
1✔
139
        constraint = f"Member must satisfy regular expression pattern: {arn_name_pattern}"
1✔
140
        validation_msg = f"Value '{function_name_or_arn}' at 'functionName' failed to satisfy constraint: {constraint}"
1✔
141
        validations.append(validation_msg)
1✔
142

143
    return validations
1✔
144

145

146
def validate_qualifier(qualifier: str):
1✔
147
    validations = []
1✔
148

149
    if len(qualifier) > 128:
1✔
150
        constraint = "Member must have length less than or equal to 128"
1✔
151
        validation_msg = (
1✔
152
            f"Value '{qualifier}' at 'qualifier' failed to satisfy constraint: {constraint}"
153
        )
154
        validations.append(validation_msg)
1✔
155

156
    if not QUALIFIER_REGEX.match(qualifier):
1✔
157
        constraint = "Member must satisfy regular expression pattern: (|[a-zA-Z0-9$_-]+)"
1✔
158
        validation_msg = (
1✔
159
            f"Value '{qualifier}' at 'qualifier' failed to satisfy constraint: {constraint}"
160
        )
161
        validations.append(validation_msg)
1✔
162

163
    return validations
1✔
164

165

166
def construct_validation_exception_message(validation_errors):
1✔
167
    if validation_errors:
1✔
168
        return f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {'; '.join(validation_errors)}"
1✔
169

UNCOV
170
    return None
×
171

172

173
def map_function_url_config(model: "FunctionUrlConfig") -> api_spec.FunctionUrlConfig:
1✔
174
    return api_spec.FunctionUrlConfig(
1✔
175
        FunctionUrl=model.url,
176
        FunctionArn=model.function_arn,
177
        CreationTime=model.creation_time,
178
        LastModifiedTime=model.last_modified_time,
179
        Cors=model.cors,
180
        AuthType=model.auth_type,
181
        InvokeMode=model.invoke_mode,
182
    )
183

184

185
def map_csc(model: "CodeSigningConfig") -> api_spec.CodeSigningConfig:
1✔
186
    return api_spec.CodeSigningConfig(
1✔
187
        CodeSigningConfigId=model.csc_id,
188
        CodeSigningConfigArn=model.arn,
189
        Description=model.description,
190
        AllowedPublishers=model.allowed_publishers,
191
        CodeSigningPolicies=model.policies,
192
        LastModified=model.last_modified,
193
    )
194

195

196
def get_config_for_url(store: "LambdaStore", url_id: str) -> "FunctionUrlConfig | None":
1✔
197
    """
198
    Get a config object when resolving a URL
199

200
    :param store: Lambda Store
201
    :param url_id: unique url ID (prefixed domain when calling the function)
202
    :return: FunctionUrlConfig that belongs to this ID
203

204
    # TODO: quite inefficient: optimize
205
    """
206
    for fn_name, fn in store.functions.items():
×
207
        for qualifier, fn_url_config in fn.function_url_configs.items():
×
208
            if fn_url_config.url_id == url_id:
×
209
                return fn_url_config
×
UNCOV
210
    return None
×
211

212

213
def is_qualifier_expression(qualifier: str) -> bool:
1✔
214
    """Checks if a given qualifier is a syntactically accepted expression.
215
    It is not necessarily a valid alias or version.
216

217
    :param qualifier: Qualifier to check
218
    :return True if syntactically accepted qualifier expression, false otherwise
219
    """
220
    return bool(QUALIFIER_REGEX.match(qualifier))
1✔
221

222

223
def qualifier_is_version(qualifier: str) -> bool:
1✔
224
    """
225
    Checks if a given qualifier represents a version
226

227
    :param qualifier: Qualifier to check
228
    :return: True if it matches a version, false otherwise
229
    """
230
    return bool(VERSION_REGEX.match(qualifier))
1✔
231

232

233
def qualifier_is_alias(qualifier: str) -> bool:
1✔
234
    """
235
    Checks if a given qualifier represents an alias
236

237
    :param qualifier: Qualifier to check
238
    :return: True if it matches an alias, false otherwise
239
    """
240
    return bool(ALIAS_REGEX.match(qualifier))
1✔
241

242

243
def get_function_name(function_arn_or_name: str, context: RequestContext) -> str:
1✔
244
    """
245
    Return function name from a given arn.
246
    Will check if the context region matches the arn region in the arn, if an arn is provided.
247

248
    :param function_arn_or_name: Function arn or only name
249
    :return: function name
250
    """
251
    name, _ = get_name_and_qualifier(function_arn_or_name, qualifier=None, context=context)
1✔
252
    return name
1✔
253

254

255
def function_locators_from_arn(arn: str) -> tuple[str | None, str | None, str | None, str | None]:
1✔
256
    """
257
    Takes a full or partial arn, or a name
258

259
    :param arn: Given arn (or name)
260
    :return: tuple with (name, qualifier, account, region). Qualifier and region are none if missing
261
    """
262

263
    if matched := FUNCTION_NAME_REGEX.match(arn):
1✔
264
        name = matched.group("name")
1✔
265
        qualifier = matched.group("qualifier")
1✔
266
        account = matched.group("account")
1✔
267
        region = matched.group("region")
1✔
268
        return (name, qualifier, account, region)
1✔
269

270
    return None, None, None, None
1✔
271

272

273
def get_account_and_region(function_arn_or_name: str, context: RequestContext) -> tuple[str, str]:
1✔
274
    """
275
    Takes a full ARN, partial ARN or a name. Returns account ID and region from ARN if available, else
276
    falls back to context account ID and region.
277

278
    Lambda allows cross-account access. This function should be used to resolve the correct Store based on the ARN.
279
    """
280
    _, _, account_id, region = function_locators_from_arn(function_arn_or_name)
1✔
281
    return account_id or context.account_id, region or context.region
1✔
282

283

284
def get_name_and_qualifier(
1✔
285
    function_arn_or_name: str, qualifier: str | None, context: RequestContext
286
) -> tuple[str, str | None]:
287
    """
288
    Takes a full or partial arn, or a name and a qualifier.
289

290
    :param function_arn_or_name: Given arn (or name)
291
    :param qualifier: A qualifier for the function (or None)
292
    :param context: Request context
293
    :return: tuple with (name, qualifier). Qualifier is none if missing
294
    :raises: `ResourceNotFoundException` when the context's region differs from the ARN's region
295
    :raises: `AccessDeniedException` when the context's account ID differs from the ARN's account ID
296
    :raises: `ValidationExcpetion` when a function ARN/name or qualifier fails validation checks
297
    :raises: `InvalidParameterValueException` when a qualified arn is provided and the qualifier does not match (but is given)
298
    """
299
    function_name, arn_qualifier, account, region = function_locators_from_arn(function_arn_or_name)
1✔
300
    operation_type = context.operation.name
1✔
301

302
    if operation_type not in _supported_resource_based_operations:
1✔
303
        if account and account != context.account_id:
1✔
304
            raise AccessDeniedException(None)
1✔
305

306
    # TODO: should this only run if operation type is unsupported?
307
    if region and region != context.region:
1✔
308
        raise ResourceNotFoundException(
1✔
309
            f"Functions from '{region}' are not reachable in this region ('{context.region}')",
310
            Type="User",
311
        )
312

313
    validation_errors = []
1✔
314
    if function_arn_or_name:
1✔
315
        validation_errors.extend(validate_function_name(function_arn_or_name, operation_type))
1✔
316

317
    if qualifier:
1✔
318
        validation_errors.extend(validate_qualifier(qualifier))
1✔
319

320
    is_only_function_name = function_arn_or_name == function_name
1✔
321
    if validation_errors:
1✔
322
        message = construct_validation_exception_message(validation_errors)
1✔
323
        # Edge-case where the error type is not ValidationException
324
        if (
1✔
325
            operation_type == "CreateFunction"
326
            and is_only_function_name
327
            and arn_qualifier is None
328
            and region is None
329
        ):  # just name OR partial
330
            raise InvalidParameterValueException(message=message, Type="User")
1✔
331
        raise CommonServiceException(message=message, code="ValidationException")
1✔
332

333
    if qualifier and arn_qualifier and arn_qualifier != qualifier:
1✔
334
        raise InvalidParameterValueException(
1✔
335
            "The derived qualifier from the function name does not match the specified qualifier.",
336
            Type="User",
337
        )
338

339
    qualifier = qualifier or arn_qualifier
1✔
340
    return function_name, qualifier
1✔
341

342

343
def build_statement(
1✔
344
    partition: str,
345
    resource_arn: str,
346
    statement_id: str,
347
    action: str,
348
    principal: str,
349
    source_arn: str | None = None,
350
    source_account: str | None = None,
351
    principal_org_id: str | None = None,
352
    event_source_token: str | None = None,
353
    auth_type: FunctionUrlAuthType | None = None,
354
) -> dict[str, Any]:
355
    statement = {
1✔
356
        "Sid": statement_id,
357
        "Effect": "Allow",
358
        "Action": action,
359
        "Resource": resource_arn,
360
    }
361

362
    # See AWS service principals for comprehensive docs:
363
    # https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements_principal.html
364
    # TODO: validate against actual list of IAM-supported AWS services (e.g., lambda.amazonaws.com)
365
    if principal.endswith(".amazonaws.com"):
1✔
366
        statement["Principal"] = {"Service": principal}
1✔
367
    elif is_aws_account(principal):
1✔
368
        statement["Principal"] = {"AWS": f"arn:{partition}:iam::{principal}:root"}
1✔
369
    # TODO: potentially validate against IAM?
370
    elif re.match(f"{ARN_PARTITION_REGEX}:iam:", principal):
1✔
371
        statement["Principal"] = {"AWS": principal}
1✔
372
    elif principal == "*":
1✔
373
        statement["Principal"] = principal
1✔
374
    # TODO: unclear whether above matching is complete?
375
    else:
376
        raise InvalidParameterValueException(
1✔
377
            "The provided principal was invalid. Please check the principal and try again.",
378
            Type="User",
379
        )
380

381
    condition = {}
1✔
382
    if auth_type:
1✔
383
        update = {"StringEquals": {"lambda:FunctionUrlAuthType": auth_type}}
1✔
384
        condition = merge_recursive(condition, update)
1✔
385

386
    if principal_org_id:
1✔
387
        update = {"StringEquals": {"aws:PrincipalOrgID": principal_org_id}}
1✔
388
        condition = merge_recursive(condition, update)
1✔
389

390
    if source_account:
1✔
391
        update = {"StringEquals": {"AWS:SourceAccount": source_account}}
1✔
392
        condition = merge_recursive(condition, update)
1✔
393

394
    if event_source_token:
1✔
395
        update = {"StringEquals": {"lambda:EventSourceToken": event_source_token}}
1✔
396
        condition = merge_recursive(condition, update)
1✔
397

398
    if source_arn:
1✔
399
        update = {"ArnLike": {"AWS:SourceArn": source_arn}}
1✔
400
        condition = merge_recursive(condition, update)
1✔
401

402
    if condition:
1✔
403
        statement["Condition"] = condition
1✔
404

405
    return statement
1✔
406

407

408
def generate_random_url_id() -> str:
1✔
409
    """
410
    32 characters [0-9a-z] url ID
411
    """
412

413
    return "".join(random.choices(URL_CHAR_SET, k=32))
1✔
414

415

416
def unqualified_lambda_arn(function_name: str, account: str, region: str):
1✔
417
    """
418
    Generate an unqualified lambda arn
419

420
    :param function_name: Function name (not an arn!)
421
    :param account: Account ID
422
    :param region: Region
423
    :return: Unqualified lambda arn
424
    """
425
    return f"arn:{get_partition(region)}:lambda:{region}:{account}:function:{function_name}"
1✔
426

427

428
def qualified_lambda_arn(
1✔
429
    function_name: str, qualifier: str | None, account: str, region: str
430
) -> str:
431
    """
432
    Generate a qualified lambda arn
433

434
    :param function_name: Function name (not an arn!)
435
    :param qualifier: qualifier (will be set to $LATEST if not present)
436
    :param account: Account ID
437
    :param region: Region
438
    :return: Qualified lambda arn
439
    """
440
    qualifier = qualifier or "$LATEST"
1✔
441
    return f"{unqualified_lambda_arn(function_name=function_name, account=account, region=region)}:{qualifier}"
1✔
442

443

444
def lambda_arn(function_name: str, qualifier: str | None, account: str, region: str) -> str:
1✔
445
    """
446
    Return the lambda arn for the given parameters, with a qualifier if supplied, without otherwise
447

448
    :param function_name: Function name
449
    :param qualifier: Qualifier. May be left out, then the returning arn does not have one either
450
    :param account: Account ID
451
    :param region: Region of the Lambda
452
    :return: Lambda Arn with or without qualifier
453
    """
454
    if qualifier:
1✔
455
        return qualified_lambda_arn(
1✔
456
            function_name=function_name, qualifier=qualifier, account=account, region=region
457
        )
458
    else:
459
        return unqualified_lambda_arn(function_name=function_name, account=account, region=region)
1✔
460

461

462
def is_role_arn(role_arn: str) -> bool:
1✔
463
    """
464
    Returns true if the provided string is a role arn, false otherwise
465

466
    :param role_arn: Potential role arn
467
    :return: Boolean indicating if input is a role arn
468
    """
469
    return bool(ROLE_REGEX.match(role_arn))
1✔
470

471

472
def is_aws_account(aws_account: str) -> bool:
1✔
473
    """
474
    Returns true if the provided string is an AWS account, false otherwise
475

476
    :param role_arn: Potential AWS account
477
    :return: Boolean indicating if input is an AWS account
478
    """
479
    return bool(AWS_ACCOUNT_REGEX.match(aws_account))
1✔
480

481

482
def format_lambda_date(date_to_format: datetime.datetime) -> str:
1✔
483
    """Format a given datetime to a string generated with the lambda date format"""
484
    return date_to_format.strftime(LAMBDA_DATE_FORMAT)
1✔
485

486

487
def generate_lambda_date() -> str:
1✔
488
    """Get the current date as string generated with the lambda date format"""
489
    return format_lambda_date(datetime.datetime.now())
1✔
490

491

492
def map_update_status_config(version: "FunctionVersion") -> dict[str, str]:
1✔
493
    """Map version model to dict output"""
494
    result = {}
1✔
495
    if version.config.last_update:
1✔
496
        if version.config.last_update.status:
1✔
497
            result["LastUpdateStatus"] = version.config.last_update.status
1✔
498
        if version.config.last_update.code:
1✔
499
            result["LastUpdateStatusReasonCode"] = version.config.last_update.code
1✔
500
        if version.config.last_update.reason:
1✔
501
            result["LastUpdateStatusReason"] = version.config.last_update.reason
1✔
502
    return result
1✔
503

504

505
def map_state_config(version: "FunctionVersion") -> dict[str, str]:
1✔
506
    """Map version state to dict output"""
507
    result = {}
1✔
508
    if version_state := version.config.state:
1✔
509
        if version_state.state:
1✔
510
            result["State"] = version_state.state
1✔
511
        if version_state.reason:
1✔
512
            result["StateReason"] = version_state.reason
1✔
513
        if version_state.code:
1✔
514
            result["StateReasonCode"] = version_state.code
1✔
515
    return result
1✔
516

517

518
def map_config_out(
1✔
519
    version: "FunctionVersion",
520
    return_qualified_arn: bool = False,
521
    return_update_status: bool = True,
522
    alias_name: str | None = None,
523
) -> FunctionConfiguration:
524
    """map function version to function configuration"""
525

526
    # handle optional entries that shouldn't be rendered at all if not present
527
    optional_kwargs = {}
1✔
528
    if return_update_status:
1✔
529
        optional_kwargs.update(map_update_status_config(version))
1✔
530
    optional_kwargs.update(map_state_config(version))
1✔
531

532
    if version.config.architectures:
1✔
533
        optional_kwargs["Architectures"] = version.config.architectures
1✔
534

535
    if version.config.dead_letter_arn:
1✔
536
        optional_kwargs["DeadLetterConfig"] = DeadLetterConfig(
1✔
537
            TargetArn=version.config.dead_letter_arn
538
        )
539

540
    if version.config.vpc_config:
1✔
541
        optional_kwargs["VpcConfig"] = VpcConfigResponse(
1✔
542
            VpcId=version.config.vpc_config.vpc_id,
543
            SubnetIds=version.config.vpc_config.subnet_ids,
544
            SecurityGroupIds=version.config.vpc_config.security_group_ids,
545
        )
546

547
    if version.config.environment is not None:
1✔
548
        optional_kwargs["Environment"] = EnvironmentResponse(
1✔
549
            Variables=version.config.environment
550
        )  # TODO: Errors key?
551

552
    if version.config.layers:
1✔
553
        optional_kwargs["Layers"] = [
1✔
554
            {"Arn": layer.layer_version_arn, "CodeSize": layer.code.code_size}
555
            for layer in version.config.layers
556
        ]
557
    if version.config.image_config:
1✔
558
        image_config = ImageConfig()
1✔
559
        if version.config.image_config.command:
1✔
560
            image_config["Command"] = version.config.image_config.command
1✔
561
        if version.config.image_config.entrypoint:
1✔
562
            image_config["EntryPoint"] = version.config.image_config.entrypoint
1✔
563
        if version.config.image_config.working_directory:
1✔
564
            image_config["WorkingDirectory"] = version.config.image_config.working_directory
1✔
565
        if image_config:
1✔
566
            optional_kwargs["ImageConfigResponse"] = ImageConfigResponse(ImageConfig=image_config)
1✔
567
    if version.config.code:
1✔
568
        optional_kwargs["CodeSize"] = version.config.code.code_size
1✔
569
        optional_kwargs["CodeSha256"] = version.config.code.code_sha256
1✔
570
    elif version.config.image:
1✔
571
        optional_kwargs["CodeSize"] = 0
1✔
572
        optional_kwargs["CodeSha256"] = version.config.image.code_sha256
1✔
573

574
    # output for an alias qualifier is completely the same except for the returned ARN
575
    if alias_name:
1✔
576
        function_arn = f"{':'.join(version.id.qualified_arn().split(':')[:-1])}:{alias_name}"
1✔
577
    else:
578
        function_arn = (
1✔
579
            version.id.qualified_arn() if return_qualified_arn else version.id.unqualified_arn()
580
        )
581

582
    func_conf = FunctionConfiguration(
1✔
583
        RevisionId=version.config.revision_id,
584
        FunctionName=version.id.function_name,
585
        FunctionArn=function_arn,
586
        LastModified=version.config.last_modified,
587
        Version=version.id.qualifier,
588
        Description=version.config.description,
589
        Role=version.config.role,
590
        Timeout=version.config.timeout,
591
        Runtime=version.config.runtime,
592
        Handler=version.config.handler,
593
        MemorySize=version.config.memory_size,
594
        PackageType=version.config.package_type,
595
        TracingConfig=TracingConfig(Mode=version.config.tracing_config_mode),
596
        EphemeralStorage=EphemeralStorage(Size=version.config.ephemeral_storage.size),
597
        SnapStart=version.config.snap_start,
598
        RuntimeVersionConfig=version.config.runtime_version_config,
599
        LoggingConfig=version.config.logging_config,
600
        **optional_kwargs,
601
    )
602
    return func_conf
1✔
603

604

605
def map_to_list_response(config: FunctionConfiguration) -> FunctionConfiguration:
1✔
606
    """remove values not usually presented in list operations from function config output"""
607
    shallow_copy = config.copy()
1✔
608
    for k in [
1✔
609
        "State",
610
        "StateReason",
611
        "StateReasonCode",
612
        "LastUpdateStatus",
613
        "LastUpdateStatusReason",
614
        "LastUpdateStatusReasonCode",
615
        "RuntimeVersionConfig",
616
    ]:
617
        shallow_copy.pop(k, None)
1✔
618
    return shallow_copy
1✔
619

620

621
def map_alias_out(alias: "VersionAlias", function: "Function") -> AliasConfiguration:
1✔
622
    """map alias model to alias configuration output"""
623
    alias_arn = f"{function.latest().id.unqualified_arn()}:{alias.name}"
1✔
624
    optional_kwargs = {}
1✔
625
    if alias.routing_configuration:
1✔
626
        optional_kwargs |= {
1✔
627
            "RoutingConfig": {
628
                "AdditionalVersionWeights": alias.routing_configuration.version_weights
629
            }
630
        }
631
    return AliasConfiguration(
1✔
632
        AliasArn=alias_arn,
633
        Description=alias.description,
634
        FunctionVersion=alias.function_version,
635
        Name=alias.name,
636
        RevisionId=alias.revision_id,
637
        **optional_kwargs,
638
    )
639

640

641
def validate_and_set_batch_size(service: str, batch_size: int | None = None) -> int:
1✔
642
    min_batch_size = 1
1✔
643

644
    BATCH_SIZE_RANGES = {
1✔
645
        "kafka": (100, 10_000),
646
        "kinesis": (100, 10_000),
647
        "dynamodb": (100, 10_000),
648
        "sqs-fifo": (10, 10),
649
        "sqs": (10, 10_000),
650
        "mq": (100, 10_000),
651
    }
652
    svc_range = BATCH_SIZE_RANGES.get(service)
1✔
653

654
    if svc_range:
1✔
655
        default_batch_size, max_batch_size = svc_range
1✔
656

657
        if batch_size is None:
1✔
658
            batch_size = default_batch_size
1✔
659

660
        if batch_size < min_batch_size or batch_size > max_batch_size:
1✔
UNCOV
661
            raise InvalidParameterValueException("out of bounds todo", Type="User")  # TODO: test
×
662

663
    return batch_size
1✔
664

665

666
def map_layer_out(layer_version: "LayerVersion") -> PublishLayerVersionResponse:
1✔
667
    return PublishLayerVersionResponse(
1✔
668
        Content=LayerVersionContentOutput(
669
            Location=layer_version.code.generate_presigned_url(
670
                endpoint_url=config.external_service_url()
671
            ),
672
            CodeSha256=layer_version.code.code_sha256,
673
            CodeSize=layer_version.code.code_size,
674
            # SigningProfileVersionArn="", # same as in function configuration
675
            # SigningJobArn="" # same as in function configuration
676
        ),
677
        LicenseInfo=layer_version.license_info,
678
        Description=layer_version.description,
679
        CompatibleArchitectures=layer_version.compatible_architectures,
680
        CompatibleRuntimes=layer_version.compatible_runtimes,
681
        CreatedDate=layer_version.created,
682
        LayerArn=layer_version.layer_arn,
683
        LayerVersionArn=layer_version.layer_version_arn,
684
        Version=layer_version.version,
685
    )
686

687

688
def layer_arn(layer_name: str, account: str, region: str):
1✔
689
    return f"arn:{get_partition(region)}:lambda:{region}:{account}:layer:{layer_name}"
1✔
690

691

692
def layer_version_arn(layer_name: str, account: str, region: str, version: str):
1✔
693
    return f"arn:{get_partition(region)}:lambda:{region}:{account}:layer:{layer_name}:{version}"
1✔
694

695

696
def parse_layer_arn(layer_version_arn: str) -> tuple[str, str, str, str]:
1✔
697
    return LAYER_VERSION_ARN_PATTERN.match(layer_version_arn).group(
1✔
698
        "region_name", "account_id", "layer_name", "layer_version"
699
    )
700

701

702
def validate_layer_runtime(compatible_runtime: str) -> str | None:
1✔
703
    if compatible_runtime is not None and compatible_runtime not in ALL_RUNTIMES:
1✔
704
        return f"Value '{compatible_runtime}' at 'compatibleRuntime' failed to satisfy constraint: Member must satisfy enum value set: {VALID_LAYER_RUNTIMES}"
1✔
705
    return None
1✔
706

707

708
def validate_layer_architecture(compatible_architecture: str) -> str | None:
1✔
709
    if compatible_architecture is not None and compatible_architecture not in ARCHITECTURES:
1✔
710
        return f"Value '{compatible_architecture}' at 'compatibleArchitecture' failed to satisfy constraint: Member must satisfy enum value set: [x86_64, arm64]"
1✔
711
    return None
1✔
712

713

714
def validate_layer_runtimes_and_architectures(
1✔
715
    compatible_runtimes: list[str], compatible_architectures: list[str]
716
):
717
    validations = []
1✔
718

719
    if compatible_runtimes and set(compatible_runtimes).difference(ALL_RUNTIMES):
1✔
720
        constraint = f"Member must satisfy enum value set: {VALID_RUNTIMES}"
1✔
721
        validation_msg = f"Value '[{', '.join(list(compatible_runtimes))}]' at 'compatibleRuntimes' failed to satisfy constraint: {constraint}"
1✔
722
        validations.append(validation_msg)
1✔
723

724
    if compatible_architectures and set(compatible_architectures).difference(ARCHITECTURES):
1✔
725
        constraint = "[Member must satisfy enum value set: [x86_64, arm64]]"
1✔
726
        validation_msg = f"Value '[{', '.join(list(compatible_architectures))}]' at 'compatibleArchitectures' failed to satisfy constraint: Member must satisfy constraint: {constraint}"
1✔
727
        validations.append(validation_msg)
1✔
728

729
    return validations
1✔
730

731

732
def is_layer_arn(layer_name: str) -> bool:
1✔
733
    return LAYER_VERSION_ARN_PATTERN.match(layer_name) is not None
1✔
734

735

736
# See Lambda API actions that support resource-based IAM policies
737
# https://docs.aws.amazon.com/lambda/latest/dg/access-control-resource-based.html#permissions-resource-api
738
_supported_resource_based_operations = {
1✔
739
    "CreateAlias",
740
    "DeleteAlias",
741
    "DeleteFunction",
742
    "DeleteFunctionConcurrency",
743
    "DeleteFunctionEventInvokeConfig",
744
    "DeleteProvisionedConcurrencyConfig",
745
    "GetAlias",
746
    "GetFunction",
747
    "GetFunctionConcurrency",
748
    "GetFunctionConfiguration",
749
    "GetFunctionEventInvokeConfig",
750
    "GetPolicy",
751
    "GetProvisionedConcurrencyConfig",
752
    "Invoke",
753
    "ListAliases",
754
    "ListFunctionEventInvokeConfigs",
755
    "ListProvisionedConcurrencyConfigs",
756
    "ListTags",
757
    "ListVersionsByFunction",
758
    "PublishVersion",
759
    "PutFunctionConcurrency",
760
    "PutFunctionEventInvokeConfig",
761
    "PutProvisionedConcurrencyConfig",
762
    "TagResource",
763
    "UntagResource",
764
    "UpdateAlias",
765
    "UpdateFunctionCode",
766
    "UpdateFunctionEventInvokeConfig",
767
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc