• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20565403496

29 Dec 2025 05:11AM UTC coverage: 84.103% (-2.8%) from 86.921%
20565403496

Pull #13567

github

web-flow
Merge 4816837a5 into 2417384aa
Pull Request #13567: Update ASF APIs

67166 of 79862 relevant lines covered (84.1%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.08
/localstack-core/localstack/utils/aws/arns.py
1
import logging
1✔
2
import re
1✔
3
from functools import cache
1✔
4
from typing import TypedDict
1✔
5

6
from botocore.utils import ArnParser, InvalidArnException
1✔
7

8
from localstack.aws.accounts import DEFAULT_AWS_ACCOUNT_ID
1✔
9
from localstack.aws.connect import connect_to
1✔
10
from localstack.utils.strings import long_uid
1✔
11

12
LOG = logging.getLogger(__name__)
1✔
13

14
#
15
# Partition Utilities
16
#
17

18
DEFAULT_PARTITION = "aws"
1✔
19
REGION_PREFIX_TO_PARTITION = {
1✔
20
    # (region prefix, aws partition)
21
    "cn-": "aws-cn",
22
    "us-gov-": "aws-us-gov",
23
    "us-iso-": "aws-iso",
24
    "us-isob-": "aws-iso-b",
25
}
26
PARTITION_NAMES = list(REGION_PREFIX_TO_PARTITION.values()) + [DEFAULT_PARTITION]
1✔
27
ARN_PARTITION_REGEX = r"^arn:(" + "|".join(sorted(PARTITION_NAMES)) + ")"
1✔
28

29

30
def get_partition(region: str | None) -> str:
1✔
31
    if not region:
1✔
32
        return DEFAULT_PARTITION
1✔
33
    if region in PARTITION_NAMES:
1✔
34
        return region
×
35
    for prefix in REGION_PREFIX_TO_PARTITION:
1✔
36
        if region.startswith(prefix):
1✔
37
            return REGION_PREFIX_TO_PARTITION[prefix]
×
38
    return DEFAULT_PARTITION
1✔
39

40

41
#
42
# ARN parsing utilities
43
#
44

45

46
class ArnData(TypedDict):
1✔
47
    partition: str
1✔
48
    service: str
1✔
49
    region: str
1✔
50
    account: str
1✔
51
    resource: str
1✔
52

53

54
_arn_parser = ArnParser()
1✔
55

56

57
def parse_arn(arn: str) -> ArnData:
1✔
58
    """
59
    Uses a botocore ArnParser to parse an arn.
60

61
    :param arn: the arn string to parse
62
    :returns: a dictionary containing the ARN components
63
    :raises InvalidArnException: if the arn is invalid
64
    """
65
    return _arn_parser.parse_arn(arn)
1✔
66

67

68
def extract_account_id_from_arn(arn: str) -> str | None:
1✔
69
    try:
1✔
70
        return parse_arn(arn).get("account")
1✔
71
    except InvalidArnException:
1✔
72
        return None
1✔
73

74

75
def extract_region_from_arn(arn: str) -> str | None:
1✔
76
    try:
1✔
77
        return parse_arn(arn).get("region")
1✔
78
    except InvalidArnException:
1✔
79
        return None
1✔
80

81

82
def extract_service_from_arn(arn: str) -> str | None:
1✔
83
    try:
1✔
84
        return parse_arn(arn).get("service")
1✔
85
    except InvalidArnException:
×
86
        return None
×
87

88

89
def extract_resource_from_arn(arn: str) -> str | None:
1✔
90
    try:
1✔
91
        return parse_arn(arn).get("resource")
1✔
92
    except InvalidArnException:
×
93
        return None
×
94

95

96
#
97
# Generic ARN builder
98
#
99

100

101
def _resource_arn(
1✔
102
    name: str, pattern: str, account_id: str, region_name: str, allow_colons=False
103
) -> str:
104
    if ":" in name and not allow_colons:
1✔
105
        return name
1✔
106
    if len(pattern.split("%s")) == 4:
1✔
107
        return pattern % (get_partition(region_name), account_id, name)
1✔
108
    return pattern % (get_partition(region_name), region_name, account_id, name)
1✔
109

110

111
#
112
# ARN builders for specific resource types
113
#
114

115
#
116
# IAM
117
#
118

119

120
def iam_role_arn(role_name: str, account_id: str, region_name: str) -> str:
1✔
121
    if not role_name:
1✔
122
        return role_name
×
123
    if re.match(f"{ARN_PARTITION_REGEX}:iam::", role_name):
1✔
124
        return role_name
×
125
    return f"arn:{get_partition(region_name)}:iam::{account_id}:role/{role_name}"
1✔
126

127

128
def iam_resource_arn(resource: str, account_id: str, role: str = None) -> str:
1✔
129
    if not role:
1✔
130
        role = f"role-{resource}"
1✔
131
    # Only used in tests, so we can hardcode the region for now
132
    return iam_role_arn(role_name=role, account_id=account_id, region_name="us-east-1")
1✔
133

134

135
#
136
# Secretsmanager
137
#
138

139

140
def secretsmanager_secret_arn(
1✔
141
    secret_id: str, account_id: str, region_name: str, random_suffix: str = None
142
) -> str:
143
    if ":" in (secret_id or ""):
×
144
        return secret_id
×
145
    pattern = "arn:%s:secretsmanager:%s:%s:secret:%s"
×
146
    arn = _resource_arn(secret_id, pattern, account_id=account_id, region_name=region_name)
×
147
    if random_suffix:
×
148
        arn += f"-{random_suffix}"
×
149
    return arn
×
150

151

152
#
153
# Cloudformation
154
#
155

156

157
def cloudformation_stack_arn(
1✔
158
    stack_name: str, stack_id: str, account_id: str, region_name: str
159
) -> str:
160
    pattern = f"arn:%s:cloudformation:%s:%s:stack/%s/{stack_id}"
1✔
161
    return _resource_arn(stack_name, pattern, account_id=account_id, region_name=region_name)
1✔
162

163

164
def cloudformation_change_set_arn(
1✔
165
    change_set_name: str, change_set_id: str, account_id: str, region_name: str
166
) -> str:
167
    pattern = f"arn:%s:cloudformation:%s:%s:changeSet/%s/{change_set_id}"
1✔
168
    return _resource_arn(change_set_name, pattern, account_id=account_id, region_name=region_name)
1✔
169

170

171
#
172
# DynamoDB
173
#
174

175

176
def dynamodb_table_arn(table_name: str, account_id: str, region_name: str) -> str:
1✔
177
    table_name = table_name.split(":table/")[-1]
1✔
178
    pattern = "arn:%s:dynamodb:%s:%s:table/%s"
1✔
179
    return _resource_arn(table_name, pattern, account_id=account_id, region_name=region_name)
1✔
180

181

182
def dynamodb_stream_arn(
1✔
183
    table_name: str, latest_stream_label: str, account_id: str, region_name: str
184
) -> str:
185
    return f"arn:{get_partition(region_name)}:dynamodb:{region_name}:{account_id}:table/{table_name}/stream/{latest_stream_label}"
1✔
186

187

188
#
189
# Cloudwatch
190
#
191

192

193
def cloudwatch_alarm_arn(alarm_name: str, account_id: str, region_name: str) -> str:
1✔
194
    # format pattern directly as alarm_name can include ":" and this is not supported by the helper _resource_arn
195
    return (
1✔
196
        f"arn:{get_partition(region_name)}:cloudwatch:{region_name}:{account_id}:alarm:{alarm_name}"
197
    )
198

199

200
def cloudwatch_dashboard_arn(dashboard_name: str, account_id: str, region_name: str) -> str:
1✔
201
    pattern = "arn:%s:cloudwatch::%s:dashboard/%s"
1✔
202
    return _resource_arn(dashboard_name, pattern, account_id=account_id, region_name=region_name)
1✔
203

204

205
#
206
# Logs
207
#
208

209

210
def log_group_arn(group_name: str, account_id: str, region_name: str) -> str:
1✔
211
    pattern = "arn:%s:logs:%s:%s:log-group:%s"
1✔
212
    return _resource_arn(group_name, pattern, account_id=account_id, region_name=region_name)
1✔
213

214

215
#
216
# Events
217
#
218

219

220
def events_archive_arn(archive_name: str, account_id: str, region_name: str) -> str:
1✔
221
    pattern = "arn:%s:events:%s:%s:archive/%s"
1✔
222
    return _resource_arn(archive_name, pattern, account_id=account_id, region_name=region_name)
1✔
223

224

225
def event_bus_arn(bus_name: str, account_id: str, region_name: str) -> str:
1✔
226
    pattern = "arn:%s:events:%s:%s:event-bus/%s"
1✔
227
    return _resource_arn(bus_name, pattern, account_id=account_id, region_name=region_name)
1✔
228

229

230
def events_replay_arn(replay_name: str, account_id: str, region_name: str) -> str:
1✔
231
    pattern = "arn:%s:events:%s:%s:replay/%s"
1✔
232
    return _resource_arn(replay_name, pattern, account_id=account_id, region_name=region_name)
1✔
233

234

235
def events_rule_arn(
1✔
236
    rule_name: str, account_id: str, region_name: str, event_bus_name: str = "default"
237
) -> str:
238
    pattern = "arn:%s:events:%s:%s:rule/%s"
1✔
239
    if event_bus_name != "default":
1✔
240
        rule_name = f"{event_bus_name}/{rule_name}"
1✔
241
    return _resource_arn(rule_name, pattern, account_id=account_id, region_name=region_name)
1✔
242

243

244
def events_connection_arn(
1✔
245
    connection_name: str, connection_id: str, account_id: str, region_name: str
246
) -> str:
247
    name = f"{connection_name}/{connection_id}"
1✔
248
    pattern = "arn:%s:events:%s:%s:connection/%s"
1✔
249
    return _resource_arn(name, pattern, account_id=account_id, region_name=region_name)
1✔
250

251

252
def events_api_destination_arn(
1✔
253
    api_destination_name: str, api_destination_id: str, account_id: str, region_name: str
254
) -> str:
255
    name = f"{api_destination_name}/{api_destination_id}"
1✔
256
    pattern = "arn:%s:events:%s:%s:api-destination/%s"
1✔
257
    return _resource_arn(name, pattern, account_id=account_id, region_name=region_name)
1✔
258

259

260
#
261
# Lambda
262
#
263

264

265
def lambda_function_arn(function_name: str, account_id: str, region_name: str) -> str:
1✔
266
    return lambda_function_or_layer_arn(
1✔
267
        "function", function_name, version=None, account_id=account_id, region_name=region_name
268
    )
269

270

271
def lambda_layer_arn(layer_name: str, account_id: str, region_name: str) -> str:
1✔
272
    return lambda_function_or_layer_arn(
×
273
        "layer", layer_name, version=None, account_id=account_id, region_name=region_name
274
    )
275

276

277
def lambda_code_signing_arn(code_signing_id: str, account_id: str, region_name: str) -> str:
1✔
278
    pattern = "arn:%s:lambda:%s:%s:code-signing-config:%s"
×
279
    return _resource_arn(code_signing_id, pattern, account_id=account_id, region_name=region_name)
×
280

281

282
def lambda_event_source_mapping_arn(uuid: str, account_id: str, region_name: str) -> str:
1✔
283
    pattern = "arn:%s:lambda:%s:%s:event-source-mapping:%s"
1✔
284
    return _resource_arn(uuid, pattern, account_id=account_id, region_name=region_name)
1✔
285

286

287
def capacity_provider_arn(capacity_provider_name: str, account_id: str, region_name: str) -> str:
1✔
288
    pattern = "arn:%s:lambda:%s:%s:capacity-provider:%s"
1✔
289
    return _resource_arn(
1✔
290
        capacity_provider_name, pattern, account_id=account_id, region_name=region_name
291
    )
292

293

294
def lambda_function_or_layer_arn(
1✔
295
    type: str,
296
    entity_name: str,
297
    version: str | None,
298
    account_id: str,
299
    region_name: str,
300
) -> str:
301
    pattern = "arn:([a-z-]+):lambda:.*:.*:(function|layer):.*"
1✔
302
    if re.match(pattern, entity_name):
1✔
303
        return entity_name
×
304
    if ":" in entity_name:
1✔
305
        client = connect_to(aws_access_key_id=account_id, region_name=region_name).lambda_
×
306
        entity_name, _, alias = entity_name.rpartition(":")
×
307
        try:
×
308
            alias_response = client.get_alias(FunctionName=entity_name, Name=alias)
×
309
            version = alias_response["FunctionVersion"]
×
310

311
        except Exception as e:
×
312
            msg = f"Alias {alias} of {entity_name} not found"
×
313
            LOG.info("%s: %s", msg, e)
×
314
            raise Exception(msg)
×
315

316
    result = (
1✔
317
        f"arn:{get_partition(region_name)}:lambda:{region_name}:{account_id}:{type}:{entity_name}"
318
    )
319
    if version:
1✔
320
        result = f"{result}:{version}"
×
321
    return result
1✔
322

323

324
#
325
# Stepfunctions
326
#
327

328

329
def stepfunctions_state_machine_arn(name: str, account_id: str, region_name: str) -> str:
1✔
330
    pattern = "arn:%s:states:%s:%s:stateMachine:%s"
1✔
331
    return _resource_arn(name, pattern, account_id=account_id, region_name=region_name)
1✔
332

333

334
def stepfunctions_standard_execution_arn(state_machine_arn: str, execution_name: str) -> str:
1✔
335
    arn_data: ArnData = parse_arn(state_machine_arn)
1✔
336
    standard_execution_arn = ":".join(
1✔
337
        [
338
            "arn",
339
            arn_data["partition"],
340
            arn_data["service"],
341
            arn_data["region"],
342
            arn_data["account"],
343
            "execution",
344
            "".join(arn_data["resource"].split(":")[1:]),
345
            execution_name,
346
        ]
347
    )
348
    return standard_execution_arn
1✔
349

350

351
def stepfunctions_express_execution_arn(state_machine_arn: str, execution_name: str) -> str:
1✔
352
    arn_data: ArnData = parse_arn(state_machine_arn)
1✔
353
    express_execution_arn = ":".join(
1✔
354
        [
355
            "arn",
356
            arn_data["partition"],
357
            arn_data["service"],
358
            arn_data["region"],
359
            arn_data["account"],
360
            "express",
361
            "".join(arn_data["resource"].split(":")[1:]),
362
            execution_name,
363
            long_uid(),
364
        ]
365
    )
366
    return express_execution_arn
1✔
367

368

369
def stepfunctions_activity_arn(name: str, account_id: str, region_name: str) -> str:
1✔
370
    pattern = "arn:%s:states:%s:%s:activity:%s"
1✔
371
    return _resource_arn(name, pattern, account_id=account_id, region_name=region_name)
1✔
372

373

374
#
375
# Cognito
376
#
377

378

379
def cognito_user_pool_arn(user_pool_id: str, account_id: str, region_name: str) -> str:
1✔
380
    pattern = "arn:%s:cognito-idp:%s:%s:userpool/%s"
×
381
    return _resource_arn(user_pool_id, pattern, account_id=account_id, region_name=region_name)
×
382

383

384
#
385
# Kinesis
386
#
387

388

389
def kinesis_stream_arn(stream_name: str, account_id: str, region_name: str) -> str:
1✔
390
    pattern = "arn:%s:kinesis:%s:%s:stream/%s"
1✔
391
    return _resource_arn(stream_name, pattern, account_id, region_name)
1✔
392

393

394
#
395
# Elasticsearch
396
#
397

398

399
def elasticsearch_domain_arn(domain_name: str, account_id: str, region_name: str) -> str:
1✔
400
    pattern = "arn:%s:es:%s:%s:domain/%s"
×
401
    return _resource_arn(domain_name, pattern, account_id=account_id, region_name=region_name)
×
402

403

404
#
405
# Firehose
406
#
407

408

409
def firehose_stream_arn(stream_name: str, account_id: str, region_name: str) -> str:
1✔
410
    pattern = "arn:%s:firehose:%s:%s:deliverystream/%s"
1✔
411
    return _resource_arn(stream_name, pattern, account_id=account_id, region_name=region_name)
1✔
412

413

414
#
415
# KMS
416
#
417

418

419
def kms_key_arn(key_id: str, account_id: str, region_name: str) -> str:
1✔
420
    pattern = "arn:%s:kms:%s:%s:key/%s"
1✔
421
    return _resource_arn(key_id, pattern, account_id=account_id, region_name=region_name)
1✔
422

423

424
def kms_alias_arn(alias_name: str, account_id: str, region_name: str):
1✔
425
    if not alias_name.startswith("alias/"):
1✔
426
        alias_name = "alias/" + alias_name
×
427
    pattern = "arn:%s:kms:%s:%s:%s"
1✔
428
    return _resource_arn(alias_name, pattern, account_id=account_id, region_name=region_name)
1✔
429

430

431
#
432
# SSM
433
#
434

435

436
def ssm_parameter_arn(param_name: str, account_id: str, region_name: str) -> str:
1✔
437
    pattern = "arn:%s:ssm:%s:%s:parameter/%s"
×
438
    param_name = param_name.lstrip("/")
×
439
    return _resource_arn(param_name, pattern, account_id=account_id, region_name=region_name)
×
440

441

442
#
443
# S3
444
#
445

446

447
def s3_bucket_arn(bucket_name_or_arn: str, region="us-east-1") -> str:
1✔
448
    bucket_name = s3_bucket_name(bucket_name_or_arn)
1✔
449
    return f"arn:{get_partition(region)}:s3:::{bucket_name}"
1✔
450

451

452
#
453
# SQS
454
#
455

456

457
def sqs_queue_arn(queue_name: str, account_id: str, region_name: str) -> str:
1✔
458
    queue_name = queue_name.split("/")[-1]
1✔
459
    return f"arn:{get_partition(region_name)}:sqs:{region_name}:{account_id}:{queue_name}"
1✔
460

461

462
#
463
# APIGW
464
#
465

466

467
def apigateway_restapi_arn(api_id: str, account_id: str, region_name: str) -> str:
1✔
468
    return (
1✔
469
        f"arn:{get_partition(region_name)}:apigateway:{region_name}:{account_id}:/restapis/{api_id}"
470
    )
471

472

473
def apigateway_invocations_arn(lambda_uri: str, region_name: str) -> str:
1✔
474
    return f"arn:{get_partition(region_name)}:apigateway:{region_name}:lambda:path/2015-03-31/functions/{lambda_uri}/invocations"
1✔
475

476

477
#
478
# SNS
479
#
480

481

482
def sns_topic_arn(topic_name: str, account_id: str, region_name: str) -> str:
1✔
483
    return f"arn:{get_partition(region_name)}:sns:{region_name}:{account_id}:{topic_name}"
1✔
484

485

486
def sns_platform_application_arn(
1✔
487
    platform_application_name: str, platform: str, account_id: str, region_name: str
488
) -> str:
489
    return f"arn:{get_partition(region_name)}:sns:{region_name}:{account_id}:app/{platform}/{platform_application_name}"
×
490

491

492
#
493
# ECR
494
#
495

496

497
def ecr_repository_arn(name: str, account_id: str, region_name: str) -> str:
1✔
498
    pattern = "arn:%s:ecr:%s:%s:repository/%s"
1✔
499
    return _resource_arn(name, pattern, account_id=account_id, region_name=region_name)
1✔
500

501

502
#
503
# Route53
504
#
505

506

507
def route53_resolver_firewall_rule_group_arn(id: str, account_id: str, region_name: str) -> str:
1✔
508
    pattern = "arn:%s:route53resolver:%s:%s:firewall-rule-group/%s"
1✔
509
    return _resource_arn(id, pattern, account_id=account_id, region_name=region_name)
1✔
510

511

512
def route53_resolver_firewall_domain_list_arn(id: str, account_id: str, region_name: str) -> str:
1✔
513
    pattern = "arn:%s:route53resolver:%s:%s:firewall-domain-list/%s"
1✔
514
    return _resource_arn(id, pattern, account_id=account_id, region_name=region_name)
1✔
515

516

517
def route53_resolver_firewall_rule_group_associations_arn(
1✔
518
    id: str, account_id: str, region_name: str
519
) -> str:
520
    pattern = "arn:%s:route53resolver:%s:%s:firewall-rule-group-association/%s"
×
521
    return _resource_arn(id, pattern, account_id=account_id, region_name=region_name)
×
522

523

524
def route53_resolver_query_log_config_arn(id: str, account_id: str, region_name: str) -> str:
1✔
525
    pattern = "arn:%s:route53resolver:%s:%s:resolver-query-log-config/%s"
1✔
526
    return _resource_arn(id, pattern, account_id=account_id, region_name=region_name)
1✔
527

528

529
#
530
# SES
531
#
532

533

534
def ses_identity_arn(email: str, account_id: str, region_name: str) -> str:
1✔
535
    pattern = "arn:%s:ses:%s:%s:identity/%s"
×
536
    return _resource_arn(email, pattern, account_id=account_id, region_name=region_name)
×
537

538

539
#
540
# Other ARN related helpers
541
#
542

543

544
def opensearch_domain_name(domain_arn: str) -> str:
1✔
545
    return domain_arn.rpartition("/")[2]
×
546

547

548
def firehose_name(firehose_arn: str) -> str:
1✔
549
    return firehose_arn.split("/")[-1]
1✔
550

551

552
def kinesis_stream_name(kinesis_arn: str) -> str:
1✔
553
    return kinesis_arn.split(":stream/")[-1]
1✔
554

555

556
def lambda_function_name(name_or_arn: str) -> str:
1✔
557
    if ":" in name_or_arn:
1✔
558
        arn = parse_arn(name_or_arn)
1✔
559
        if arn["service"] != "lambda":
1✔
560
            raise ValueError(f"arn is not a lambda arn {name_or_arn}")
1✔
561

562
        return parse_arn(name_or_arn)["resource"].split(":")[1]
1✔
563
    else:
564
        return name_or_arn
1✔
565

566

567
@cache
1✔
568
def sqs_queue_url_for_arn(queue_arn: str) -> str:
1✔
569
    """
570
    Return the SQS queue URL for the given queue ARN.
571
    """
572
    if "://" in queue_arn:
1✔
573
        return queue_arn
×
574

575
    try:
1✔
576
        arn = parse_arn(queue_arn)
1✔
577
        account_id = arn["account"]
1✔
578
        region_name = arn["region"]
1✔
579
        queue_name = arn["resource"]
1✔
580
    except InvalidArnException:
×
581
        account_id = DEFAULT_AWS_ACCOUNT_ID
×
582
        region_name = None
×
583
        queue_name = queue_arn
×
584

585
    sqs_client = connect_to(region_name=region_name).sqs
1✔
586
    result = sqs_client.get_queue_url(QueueName=queue_name, QueueOwnerAWSAccountId=account_id)[
1✔
587
        "QueueUrl"
588
    ]
589
    return result
1✔
590

591

592
def sqs_queue_name(queue_arn: str) -> str:
1✔
593
    if ":" in queue_arn:
1✔
594
        return parse_arn(queue_arn)["resource"]
1✔
595
    else:
596
        return queue_arn
×
597

598

599
def s3_bucket_name(bucket_name_or_arn: str) -> str:
1✔
600
    return bucket_name_or_arn.split(":::")[-1]
1✔
601

602

603
def is_arn(possible_arn: str) -> bool:
1✔
604
    try:
1✔
605
        parse_arn(possible_arn)
1✔
606
        return True
1✔
607
    except InvalidArnException:
1✔
608
        return False
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc