• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 17265699519

27 Aug 2025 11:28AM UTC coverage: 86.827% (-0.01%) from 86.837%
17265699519

push

github

web-flow
Fix SQS tests failing due to missing snapshot update after #12957 (#13062)

67057 of 77231 relevant lines covered (86.83%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.1
/localstack-core/localstack/utils/aws/arns.py
1
import logging
1✔
2
import re
1✔
3
from functools import cache
1✔
4
from typing import Optional, TypedDict
1✔
5

6
from botocore.utils import ArnParser, InvalidArnException
1✔
7

8
from localstack.aws.accounts import DEFAULT_AWS_ACCOUNT_ID
1✔
9
from localstack.aws.connect import connect_to
1✔
10
from localstack.utils.strings import long_uid
1✔
11

12
LOG = logging.getLogger(__name__)
1✔
13

14
#
15
# Partition Utilities
16
#
17

18
DEFAULT_PARTITION = "aws"
1✔
19
REGION_PREFIX_TO_PARTITION = {
1✔
20
    # (region prefix, aws partition)
21
    "cn-": "aws-cn",
22
    "us-gov-": "aws-us-gov",
23
    "us-iso-": "aws-iso",
24
    "us-isob-": "aws-iso-b",
25
}
26
PARTITION_NAMES = list(REGION_PREFIX_TO_PARTITION.values()) + [DEFAULT_PARTITION]
1✔
27
ARN_PARTITION_REGEX = r"^arn:(" + "|".join(sorted(PARTITION_NAMES)) + ")"
1✔
28

29

30
def get_partition(region: Optional[str]) -> str:
1✔
31
    if not region:
1✔
32
        return DEFAULT_PARTITION
1✔
33
    if region in PARTITION_NAMES:
1✔
34
        return region
×
35
    for prefix in REGION_PREFIX_TO_PARTITION:
1✔
36
        if region.startswith(prefix):
1✔
37
            return REGION_PREFIX_TO_PARTITION[prefix]
×
38
    return DEFAULT_PARTITION
1✔
39

40

41
#
42
# ARN parsing utilities
43
#
44

45

46
class ArnData(TypedDict):
1✔
47
    partition: str
1✔
48
    service: str
1✔
49
    region: str
1✔
50
    account: str
1✔
51
    resource: str
1✔
52

53

54
_arn_parser = ArnParser()
1✔
55

56

57
def parse_arn(arn: str) -> ArnData:
1✔
58
    """
59
    Uses a botocore ArnParser to parse an arn.
60

61
    :param arn: the arn string to parse
62
    :returns: a dictionary containing the ARN components
63
    :raises InvalidArnException: if the arn is invalid
64
    """
65
    return _arn_parser.parse_arn(arn)
1✔
66

67

68
def extract_account_id_from_arn(arn: str) -> Optional[str]:
1✔
69
    try:
1✔
70
        return parse_arn(arn).get("account")
1✔
71
    except InvalidArnException:
1✔
72
        return None
1✔
73

74

75
def extract_region_from_arn(arn: str) -> Optional[str]:
1✔
76
    try:
1✔
77
        return parse_arn(arn).get("region")
1✔
78
    except InvalidArnException:
1✔
79
        return None
1✔
80

81

82
def extract_service_from_arn(arn: str) -> Optional[str]:
1✔
83
    try:
1✔
84
        return parse_arn(arn).get("service")
1✔
85
    except InvalidArnException:
×
86
        return None
×
87

88

89
def extract_resource_from_arn(arn: str) -> Optional[str]:
1✔
90
    try:
1✔
91
        return parse_arn(arn).get("resource")
1✔
92
    except InvalidArnException:
×
93
        return None
×
94

95

96
#
97
# Generic ARN builder
98
#
99

100

101
def _resource_arn(name: str, pattern: str, account_id: str, region_name: str) -> str:
1✔
102
    if ":" in name:
1✔
103
        return name
1✔
104
    if len(pattern.split("%s")) == 4:
1✔
105
        return pattern % (get_partition(region_name), account_id, name)
1✔
106
    return pattern % (get_partition(region_name), region_name, account_id, name)
1✔
107

108

109
#
110
# ARN builders for specific resource types
111
#
112

113
#
114
# IAM
115
#
116

117

118
def iam_role_arn(role_name: str, account_id: str, region_name: str) -> str:
1✔
119
    if not role_name:
1✔
120
        return role_name
×
121
    if re.match(f"{ARN_PARTITION_REGEX}:iam::", role_name):
1✔
122
        return role_name
×
123
    return f"arn:{get_partition(region_name)}:iam::{account_id}:role/{role_name}"
1✔
124

125

126
def iam_resource_arn(resource: str, account_id: str, role: str = None) -> str:
1✔
127
    if not role:
1✔
128
        role = f"role-{resource}"
1✔
129
    # Only used in tests, so we can hardcode the region for now
130
    return iam_role_arn(role_name=role, account_id=account_id, region_name="us-east-1")
1✔
131

132

133
#
134
# Secretsmanager
135
#
136

137

138
def secretsmanager_secret_arn(
1✔
139
    secret_id: str, account_id: str, region_name: str, random_suffix: str = None
140
) -> str:
141
    if ":" in (secret_id or ""):
×
142
        return secret_id
×
143
    pattern = "arn:%s:secretsmanager:%s:%s:secret:%s"
×
144
    arn = _resource_arn(secret_id, pattern, account_id=account_id, region_name=region_name)
×
145
    if random_suffix:
×
146
        arn += f"-{random_suffix}"
×
147
    return arn
×
148

149

150
#
151
# Cloudformation
152
#
153

154

155
def cloudformation_stack_arn(
1✔
156
    stack_name: str, stack_id: str, account_id: str, region_name: str
157
) -> str:
158
    pattern = f"arn:%s:cloudformation:%s:%s:stack/%s/{stack_id}"
1✔
159
    return _resource_arn(stack_name, pattern, account_id=account_id, region_name=region_name)
1✔
160

161

162
def cloudformation_change_set_arn(
1✔
163
    change_set_name: str, change_set_id: str, account_id: str, region_name: str
164
) -> str:
165
    pattern = f"arn:%s:cloudformation:%s:%s:changeSet/%s/{change_set_id}"
1✔
166
    return _resource_arn(change_set_name, pattern, account_id=account_id, region_name=region_name)
1✔
167

168

169
#
170
# DynamoDB
171
#
172

173

174
def dynamodb_table_arn(table_name: str, account_id: str, region_name: str) -> str:
1✔
175
    table_name = table_name.split(":table/")[-1]
1✔
176
    pattern = "arn:%s:dynamodb:%s:%s:table/%s"
1✔
177
    return _resource_arn(table_name, pattern, account_id=account_id, region_name=region_name)
1✔
178

179

180
def dynamodb_stream_arn(
1✔
181
    table_name: str, latest_stream_label: str, account_id: str, region_name: str
182
) -> str:
183
    return f"arn:{get_partition(region_name)}:dynamodb:{region_name}:{account_id}:table/{table_name}/stream/{latest_stream_label}"
1✔
184

185

186
#
187
# Cloudwatch
188
#
189

190

191
def cloudwatch_alarm_arn(alarm_name: str, account_id: str, region_name: str) -> str:
1✔
192
    # format pattern directly as alarm_name can include ":" and this is not supported by the helper _resource_arn
193
    return (
1✔
194
        f"arn:{get_partition(region_name)}:cloudwatch:{region_name}:{account_id}:alarm:{alarm_name}"
195
    )
196

197

198
def cloudwatch_dashboard_arn(dashboard_name: str, account_id: str, region_name: str) -> str:
1✔
199
    pattern = "arn:%s:cloudwatch::%s:dashboard/%s"
1✔
200
    return _resource_arn(dashboard_name, pattern, account_id=account_id, region_name=region_name)
1✔
201

202

203
#
204
# Logs
205
#
206

207

208
def log_group_arn(group_name: str, account_id: str, region_name: str) -> str:
1✔
209
    pattern = "arn:%s:logs:%s:%s:log-group:%s"
1✔
210
    return _resource_arn(group_name, pattern, account_id=account_id, region_name=region_name)
1✔
211

212

213
#
214
# Events
215
#
216

217

218
def events_archive_arn(archive_name: str, account_id: str, region_name: str) -> str:
1✔
219
    pattern = "arn:%s:events:%s:%s:archive/%s"
1✔
220
    return _resource_arn(archive_name, pattern, account_id=account_id, region_name=region_name)
1✔
221

222

223
def event_bus_arn(bus_name: str, account_id: str, region_name: str) -> str:
1✔
224
    pattern = "arn:%s:events:%s:%s:event-bus/%s"
1✔
225
    return _resource_arn(bus_name, pattern, account_id=account_id, region_name=region_name)
1✔
226

227

228
def events_replay_arn(replay_name: str, account_id: str, region_name: str) -> str:
1✔
229
    pattern = "arn:%s:events:%s:%s:replay/%s"
1✔
230
    return _resource_arn(replay_name, pattern, account_id=account_id, region_name=region_name)
1✔
231

232

233
def events_rule_arn(
1✔
234
    rule_name: str, account_id: str, region_name: str, event_bus_name: str = "default"
235
) -> str:
236
    pattern = "arn:%s:events:%s:%s:rule/%s"
1✔
237
    if event_bus_name != "default":
1✔
238
        rule_name = f"{event_bus_name}/{rule_name}"
1✔
239
    return _resource_arn(rule_name, pattern, account_id=account_id, region_name=region_name)
1✔
240

241

242
def events_connection_arn(
1✔
243
    connection_name: str, connection_id: str, account_id: str, region_name: str
244
) -> str:
245
    name = f"{connection_name}/{connection_id}"
1✔
246
    pattern = "arn:%s:events:%s:%s:connection/%s"
1✔
247
    return _resource_arn(name, pattern, account_id=account_id, region_name=region_name)
1✔
248

249

250
def events_api_destination_arn(
1✔
251
    api_destination_name: str, api_destination_id: str, account_id: str, region_name: str
252
) -> str:
253
    name = f"{api_destination_name}/{api_destination_id}"
1✔
254
    pattern = "arn:%s:events:%s:%s:api-destination/%s"
1✔
255
    return _resource_arn(name, pattern, account_id=account_id, region_name=region_name)
1✔
256

257

258
#
259
# Lambda
260
#
261

262

263
def lambda_function_arn(function_name: str, account_id: str, region_name: str) -> str:
1✔
264
    return lambda_function_or_layer_arn(
1✔
265
        "function", function_name, version=None, account_id=account_id, region_name=region_name
266
    )
267

268

269
def lambda_layer_arn(layer_name: str, account_id: str, region_name: str) -> str:
1✔
270
    return lambda_function_or_layer_arn(
×
271
        "layer", layer_name, version=None, account_id=account_id, region_name=region_name
272
    )
273

274

275
def lambda_code_signing_arn(code_signing_id: str, account_id: str, region_name: str) -> str:
1✔
276
    pattern = "arn:%s:lambda:%s:%s:code-signing-config:%s"
×
277
    return _resource_arn(code_signing_id, pattern, account_id=account_id, region_name=region_name)
×
278

279

280
def lambda_event_source_mapping_arn(uuid: str, account_id: str, region_name: str) -> str:
1✔
281
    pattern = "arn:%s:lambda:%s:%s:event-source-mapping:%s"
1✔
282
    return _resource_arn(uuid, pattern, account_id=account_id, region_name=region_name)
1✔
283

284

285
def lambda_function_or_layer_arn(
1✔
286
    type: str,
287
    entity_name: str,
288
    version: Optional[str],
289
    account_id: str,
290
    region_name: str,
291
) -> str:
292
    pattern = "arn:([a-z-]+):lambda:.*:.*:(function|layer):.*"
1✔
293
    if re.match(pattern, entity_name):
1✔
294
        return entity_name
×
295
    if ":" in entity_name:
1✔
296
        client = connect_to(aws_access_key_id=account_id, region_name=region_name).lambda_
×
297
        entity_name, _, alias = entity_name.rpartition(":")
×
298
        try:
×
299
            alias_response = client.get_alias(FunctionName=entity_name, Name=alias)
×
300
            version = alias_response["FunctionVersion"]
×
301

302
        except Exception as e:
×
303
            msg = f"Alias {alias} of {entity_name} not found"
×
304
            LOG.info("%s: %s", msg, e)
×
305
            raise Exception(msg)
×
306

307
    result = (
1✔
308
        f"arn:{get_partition(region_name)}:lambda:{region_name}:{account_id}:{type}:{entity_name}"
309
    )
310
    if version:
1✔
311
        result = f"{result}:{version}"
×
312
    return result
1✔
313

314

315
#
316
# Stepfunctions
317
#
318

319

320
def stepfunctions_state_machine_arn(name: str, account_id: str, region_name: str) -> str:
1✔
321
    pattern = "arn:%s:states:%s:%s:stateMachine:%s"
1✔
322
    return _resource_arn(name, pattern, account_id=account_id, region_name=region_name)
1✔
323

324

325
def stepfunctions_standard_execution_arn(state_machine_arn: str, execution_name: str) -> str:
1✔
326
    arn_data: ArnData = parse_arn(state_machine_arn)
1✔
327
    standard_execution_arn = ":".join(
1✔
328
        [
329
            "arn",
330
            arn_data["partition"],
331
            arn_data["service"],
332
            arn_data["region"],
333
            arn_data["account"],
334
            "execution",
335
            "".join(arn_data["resource"].split(":")[1:]),
336
            execution_name,
337
        ]
338
    )
339
    return standard_execution_arn
1✔
340

341

342
def stepfunctions_express_execution_arn(state_machine_arn: str, execution_name: str) -> str:
1✔
343
    arn_data: ArnData = parse_arn(state_machine_arn)
1✔
344
    express_execution_arn = ":".join(
1✔
345
        [
346
            "arn",
347
            arn_data["partition"],
348
            arn_data["service"],
349
            arn_data["region"],
350
            arn_data["account"],
351
            "express",
352
            "".join(arn_data["resource"].split(":")[1:]),
353
            execution_name,
354
            long_uid(),
355
        ]
356
    )
357
    return express_execution_arn
1✔
358

359

360
def stepfunctions_activity_arn(name: str, account_id: str, region_name: str) -> str:
1✔
361
    pattern = "arn:%s:states:%s:%s:activity:%s"
1✔
362
    return _resource_arn(name, pattern, account_id=account_id, region_name=region_name)
1✔
363

364

365
#
366
# Cognito
367
#
368

369

370
def cognito_user_pool_arn(user_pool_id: str, account_id: str, region_name: str) -> str:
1✔
371
    pattern = "arn:%s:cognito-idp:%s:%s:userpool/%s"
×
372
    return _resource_arn(user_pool_id, pattern, account_id=account_id, region_name=region_name)
×
373

374

375
#
376
# Kinesis
377
#
378

379

380
def kinesis_stream_arn(stream_name: str, account_id: str, region_name: str) -> str:
1✔
381
    pattern = "arn:%s:kinesis:%s:%s:stream/%s"
1✔
382
    return _resource_arn(stream_name, pattern, account_id, region_name)
1✔
383

384

385
#
386
# Elasticsearch
387
#
388

389

390
def elasticsearch_domain_arn(domain_name: str, account_id: str, region_name: str) -> str:
1✔
391
    pattern = "arn:%s:es:%s:%s:domain/%s"
×
392
    return _resource_arn(domain_name, pattern, account_id=account_id, region_name=region_name)
×
393

394

395
#
396
# Firehose
397
#
398

399

400
def firehose_stream_arn(stream_name: str, account_id: str, region_name: str) -> str:
1✔
401
    pattern = "arn:%s:firehose:%s:%s:deliverystream/%s"
1✔
402
    return _resource_arn(stream_name, pattern, account_id=account_id, region_name=region_name)
1✔
403

404

405
#
406
# KMS
407
#
408

409

410
def kms_key_arn(key_id: str, account_id: str, region_name: str) -> str:
1✔
411
    pattern = "arn:%s:kms:%s:%s:key/%s"
1✔
412
    return _resource_arn(key_id, pattern, account_id=account_id, region_name=region_name)
1✔
413

414

415
def kms_alias_arn(alias_name: str, account_id: str, region_name: str):
1✔
416
    if not alias_name.startswith("alias/"):
1✔
417
        alias_name = "alias/" + alias_name
×
418
    pattern = "arn:%s:kms:%s:%s:%s"
1✔
419
    return _resource_arn(alias_name, pattern, account_id=account_id, region_name=region_name)
1✔
420

421

422
#
423
# SSM
424
#
425

426

427
def ssm_parameter_arn(param_name: str, account_id: str, region_name: str) -> str:
1✔
428
    pattern = "arn:%s:ssm:%s:%s:parameter/%s"
×
429
    param_name = param_name.lstrip("/")
×
430
    return _resource_arn(param_name, pattern, account_id=account_id, region_name=region_name)
×
431

432

433
#
434
# S3
435
#
436

437

438
def s3_bucket_arn(bucket_name_or_arn: str, region="us-east-1") -> str:
1✔
439
    bucket_name = s3_bucket_name(bucket_name_or_arn)
1✔
440
    return f"arn:{get_partition(region)}:s3:::{bucket_name}"
1✔
441

442

443
#
444
# SQS
445
#
446

447

448
def sqs_queue_arn(queue_name: str, account_id: str, region_name: str) -> str:
1✔
449
    queue_name = queue_name.split("/")[-1]
1✔
450
    return f"arn:{get_partition(region_name)}:sqs:{region_name}:{account_id}:{queue_name}"
1✔
451

452

453
#
454
# APIGW
455
#
456

457

458
def apigateway_restapi_arn(api_id: str, account_id: str, region_name: str) -> str:
1✔
459
    return (
1✔
460
        f"arn:{get_partition(region_name)}:apigateway:{region_name}:{account_id}:/restapis/{api_id}"
461
    )
462

463

464
def apigateway_invocations_arn(lambda_uri: str, region_name: str) -> str:
1✔
465
    return f"arn:{get_partition(region_name)}:apigateway:{region_name}:lambda:path/2015-03-31/functions/{lambda_uri}/invocations"
1✔
466

467

468
#
469
# SNS
470
#
471

472

473
def sns_topic_arn(topic_name: str, account_id: str, region_name: str) -> str:
1✔
474
    return f"arn:{get_partition(region_name)}:sns:{region_name}:{account_id}:{topic_name}"
1✔
475

476

477
#
478
# ECR
479
#
480

481

482
def ecr_repository_arn(name: str, account_id: str, region_name: str) -> str:
1✔
483
    pattern = "arn:%s:ecr:%s:%s:repository/%s"
1✔
484
    return _resource_arn(name, pattern, account_id=account_id, region_name=region_name)
1✔
485

486

487
#
488
# Route53
489
#
490

491

492
def route53_resolver_firewall_rule_group_arn(id: str, account_id: str, region_name: str) -> str:
1✔
493
    pattern = "arn:%s:route53resolver:%s:%s:firewall-rule-group/%s"
1✔
494
    return _resource_arn(id, pattern, account_id=account_id, region_name=region_name)
1✔
495

496

497
def route53_resolver_firewall_domain_list_arn(id: str, account_id: str, region_name: str) -> str:
1✔
498
    pattern = "arn:%s:route53resolver:%s:%s:firewall-domain-list/%s"
1✔
499
    return _resource_arn(id, pattern, account_id=account_id, region_name=region_name)
1✔
500

501

502
def route53_resolver_firewall_rule_group_associations_arn(
1✔
503
    id: str, account_id: str, region_name: str
504
) -> str:
505
    pattern = "arn:%s:route53resolver:%s:%s:firewall-rule-group-association/%s"
×
506
    return _resource_arn(id, pattern, account_id=account_id, region_name=region_name)
×
507

508

509
def route53_resolver_query_log_config_arn(id: str, account_id: str, region_name: str) -> str:
1✔
510
    pattern = "arn:%s:route53resolver:%s:%s:resolver-query-log-config/%s"
1✔
511
    return _resource_arn(id, pattern, account_id=account_id, region_name=region_name)
1✔
512

513

514
#
515
# SES
516
#
517

518

519
def ses_identity_arn(email: str, account_id: str, region_name: str) -> str:
1✔
520
    pattern = "arn:%s:ses:%s:%s:identity/%s"
×
521
    return _resource_arn(email, pattern, account_id=account_id, region_name=region_name)
×
522

523

524
#
525
# Other ARN related helpers
526
#
527

528

529
def opensearch_domain_name(domain_arn: str) -> str:
1✔
530
    return domain_arn.rpartition("/")[2]
×
531

532

533
def firehose_name(firehose_arn: str) -> str:
1✔
534
    return firehose_arn.split("/")[-1]
1✔
535

536

537
def kinesis_stream_name(kinesis_arn: str) -> str:
1✔
538
    return kinesis_arn.split(":stream/")[-1]
1✔
539

540

541
def lambda_function_name(name_or_arn: str) -> str:
1✔
542
    if ":" in name_or_arn:
1✔
543
        arn = parse_arn(name_or_arn)
1✔
544
        if arn["service"] != "lambda":
1✔
545
            raise ValueError(f"arn is not a lambda arn {name_or_arn}")
1✔
546

547
        return parse_arn(name_or_arn)["resource"].split(":")[1]
1✔
548
    else:
549
        return name_or_arn
1✔
550

551

552
@cache
1✔
553
def sqs_queue_url_for_arn(queue_arn: str) -> str:
1✔
554
    """
555
    Return the SQS queue URL for the given queue ARN.
556
    """
557
    if "://" in queue_arn:
1✔
558
        return queue_arn
×
559

560
    try:
1✔
561
        arn = parse_arn(queue_arn)
1✔
562
        account_id = arn["account"]
1✔
563
        region_name = arn["region"]
1✔
564
        queue_name = arn["resource"]
1✔
565
    except InvalidArnException:
×
566
        account_id = DEFAULT_AWS_ACCOUNT_ID
×
567
        region_name = None
×
568
        queue_name = queue_arn
×
569

570
    sqs_client = connect_to(region_name=region_name).sqs
1✔
571
    result = sqs_client.get_queue_url(QueueName=queue_name, QueueOwnerAWSAccountId=account_id)[
1✔
572
        "QueueUrl"
573
    ]
574
    return result
1✔
575

576

577
def sqs_queue_name(queue_arn: str) -> str:
1✔
578
    if ":" in queue_arn:
1✔
579
        return parse_arn(queue_arn)["resource"]
1✔
580
    else:
581
        return queue_arn
×
582

583

584
def s3_bucket_name(bucket_name_or_arn: str) -> str:
1✔
585
    return bucket_name_or_arn.split(":::")[-1]
1✔
586

587

588
def is_arn(possible_arn: str) -> bool:
1✔
589
    try:
1✔
590
        parse_arn(possible_arn)
1✔
591
        return True
1✔
592
    except InvalidArnException:
1✔
593
        return False
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc