• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 21656696000

03 Feb 2026 04:15PM UTC coverage: 86.966% (-0.01%) from 86.976%
21656696000

push

github

web-flow
Improve types for the SQS store (#13684)

25 of 25 new or added lines in 3 files covered. (100.0%)

158 existing lines in 7 files now uncovered.

70552 of 81126 relevant lines covered (86.97%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.46
/localstack-core/localstack/services/sns/provider.py
1
import base64
1✔
2
import contextlib
1✔
3
import copy
1✔
4
import functools
1✔
5
import json
1✔
6
import logging
1✔
7
from uuid import uuid4
1✔
8

9
from botocore.utils import InvalidArnException
1✔
10
from moto.core.utils import camelcase_to_pascal, underscores_to_camelcase
1✔
11
from moto.sns import sns_backends
1✔
12
from moto.sns.models import MAXIMUM_MESSAGE_LENGTH, SNSBackend, Topic
1✔
13
from moto.sns.utils import is_e164
1✔
14

15
from localstack.aws.api import CommonServiceException, RequestContext
1✔
16
from localstack.aws.api.sns import (
1✔
17
    AmazonResourceName,
18
    BatchEntryIdsNotDistinctException,
19
    ConfirmSubscriptionResponse,
20
    CreateEndpointResponse,
21
    CreatePlatformApplicationResponse,
22
    CreateTopicResponse,
23
    EndpointDisabledException,
24
    GetSubscriptionAttributesResponse,
25
    GetTopicAttributesResponse,
26
    InvalidBatchEntryIdException,
27
    InvalidParameterException,
28
    InvalidParameterValueException,
29
    ListSubscriptionsByTopicResponse,
30
    ListSubscriptionsResponse,
31
    ListTagsForResourceResponse,
32
    MapStringToString,
33
    MessageAttributeMap,
34
    NotFoundException,
35
    PublishBatchRequestEntryList,
36
    PublishBatchResponse,
37
    PublishBatchResultEntry,
38
    PublishResponse,
39
    SnsApi,
40
    String,
41
    SubscribeResponse,
42
    Subscription,
43
    SubscriptionAttributesMap,
44
    TagKeyList,
45
    TagList,
46
    TagResourceResponse,
47
    TooManyEntriesInBatchRequestException,
48
    TopicAttributesMap,
49
    UntagResourceResponse,
50
    attributeName,
51
    attributeValue,
52
    authenticateOnUnsubscribe,
53
    boolean,
54
    messageStructure,
55
    nextToken,
56
    subscriptionARN,
57
    topicARN,
58
    topicName,
59
)
60
from localstack.constants import AWS_REGION_US_EAST_1, DEFAULT_AWS_ACCOUNT_ID
1✔
61
from localstack.http import Request, Response, Router, route
1✔
62
from localstack.services.edge import ROUTER
1✔
63
from localstack.services.moto import call_moto
1✔
64
from localstack.services.plugins import ServiceLifecycleHook
1✔
65
from localstack.services.sns import constants as sns_constants
1✔
66
from localstack.services.sns.certificate import SNS_SERVER_CERT
1✔
67
from localstack.services.sns.filter import FilterPolicyValidator
1✔
68
from localstack.services.sns.models import (
1✔
69
    SnsMessage,
70
    SnsMessageType,
71
    SnsStore,
72
    SnsSubscription,
73
    sns_stores,
74
)
75
from localstack.services.sns.publisher import (
1✔
76
    PublishDispatcher,
77
    SnsBatchPublishContext,
78
    SnsPublishContext,
79
)
80
from localstack.utils.aws.arns import (
1✔
81
    ArnData,
82
    extract_account_id_from_arn,
83
    extract_region_from_arn,
84
    get_partition,
85
    parse_arn,
86
)
87
from localstack.utils.collections import PaginatedList, select_from_typed_dict
1✔
88
from localstack.utils.strings import short_uid, to_bytes, to_str
1✔
89

90
from ...state import StateVisitor
1✔
91
from .analytics import internal_api_calls
1✔
92

93
# set up logger
94
LOG = logging.getLogger(__name__)
1✔
95

96

97
class SnsProvider(SnsApi, ServiceLifecycleHook):
1✔
98
    """
99
    Provider class for AWS Simple Notification Service.
100

101
    AWS supports following operations in a cross-account setup:
102
    - GetTopicAttributes
103
    - SetTopicAttributes
104
    - AddPermission
105
    - RemovePermission
106
    - Publish
107
    - Subscribe
108
    - ListSubscriptionByTopic
109
    - DeleteTopic
110
    """
111

112
    @route(sns_constants.SNS_CERT_ENDPOINT, methods=["GET"])
1✔
113
    def get_signature_cert_pem_file(self, request: Request):
1✔
114
        # see http://sns-public-resources.s3.amazonaws.com/SNS_Message_Signing_Release_Note_Jan_25_2011.pdf
115
        # see https://docs.aws.amazon.com/sns/latest/dg/sns-verify-signature-of-message.html
116
        return Response(self._signature_cert_pem, 200)
1✔
117

118
    def __init__(self) -> None:
1✔
119
        super().__init__()
1✔
120
        self._publisher = PublishDispatcher()
1✔
121
        self._signature_cert_pem: str = SNS_SERVER_CERT
1✔
122

123
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
124
        visitor.visit(sns_backends)
×
125
        visitor.visit(sns_stores)
×
126

127
    def on_before_stop(self):
1✔
128
        self._publisher.shutdown()
1✔
129

130
    def on_after_init(self):
1✔
131
        # Allow sent platform endpoint messages to be retrieved from the SNS endpoint
132
        register_sns_api_resource(ROUTER)
1✔
133
        # add the route to serve the certificate used to validate message signatures
134
        ROUTER.add(self.get_signature_cert_pem_file)
1✔
135

136
    @staticmethod
1✔
137
    def get_store(account_id: str, region_name: str) -> SnsStore:
1✔
138
        return sns_stores[account_id][region_name]
1✔
139

140
    @staticmethod
1✔
141
    def get_moto_backend(account_id: str, region_name: str) -> SNSBackend:
1✔
142
        return sns_backends[account_id][region_name]
1✔
143

144
    @staticmethod
1✔
145
    def _get_topic(arn: str, context: RequestContext) -> Topic:
1✔
146
        """
147
        :param arn: the Topic ARN
148
        :param context: the RequestContext of the request
149
        :param multiregion: if the request can fetch the topic across regions or not (ex. Publish cannot publish to a
150
        topic in a different region than the request)
151
        :return: the Moto model Topic
152
        """
153
        arn_data = parse_and_validate_topic_arn(arn)
1✔
154
        if context.region != arn_data["region"]:
1✔
155
            raise InvalidParameterException("Invalid parameter: TopicArn")
1✔
156

157
        try:
1✔
158
            return sns_backends[arn_data["account"]][context.region].topics[arn]
1✔
159
        except KeyError:
1✔
160
            raise NotFoundException("Topic does not exist")
1✔
161

162
    def get_topic_attributes(
1✔
163
        self, context: RequestContext, topic_arn: topicARN, **kwargs
164
    ) -> GetTopicAttributesResponse:
165
        # get the Topic from moto manually first, because Moto does not handle well the case where the ARN is malformed
166
        # (raises ValueError: not enough values to unpack (expected 6, got 1))
167
        moto_topic_model = self._get_topic(topic_arn, context)
1✔
168
        moto_response: GetTopicAttributesResponse = call_moto(context)
1✔
169
        # TODO: fix some attributes by moto, see snapshot
170
        # DeliveryPolicy
171
        # EffectiveDeliveryPolicy
172
        # Policy.Statement..Action -> SNS:Receive is added by moto but not returned in AWS
173
        # TODO: very hacky way to get the attributes we need instead of a moto patch
174
        # see the attributes we need: https://docs.aws.amazon.com/sns/latest/dg/sns-topic-attributes.html
175
        # would need more work to have the proper format out of moto, maybe extract the model to our store
176
        attributes = moto_response["Attributes"]
1✔
177
        for attr in vars(moto_topic_model):
1✔
178
            if "_feedback" in attr:
1✔
179
                key = camelcase_to_pascal(underscores_to_camelcase(attr))
1✔
180
                attributes[key] = getattr(moto_topic_model, attr)
1✔
181
            elif attr == "signature_version":
1✔
182
                attributes["SignatureVersion"] = moto_topic_model.signature_version
1✔
183
            elif attr == "archive_policy":
1✔
184
                attributes["ArchivePolicy"] = moto_topic_model.archive_policy
1✔
185

186
        return moto_response
1✔
187

188
    def set_topic_attributes(
1✔
189
        self,
190
        context: RequestContext,
191
        topic_arn: topicARN,
192
        attribute_name: attributeName,
193
        attribute_value: attributeValue | None = None,
194
        **kwargs,
195
    ) -> None:
196
        # validate the topic first
197
        self._get_topic(topic_arn, context)
1✔
198
        call_moto(context)
1✔
199

200
    def publish_batch(
1✔
201
        self,
202
        context: RequestContext,
203
        topic_arn: topicARN,
204
        publish_batch_request_entries: PublishBatchRequestEntryList,
205
        **kwargs,
206
    ) -> PublishBatchResponse:
207
        if len(publish_batch_request_entries) > 10:
1✔
UNCOV
208
            raise TooManyEntriesInBatchRequestException(
×
209
                "The batch request contains more entries than permissible."
210
            )
211

212
        parsed_arn = parse_and_validate_topic_arn(topic_arn)
1✔
213
        store = self.get_store(account_id=parsed_arn["account"], region_name=context.region)
1✔
214
        moto_topic = self._get_topic(topic_arn, context)
1✔
215

216
        ids = [entry["Id"] for entry in publish_batch_request_entries]
1✔
217
        if len(set(ids)) != len(publish_batch_request_entries):
1✔
UNCOV
218
            raise BatchEntryIdsNotDistinctException(
×
219
                "Two or more batch entries in the request have the same Id."
220
            )
221

222
        # Validate each batch entry ID according to AWS specifications
223
        for entry_id in ids:
1✔
224
            validate_batch_entry_id(entry_id)
1✔
225

226
        response: PublishBatchResponse = {"Successful": [], "Failed": []}
1✔
227

228
        # TODO: write AWS validated tests with FilterPolicy and batching
229
        # TODO: find a scenario where we can fail to send a message synchronously to be able to report it
230
        # right now, it seems that AWS fails the whole publish if something is wrong in the format of 1 message
231

232
        total_batch_size = 0
1✔
233
        message_contexts = []
1✔
234
        for entry_index, entry in enumerate(publish_batch_request_entries, start=1):
1✔
235
            message_payload = entry.get("Message")
1✔
236
            message_attributes = entry.get("MessageAttributes", {})
1✔
237
            if message_attributes:
1✔
238
                # if a message contains non-valid message attributes
239
                # will fail for the first non-valid message encountered, and raise ParameterValueInvalid
240
                validate_message_attributes(message_attributes, position=entry_index)
1✔
241

242
            total_batch_size += get_total_publish_size(message_payload, message_attributes)
1✔
243

244
            # TODO: WRITE AWS VALIDATED
245
            if entry.get("MessageStructure") == "json":
1✔
246
                try:
1✔
247
                    message = json.loads(message_payload)
1✔
248
                    # Keys in the JSON object that correspond to supported transport protocols must have
249
                    # simple JSON string values.
250
                    # Non-string values will cause the key to be ignored.
251
                    message = {
1✔
252
                        key: field for key, field in message.items() if isinstance(field, str)
253
                    }
254
                    if "default" not in message:
1✔
UNCOV
255
                        raise InvalidParameterException(
×
256
                            "Invalid parameter: Message Structure - No default entry in JSON message body"
257
                        )
258
                    entry["Message"] = message  # noqa
1✔
UNCOV
259
                except json.JSONDecodeError:
×
260
                    raise InvalidParameterException(
×
261
                        "Invalid parameter: Message Structure - JSON message body failed to parse"
262
                    )
263

264
            if is_fifo := (".fifo" in topic_arn):
1✔
265
                if not all("MessageGroupId" in entry for entry in publish_batch_request_entries):
1✔
UNCOV
266
                    raise InvalidParameterException(
×
267
                        "Invalid parameter: The MessageGroupId parameter is required for FIFO topics"
268
                    )
269
                if moto_topic.content_based_deduplication is False:
1✔
UNCOV
270
                    if not all(
×
271
                        "MessageDeduplicationId" in entry for entry in publish_batch_request_entries
272
                    ):
UNCOV
273
                        raise InvalidParameterException(
×
274
                            "Invalid parameter: The topic should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided explicitly",
275
                        )
276

277
            msg_ctx = SnsMessage.from_batch_entry(entry, is_fifo=is_fifo)
1✔
278
            message_contexts.append(msg_ctx)
1✔
279
            success = PublishBatchResultEntry(
1✔
280
                Id=entry["Id"],
281
                MessageId=msg_ctx.message_id,
282
            )
283
            if is_fifo:
1✔
284
                success["SequenceNumber"] = msg_ctx.sequencer_number
1✔
285
            response["Successful"].append(success)
1✔
286

287
        if total_batch_size > MAXIMUM_MESSAGE_LENGTH:
1✔
288
            raise CommonServiceException(
1✔
289
                code="BatchRequestTooLong",
290
                message="The length of all the messages put together is more than the limit.",
291
                sender_fault=True,
292
            )
293

294
        publish_ctx = SnsBatchPublishContext(
1✔
295
            messages=message_contexts,
296
            store=store,
297
            request_headers=context.request.headers,
298
            topic_attributes=vars(moto_topic),
299
        )
300
        self._publisher.publish_batch_to_topic(publish_ctx, topic_arn)
1✔
301

302
        return response
1✔
303

304
    def set_subscription_attributes(
1✔
305
        self,
306
        context: RequestContext,
307
        subscription_arn: subscriptionARN,
308
        attribute_name: attributeName,
309
        attribute_value: attributeValue = None,
310
        **kwargs,
311
    ) -> None:
312
        store = self.get_store(account_id=context.account_id, region_name=context.region)
1✔
313
        sub = store.subscriptions.get(subscription_arn)
1✔
314
        if not sub:
1✔
315
            raise NotFoundException("Subscription does not exist")
1✔
316

317
        validate_subscription_attribute(
1✔
318
            attribute_name=attribute_name,
319
            attribute_value=attribute_value,
320
            topic_arn=sub["TopicArn"],
321
            endpoint=sub["Endpoint"],
322
        )
323
        if attribute_name == "RawMessageDelivery":
1✔
324
            attribute_value = attribute_value.lower()
1✔
325

326
        elif attribute_name == "FilterPolicy":
1✔
327
            filter_policy = json.loads(attribute_value) if attribute_value else None
1✔
328
            if filter_policy:
1✔
329
                validator = FilterPolicyValidator(
1✔
330
                    scope=sub.get("FilterPolicyScope", "MessageAttributes"),
331
                    is_subscribe_call=False,
332
                )
333
                validator.validate_filter_policy(filter_policy)
1✔
334

335
            store.subscription_filter_policy[subscription_arn] = filter_policy
1✔
336

337
        sub[attribute_name] = attribute_value
1✔
338

339
    def confirm_subscription(
1✔
340
        self,
341
        context: RequestContext,
342
        topic_arn: topicARN,
343
        token: String,
344
        authenticate_on_unsubscribe: authenticateOnUnsubscribe = None,
345
        **kwargs,
346
    ) -> ConfirmSubscriptionResponse:
347
        # TODO: validate format on the token (seems to be 288 hex chars)
348
        # this request can come from any http client, it might not be signed (we would need to implement
349
        # `authenticate_on_unsubscribe` to force a signing client to do this request.
350
        # so, the region and account_id might not be in the request. Use the ones from the topic_arn
351
        try:
1✔
352
            parsed_arn = parse_arn(topic_arn)
1✔
353
        except InvalidArnException:
1✔
354
            raise InvalidParameterException("Invalid parameter: Topic")
1✔
355

356
        store = self.get_store(account_id=parsed_arn["account"], region_name=parsed_arn["region"])
1✔
357

358
        # it seems SNS is able to know what the region of the topic should be, even though a wrong topic is accepted
359
        if parsed_arn["region"] != get_region_from_subscription_token(token):
1✔
360
            raise InvalidParameterException("Invalid parameter: Topic")
1✔
361

362
        subscription_arn = store.subscription_tokens.get(token)
1✔
363
        if not subscription_arn:
1✔
364
            raise InvalidParameterException("Invalid parameter: Token")
×
365

366
        subscription = store.subscriptions.get(subscription_arn)
1✔
367
        if not subscription:
1✔
368
            # subscription could have been deleted in the meantime
369
            raise InvalidParameterException("Invalid parameter: Token")
×
370

371
        # ConfirmSubscription is idempotent
372
        if subscription.get("PendingConfirmation") == "false":
1✔
373
            return ConfirmSubscriptionResponse(SubscriptionArn=subscription_arn)
1✔
374

375
        subscription["PendingConfirmation"] = "false"
1✔
376
        subscription["ConfirmationWasAuthenticated"] = "true"
1✔
377

378
        return ConfirmSubscriptionResponse(SubscriptionArn=subscription_arn)
1✔
379

380
    def untag_resource(
1✔
381
        self,
382
        context: RequestContext,
383
        resource_arn: AmazonResourceName,
384
        tag_keys: TagKeyList,
385
        **kwargs,
386
    ) -> UntagResourceResponse:
387
        call_moto(context)
1✔
388
        # TODO: probably get the account_id and region from the `resource_arn`
389
        store = self.get_store(context.account_id, context.region)
1✔
390
        existing_tags = store.sns_tags.setdefault(resource_arn, [])
1✔
391
        store.sns_tags[resource_arn] = [t for t in existing_tags if t["Key"] not in tag_keys]
1✔
392
        return UntagResourceResponse()
1✔
393

394
    def list_tags_for_resource(
1✔
395
        self, context: RequestContext, resource_arn: AmazonResourceName, **kwargs
396
    ) -> ListTagsForResourceResponse:
397
        # TODO: probably get the account_id and region from the `resource_arn`
398
        store = self.get_store(context.account_id, context.region)
1✔
399
        tags = store.sns_tags.setdefault(resource_arn, [])
1✔
400
        return ListTagsForResourceResponse(Tags=tags)
1✔
401

402
    def create_platform_application(
1✔
403
        self,
404
        context: RequestContext,
405
        name: String,
406
        platform: String,
407
        attributes: MapStringToString,
408
        **kwargs,
409
    ) -> CreatePlatformApplicationResponse:
410
        # TODO: validate platform
411
        # see https://docs.aws.amazon.com/cli/latest/reference/sns/create-platform-application.html
412
        # list of possible values: ADM, Baidu, APNS, APNS_SANDBOX, GCM, MPNS, WNS
413
        # each platform has a specific way to handle credentials
414
        # this can also be used for dispatching message to the right platform
415
        return call_moto(context)
1✔
416

417
    def create_platform_endpoint(
1✔
418
        self,
419
        context: RequestContext,
420
        platform_application_arn: String,
421
        token: String,
422
        custom_user_data: String = None,
423
        attributes: MapStringToString = None,
424
        **kwargs,
425
    ) -> CreateEndpointResponse:
426
        # TODO: support mobile app events
427
        # see https://docs.aws.amazon.com/sns/latest/dg/application-event-notifications.html
428
        return call_moto(context)
1✔
429

430
    def unsubscribe(
1✔
431
        self, context: RequestContext, subscription_arn: subscriptionARN, **kwargs
432
    ) -> None:
433
        if subscription_arn is None:
1✔
434
            raise InvalidParameterException(
1✔
435
                "Invalid parameter: SubscriptionArn Reason: no value for required parameter",
436
            )
437
        count = len(subscription_arn.split(":"))
1✔
438
        try:
1✔
439
            parsed_arn = parse_arn(subscription_arn)
1✔
440
        except InvalidArnException:
1✔
441
            # TODO: check for invalid SubscriptionGUID
442
            raise InvalidParameterException(
1✔
443
                f"Invalid parameter: SubscriptionArn Reason: An ARN must have at least 6 elements, not {count}"
444
            )
445

446
        account_id = parsed_arn["account"]
1✔
447
        region_name = parsed_arn["region"]
1✔
448

449
        store = self.get_store(account_id=account_id, region_name=region_name)
1✔
450
        if count == 6 and subscription_arn not in store.subscriptions:
1✔
451
            raise InvalidParameterException("Invalid parameter: SubscriptionId")
1✔
452

453
        moto_sns_backend = self.get_moto_backend(account_id, region_name)
1✔
454
        moto_sns_backend.unsubscribe(subscription_arn)
1✔
455

456
        # pop the subscription at the end, to avoid race condition by iterating over the topic subscriptions
457
        subscription = store.subscriptions.get(subscription_arn)
1✔
458

459
        if not subscription:
1✔
460
            # unsubscribe is idempotent, so unsubscribing from a non-existing topic does nothing
461
            return
1✔
462

463
        if subscription["Protocol"] in ["http", "https"]:
1✔
464
            # TODO: actually validate this (re)subscribe behaviour somehow (localhost.run?)
465
            #  we might need to save the sub token in the store
466
            # TODO: AWS only sends the UnsubscribeConfirmation if the call is unauthenticated or the requester is not
467
            #  the owner
468
            subscription_token = encode_subscription_token_with_region(region=context.region)
1✔
469
            message_ctx = SnsMessage(
1✔
470
                type=SnsMessageType.UnsubscribeConfirmation,
471
                token=subscription_token,
472
                message=f"You have chosen to deactivate subscription {subscription_arn}.\nTo cancel this operation and restore the subscription, visit the SubscribeURL included in this message.",
473
            )
474
            moto_topic = moto_sns_backend.topics.get(subscription["TopicArn"])
1✔
475
            publish_ctx = SnsPublishContext(
1✔
476
                message=message_ctx,
477
                store=store,
478
                request_headers=context.request.headers,
479
                topic_attributes=vars(moto_topic),
480
            )
481
            self._publisher.publish_to_topic_subscriber(
1✔
482
                publish_ctx,
483
                topic_arn=subscription["TopicArn"],
484
                subscription_arn=subscription_arn,
485
            )
486

487
        with contextlib.suppress(ValueError):
1✔
488
            store.topic_subscriptions[subscription["TopicArn"]].remove(subscription_arn)
1✔
489
        store.subscription_filter_policy.pop(subscription_arn, None)
1✔
490
        store.subscriptions.pop(subscription_arn, None)
1✔
491

492
    def get_subscription_attributes(
1✔
493
        self, context: RequestContext, subscription_arn: subscriptionARN, **kwargs
494
    ) -> GetSubscriptionAttributesResponse:
495
        store = self.get_store(account_id=context.account_id, region_name=context.region)
1✔
496
        sub = store.subscriptions.get(subscription_arn)
1✔
497
        if not sub:
1✔
498
            raise NotFoundException("Subscription does not exist")
1✔
499
        removed_attrs = ["sqs_queue_url"]
1✔
500
        if "FilterPolicyScope" in sub and not sub.get("FilterPolicy"):
1✔
501
            removed_attrs.append("FilterPolicyScope")
1✔
502
            removed_attrs.append("FilterPolicy")
1✔
503
        elif "FilterPolicy" in sub and "FilterPolicyScope" not in sub:
1✔
504
            sub["FilterPolicyScope"] = "MessageAttributes"
×
505

506
        attributes = {k: v for k, v in sub.items() if k not in removed_attrs}
1✔
507
        return GetSubscriptionAttributesResponse(Attributes=attributes)
1✔
508

509
    def list_subscriptions(
1✔
510
        self, context: RequestContext, next_token: nextToken = None, **kwargs
511
    ) -> ListSubscriptionsResponse:
512
        store = self.get_store(context.account_id, context.region)
1✔
513
        subscriptions = [
1✔
514
            select_from_typed_dict(Subscription, sub) for sub in list(store.subscriptions.values())
515
        ]
516
        paginated_subscriptions = PaginatedList(subscriptions)
1✔
517
        page, next_token = paginated_subscriptions.get_page(
1✔
518
            token_generator=lambda x: get_next_page_token_from_arn(x["SubscriptionArn"]),
519
            page_size=100,
520
            next_token=next_token,
521
        )
522

523
        response = ListSubscriptionsResponse(Subscriptions=page)
1✔
524
        if next_token:
1✔
525
            response["NextToken"] = next_token
1✔
526
        return response
1✔
527

528
    def list_subscriptions_by_topic(
1✔
529
        self, context: RequestContext, topic_arn: topicARN, next_token: nextToken = None, **kwargs
530
    ) -> ListSubscriptionsByTopicResponse:
531
        self._get_topic(topic_arn, context)
1✔
532
        parsed_topic_arn = parse_and_validate_topic_arn(topic_arn)
1✔
533
        store = self.get_store(parsed_topic_arn["account"], parsed_topic_arn["region"])
1✔
534
        sns_subscriptions = store.get_topic_subscriptions(topic_arn)
1✔
535
        subscriptions = [select_from_typed_dict(Subscription, sub) for sub in sns_subscriptions]
1✔
536

537
        paginated_subscriptions = PaginatedList(subscriptions)
1✔
538
        page, next_token = paginated_subscriptions.get_page(
1✔
539
            token_generator=lambda x: get_next_page_token_from_arn(x["SubscriptionArn"]),
540
            page_size=100,
541
            next_token=next_token,
542
        )
543

544
        response = ListSubscriptionsResponse(Subscriptions=page)
1✔
545
        if next_token:
1✔
546
            response["NextToken"] = next_token
1✔
547
        return response
1✔
548

549
    def publish(
1✔
550
        self,
551
        context: RequestContext,
552
        message: String,
553
        topic_arn: topicARN = None,
554
        target_arn: String = None,
555
        phone_number: String = None,
556
        subject: String = None,
557
        message_structure: messageStructure = None,
558
        message_attributes: MessageAttributeMap = None,
559
        message_deduplication_id: String = None,
560
        message_group_id: String = None,
561
        **kwargs,
562
    ) -> PublishResponse:
563
        if subject == "":
1✔
564
            raise InvalidParameterException("Invalid parameter: Subject")
1✔
565
        if not message or all(not m for m in message):
1✔
566
            raise InvalidParameterException("Invalid parameter: Empty message")
1✔
567

568
        # TODO: check for topic + target + phone number at the same time?
569
        # TODO: more validation on phone, it might be opted out?
570
        if phone_number and not is_e164(phone_number):
1✔
571
            raise InvalidParameterException(
1✔
572
                f"Invalid parameter: PhoneNumber Reason: {phone_number} is not valid to publish to"
573
            )
574

575
        if message_attributes:
1✔
576
            validate_message_attributes(message_attributes)
1✔
577

578
        if get_total_publish_size(message, message_attributes) > MAXIMUM_MESSAGE_LENGTH:
1✔
579
            raise InvalidParameterException("Invalid parameter: Message too long")
1✔
580

581
        # for compatibility reasons, AWS allows users to use either TargetArn or TopicArn for publishing to a topic
582
        # use any of them for topic validation
583
        topic_or_target_arn = topic_arn or target_arn
1✔
584
        topic_model = None
1✔
585

586
        if is_fifo := (topic_or_target_arn and ".fifo" in topic_or_target_arn):
1✔
587
            if not message_group_id:
1✔
588
                raise InvalidParameterException(
1✔
589
                    "Invalid parameter: The MessageGroupId parameter is required for FIFO topics",
590
                )
591
            topic_model = self._get_topic(topic_or_target_arn, context)
1✔
592
            if topic_model.content_based_deduplication is False:
1✔
UNCOV
593
                if not message_deduplication_id:
×
UNCOV
594
                    raise InvalidParameterException(
×
595
                        "Invalid parameter: The topic should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided explicitly",
596
                    )
597
        elif message_deduplication_id:
1✔
598
            # this is the first one to raise if both are set while the topic is not fifo
UNCOV
599
            raise InvalidParameterException(
×
600
                "Invalid parameter: MessageDeduplicationId Reason: The request includes MessageDeduplicationId parameter that is not valid for this topic type"
601
            )
602

603
        is_endpoint_publish = target_arn and ":endpoint/" in target_arn
1✔
604
        if message_structure == "json":
1✔
605
            try:
1✔
606
                message = json.loads(message)
1✔
607
                # Keys in the JSON object that correspond to supported transport protocols must have
608
                # simple JSON string values.
609
                # Non-string values will cause the key to be ignored.
610
                message = {key: field for key, field in message.items() if isinstance(field, str)}
1✔
611
                # TODO: check no default key for direct TargetArn endpoint publish, need credentials
612
                # see example: https://docs.aws.amazon.com/sns/latest/dg/sns-send-custom-platform-specific-payloads-mobile-devices.html
613
                if "default" not in message and not is_endpoint_publish:
1✔
614
                    raise InvalidParameterException(
1✔
615
                        "Invalid parameter: Message Structure - No default entry in JSON message body"
616
                    )
617
            except json.JSONDecodeError:
1✔
618
                raise InvalidParameterException(
1✔
619
                    "Invalid parameter: Message Structure - JSON message body failed to parse"
620
                )
621

622
        if not phone_number:
1✔
623
            # use the account to get the store from the TopicArn (you can only publish in the same region as the topic)
624
            parsed_arn = parse_and_validate_topic_arn(topic_or_target_arn)
1✔
625
            store = self.get_store(account_id=parsed_arn["account"], region_name=context.region)
1✔
626
            moto_sns_backend = self.get_moto_backend(parsed_arn["account"], context.region)
1✔
627
            if is_endpoint_publish:
1✔
628
                if not (platform_endpoint := moto_sns_backend.platform_endpoints.get(target_arn)):
1✔
629
                    raise InvalidParameterException(
1✔
630
                        "Invalid parameter: TargetArn Reason: No endpoint found for the target arn specified"
631
                    )
632
                elif not platform_endpoint.enabled:
1✔
633
                    raise EndpointDisabledException("Endpoint is disabled")
1✔
634
            else:
635
                topic_model = self._get_topic(topic_or_target_arn, context)
1✔
636
        else:
637
            # use the store from the request context
638
            store = self.get_store(account_id=context.account_id, region_name=context.region)
1✔
639

640
        message_ctx = SnsMessage(
1✔
641
            type=SnsMessageType.Notification,
642
            message=message,
643
            message_attributes=message_attributes,
644
            message_deduplication_id=message_deduplication_id,
645
            message_group_id=message_group_id,
646
            message_structure=message_structure,
647
            subject=subject,
648
            is_fifo=is_fifo,
649
        )
650
        publish_ctx = SnsPublishContext(
1✔
651
            message=message_ctx, store=store, request_headers=context.request.headers
652
        )
653

654
        if is_endpoint_publish:
1✔
655
            self._publisher.publish_to_application_endpoint(
1✔
656
                ctx=publish_ctx, endpoint_arn=target_arn
657
            )
658
        elif phone_number:
1✔
659
            self._publisher.publish_to_phone_number(ctx=publish_ctx, phone_number=phone_number)
1✔
660
        else:
661
            # beware if the subscription is FIFO, the order might not be guaranteed.
662
            # 2 quick call to this method in succession might not be executed in order in the executor?
663
            # TODO: test how this behaves in a FIFO context with a lot of threads.
664
            publish_ctx.topic_attributes |= vars(topic_model)
1✔
665
            self._publisher.publish_to_topic(publish_ctx, topic_or_target_arn)
1✔
666

667
        if is_fifo:
1✔
668
            return PublishResponse(
1✔
669
                MessageId=message_ctx.message_id, SequenceNumber=message_ctx.sequencer_number
670
            )
671

672
        return PublishResponse(MessageId=message_ctx.message_id)
1✔
673

674
    def subscribe(
1✔
675
        self,
676
        context: RequestContext,
677
        topic_arn: topicARN,
678
        protocol: String,
679
        endpoint: String = None,
680
        attributes: SubscriptionAttributesMap = None,
681
        return_subscription_arn: boolean = None,
682
        **kwargs,
683
    ) -> SubscribeResponse:
684
        # TODO: check validation ordering
685
        parsed_topic_arn = parse_and_validate_topic_arn(topic_arn)
1✔
686
        if context.region != parsed_topic_arn["region"]:
1✔
687
            raise InvalidParameterException("Invalid parameter: TopicArn")
1✔
688

689
        store = self.get_store(account_id=parsed_topic_arn["account"], region_name=context.region)
1✔
690

691
        if topic_arn not in store.topic_subscriptions:
1✔
692
            raise NotFoundException("Topic does not exist")
1✔
693

694
        if not endpoint:
1✔
695
            # TODO: check AWS behaviour (because endpoint is optional)
696
            raise NotFoundException("Endpoint not specified in subscription")
×
697
        if protocol not in sns_constants.SNS_PROTOCOLS:
1✔
698
            raise InvalidParameterException(
1✔
699
                f"Invalid parameter: Amazon SNS does not support this protocol string: {protocol}"
700
            )
701
        elif protocol in ["http", "https"] and not endpoint.startswith(f"{protocol}://"):
1✔
702
            raise InvalidParameterException(
×
703
                "Invalid parameter: Endpoint must match the specified protocol"
704
            )
705
        elif protocol == "sms" and not is_e164(endpoint):
1✔
706
            raise InvalidParameterException(f"Invalid SMS endpoint: {endpoint}")
1✔
707

708
        elif protocol == "sqs":
1✔
709
            try:
1✔
710
                parse_arn(endpoint)
1✔
711
            except InvalidArnException:
1✔
712
                raise InvalidParameterException("Invalid parameter: SQS endpoint ARN")
1✔
713

714
        elif protocol == "application":
1✔
715
            # TODO: this is taken from moto, validate it
716
            moto_backend = self.get_moto_backend(
1✔
717
                account_id=parsed_topic_arn["account"], region_name=context.region
718
            )
719
            if endpoint not in moto_backend.platform_endpoints:
1✔
720
                raise NotFoundException("Endpoint does not exist")
×
721

722
        if ".fifo" in endpoint and ".fifo" not in topic_arn:
1✔
UNCOV
723
            raise InvalidParameterException(
×
724
                "Invalid parameter: Invalid parameter: Endpoint Reason: FIFO SQS Queues can not be subscribed to standard SNS topics"
725
            )
726

727
        sub_attributes = copy.deepcopy(attributes) if attributes else None
1✔
728
        if sub_attributes:
1✔
729
            for attr_name, attr_value in sub_attributes.items():
1✔
730
                validate_subscription_attribute(
1✔
731
                    attribute_name=attr_name,
732
                    attribute_value=attr_value,
733
                    topic_arn=topic_arn,
734
                    endpoint=endpoint,
735
                    is_subscribe_call=True,
736
                )
737
                if raw_msg_delivery := sub_attributes.get("RawMessageDelivery"):
1✔
738
                    sub_attributes["RawMessageDelivery"] = raw_msg_delivery.lower()
1✔
739

740
        # An endpoint may only be subscribed to a topic once. Subsequent
741
        # subscribe calls do nothing (subscribe is idempotent), except if its attributes are different.
742
        for existing_topic_subscription in store.topic_subscriptions.get(topic_arn, []):
1✔
743
            sub = store.subscriptions.get(existing_topic_subscription, {})
1✔
744
            if sub.get("Endpoint") == endpoint:
1✔
745
                if sub_attributes:
1✔
746
                    # validate the subscription attributes aren't different
747
                    for attr in sns_constants.VALID_SUBSCRIPTION_ATTR_NAME:
1✔
748
                        # if a new attribute is present and different from an existent one, raise
749
                        if (new_attr := sub_attributes.get(attr)) and sub.get(attr) != new_attr:
1✔
750
                            raise InvalidParameterException(
1✔
751
                                "Invalid parameter: Attributes Reason: Subscription already exists with different attributes"
752
                            )
753

754
                return SubscribeResponse(SubscriptionArn=sub["SubscriptionArn"])
1✔
755

756
        principal = sns_constants.DUMMY_SUBSCRIPTION_PRINCIPAL.format(
1✔
757
            partition=get_partition(context.region), account_id=context.account_id
758
        )
759
        subscription_arn = create_subscription_arn(topic_arn)
1✔
760
        subscription = SnsSubscription(
1✔
761
            # http://docs.aws.amazon.com/cli/latest/reference/sns/get-subscription-attributes.html
762
            TopicArn=topic_arn,
763
            Endpoint=endpoint,
764
            Protocol=protocol,
765
            SubscriptionArn=subscription_arn,
766
            PendingConfirmation="true",
767
            Owner=context.account_id,
768
            RawMessageDelivery="false",  # default value, will be overridden if set
769
            FilterPolicyScope="MessageAttributes",  # default value, will be overridden if set
770
            SubscriptionPrincipal=principal,  # dummy value, could be fetched with a call to STS?
771
        )
772
        if sub_attributes:
1✔
773
            subscription.update(sub_attributes)
1✔
774
            if "FilterPolicy" in sub_attributes:
1✔
775
                filter_policy = (
1✔
776
                    json.loads(sub_attributes["FilterPolicy"])
777
                    if sub_attributes["FilterPolicy"]
778
                    else None
779
                )
780
                if filter_policy:
1✔
781
                    validator = FilterPolicyValidator(
1✔
782
                        scope=subscription.get("FilterPolicyScope", "MessageAttributes"),
783
                        is_subscribe_call=True,
784
                    )
785
                    validator.validate_filter_policy(filter_policy)
1✔
786

787
                store.subscription_filter_policy[subscription_arn] = filter_policy
1✔
788

789
        store.subscriptions[subscription_arn] = subscription
1✔
790

791
        topic_subscriptions = store.topic_subscriptions.setdefault(topic_arn, [])
1✔
792
        topic_subscriptions.append(subscription_arn)
1✔
793

794
        # store the token and subscription arn
795
        # TODO: the token is a 288 hex char string
796
        subscription_token = encode_subscription_token_with_region(region=context.region)
1✔
797
        store.subscription_tokens[subscription_token] = subscription_arn
1✔
798

799
        response_subscription_arn = subscription_arn
1✔
800
        # Send out confirmation message for HTTP(S), fix for https://github.com/localstack/localstack/issues/881
801
        if protocol in ["http", "https"]:
1✔
802
            message_ctx = SnsMessage(
1✔
803
                type=SnsMessageType.SubscriptionConfirmation,
804
                token=subscription_token,
805
                message=f"You have chosen to subscribe to the topic {topic_arn}.\nTo confirm the subscription, visit the SubscribeURL included in this message.",
806
            )
807
            publish_ctx = SnsPublishContext(
1✔
808
                message=message_ctx,
809
                store=store,
810
                request_headers=context.request.headers,
811
                topic_attributes=vars(self._get_topic(topic_arn, context)),
812
            )
813
            self._publisher.publish_to_topic_subscriber(
1✔
814
                ctx=publish_ctx,
815
                topic_arn=topic_arn,
816
                subscription_arn=subscription_arn,
817
            )
818
            if not return_subscription_arn:
1✔
819
                response_subscription_arn = "pending confirmation"
1✔
820

821
        elif protocol not in ["email", "email-json"]:
1✔
822
            # Only HTTP(S) and email subscriptions are not auto validated
823
            # Except if the endpoint and the topic are not in the same AWS account, then you'd need to manually confirm
824
            # the subscription with the token
825
            # TODO: revisit for multi-account
826
            # TODO: test with AWS for email & email-json confirmation message
827
            # we need to add the following check:
828
            # if parsed_topic_arn["account"] == endpoint account (depending on the type, SQS, lambda, parse the arn)
829
            subscription["PendingConfirmation"] = "false"
1✔
830
            subscription["ConfirmationWasAuthenticated"] = "true"
1✔
831

832
        return SubscribeResponse(SubscriptionArn=response_subscription_arn)
1✔
833

834
    def tag_resource(
1✔
835
        self, context: RequestContext, resource_arn: AmazonResourceName, tags: TagList, **kwargs
836
    ) -> TagResourceResponse:
837
        # each tag key must be unique
838
        # https://docs.aws.amazon.com/general/latest/gr/aws_tagging.html#tag-best-practices
839
        unique_tag_keys = {tag["Key"] for tag in tags}
1✔
840
        if len(unique_tag_keys) < len(tags):
1✔
841
            raise InvalidParameterException("Invalid parameter: Duplicated keys are not allowed.")
1✔
842

843
        call_moto(context)
1✔
844
        store = self.get_store(context.account_id, context.region)
1✔
845
        existing_tags = store.sns_tags.get(resource_arn, [])
1✔
846

847
        def existing_tag_index(_item):
1✔
848
            for idx, tag in enumerate(existing_tags):
1✔
849
                if _item["Key"] == tag["Key"]:
1✔
850
                    return idx
1✔
851
            return None
1✔
852

853
        for item in tags:
1✔
854
            existing_index = existing_tag_index(item)
1✔
855
            if existing_index is None:
1✔
856
                existing_tags.append(item)
1✔
857
            else:
858
                existing_tags[existing_index] = item
1✔
859

860
        store.sns_tags[resource_arn] = existing_tags
1✔
861
        return TagResourceResponse()
1✔
862

863
    def delete_topic(self, context: RequestContext, topic_arn: topicARN, **kwargs) -> None:
1✔
864
        parsed_arn = parse_and_validate_topic_arn(topic_arn)
1✔
865
        if context.region != parsed_arn["region"]:
1✔
866
            raise InvalidParameterException("Invalid parameter: TopicArn")
1✔
867

868
        call_moto(context)
1✔
869
        store = self.get_store(account_id=parsed_arn["account"], region_name=context.region)
1✔
870
        topic_subscriptions = store.topic_subscriptions.pop(topic_arn, [])
1✔
871
        for topic_sub in topic_subscriptions:
1✔
872
            store.subscriptions.pop(topic_sub, None)
1✔
873

874
        store.sns_tags.pop(topic_arn, None)
1✔
875

876
    def create_topic(
1✔
877
        self,
878
        context: RequestContext,
879
        name: topicName,
880
        attributes: TopicAttributesMap = None,
881
        tags: TagList = None,
882
        data_protection_policy: attributeValue = None,
883
        **kwargs,
884
    ) -> CreateTopicResponse:
885
        moto_response = call_moto(context)
1✔
886
        store = self.get_store(account_id=context.account_id, region_name=context.region)
1✔
887
        topic_arn = moto_response["TopicArn"]
1✔
888
        tag_resource_success = extract_tags(topic_arn, tags, True, store)
1✔
889
        if not tag_resource_success:
1✔
890
            raise InvalidParameterException(
1✔
891
                "Invalid parameter: Tags Reason: Topic already exists with different tags"
892
            )
893
        if tags:
1✔
894
            self.tag_resource(context=context, resource_arn=topic_arn, tags=tags)
1✔
895
        store.topic_subscriptions.setdefault(topic_arn, [])
1✔
896
        return CreateTopicResponse(TopicArn=topic_arn)
1✔
897

898

899
def is_raw_message_delivery(susbcriber):
1✔
900
    return susbcriber.get("RawMessageDelivery") in ("true", True, "True")
1✔
901

902

903
def validate_subscription_attribute(
1✔
904
    attribute_name: str,
905
    attribute_value: str,
906
    topic_arn: str,
907
    endpoint: str,
908
    is_subscribe_call: bool = False,
909
) -> None:
910
    """
911
    Validate the subscription attribute to be set. See:
912
    https://docs.aws.amazon.com/sns/latest/api/API_SetSubscriptionAttributes.html
913
    :param attribute_name: the subscription attribute name, must be in VALID_SUBSCRIPTION_ATTR_NAME
914
    :param attribute_value: the subscription attribute value
915
    :param topic_arn: the topic_arn of the subscription, needed to know if it is FIFO
916
    :param endpoint: the subscription endpoint (like an SQS queue ARN)
917
    :param is_subscribe_call: the error message is different if called from Subscribe or SetSubscriptionAttributes
918
    :raises InvalidParameterException
919
    :return:
920
    """
921
    error_prefix = (
1✔
922
        "Invalid parameter: Attributes Reason: " if is_subscribe_call else "Invalid parameter: "
923
    )
924
    if attribute_name not in sns_constants.VALID_SUBSCRIPTION_ATTR_NAME:
1✔
925
        raise InvalidParameterException(f"{error_prefix}AttributeName")
1✔
926

927
    if attribute_name == "FilterPolicy":
1✔
928
        try:
1✔
929
            json.loads(attribute_value or "{}")
1✔
930
        except json.JSONDecodeError:
1✔
931
            raise InvalidParameterException(f"{error_prefix}FilterPolicy: failed to parse JSON.")
1✔
932
    elif attribute_name == "FilterPolicyScope":
1✔
933
        if attribute_value not in ("MessageAttributes", "MessageBody"):
1✔
934
            raise InvalidParameterException(
1✔
935
                f"{error_prefix}FilterPolicyScope: Invalid value [{attribute_value}]. "
936
                f"Please use either MessageBody or MessageAttributes"
937
            )
938
    elif attribute_name == "RawMessageDelivery":
1✔
939
        # TODO: only for SQS and https(s) subs, + firehose
940
        if attribute_value.lower() not in ("true", "false"):
1✔
941
            raise InvalidParameterException(
1✔
942
                f"{error_prefix}RawMessageDelivery: Invalid value [{attribute_value}]. "
943
                f"Must be true or false."
944
            )
945

946
    elif attribute_name == "RedrivePolicy":
1✔
947
        try:
1✔
948
            dlq_target_arn = json.loads(attribute_value).get("deadLetterTargetArn", "")
1✔
949
        except json.JSONDecodeError:
1✔
950
            raise InvalidParameterException(f"{error_prefix}RedrivePolicy: failed to parse JSON.")
1✔
951
        try:
1✔
952
            parsed_arn = parse_arn(dlq_target_arn)
1✔
953
        except InvalidArnException:
1✔
954
            raise InvalidParameterException(
1✔
955
                f"{error_prefix}RedrivePolicy: deadLetterTargetArn is an invalid arn"
956
            )
957

958
        if topic_arn.endswith(".fifo"):
1✔
959
            if endpoint.endswith(".fifo") and (
1✔
960
                not parsed_arn["resource"].endswith(".fifo") or "sqs" not in parsed_arn["service"]
961
            ):
UNCOV
962
                raise InvalidParameterException(
×
963
                    f"{error_prefix}RedrivePolicy: must use a FIFO queue as DLQ for a FIFO Subscription to a FIFO Topic."
964
                )
965

966

967
def validate_message_attributes(
1✔
968
    message_attributes: MessageAttributeMap, position: int | None = None
969
) -> None:
970
    """
971
    Validate the message attributes, and raises an exception if those do not follow AWS validation
972
    See: https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html
973
    Regex from: https://stackoverflow.com/questions/40718851/regex-that-does-not-allow-consecutive-dots
974
    :param message_attributes: the message attributes map for the message
975
    :param position: given to give the Batch Entry position if coming from `publishBatch`
976
    :raises: InvalidParameterValueException
977
    :return: None
978
    """
979
    for attr_name, attr in message_attributes.items():
1✔
980
        if len(attr_name) > 256:
1✔
981
            raise InvalidParameterValueException(
1✔
982
                "Length of message attribute name must be less than 256 bytes."
983
            )
984
        validate_message_attribute_name(attr_name)
1✔
985
        # `DataType` is a required field for MessageAttributeValue
986
        if (data_type := attr.get("DataType")) is None:
1✔
987
            if position:
1✔
988
                at = f"publishBatchRequestEntries.{position}.member.messageAttributes.{attr_name}.member.dataType"
1✔
989
            else:
990
                at = f"messageAttributes.{attr_name}.member.dataType"
1✔
991

992
            raise CommonServiceException(
1✔
993
                code="ValidationError",
994
                message=f"1 validation error detected: Value null at '{at}' failed to satisfy constraint: Member must not be null",
995
                sender_fault=True,
996
            )
997

998
        if data_type not in (
1✔
999
            "String",
1000
            "Number",
1001
            "Binary",
1002
        ) and not sns_constants.ATTR_TYPE_REGEX.match(data_type):
1003
            raise InvalidParameterValueException(
1✔
1004
                f"The message attribute '{attr_name}' has an invalid message attribute type, the set of supported type prefixes is Binary, Number, and String."
1005
            )
1006
        if not any(attr_value.endswith("Value") for attr_value in attr):
1✔
1007
            raise InvalidParameterValueException(
1✔
1008
                f"The message attribute '{attr_name}' must contain non-empty message attribute value for message attribute type '{data_type}'."
1009
            )
1010

1011
        value_key_data_type = "Binary" if data_type.startswith("Binary") else "String"
1✔
1012
        value_key = f"{value_key_data_type}Value"
1✔
1013
        if value_key not in attr:
1✔
1014
            raise InvalidParameterValueException(
1✔
1015
                f"The message attribute '{attr_name}' with type '{data_type}' must use field '{value_key_data_type}'."
1016
            )
1017
        elif not attr[value_key]:
1✔
1018
            raise InvalidParameterValueException(
1✔
1019
                f"The message attribute '{attr_name}' must contain non-empty message attribute value for message attribute type '{data_type}'.",
1020
            )
1021

1022

1023
def validate_message_attribute_name(name: str) -> None:
1✔
1024
    """
1025
    Validate the message attribute name with the specification of AWS.
1026
    The message attribute name can contain the following characters: A-Z, a-z, 0-9, underscore(_), hyphen(-), and period (.). The name must not start or end with a period, and it should not have successive periods.
1027
    :param name: message attribute name
1028
    :raises InvalidParameterValueException: if the name does not conform to the spec
1029
    """
1030
    if not sns_constants.MSG_ATTR_NAME_REGEX.match(name):
1✔
1031
        # find the proper exception
1032
        if name[0] == ".":
1✔
1033
            raise InvalidParameterValueException(
1✔
1034
                "Invalid message attribute name starting with character '.' was found."
1035
            )
1036
        elif name[-1] == ".":
1✔
1037
            raise InvalidParameterValueException(
1✔
1038
                "Invalid message attribute name ending with character '.' was found."
1039
            )
1040

1041
        for idx, char in enumerate(name):
1✔
1042
            if char not in sns_constants.VALID_MSG_ATTR_NAME_CHARS:
1✔
1043
                # change prefix from 0x to #x, without capitalizing the x
1044
                hex_char = "#x" + hex(ord(char)).upper()[2:]
1✔
1045
                raise InvalidParameterValueException(
1✔
1046
                    f"Invalid non-alphanumeric character '{hex_char}' was found in the message attribute name. Can only include alphanumeric characters, hyphens, underscores, or dots."
1047
                )
1048
            # even if we go negative index, it will be covered by starting/ending with dot
1049
            if char == "." and name[idx - 1] == ".":
1✔
1050
                raise InvalidParameterValueException(
1✔
1051
                    "Message attribute name can not have successive '.' character."
1052
                )
1053

1054

1055
def validate_batch_entry_id(entry_id: str) -> None:
1✔
1056
    """
1057
    Validate the batch entry ID according to AWS specifications.
1058
    The ID can only contain alphanumeric characters, hyphens, and underscores,
1059
    and must be between 1 and 80 characters in length.
1060
    See: https://docs.aws.amazon.com/sns/latest/api/API_PublishBatchRequestEntry.html
1061

1062
    :param entry_id: the batch entry ID to validate
1063
    :raises InvalidBatchEntryIdException: if the ID does not conform to the spec
1064
    """
1065
    # Check length constraint (1-80 characters)
1066
    if len(entry_id) > 80:
1✔
1067
        raise InvalidBatchEntryIdException(
1✔
1068
            f"The Id of a batch entry in the batch request is too long: {entry_id}"
1069
        )
1070

1071
    # Check character pattern (alphanumeric, hyphen, underscore only)
1072
    if not sns_constants.BATCH_ENTRY_ID_REGEX.match(entry_id):
1✔
1073
        raise InvalidBatchEntryIdException(
1✔
1074
            f"The Id of a batch entry in the batch request contains an impermissible character: {entry_id}"
1075
        )
1076

1077

1078
def extract_tags(
1✔
1079
    topic_arn: str, tags: TagList, is_create_topic_request: bool, store: SnsStore
1080
) -> bool:
1081
    existing_tags = list(store.sns_tags.get(topic_arn, []))
1✔
1082
    # if this is none there is nothing to check
1083
    if topic_arn in store.topic_subscriptions:
1✔
1084
        if tags is None:
1✔
1085
            tags = []
1✔
1086
        for tag in tags:
1✔
1087
            # this means topic already created with empty tags and when we try to create it
1088
            # again with other tag value then it should fail according to aws documentation.
1089
            if is_create_topic_request and existing_tags is not None and tag not in existing_tags:
1✔
1090
                return False
1✔
1091
    return True
1✔
1092

1093

1094
def parse_and_validate_topic_arn(topic_arn: str | None) -> ArnData:
1✔
1095
    topic_arn = topic_arn or ""
1✔
1096
    try:
1✔
1097
        return parse_arn(topic_arn)
1✔
1098
    except InvalidArnException:
1✔
1099
        count = len(topic_arn.split(":"))
1✔
1100
        raise InvalidParameterException(
1✔
1101
            f"Invalid parameter: TopicArn Reason: An ARN must have at least 6 elements, not {count}"
1102
        )
1103

1104

1105
def create_subscription_arn(topic_arn: str) -> str:
1✔
1106
    # This is the format of a Subscription ARN
1107
    # arn:aws:sns:us-west-2:123456789012:my-topic:8a21d249-4329-4871-acc6-7be709c6ea7f
1108
    return f"{topic_arn}:{uuid4()}"
1✔
1109

1110

1111
def encode_subscription_token_with_region(region: str) -> str:
1✔
1112
    """
1113
    Create a 64 characters Subscription Token with the region encoded
1114
    :param region:
1115
    :return: a subscription token with the region encoded
1116
    """
1117
    return ((region.encode() + b"/").hex() + short_uid() * 8)[:64]
1✔
1118

1119

1120
def get_region_from_subscription_token(token: str) -> str:
1✔
1121
    """
1122
    Try to decode and return the region from a subscription token
1123
    :param token:
1124
    :return: the region if able to decode it
1125
    :raises: InvalidParameterException if the token is invalid
1126
    """
1127
    try:
1✔
1128
        region = token.split("2f", maxsplit=1)[0]
1✔
1129
        return bytes.fromhex(region).decode("utf-8")
1✔
1130
    except (IndexError, ValueError, TypeError, UnicodeDecodeError):
1✔
1131
        raise InvalidParameterException("Invalid parameter: Token")
1✔
1132

1133

1134
def get_next_page_token_from_arn(resource_arn: str) -> str:
1✔
1135
    return to_str(base64.b64encode(to_bytes(resource_arn)))
1✔
1136

1137

1138
def _get_byte_size(payload: str | bytes) -> int:
1✔
1139
    # Calculate the real length of the byte object if the object is a string
1140
    return len(to_bytes(payload))
1✔
1141

1142

1143
def get_total_publish_size(
1✔
1144
    message_body: str, message_attributes: MessageAttributeMap | None
1145
) -> int:
1146
    size = _get_byte_size(message_body)
1✔
1147
    if message_attributes:
1✔
1148
        # https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html
1149
        # All parts of the message attribute, including name, type, and value, are included in the message size
1150
        # restriction, which is 256 KB.
1151
        # iterate over the Keys and Attributes, adding the length of the Key to the length of all Attributes values
1152
        # (DataType and StringValue or BinaryValue)
1153
        size += sum(
1✔
1154
            _get_byte_size(key) + sum(_get_byte_size(attr_value) for attr_value in attr.values())
1155
            for key, attr in message_attributes.items()
1156
        )
1157

1158
    return size
1✔
1159

1160

1161
def register_sns_api_resource(router: Router):
1✔
1162
    """Register the retrospection endpoints as internal LocalStack endpoints."""
1163
    router.add(SNSServicePlatformEndpointMessagesApiResource())
1✔
1164
    router.add(SNSServiceSMSMessagesApiResource())
1✔
1165
    router.add(SNSServiceSubscriptionTokenApiResource())
1✔
1166

1167

1168
def _format_messages(sent_messages: list[dict[str, str]], validated_keys: list[str]):
1✔
1169
    """
1170
    This method format the messages to be more readable and undo the format change that was needed for Moto
1171
    Should be removed once we refactor SNS.
1172
    """
1173
    formatted_messages = []
1✔
1174
    for sent_message in sent_messages:
1✔
1175
        msg = {
1✔
1176
            key: json.dumps(value)
1177
            if key == "Message" and sent_message.get("MessageStructure") == "json"
1178
            else value
1179
            for key, value in sent_message.items()
1180
            if key in validated_keys
1181
        }
1182
        formatted_messages.append(msg)
1✔
1183

1184
    return formatted_messages
1✔
1185

1186

1187
class SNSInternalResource:
1✔
1188
    resource_type: str
1✔
1189
    """Base class with helper to properly track usage of internal endpoints"""
1✔
1190

1191
    def count_usage(self):
1✔
1192
        internal_api_calls.labels(resource_type=self.resource_type).increment()
1✔
1193

1194

1195
def count_usage(f):
1✔
1196
    @functools.wraps(f)
1✔
1197
    def _wrapper(self, *args, **kwargs):
1✔
1198
        self.count_usage()
1✔
1199
        return f(self, *args, **kwargs)
1✔
1200

1201
    return _wrapper
1✔
1202

1203

1204
class SNSServicePlatformEndpointMessagesApiResource(SNSInternalResource):
1✔
1205
    resource_type = "platform-endpoint-message"
1✔
1206
    """Provides a REST API for retrospective access to platform endpoint messages sent via SNS.
1✔
1207

1208
    This is registered as a LocalStack internal HTTP resource.
1209

1210
    This endpoint accepts:
1211
    - GET param `accountId`: selector for AWS account. If not specified, return fallback `000000000000` test ID
1212
    - GET param `region`: selector for AWS `region`. If not specified, return default "us-east-1"
1213
    - GET param `endpointArn`: filter for `endpointArn` resource in SNS
1214
    - DELETE param `accountId`: selector for AWS account
1215
    - DELETE param `region`: will delete saved messages for `region`
1216
    - DELETE param `endpointArn`: will delete saved messages for `endpointArn`
1217
    """
1218

1219
    _PAYLOAD_FIELDS = [
1✔
1220
        "TargetArn",
1221
        "TopicArn",
1222
        "Message",
1223
        "MessageAttributes",
1224
        "MessageStructure",
1225
        "Subject",
1226
        "MessageId",
1227
    ]
1228

1229
    @route(sns_constants.PLATFORM_ENDPOINT_MSGS_ENDPOINT, methods=["GET"])
1✔
1230
    @count_usage
1✔
1231
    def on_get(self, request: Request):
1✔
1232
        filter_endpoint_arn = request.args.get("endpointArn")
1✔
1233
        account_id = (
1✔
1234
            request.args.get("accountId", DEFAULT_AWS_ACCOUNT_ID)
1235
            if not filter_endpoint_arn
1236
            else extract_account_id_from_arn(filter_endpoint_arn)
1237
        )
1238
        region = (
1✔
1239
            request.args.get("region", AWS_REGION_US_EAST_1)
1240
            if not filter_endpoint_arn
1241
            else extract_region_from_arn(filter_endpoint_arn)
1242
        )
1243
        store: SnsStore = sns_stores[account_id][region]
1✔
1244
        if filter_endpoint_arn:
1✔
1245
            messages = store.platform_endpoint_messages.get(filter_endpoint_arn, [])
1✔
1246
            messages = _format_messages(messages, self._PAYLOAD_FIELDS)
1✔
1247
            return {
1✔
1248
                "platform_endpoint_messages": {filter_endpoint_arn: messages},
1249
                "region": region,
1250
            }
1251

1252
        platform_endpoint_messages = {
1✔
1253
            endpoint_arn: _format_messages(messages, self._PAYLOAD_FIELDS)
1254
            for endpoint_arn, messages in store.platform_endpoint_messages.items()
1255
        }
1256
        return {
1✔
1257
            "platform_endpoint_messages": platform_endpoint_messages,
1258
            "region": region,
1259
        }
1260

1261
    @route(sns_constants.PLATFORM_ENDPOINT_MSGS_ENDPOINT, methods=["DELETE"])
1✔
1262
    @count_usage
1✔
1263
    def on_delete(self, request: Request) -> Response:
1✔
1264
        filter_endpoint_arn = request.args.get("endpointArn")
1✔
1265
        account_id = (
1✔
1266
            request.args.get("accountId", DEFAULT_AWS_ACCOUNT_ID)
1267
            if not filter_endpoint_arn
1268
            else extract_account_id_from_arn(filter_endpoint_arn)
1269
        )
1270
        region = (
1✔
1271
            request.args.get("region", AWS_REGION_US_EAST_1)
1272
            if not filter_endpoint_arn
1273
            else extract_region_from_arn(filter_endpoint_arn)
1274
        )
1275
        store: SnsStore = sns_stores[account_id][region]
1✔
1276
        if filter_endpoint_arn:
1✔
1277
            store.platform_endpoint_messages.pop(filter_endpoint_arn, None)
1✔
1278
            return Response("", status=204)
1✔
1279

1280
        store.platform_endpoint_messages.clear()
1✔
1281
        return Response("", status=204)
1✔
1282

1283

1284
class SNSServiceSMSMessagesApiResource(SNSInternalResource):
1✔
1285
    resource_type = "sms-message"
1✔
1286
    """Provides a REST API for retrospective access to SMS messages sent via SNS.
1✔
1287

1288
    This is registered as a LocalStack internal HTTP resource.
1289

1290
    This endpoint accepts:
1291
    - GET param `accountId`: selector for AWS account. If not specified, return fallback `000000000000` test ID
1292
    - GET param `region`: selector for AWS `region`. If not specified, return default "us-east-1"
1293
    - GET param `phoneNumber`: filter for `phoneNumber` resource in SNS
1294
    """
1295

1296
    _PAYLOAD_FIELDS = [
1✔
1297
        "PhoneNumber",
1298
        "TopicArn",
1299
        "SubscriptionArn",
1300
        "MessageId",
1301
        "Message",
1302
        "MessageAttributes",
1303
        "MessageStructure",
1304
        "Subject",
1305
    ]
1306

1307
    @route(sns_constants.SMS_MSGS_ENDPOINT, methods=["GET"])
1✔
1308
    @count_usage
1✔
1309
    def on_get(self, request: Request):
1✔
1310
        account_id = request.args.get("accountId", DEFAULT_AWS_ACCOUNT_ID)
1✔
1311
        region = request.args.get("region", AWS_REGION_US_EAST_1)
1✔
1312
        filter_phone_number = request.args.get("phoneNumber")
1✔
1313
        store: SnsStore = sns_stores[account_id][region]
1✔
1314
        if filter_phone_number:
1✔
1315
            messages = [
1✔
1316
                m for m in store.sms_messages if m.get("PhoneNumber") == filter_phone_number
1317
            ]
1318
            messages = _format_messages(messages, self._PAYLOAD_FIELDS)
1✔
1319
            return {
1✔
1320
                "sms_messages": {filter_phone_number: messages},
1321
                "region": region,
1322
            }
1323

1324
        sms_messages = {}
1✔
1325

1326
        for m in _format_messages(store.sms_messages, self._PAYLOAD_FIELDS):
1✔
1327
            sms_messages.setdefault(m.get("PhoneNumber"), []).append(m)
1✔
1328

1329
        return {
1✔
1330
            "sms_messages": sms_messages,
1331
            "region": region,
1332
        }
1333

1334
    @route(sns_constants.SMS_MSGS_ENDPOINT, methods=["DELETE"])
1✔
1335
    @count_usage
1✔
1336
    def on_delete(self, request: Request) -> Response:
1✔
1337
        account_id = request.args.get("accountId", DEFAULT_AWS_ACCOUNT_ID)
1✔
1338
        region = request.args.get("region", AWS_REGION_US_EAST_1)
1✔
1339
        filter_phone_number = request.args.get("phoneNumber")
1✔
1340
        store: SnsStore = sns_stores[account_id][region]
1✔
1341
        if filter_phone_number:
1✔
1342
            store.sms_messages = [
1✔
1343
                m for m in store.sms_messages if m.get("PhoneNumber") != filter_phone_number
1344
            ]
1345
            return Response("", status=204)
1✔
1346

1347
        store.sms_messages.clear()
1✔
1348
        return Response("", status=204)
1✔
1349

1350

1351
class SNSServiceSubscriptionTokenApiResource(SNSInternalResource):
1✔
1352
    resource_type = "subscription-token"
1✔
1353
    """Provides a REST API for retrospective access to Subscription Confirmation Tokens to confirm subscriptions.
1✔
1354
    Those are not sent for email, and sometimes inaccessible when working with external HTTPS endpoint which won't be
1355
    able to reach your local host.
1356

1357
    This is registered as a LocalStack internal HTTP resource.
1358

1359
    This endpoint has the following parameter:
1360
    - GET `subscription_arn`: `subscriptionArn`resource in SNS for which you want the SubscriptionToken
1361
    """
1362

1363
    @route(f"{sns_constants.SUBSCRIPTION_TOKENS_ENDPOINT}/<path:subscription_arn>", methods=["GET"])
1✔
1364
    @count_usage
1✔
1365
    def on_get(self, _request: Request, subscription_arn: str):
1✔
1366
        try:
1✔
1367
            parsed_arn = parse_arn(subscription_arn)
1✔
1368
        except InvalidArnException:
1✔
1369
            response = Response("", 400)
1✔
1370
            response.set_json(
1✔
1371
                {
1372
                    "error": "The provided SubscriptionARN is invalid",
1373
                    "subscription_arn": subscription_arn,
1374
                }
1375
            )
1376
            return response
1✔
1377

1378
        store: SnsStore = sns_stores[parsed_arn["account"]][parsed_arn["region"]]
1✔
1379

1380
        for token, sub_arn in store.subscription_tokens.items():
1✔
1381
            if sub_arn == subscription_arn:
1✔
1382
                return {
1✔
1383
                    "subscription_token": token,
1384
                    "subscription_arn": subscription_arn,
1385
                }
1386

1387
        response = Response("", 404)
1✔
1388
        response.set_json(
1✔
1389
            {
1390
                "error": "The provided SubscriptionARN is not found",
1391
                "subscription_arn": subscription_arn,
1392
            }
1393
        )
1394
        return response
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc