• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 16820655284

07 Aug 2025 05:03PM UTC coverage: 86.841% (-0.05%) from 86.892%
16820655284

push

github

web-flow
CFNV2: support CDK bootstrap and deployment (#12967)

32 of 38 new or added lines in 5 files covered. (84.21%)

2013 existing lines in 125 files now uncovered.

66606 of 76699 relevant lines covered (86.84%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.8
/localstack-core/localstack/services/sns/provider.py
1
import base64
1✔
2
import copy
1✔
3
import functools
1✔
4
import json
1✔
5
import logging
1✔
6
from uuid import uuid4
1✔
7

8
from botocore.utils import InvalidArnException
1✔
9
from moto.core.utils import camelcase_to_pascal, underscores_to_camelcase
1✔
10
from moto.sns import sns_backends
1✔
11
from moto.sns.models import MAXIMUM_MESSAGE_LENGTH, SNSBackend, Topic
1✔
12
from moto.sns.utils import is_e164
1✔
13

14
from localstack.aws.api import CommonServiceException, RequestContext
1✔
15
from localstack.aws.api.sns import (
1✔
16
    AmazonResourceName,
17
    BatchEntryIdsNotDistinctException,
18
    ConfirmSubscriptionResponse,
19
    CreateEndpointResponse,
20
    CreatePlatformApplicationResponse,
21
    CreateTopicResponse,
22
    EndpointDisabledException,
23
    GetSubscriptionAttributesResponse,
24
    GetTopicAttributesResponse,
25
    InvalidParameterException,
26
    InvalidParameterValueException,
27
    ListSubscriptionsByTopicResponse,
28
    ListSubscriptionsResponse,
29
    ListTagsForResourceResponse,
30
    MapStringToString,
31
    MessageAttributeMap,
32
    NotFoundException,
33
    PublishBatchRequestEntryList,
34
    PublishBatchResponse,
35
    PublishBatchResultEntry,
36
    PublishResponse,
37
    SnsApi,
38
    String,
39
    SubscribeResponse,
40
    Subscription,
41
    SubscriptionAttributesMap,
42
    TagKeyList,
43
    TagList,
44
    TagResourceResponse,
45
    TooManyEntriesInBatchRequestException,
46
    TopicAttributesMap,
47
    UntagResourceResponse,
48
    attributeName,
49
    attributeValue,
50
    authenticateOnUnsubscribe,
51
    boolean,
52
    messageStructure,
53
    nextToken,
54
    subscriptionARN,
55
    topicARN,
56
    topicName,
57
)
58
from localstack.constants import AWS_REGION_US_EAST_1, DEFAULT_AWS_ACCOUNT_ID
1✔
59
from localstack.http import Request, Response, Router, route
1✔
60
from localstack.services.edge import ROUTER
1✔
61
from localstack.services.moto import call_moto
1✔
62
from localstack.services.plugins import ServiceLifecycleHook
1✔
63
from localstack.services.sns import constants as sns_constants
1✔
64
from localstack.services.sns.certificate import SNS_SERVER_CERT
1✔
65
from localstack.services.sns.filter import FilterPolicyValidator
1✔
66
from localstack.services.sns.models import (
1✔
67
    SnsMessage,
68
    SnsMessageType,
69
    SnsStore,
70
    SnsSubscription,
71
    sns_stores,
72
)
73
from localstack.services.sns.publisher import (
1✔
74
    PublishDispatcher,
75
    SnsBatchPublishContext,
76
    SnsPublishContext,
77
)
78
from localstack.utils.aws.arns import (
1✔
79
    ArnData,
80
    extract_account_id_from_arn,
81
    extract_region_from_arn,
82
    get_partition,
83
    parse_arn,
84
)
85
from localstack.utils.collections import PaginatedList, select_from_typed_dict
1✔
86
from localstack.utils.strings import short_uid, to_bytes, to_str
1✔
87

88
from .analytics import internal_api_calls
1✔
89

90
# set up logger
91
LOG = logging.getLogger(__name__)
1✔
92

93

94
class SnsProvider(SnsApi, ServiceLifecycleHook):
1✔
95
    """
96
    Provider class for AWS Simple Notification Service.
97

98
    AWS supports following operations in a cross-account setup:
99
    - GetTopicAttributes
100
    - SetTopicAttributes
101
    - AddPermission
102
    - RemovePermission
103
    - Publish
104
    - Subscribe
105
    - ListSubscriptionByTopic
106
    - DeleteTopic
107
    """
108

109
    @route(sns_constants.SNS_CERT_ENDPOINT, methods=["GET"])
1✔
110
    def get_signature_cert_pem_file(self, request: Request):
1✔
111
        # see http://sns-public-resources.s3.amazonaws.com/SNS_Message_Signing_Release_Note_Jan_25_2011.pdf
112
        # see https://docs.aws.amazon.com/sns/latest/dg/sns-verify-signature-of-message.html
113
        return Response(self._signature_cert_pem, 200)
1✔
114

115
    def __init__(self) -> None:
1✔
116
        super().__init__()
1✔
117
        self._publisher = PublishDispatcher()
1✔
118
        self._signature_cert_pem: str = SNS_SERVER_CERT
1✔
119

120
    def on_before_stop(self):
1✔
121
        self._publisher.shutdown()
1✔
122

123
    def on_after_init(self):
1✔
124
        # Allow sent platform endpoint messages to be retrieved from the SNS endpoint
125
        register_sns_api_resource(ROUTER)
1✔
126
        # add the route to serve the certificate used to validate message signatures
127
        ROUTER.add(self.get_signature_cert_pem_file)
1✔
128

129
    @staticmethod
1✔
130
    def get_store(account_id: str, region_name: str) -> SnsStore:
1✔
131
        return sns_stores[account_id][region_name]
1✔
132

133
    @staticmethod
1✔
134
    def get_moto_backend(account_id: str, region_name: str) -> SNSBackend:
1✔
135
        return sns_backends[account_id][region_name]
1✔
136

137
    @staticmethod
1✔
138
    def _get_topic(arn: str, context: RequestContext) -> Topic:
1✔
139
        """
140
        :param arn: the Topic ARN
141
        :param context: the RequestContext of the request
142
        :param multiregion: if the request can fetch the topic across regions or not (ex. Publish cannot publish to a
143
        topic in a different region than the request)
144
        :return: the Moto model Topic
145
        """
146
        arn_data = parse_and_validate_topic_arn(arn)
1✔
147
        if context.region != arn_data["region"]:
1✔
148
            raise InvalidParameterException("Invalid parameter: TopicArn")
1✔
149

150
        try:
1✔
151
            return sns_backends[arn_data["account"]][context.region].topics[arn]
1✔
152
        except KeyError:
1✔
153
            raise NotFoundException("Topic does not exist")
1✔
154

155
    def get_topic_attributes(
1✔
156
        self, context: RequestContext, topic_arn: topicARN, **kwargs
157
    ) -> GetTopicAttributesResponse:
158
        # get the Topic from moto manually first, because Moto does not handle well the case where the ARN is malformed
159
        # (raises ValueError: not enough values to unpack (expected 6, got 1))
160
        moto_topic_model = self._get_topic(topic_arn, context)
1✔
161
        moto_response: GetTopicAttributesResponse = call_moto(context)
1✔
162
        # TODO: fix some attributes by moto, see snapshot
163
        # DeliveryPolicy
164
        # EffectiveDeliveryPolicy
165
        # Policy.Statement..Action -> SNS:Receive is added by moto but not returned in AWS
166
        # TODO: very hacky way to get the attributes we need instead of a moto patch
167
        # see the attributes we need: https://docs.aws.amazon.com/sns/latest/dg/sns-topic-attributes.html
168
        # would need more work to have the proper format out of moto, maybe extract the model to our store
169
        attributes = moto_response["Attributes"]
1✔
170
        for attr in vars(moto_topic_model):
1✔
171
            if "_feedback" in attr:
1✔
172
                key = camelcase_to_pascal(underscores_to_camelcase(attr))
1✔
173
                attributes[key] = getattr(moto_topic_model, attr)
1✔
174
            elif attr == "signature_version":
1✔
175
                attributes["SignatureVersion"] = moto_topic_model.signature_version
1✔
176
            elif attr == "archive_policy":
1✔
177
                attributes["ArchivePolicy"] = moto_topic_model.archive_policy
1✔
178

179
        return moto_response
1✔
180

181
    def set_topic_attributes(
1✔
182
        self,
183
        context: RequestContext,
184
        topic_arn: topicARN,
185
        attribute_name: attributeName,
186
        attribute_value: attributeValue | None = None,
187
        **kwargs,
188
    ) -> None:
189
        # validate the topic first
190
        self._get_topic(topic_arn, context)
1✔
191
        call_moto(context)
1✔
192

193
    def publish_batch(
1✔
194
        self,
195
        context: RequestContext,
196
        topic_arn: topicARN,
197
        publish_batch_request_entries: PublishBatchRequestEntryList,
198
        **kwargs,
199
    ) -> PublishBatchResponse:
200
        if len(publish_batch_request_entries) > 10:
1✔
201
            raise TooManyEntriesInBatchRequestException(
1✔
202
                "The batch request contains more entries than permissible."
203
            )
204

205
        parsed_arn = parse_and_validate_topic_arn(topic_arn)
1✔
206
        store = self.get_store(account_id=parsed_arn["account"], region_name=context.region)
1✔
207
        moto_topic = self._get_topic(topic_arn, context)
1✔
208

209
        ids = [entry["Id"] for entry in publish_batch_request_entries]
1✔
210
        if len(set(ids)) != len(publish_batch_request_entries):
1✔
211
            raise BatchEntryIdsNotDistinctException(
1✔
212
                "Two or more batch entries in the request have the same Id."
213
            )
214

215
        response: PublishBatchResponse = {"Successful": [], "Failed": []}
1✔
216

217
        # TODO: write AWS validated tests with FilterPolicy and batching
218
        # TODO: find a scenario where we can fail to send a message synchronously to be able to report it
219
        # right now, it seems that AWS fails the whole publish if something is wrong in the format of 1 message
220

221
        total_batch_size = 0
1✔
222
        message_contexts = []
1✔
223
        for entry_index, entry in enumerate(publish_batch_request_entries, start=1):
1✔
224
            message_payload = entry.get("Message")
1✔
225
            message_attributes = entry.get("MessageAttributes", {})
1✔
226
            if message_attributes:
1✔
227
                # if a message contains non-valid message attributes
228
                # will fail for the first non-valid message encountered, and raise ParameterValueInvalid
229
                validate_message_attributes(message_attributes, position=entry_index)
1✔
230

231
            total_batch_size += get_total_publish_size(message_payload, message_attributes)
1✔
232

233
            # TODO: WRITE AWS VALIDATED
234
            if entry.get("MessageStructure") == "json":
1✔
235
                try:
1✔
236
                    message = json.loads(message_payload)
1✔
237
                    # Keys in the JSON object that correspond to supported transport protocols must have
238
                    # simple JSON string values.
239
                    # Non-string values will cause the key to be ignored.
240
                    message = {
1✔
241
                        key: field for key, field in message.items() if isinstance(field, str)
242
                    }
243
                    if "default" not in message:
1✔
244
                        raise InvalidParameterException(
1✔
245
                            "Invalid parameter: Message Structure - No default entry in JSON message body"
246
                        )
247
                    entry["Message"] = message  # noqa
1✔
248
                except json.JSONDecodeError:
1✔
UNCOV
249
                    raise InvalidParameterException(
×
250
                        "Invalid parameter: Message Structure - JSON message body failed to parse"
251
                    )
252

253
            if is_fifo := (".fifo" in topic_arn):
1✔
254
                if not all("MessageGroupId" in entry for entry in publish_batch_request_entries):
1✔
255
                    raise InvalidParameterException(
1✔
256
                        "Invalid parameter: The MessageGroupId parameter is required for FIFO topics"
257
                    )
258
                if moto_topic.content_based_deduplication == "false":
1✔
259
                    if not all(
1✔
260
                        "MessageDeduplicationId" in entry for entry in publish_batch_request_entries
261
                    ):
262
                        raise InvalidParameterException(
1✔
263
                            "Invalid parameter: The topic should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided explicitly",
264
                        )
265

266
            msg_ctx = SnsMessage.from_batch_entry(entry, is_fifo=is_fifo)
1✔
267
            message_contexts.append(msg_ctx)
1✔
268
            success = PublishBatchResultEntry(
1✔
269
                Id=entry["Id"],
270
                MessageId=msg_ctx.message_id,
271
            )
272
            if is_fifo:
1✔
273
                success["SequenceNumber"] = msg_ctx.sequencer_number
1✔
274
            response["Successful"].append(success)
1✔
275

276
        if total_batch_size > MAXIMUM_MESSAGE_LENGTH:
1✔
277
            raise CommonServiceException(
1✔
278
                code="BatchRequestTooLong",
279
                message="The length of all the messages put together is more than the limit.",
280
                sender_fault=True,
281
            )
282

283
        publish_ctx = SnsBatchPublishContext(
1✔
284
            messages=message_contexts,
285
            store=store,
286
            request_headers=context.request.headers,
287
            topic_attributes=vars(moto_topic),
288
        )
289
        self._publisher.publish_batch_to_topic(publish_ctx, topic_arn)
1✔
290

291
        return response
1✔
292

293
    def set_subscription_attributes(
1✔
294
        self,
295
        context: RequestContext,
296
        subscription_arn: subscriptionARN,
297
        attribute_name: attributeName,
298
        attribute_value: attributeValue = None,
299
        **kwargs,
300
    ) -> None:
301
        store = self.get_store(account_id=context.account_id, region_name=context.region)
1✔
302
        sub = store.subscriptions.get(subscription_arn)
1✔
303
        if not sub:
1✔
304
            raise NotFoundException("Subscription does not exist")
1✔
305

306
        validate_subscription_attribute(
1✔
307
            attribute_name=attribute_name,
308
            attribute_value=attribute_value,
309
            topic_arn=sub["TopicArn"],
310
            endpoint=sub["Endpoint"],
311
        )
312
        if attribute_name == "RawMessageDelivery":
1✔
313
            attribute_value = attribute_value.lower()
1✔
314

315
        elif attribute_name == "FilterPolicy":
1✔
316
            filter_policy = json.loads(attribute_value) if attribute_value else None
1✔
317
            if filter_policy:
1✔
318
                validator = FilterPolicyValidator(
1✔
319
                    scope=sub.get("FilterPolicyScope", "MessageAttributes"),
320
                    is_subscribe_call=False,
321
                )
322
                validator.validate_filter_policy(filter_policy)
1✔
323

324
            store.subscription_filter_policy[subscription_arn] = filter_policy
1✔
325

326
        sub[attribute_name] = attribute_value
1✔
327

328
    def confirm_subscription(
1✔
329
        self,
330
        context: RequestContext,
331
        topic_arn: topicARN,
332
        token: String,
333
        authenticate_on_unsubscribe: authenticateOnUnsubscribe = None,
334
        **kwargs,
335
    ) -> ConfirmSubscriptionResponse:
336
        # TODO: validate format on the token (seems to be 288 hex chars)
337
        # this request can come from any http client, it might not be signed (we would need to implement
338
        # `authenticate_on_unsubscribe` to force a signing client to do this request.
339
        # so, the region and account_id might not be in the request. Use the ones from the topic_arn
340
        try:
1✔
341
            parsed_arn = parse_arn(topic_arn)
1✔
342
        except InvalidArnException:
1✔
343
            raise InvalidParameterException("Invalid parameter: Topic")
1✔
344

345
        store = self.get_store(account_id=parsed_arn["account"], region_name=parsed_arn["region"])
1✔
346

347
        # it seems SNS is able to know what the region of the topic should be, even though a wrong topic is accepted
348
        if parsed_arn["region"] != get_region_from_subscription_token(token):
1✔
349
            raise InvalidParameterException("Invalid parameter: Topic")
1✔
350

351
        subscription_arn = store.subscription_tokens.get(token)
1✔
352
        if not subscription_arn:
1✔
UNCOV
353
            raise InvalidParameterException("Invalid parameter: Token")
×
354

355
        subscription = store.subscriptions.get(subscription_arn)
1✔
356
        if not subscription:
1✔
357
            # subscription could have been deleted in the meantime
UNCOV
358
            raise InvalidParameterException("Invalid parameter: Token")
×
359

360
        # ConfirmSubscription is idempotent
361
        if subscription.get("PendingConfirmation") == "false":
1✔
362
            return ConfirmSubscriptionResponse(SubscriptionArn=subscription_arn)
1✔
363

364
        subscription["PendingConfirmation"] = "false"
1✔
365
        subscription["ConfirmationWasAuthenticated"] = "true"
1✔
366

367
        return ConfirmSubscriptionResponse(SubscriptionArn=subscription_arn)
1✔
368

369
    def untag_resource(
1✔
370
        self,
371
        context: RequestContext,
372
        resource_arn: AmazonResourceName,
373
        tag_keys: TagKeyList,
374
        **kwargs,
375
    ) -> UntagResourceResponse:
376
        call_moto(context)
1✔
377
        # TODO: probably get the account_id and region from the `resource_arn`
378
        store = self.get_store(context.account_id, context.region)
1✔
379
        existing_tags = store.sns_tags.setdefault(resource_arn, [])
1✔
380
        store.sns_tags[resource_arn] = [t for t in existing_tags if t["Key"] not in tag_keys]
1✔
381
        return UntagResourceResponse()
1✔
382

383
    def list_tags_for_resource(
1✔
384
        self, context: RequestContext, resource_arn: AmazonResourceName, **kwargs
385
    ) -> ListTagsForResourceResponse:
386
        # TODO: probably get the account_id and region from the `resource_arn`
387
        store = self.get_store(context.account_id, context.region)
1✔
388
        tags = store.sns_tags.setdefault(resource_arn, [])
1✔
389
        return ListTagsForResourceResponse(Tags=tags)
1✔
390

391
    def create_platform_application(
1✔
392
        self,
393
        context: RequestContext,
394
        name: String,
395
        platform: String,
396
        attributes: MapStringToString,
397
        **kwargs,
398
    ) -> CreatePlatformApplicationResponse:
399
        # TODO: validate platform
400
        # see https://docs.aws.amazon.com/cli/latest/reference/sns/create-platform-application.html
401
        # list of possible values: ADM, Baidu, APNS, APNS_SANDBOX, GCM, MPNS, WNS
402
        # each platform has a specific way to handle credentials
403
        # this can also be used for dispatching message to the right platform
404
        return call_moto(context)
1✔
405

406
    def create_platform_endpoint(
1✔
407
        self,
408
        context: RequestContext,
409
        platform_application_arn: String,
410
        token: String,
411
        custom_user_data: String = None,
412
        attributes: MapStringToString = None,
413
        **kwargs,
414
    ) -> CreateEndpointResponse:
415
        # TODO: support mobile app events
416
        # see https://docs.aws.amazon.com/sns/latest/dg/application-event-notifications.html
417
        return call_moto(context)
1✔
418

419
    def unsubscribe(
1✔
420
        self, context: RequestContext, subscription_arn: subscriptionARN, **kwargs
421
    ) -> None:
422
        count = len(subscription_arn.split(":"))
1✔
423
        try:
1✔
424
            parsed_arn = parse_arn(subscription_arn)
1✔
425
        except InvalidArnException:
1✔
426
            # TODO: check for invalid SubscriptionGUID
427
            raise InvalidParameterException(
1✔
428
                f"Invalid parameter: SubscriptionArn Reason: An ARN must have at least 6 elements, not {count}"
429
            )
430

431
        account_id = parsed_arn["account"]
1✔
432
        region_name = parsed_arn["region"]
1✔
433

434
        store = self.get_store(account_id=account_id, region_name=region_name)
1✔
435
        if count == 6 and subscription_arn not in store.subscriptions:
1✔
436
            raise InvalidParameterException("Invalid parameter: SubscriptionId")
1✔
437

438
        moto_sns_backend = self.get_moto_backend(account_id, region_name)
1✔
439
        moto_sns_backend.unsubscribe(subscription_arn)
1✔
440

441
        # pop the subscription at the end, to avoid race condition by iterating over the topic subscriptions
442
        subscription = store.subscriptions.get(subscription_arn)
1✔
443

444
        if not subscription:
1✔
445
            # unsubscribe is idempotent, so unsubscribing from a non-existing topic does nothing
446
            return
1✔
447

448
        if subscription["Protocol"] in ["http", "https"]:
1✔
449
            # TODO: actually validate this (re)subscribe behaviour somehow (localhost.run?)
450
            #  we might need to save the sub token in the store
451
            # TODO: AWS only sends the UnsubscribeConfirmation if the call is unauthenticated or the requester is not
452
            #  the owner
453
            subscription_token = encode_subscription_token_with_region(region=context.region)
1✔
454
            message_ctx = SnsMessage(
1✔
455
                type=SnsMessageType.UnsubscribeConfirmation,
456
                token=subscription_token,
457
                message=f"You have chosen to deactivate subscription {subscription_arn}.\nTo cancel this operation and restore the subscription, visit the SubscribeURL included in this message.",
458
            )
459
            moto_topic = moto_sns_backend.topics.get(subscription["TopicArn"])
1✔
460
            publish_ctx = SnsPublishContext(
1✔
461
                message=message_ctx,
462
                store=store,
463
                request_headers=context.request.headers,
464
                topic_attributes=vars(moto_topic),
465
            )
466
            self._publisher.publish_to_topic_subscriber(
1✔
467
                publish_ctx,
468
                topic_arn=subscription["TopicArn"],
469
                subscription_arn=subscription_arn,
470
            )
471

472
        store.topic_subscriptions[subscription["TopicArn"]].remove(subscription_arn)
1✔
473
        store.subscription_filter_policy.pop(subscription_arn, None)
1✔
474
        store.subscriptions.pop(subscription_arn, None)
1✔
475

476
    def get_subscription_attributes(
1✔
477
        self, context: RequestContext, subscription_arn: subscriptionARN, **kwargs
478
    ) -> GetSubscriptionAttributesResponse:
479
        store = self.get_store(account_id=context.account_id, region_name=context.region)
1✔
480
        sub = store.subscriptions.get(subscription_arn)
1✔
481
        if not sub:
1✔
482
            raise NotFoundException("Subscription does not exist")
1✔
483
        removed_attrs = ["sqs_queue_url"]
1✔
484
        if "FilterPolicyScope" in sub and not sub.get("FilterPolicy"):
1✔
485
            removed_attrs.append("FilterPolicyScope")
1✔
486
            removed_attrs.append("FilterPolicy")
1✔
487
        elif "FilterPolicy" in sub and "FilterPolicyScope" not in sub:
1✔
UNCOV
488
            sub["FilterPolicyScope"] = "MessageAttributes"
×
489

490
        attributes = {k: v for k, v in sub.items() if k not in removed_attrs}
1✔
491
        return GetSubscriptionAttributesResponse(Attributes=attributes)
1✔
492

493
    def list_subscriptions(
1✔
494
        self, context: RequestContext, next_token: nextToken = None, **kwargs
495
    ) -> ListSubscriptionsResponse:
496
        store = self.get_store(context.account_id, context.region)
1✔
497
        subscriptions = [
1✔
498
            select_from_typed_dict(Subscription, sub) for sub in list(store.subscriptions.values())
499
        ]
500
        paginated_subscriptions = PaginatedList(subscriptions)
1✔
501
        page, next_token = paginated_subscriptions.get_page(
1✔
502
            token_generator=lambda x: get_next_page_token_from_arn(x["SubscriptionArn"]),
503
            page_size=100,
504
            next_token=next_token,
505
        )
506

507
        response = ListSubscriptionsResponse(Subscriptions=page)
1✔
508
        if next_token:
1✔
509
            response["NextToken"] = next_token
1✔
510
        return response
1✔
511

512
    def list_subscriptions_by_topic(
1✔
513
        self, context: RequestContext, topic_arn: topicARN, next_token: nextToken = None, **kwargs
514
    ) -> ListSubscriptionsByTopicResponse:
515
        self._get_topic(topic_arn, context)
1✔
516
        parsed_topic_arn = parse_and_validate_topic_arn(topic_arn)
1✔
517
        store = self.get_store(parsed_topic_arn["account"], parsed_topic_arn["region"])
1✔
518
        sns_subscriptions = store.get_topic_subscriptions(topic_arn)
1✔
519
        subscriptions = [select_from_typed_dict(Subscription, sub) for sub in sns_subscriptions]
1✔
520

521
        paginated_subscriptions = PaginatedList(subscriptions)
1✔
522
        page, next_token = paginated_subscriptions.get_page(
1✔
523
            token_generator=lambda x: get_next_page_token_from_arn(x["SubscriptionArn"]),
524
            page_size=100,
525
            next_token=next_token,
526
        )
527

528
        response = ListSubscriptionsResponse(Subscriptions=page)
1✔
529
        if next_token:
1✔
530
            response["NextToken"] = next_token
1✔
531
        return response
1✔
532

533
    def publish(
1✔
534
        self,
535
        context: RequestContext,
536
        message: String,
537
        topic_arn: topicARN = None,
538
        target_arn: String = None,
539
        phone_number: String = None,
540
        subject: String = None,
541
        message_structure: messageStructure = None,
542
        message_attributes: MessageAttributeMap = None,
543
        message_deduplication_id: String = None,
544
        message_group_id: String = None,
545
        **kwargs,
546
    ) -> PublishResponse:
547
        if subject == "":
1✔
548
            raise InvalidParameterException("Invalid parameter: Subject")
1✔
549
        if not message or all(not m for m in message):
1✔
550
            raise InvalidParameterException("Invalid parameter: Empty message")
1✔
551

552
        # TODO: check for topic + target + phone number at the same time?
553
        # TODO: more validation on phone, it might be opted out?
554
        if phone_number and not is_e164(phone_number):
1✔
555
            raise InvalidParameterException(
1✔
556
                f"Invalid parameter: PhoneNumber Reason: {phone_number} is not valid to publish to"
557
            )
558

559
        if message_attributes:
1✔
560
            validate_message_attributes(message_attributes)
1✔
561

562
        if get_total_publish_size(message, message_attributes) > MAXIMUM_MESSAGE_LENGTH:
1✔
563
            raise InvalidParameterException("Invalid parameter: Message too long")
1✔
564

565
        # for compatibility reasons, AWS allows users to use either TargetArn or TopicArn for publishing to a topic
566
        # use any of them for topic validation
567
        topic_or_target_arn = topic_arn or target_arn
1✔
568
        topic_model = None
1✔
569

570
        if is_fifo := (topic_or_target_arn and ".fifo" in topic_or_target_arn):
1✔
571
            if not message_group_id:
1✔
572
                raise InvalidParameterException(
1✔
573
                    "Invalid parameter: The MessageGroupId parameter is required for FIFO topics",
574
                )
575
            topic_model = self._get_topic(topic_or_target_arn, context)
1✔
576
            if topic_model.content_based_deduplication == "false":
1✔
577
                if not message_deduplication_id:
1✔
578
                    raise InvalidParameterException(
1✔
579
                        "Invalid parameter: The topic should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided explicitly",
580
                    )
581
        elif message_deduplication_id:
1✔
582
            # this is the first one to raise if both are set while the topic is not fifo
583
            raise InvalidParameterException(
1✔
584
                "Invalid parameter: MessageDeduplicationId Reason: The request includes MessageDeduplicationId parameter that is not valid for this topic type"
585
            )
586
        elif message_group_id:
1✔
587
            raise InvalidParameterException(
1✔
588
                "Invalid parameter: MessageGroupId Reason: The request includes MessageGroupId parameter that is not valid for this topic type"
589
            )
590
        is_endpoint_publish = target_arn and ":endpoint/" in target_arn
1✔
591
        if message_structure == "json":
1✔
592
            try:
1✔
593
                message = json.loads(message)
1✔
594
                # Keys in the JSON object that correspond to supported transport protocols must have
595
                # simple JSON string values.
596
                # Non-string values will cause the key to be ignored.
597
                message = {key: field for key, field in message.items() if isinstance(field, str)}
1✔
598
                # TODO: check no default key for direct TargetArn endpoint publish, need credentials
599
                # see example: https://docs.aws.amazon.com/sns/latest/dg/sns-send-custom-platform-specific-payloads-mobile-devices.html
600
                if "default" not in message and not is_endpoint_publish:
1✔
601
                    raise InvalidParameterException(
1✔
602
                        "Invalid parameter: Message Structure - No default entry in JSON message body"
603
                    )
604
            except json.JSONDecodeError:
1✔
605
                raise InvalidParameterException(
1✔
606
                    "Invalid parameter: Message Structure - JSON message body failed to parse"
607
                )
608

609
        if not phone_number:
1✔
610
            # use the account to get the store from the TopicArn (you can only publish in the same region as the topic)
611
            parsed_arn = parse_and_validate_topic_arn(topic_or_target_arn)
1✔
612
            store = self.get_store(account_id=parsed_arn["account"], region_name=context.region)
1✔
613
            moto_sns_backend = self.get_moto_backend(parsed_arn["account"], context.region)
1✔
614
            if is_endpoint_publish:
1✔
615
                if not (platform_endpoint := moto_sns_backend.platform_endpoints.get(target_arn)):
1✔
616
                    raise InvalidParameterException(
1✔
617
                        "Invalid parameter: TargetArn Reason: No endpoint found for the target arn specified"
618
                    )
619
                elif not platform_endpoint.enabled:
1✔
620
                    raise EndpointDisabledException("Endpoint is disabled")
1✔
621
            else:
622
                topic_model = self._get_topic(topic_or_target_arn, context)
1✔
623
        else:
624
            # use the store from the request context
625
            store = self.get_store(account_id=context.account_id, region_name=context.region)
1✔
626

627
        message_ctx = SnsMessage(
1✔
628
            type=SnsMessageType.Notification,
629
            message=message,
630
            message_attributes=message_attributes,
631
            message_deduplication_id=message_deduplication_id,
632
            message_group_id=message_group_id,
633
            message_structure=message_structure,
634
            subject=subject,
635
            is_fifo=is_fifo,
636
        )
637
        publish_ctx = SnsPublishContext(
1✔
638
            message=message_ctx, store=store, request_headers=context.request.headers
639
        )
640

641
        if is_endpoint_publish:
1✔
642
            self._publisher.publish_to_application_endpoint(
1✔
643
                ctx=publish_ctx, endpoint_arn=target_arn
644
            )
645
        elif phone_number:
1✔
646
            self._publisher.publish_to_phone_number(ctx=publish_ctx, phone_number=phone_number)
1✔
647
        else:
648
            # beware if the subscription is FIFO, the order might not be guaranteed.
649
            # 2 quick call to this method in succession might not be executed in order in the executor?
650
            # TODO: test how this behaves in a FIFO context with a lot of threads.
651
            publish_ctx.topic_attributes |= vars(topic_model)
1✔
652
            self._publisher.publish_to_topic(publish_ctx, topic_or_target_arn)
1✔
653

654
        if is_fifo:
1✔
655
            return PublishResponse(
1✔
656
                MessageId=message_ctx.message_id, SequenceNumber=message_ctx.sequencer_number
657
            )
658

659
        return PublishResponse(MessageId=message_ctx.message_id)
1✔
660

661
    def subscribe(
1✔
662
        self,
663
        context: RequestContext,
664
        topic_arn: topicARN,
665
        protocol: String,
666
        endpoint: String = None,
667
        attributes: SubscriptionAttributesMap = None,
668
        return_subscription_arn: boolean = None,
669
        **kwargs,
670
    ) -> SubscribeResponse:
671
        # TODO: check validation ordering
672
        parsed_topic_arn = parse_and_validate_topic_arn(topic_arn)
1✔
673
        if context.region != parsed_topic_arn["region"]:
1✔
674
            raise InvalidParameterException("Invalid parameter: TopicArn")
1✔
675

676
        store = self.get_store(account_id=parsed_topic_arn["account"], region_name=context.region)
1✔
677

678
        if topic_arn not in store.topic_subscriptions:
1✔
679
            raise NotFoundException("Topic does not exist")
1✔
680

681
        if not endpoint:
1✔
682
            # TODO: check AWS behaviour (because endpoint is optional)
UNCOV
683
            raise NotFoundException("Endpoint not specified in subscription")
×
684
        if protocol not in sns_constants.SNS_PROTOCOLS:
1✔
685
            raise InvalidParameterException(
1✔
686
                f"Invalid parameter: Amazon SNS does not support this protocol string: {protocol}"
687
            )
688
        elif protocol in ["http", "https"] and not endpoint.startswith(f"{protocol}://"):
1✔
UNCOV
689
            raise InvalidParameterException(
×
690
                "Invalid parameter: Endpoint must match the specified protocol"
691
            )
692
        elif protocol == "sms" and not is_e164(endpoint):
1✔
693
            raise InvalidParameterException(f"Invalid SMS endpoint: {endpoint}")
1✔
694

695
        elif protocol == "sqs":
1✔
696
            try:
1✔
697
                parse_arn(endpoint)
1✔
698
            except InvalidArnException:
1✔
699
                raise InvalidParameterException("Invalid parameter: SQS endpoint ARN")
1✔
700

701
        elif protocol == "application":
1✔
702
            # TODO: this is taken from moto, validate it
703
            moto_backend = self.get_moto_backend(
1✔
704
                account_id=parsed_topic_arn["account"], region_name=context.region
705
            )
706
            if endpoint not in moto_backend.platform_endpoints:
1✔
UNCOV
707
                raise NotFoundException("Endpoint does not exist")
×
708

709
        if ".fifo" in endpoint and ".fifo" not in topic_arn:
1✔
710
            raise InvalidParameterException(
1✔
711
                "Invalid parameter: Invalid parameter: Endpoint Reason: FIFO SQS Queues can not be subscribed to standard SNS topics"
712
            )
713

714
        sub_attributes = copy.deepcopy(attributes) if attributes else None
1✔
715
        if sub_attributes:
1✔
716
            for attr_name, attr_value in sub_attributes.items():
1✔
717
                validate_subscription_attribute(
1✔
718
                    attribute_name=attr_name,
719
                    attribute_value=attr_value,
720
                    topic_arn=topic_arn,
721
                    endpoint=endpoint,
722
                    is_subscribe_call=True,
723
                )
724
                if raw_msg_delivery := sub_attributes.get("RawMessageDelivery"):
1✔
725
                    sub_attributes["RawMessageDelivery"] = raw_msg_delivery.lower()
1✔
726

727
        # An endpoint may only be subscribed to a topic once. Subsequent
728
        # subscribe calls do nothing (subscribe is idempotent), except if its attributes are different.
729
        for existing_topic_subscription in store.topic_subscriptions.get(topic_arn, []):
1✔
730
            sub = store.subscriptions.get(existing_topic_subscription, {})
1✔
731
            if sub.get("Endpoint") == endpoint:
1✔
732
                if sub_attributes:
1✔
733
                    # validate the subscription attributes aren't different
734
                    for attr in sns_constants.VALID_SUBSCRIPTION_ATTR_NAME:
1✔
735
                        # if a new attribute is present and different from an existent one, raise
736
                        if (new_attr := sub_attributes.get(attr)) and sub.get(attr) != new_attr:
1✔
737
                            raise InvalidParameterException(
1✔
738
                                "Invalid parameter: Attributes Reason: Subscription already exists with different attributes"
739
                            )
740

741
                return SubscribeResponse(SubscriptionArn=sub["SubscriptionArn"])
1✔
742

743
        principal = sns_constants.DUMMY_SUBSCRIPTION_PRINCIPAL.format(
1✔
744
            partition=get_partition(context.region), account_id=context.account_id
745
        )
746
        subscription_arn = create_subscription_arn(topic_arn)
1✔
747
        subscription = SnsSubscription(
1✔
748
            # http://docs.aws.amazon.com/cli/latest/reference/sns/get-subscription-attributes.html
749
            TopicArn=topic_arn,
750
            Endpoint=endpoint,
751
            Protocol=protocol,
752
            SubscriptionArn=subscription_arn,
753
            PendingConfirmation="true",
754
            Owner=context.account_id,
755
            RawMessageDelivery="false",  # default value, will be overridden if set
756
            FilterPolicyScope="MessageAttributes",  # default value, will be overridden if set
757
            SubscriptionPrincipal=principal,  # dummy value, could be fetched with a call to STS?
758
        )
759
        if sub_attributes:
1✔
760
            subscription.update(sub_attributes)
1✔
761
            if "FilterPolicy" in sub_attributes:
1✔
762
                filter_policy = (
1✔
763
                    json.loads(sub_attributes["FilterPolicy"])
764
                    if sub_attributes["FilterPolicy"]
765
                    else None
766
                )
767
                if filter_policy:
1✔
768
                    validator = FilterPolicyValidator(
1✔
769
                        scope=subscription.get("FilterPolicyScope", "MessageAttributes"),
770
                        is_subscribe_call=True,
771
                    )
772
                    validator.validate_filter_policy(filter_policy)
1✔
773

774
                store.subscription_filter_policy[subscription_arn] = filter_policy
1✔
775

776
        store.subscriptions[subscription_arn] = subscription
1✔
777

778
        topic_subscriptions = store.topic_subscriptions.setdefault(topic_arn, [])
1✔
779
        topic_subscriptions.append(subscription_arn)
1✔
780

781
        # store the token and subscription arn
782
        # TODO: the token is a 288 hex char string
783
        subscription_token = encode_subscription_token_with_region(region=context.region)
1✔
784
        store.subscription_tokens[subscription_token] = subscription_arn
1✔
785

786
        response_subscription_arn = subscription_arn
1✔
787
        # Send out confirmation message for HTTP(S), fix for https://github.com/localstack/localstack/issues/881
788
        if protocol in ["http", "https"]:
1✔
789
            message_ctx = SnsMessage(
1✔
790
                type=SnsMessageType.SubscriptionConfirmation,
791
                token=subscription_token,
792
                message=f"You have chosen to subscribe to the topic {topic_arn}.\nTo confirm the subscription, visit the SubscribeURL included in this message.",
793
            )
794
            publish_ctx = SnsPublishContext(
1✔
795
                message=message_ctx,
796
                store=store,
797
                request_headers=context.request.headers,
798
                topic_attributes=vars(self._get_topic(topic_arn, context)),
799
            )
800
            self._publisher.publish_to_topic_subscriber(
1✔
801
                ctx=publish_ctx,
802
                topic_arn=topic_arn,
803
                subscription_arn=subscription_arn,
804
            )
805
            if not return_subscription_arn:
1✔
806
                response_subscription_arn = "pending confirmation"
1✔
807

808
        elif protocol not in ["email", "email-json"]:
1✔
809
            # Only HTTP(S) and email subscriptions are not auto validated
810
            # Except if the endpoint and the topic are not in the same AWS account, then you'd need to manually confirm
811
            # the subscription with the token
812
            # TODO: revisit for multi-account
813
            # TODO: test with AWS for email & email-json confirmation message
814
            # we need to add the following check:
815
            # if parsed_topic_arn["account"] == endpoint account (depending on the type, SQS, lambda, parse the arn)
816
            subscription["PendingConfirmation"] = "false"
1✔
817
            subscription["ConfirmationWasAuthenticated"] = "true"
1✔
818

819
        return SubscribeResponse(SubscriptionArn=response_subscription_arn)
1✔
820

821
    def tag_resource(
1✔
822
        self, context: RequestContext, resource_arn: AmazonResourceName, tags: TagList, **kwargs
823
    ) -> TagResourceResponse:
824
        # each tag key must be unique
825
        # https://docs.aws.amazon.com/general/latest/gr/aws_tagging.html#tag-best-practices
826
        unique_tag_keys = {tag["Key"] for tag in tags}
1✔
827
        if len(unique_tag_keys) < len(tags):
1✔
828
            raise InvalidParameterException("Invalid parameter: Duplicated keys are not allowed.")
1✔
829

830
        call_moto(context)
1✔
831
        store = self.get_store(context.account_id, context.region)
1✔
832
        existing_tags = store.sns_tags.get(resource_arn, [])
1✔
833

834
        def existing_tag_index(_item):
1✔
835
            for idx, tag in enumerate(existing_tags):
1✔
836
                if _item["Key"] == tag["Key"]:
1✔
837
                    return idx
1✔
838
            return None
1✔
839

840
        for item in tags:
1✔
841
            existing_index = existing_tag_index(item)
1✔
842
            if existing_index is None:
1✔
843
                existing_tags.append(item)
1✔
844
            else:
845
                existing_tags[existing_index] = item
1✔
846

847
        store.sns_tags[resource_arn] = existing_tags
1✔
848
        return TagResourceResponse()
1✔
849

850
    def delete_topic(self, context: RequestContext, topic_arn: topicARN, **kwargs) -> None:
1✔
851
        parsed_arn = parse_and_validate_topic_arn(topic_arn)
1✔
852
        if context.region != parsed_arn["region"]:
1✔
853
            raise InvalidParameterException("Invalid parameter: TopicArn")
1✔
854

855
        call_moto(context)
1✔
856
        store = self.get_store(account_id=parsed_arn["account"], region_name=context.region)
1✔
857
        topic_subscriptions = store.topic_subscriptions.pop(topic_arn, [])
1✔
858
        for topic_sub in topic_subscriptions:
1✔
859
            store.subscriptions.pop(topic_sub, None)
1✔
860

861
        store.sns_tags.pop(topic_arn, None)
1✔
862

863
    def create_topic(
1✔
864
        self,
865
        context: RequestContext,
866
        name: topicName,
867
        attributes: TopicAttributesMap = None,
868
        tags: TagList = None,
869
        data_protection_policy: attributeValue = None,
870
        **kwargs,
871
    ) -> CreateTopicResponse:
872
        moto_response = call_moto(context)
1✔
873
        store = self.get_store(account_id=context.account_id, region_name=context.region)
1✔
874
        topic_arn = moto_response["TopicArn"]
1✔
875
        tag_resource_success = extract_tags(topic_arn, tags, True, store)
1✔
876
        if not tag_resource_success:
1✔
877
            raise InvalidParameterException(
1✔
878
                "Invalid parameter: Tags Reason: Topic already exists with different tags"
879
            )
880
        if tags:
1✔
881
            self.tag_resource(context=context, resource_arn=topic_arn, tags=tags)
1✔
882
        store.topic_subscriptions.setdefault(topic_arn, [])
1✔
883
        return CreateTopicResponse(TopicArn=topic_arn)
1✔
884

885

886
def is_raw_message_delivery(susbcriber):
1✔
887
    return susbcriber.get("RawMessageDelivery") in ("true", True, "True")
1✔
888

889

890
def validate_subscription_attribute(
1✔
891
    attribute_name: str,
892
    attribute_value: str,
893
    topic_arn: str,
894
    endpoint: str,
895
    is_subscribe_call: bool = False,
896
) -> None:
897
    """
898
    Validate the subscription attribute to be set. See:
899
    https://docs.aws.amazon.com/sns/latest/api/API_SetSubscriptionAttributes.html
900
    :param attribute_name: the subscription attribute name, must be in VALID_SUBSCRIPTION_ATTR_NAME
901
    :param attribute_value: the subscription attribute value
902
    :param topic_arn: the topic_arn of the subscription, needed to know if it is FIFO
903
    :param endpoint: the subscription endpoint (like an SQS queue ARN)
904
    :param is_subscribe_call: the error message is different if called from Subscribe or SetSubscriptionAttributes
905
    :raises InvalidParameterException
906
    :return:
907
    """
908
    error_prefix = (
1✔
909
        "Invalid parameter: Attributes Reason: " if is_subscribe_call else "Invalid parameter: "
910
    )
911
    if attribute_name not in sns_constants.VALID_SUBSCRIPTION_ATTR_NAME:
1✔
912
        raise InvalidParameterException(f"{error_prefix}AttributeName")
1✔
913

914
    if attribute_name == "FilterPolicy":
1✔
915
        try:
1✔
916
            json.loads(attribute_value or "{}")
1✔
917
        except json.JSONDecodeError:
1✔
918
            raise InvalidParameterException(f"{error_prefix}FilterPolicy: failed to parse JSON.")
1✔
919
    elif attribute_name == "FilterPolicyScope":
1✔
920
        if attribute_value not in ("MessageAttributes", "MessageBody"):
1✔
921
            raise InvalidParameterException(
1✔
922
                f"{error_prefix}FilterPolicyScope: Invalid value [{attribute_value}]. "
923
                f"Please use either MessageBody or MessageAttributes"
924
            )
925
    elif attribute_name == "RawMessageDelivery":
1✔
926
        # TODO: only for SQS and https(s) subs, + firehose
927
        if attribute_value.lower() not in ("true", "false"):
1✔
928
            raise InvalidParameterException(
1✔
929
                f"{error_prefix}RawMessageDelivery: Invalid value [{attribute_value}]. "
930
                f"Must be true or false."
931
            )
932

933
    elif attribute_name == "RedrivePolicy":
1✔
934
        try:
1✔
935
            dlq_target_arn = json.loads(attribute_value).get("deadLetterTargetArn", "")
1✔
936
        except json.JSONDecodeError:
1✔
937
            raise InvalidParameterException(f"{error_prefix}RedrivePolicy: failed to parse JSON.")
1✔
938
        try:
1✔
939
            parsed_arn = parse_arn(dlq_target_arn)
1✔
940
        except InvalidArnException:
1✔
941
            raise InvalidParameterException(
1✔
942
                f"{error_prefix}RedrivePolicy: deadLetterTargetArn is an invalid arn"
943
            )
944

945
        if topic_arn.endswith(".fifo"):
1✔
946
            if endpoint.endswith(".fifo") and (
1✔
947
                not parsed_arn["resource"].endswith(".fifo") or "sqs" not in parsed_arn["service"]
948
            ):
949
                raise InvalidParameterException(
1✔
950
                    f"{error_prefix}RedrivePolicy: must use a FIFO queue as DLQ for a FIFO Subscription to a FIFO Topic."
951
                )
952

953

954
def validate_message_attributes(
1✔
955
    message_attributes: MessageAttributeMap, position: int | None = None
956
) -> None:
957
    """
958
    Validate the message attributes, and raises an exception if those do not follow AWS validation
959
    See: https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html
960
    Regex from: https://stackoverflow.com/questions/40718851/regex-that-does-not-allow-consecutive-dots
961
    :param message_attributes: the message attributes map for the message
962
    :param position: given to give the Batch Entry position if coming from `publishBatch`
963
    :raises: InvalidParameterValueException
964
    :return: None
965
    """
966
    for attr_name, attr in message_attributes.items():
1✔
967
        if len(attr_name) > 256:
1✔
968
            raise InvalidParameterValueException(
1✔
969
                "Length of message attribute name must be less than 256 bytes."
970
            )
971
        validate_message_attribute_name(attr_name)
1✔
972
        # `DataType` is a required field for MessageAttributeValue
973
        if (data_type := attr.get("DataType")) is None:
1✔
974
            if position:
1✔
975
                at = f"publishBatchRequestEntries.{position}.member.messageAttributes.{attr_name}.member.dataType"
1✔
976
            else:
977
                at = f"messageAttributes.{attr_name}.member.dataType"
1✔
978

979
            raise CommonServiceException(
1✔
980
                code="ValidationError",
981
                message=f"1 validation error detected: Value null at '{at}' failed to satisfy constraint: Member must not be null",
982
                sender_fault=True,
983
            )
984

985
        if data_type not in (
1✔
986
            "String",
987
            "Number",
988
            "Binary",
989
        ) and not sns_constants.ATTR_TYPE_REGEX.match(data_type):
990
            raise InvalidParameterValueException(
1✔
991
                f"The message attribute '{attr_name}' has an invalid message attribute type, the set of supported type prefixes is Binary, Number, and String."
992
            )
993
        if not any(attr_value.endswith("Value") for attr_value in attr):
1✔
994
            raise InvalidParameterValueException(
1✔
995
                f"The message attribute '{attr_name}' must contain non-empty message attribute value for message attribute type '{data_type}'."
996
            )
997

998
        value_key_data_type = "Binary" if data_type.startswith("Binary") else "String"
1✔
999
        value_key = f"{value_key_data_type}Value"
1✔
1000
        if value_key not in attr:
1✔
1001
            raise InvalidParameterValueException(
1✔
1002
                f"The message attribute '{attr_name}' with type '{data_type}' must use field '{value_key_data_type}'."
1003
            )
1004
        elif not attr[value_key]:
1✔
1005
            raise InvalidParameterValueException(
1✔
1006
                f"The message attribute '{attr_name}' must contain non-empty message attribute value for message attribute type '{data_type}'.",
1007
            )
1008

1009

1010
def validate_message_attribute_name(name: str) -> None:
1✔
1011
    """
1012
    Validate the message attribute name with the specification of AWS.
1013
    The message attribute name can contain the following characters: A-Z, a-z, 0-9, underscore(_), hyphen(-), and period (.). The name must not start or end with a period, and it should not have successive periods.
1014
    :param name: message attribute name
1015
    :raises InvalidParameterValueException: if the name does not conform to the spec
1016
    """
1017
    if not sns_constants.MSG_ATTR_NAME_REGEX.match(name):
1✔
1018
        # find the proper exception
1019
        if name[0] == ".":
1✔
1020
            raise InvalidParameterValueException(
1✔
1021
                "Invalid message attribute name starting with character '.' was found."
1022
            )
1023
        elif name[-1] == ".":
1✔
1024
            raise InvalidParameterValueException(
1✔
1025
                "Invalid message attribute name ending with character '.' was found."
1026
            )
1027

1028
        for idx, char in enumerate(name):
1✔
1029
            if char not in sns_constants.VALID_MSG_ATTR_NAME_CHARS:
1✔
1030
                # change prefix from 0x to #x, without capitalizing the x
1031
                hex_char = "#x" + hex(ord(char)).upper()[2:]
1✔
1032
                raise InvalidParameterValueException(
1✔
1033
                    f"Invalid non-alphanumeric character '{hex_char}' was found in the message attribute name. Can only include alphanumeric characters, hyphens, underscores, or dots."
1034
                )
1035
            # even if we go negative index, it will be covered by starting/ending with dot
1036
            if char == "." and name[idx - 1] == ".":
1✔
1037
                raise InvalidParameterValueException(
1✔
1038
                    "Message attribute name can not have successive '.' character."
1039
                )
1040

1041

1042
def extract_tags(
1✔
1043
    topic_arn: str, tags: TagList, is_create_topic_request: bool, store: SnsStore
1044
) -> bool:
1045
    existing_tags = list(store.sns_tags.get(topic_arn, []))
1✔
1046
    # if this is none there is nothing to check
1047
    if topic_arn in store.topic_subscriptions:
1✔
1048
        if tags is None:
1✔
1049
            tags = []
1✔
1050
        for tag in tags:
1✔
1051
            # this means topic already created with empty tags and when we try to create it
1052
            # again with other tag value then it should fail according to aws documentation.
1053
            if is_create_topic_request and existing_tags is not None and tag not in existing_tags:
1✔
1054
                return False
1✔
1055
    return True
1✔
1056

1057

1058
def parse_and_validate_topic_arn(topic_arn: str | None) -> ArnData:
1✔
1059
    topic_arn = topic_arn or ""
1✔
1060
    try:
1✔
1061
        return parse_arn(topic_arn)
1✔
1062
    except InvalidArnException:
1✔
1063
        count = len(topic_arn.split(":"))
1✔
1064
        raise InvalidParameterException(
1✔
1065
            f"Invalid parameter: TopicArn Reason: An ARN must have at least 6 elements, not {count}"
1066
        )
1067

1068

1069
def create_subscription_arn(topic_arn: str) -> str:
1✔
1070
    # This is the format of a Subscription ARN
1071
    # arn:aws:sns:us-west-2:123456789012:my-topic:8a21d249-4329-4871-acc6-7be709c6ea7f
1072
    return f"{topic_arn}:{uuid4()}"
1✔
1073

1074

1075
def encode_subscription_token_with_region(region: str) -> str:
1✔
1076
    """
1077
    Create a 64 characters Subscription Token with the region encoded
1078
    :param region:
1079
    :return: a subscription token with the region encoded
1080
    """
1081
    return ((region.encode() + b"/").hex() + short_uid() * 8)[:64]
1✔
1082

1083

1084
def get_region_from_subscription_token(token: str) -> str:
1✔
1085
    """
1086
    Try to decode and return the region from a subscription token
1087
    :param token:
1088
    :return: the region if able to decode it
1089
    :raises: InvalidParameterException if the token is invalid
1090
    """
1091
    try:
1✔
1092
        region = token.split("2f", maxsplit=1)[0]
1✔
1093
        return bytes.fromhex(region).decode("utf-8")
1✔
1094
    except (IndexError, ValueError, TypeError, UnicodeDecodeError):
1✔
1095
        raise InvalidParameterException("Invalid parameter: Token")
1✔
1096

1097

1098
def get_next_page_token_from_arn(resource_arn: str) -> str:
1✔
1099
    return to_str(base64.b64encode(to_bytes(resource_arn)))
1✔
1100

1101

1102
def _get_byte_size(payload: str | bytes) -> int:
1✔
1103
    # Calculate the real length of the byte object if the object is a string
1104
    return len(to_bytes(payload))
1✔
1105

1106

1107
def get_total_publish_size(
1✔
1108
    message_body: str, message_attributes: MessageAttributeMap | None
1109
) -> int:
1110
    size = _get_byte_size(message_body)
1✔
1111
    if message_attributes:
1✔
1112
        # https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html
1113
        # All parts of the message attribute, including name, type, and value, are included in the message size
1114
        # restriction, which is 256 KB.
1115
        # iterate over the Keys and Attributes, adding the length of the Key to the length of all Attributes values
1116
        # (DataType and StringValue or BinaryValue)
1117
        size += sum(
1✔
1118
            _get_byte_size(key) + sum(_get_byte_size(attr_value) for attr_value in attr.values())
1119
            for key, attr in message_attributes.items()
1120
        )
1121

1122
    return size
1✔
1123

1124

1125
def register_sns_api_resource(router: Router):
1✔
1126
    """Register the retrospection endpoints as internal LocalStack endpoints."""
1127
    router.add(SNSServicePlatformEndpointMessagesApiResource())
1✔
1128
    router.add(SNSServiceSMSMessagesApiResource())
1✔
1129
    router.add(SNSServiceSubscriptionTokenApiResource())
1✔
1130

1131

1132
def _format_messages(sent_messages: list[dict[str, str]], validated_keys: list[str]):
1✔
1133
    """
1134
    This method format the messages to be more readable and undo the format change that was needed for Moto
1135
    Should be removed once we refactor SNS.
1136
    """
1137
    formatted_messages = []
1✔
1138
    for sent_message in sent_messages:
1✔
1139
        msg = {
1✔
1140
            key: json.dumps(value)
1141
            if key == "Message" and sent_message.get("MessageStructure") == "json"
1142
            else value
1143
            for key, value in sent_message.items()
1144
            if key in validated_keys
1145
        }
1146
        formatted_messages.append(msg)
1✔
1147

1148
    return formatted_messages
1✔
1149

1150

1151
class SNSInternalResource:
1✔
1152
    resource_type: str
1✔
1153
    """Base class with helper to properly track usage of internal endpoints"""
1✔
1154

1155
    def count_usage(self):
1✔
1156
        internal_api_calls.labels(resource_type=self.resource_type).increment()
1✔
1157

1158

1159
def count_usage(f):
1✔
1160
    @functools.wraps(f)
1✔
1161
    def _wrapper(self, *args, **kwargs):
1✔
1162
        self.count_usage()
1✔
1163
        return f(self, *args, **kwargs)
1✔
1164

1165
    return _wrapper
1✔
1166

1167

1168
class SNSServicePlatformEndpointMessagesApiResource(SNSInternalResource):
1✔
1169
    resource_type = "platform-endpoint-message"
1✔
1170
    """Provides a REST API for retrospective access to platform endpoint messages sent via SNS.
1✔
1171

1172
    This is registered as a LocalStack internal HTTP resource.
1173

1174
    This endpoint accepts:
1175
    - GET param `accountId`: selector for AWS account. If not specified, return fallback `000000000000` test ID
1176
    - GET param `region`: selector for AWS `region`. If not specified, return default "us-east-1"
1177
    - GET param `endpointArn`: filter for `endpointArn` resource in SNS
1178
    - DELETE param `accountId`: selector for AWS account
1179
    - DELETE param `region`: will delete saved messages for `region`
1180
    - DELETE param `endpointArn`: will delete saved messages for `endpointArn`
1181
    """
1182

1183
    _PAYLOAD_FIELDS = [
1✔
1184
        "TargetArn",
1185
        "TopicArn",
1186
        "Message",
1187
        "MessageAttributes",
1188
        "MessageStructure",
1189
        "Subject",
1190
        "MessageId",
1191
    ]
1192

1193
    @route(sns_constants.PLATFORM_ENDPOINT_MSGS_ENDPOINT, methods=["GET"])
1✔
1194
    @count_usage
1✔
1195
    def on_get(self, request: Request):
1✔
1196
        filter_endpoint_arn = request.args.get("endpointArn")
1✔
1197
        account_id = (
1✔
1198
            request.args.get("accountId", DEFAULT_AWS_ACCOUNT_ID)
1199
            if not filter_endpoint_arn
1200
            else extract_account_id_from_arn(filter_endpoint_arn)
1201
        )
1202
        region = (
1✔
1203
            request.args.get("region", AWS_REGION_US_EAST_1)
1204
            if not filter_endpoint_arn
1205
            else extract_region_from_arn(filter_endpoint_arn)
1206
        )
1207
        store: SnsStore = sns_stores[account_id][region]
1✔
1208
        if filter_endpoint_arn:
1✔
1209
            messages = store.platform_endpoint_messages.get(filter_endpoint_arn, [])
1✔
1210
            messages = _format_messages(messages, self._PAYLOAD_FIELDS)
1✔
1211
            return {
1✔
1212
                "platform_endpoint_messages": {filter_endpoint_arn: messages},
1213
                "region": region,
1214
            }
1215

1216
        platform_endpoint_messages = {
1✔
1217
            endpoint_arn: _format_messages(messages, self._PAYLOAD_FIELDS)
1218
            for endpoint_arn, messages in store.platform_endpoint_messages.items()
1219
        }
1220
        return {
1✔
1221
            "platform_endpoint_messages": platform_endpoint_messages,
1222
            "region": region,
1223
        }
1224

1225
    @route(sns_constants.PLATFORM_ENDPOINT_MSGS_ENDPOINT, methods=["DELETE"])
1✔
1226
    @count_usage
1✔
1227
    def on_delete(self, request: Request) -> Response:
1✔
1228
        filter_endpoint_arn = request.args.get("endpointArn")
1✔
1229
        account_id = (
1✔
1230
            request.args.get("accountId", DEFAULT_AWS_ACCOUNT_ID)
1231
            if not filter_endpoint_arn
1232
            else extract_account_id_from_arn(filter_endpoint_arn)
1233
        )
1234
        region = (
1✔
1235
            request.args.get("region", AWS_REGION_US_EAST_1)
1236
            if not filter_endpoint_arn
1237
            else extract_region_from_arn(filter_endpoint_arn)
1238
        )
1239
        store: SnsStore = sns_stores[account_id][region]
1✔
1240
        if filter_endpoint_arn:
1✔
1241
            store.platform_endpoint_messages.pop(filter_endpoint_arn, None)
1✔
1242
            return Response("", status=204)
1✔
1243

1244
        store.platform_endpoint_messages.clear()
1✔
1245
        return Response("", status=204)
1✔
1246

1247

1248
class SNSServiceSMSMessagesApiResource(SNSInternalResource):
1✔
1249
    resource_type = "sms-message"
1✔
1250
    """Provides a REST API for retrospective access to SMS messages sent via SNS.
1✔
1251

1252
    This is registered as a LocalStack internal HTTP resource.
1253

1254
    This endpoint accepts:
1255
    - GET param `accountId`: selector for AWS account. If not specified, return fallback `000000000000` test ID
1256
    - GET param `region`: selector for AWS `region`. If not specified, return default "us-east-1"
1257
    - GET param `phoneNumber`: filter for `phoneNumber` resource in SNS
1258
    """
1259

1260
    _PAYLOAD_FIELDS = [
1✔
1261
        "PhoneNumber",
1262
        "TopicArn",
1263
        "SubscriptionArn",
1264
        "MessageId",
1265
        "Message",
1266
        "MessageAttributes",
1267
        "MessageStructure",
1268
        "Subject",
1269
    ]
1270

1271
    @route(sns_constants.SMS_MSGS_ENDPOINT, methods=["GET"])
1✔
1272
    @count_usage
1✔
1273
    def on_get(self, request: Request):
1✔
1274
        account_id = request.args.get("accountId", DEFAULT_AWS_ACCOUNT_ID)
1✔
1275
        region = request.args.get("region", AWS_REGION_US_EAST_1)
1✔
1276
        filter_phone_number = request.args.get("phoneNumber")
1✔
1277
        store: SnsStore = sns_stores[account_id][region]
1✔
1278
        if filter_phone_number:
1✔
1279
            messages = [
1✔
1280
                m for m in store.sms_messages if m.get("PhoneNumber") == filter_phone_number
1281
            ]
1282
            messages = _format_messages(messages, self._PAYLOAD_FIELDS)
1✔
1283
            return {
1✔
1284
                "sms_messages": {filter_phone_number: messages},
1285
                "region": region,
1286
            }
1287

1288
        sms_messages = {}
1✔
1289

1290
        for m in _format_messages(store.sms_messages, self._PAYLOAD_FIELDS):
1✔
1291
            sms_messages.setdefault(m.get("PhoneNumber"), []).append(m)
1✔
1292

1293
        return {
1✔
1294
            "sms_messages": sms_messages,
1295
            "region": region,
1296
        }
1297

1298
    @route(sns_constants.SMS_MSGS_ENDPOINT, methods=["DELETE"])
1✔
1299
    @count_usage
1✔
1300
    def on_delete(self, request: Request) -> Response:
1✔
1301
        account_id = request.args.get("accountId", DEFAULT_AWS_ACCOUNT_ID)
1✔
1302
        region = request.args.get("region", AWS_REGION_US_EAST_1)
1✔
1303
        filter_phone_number = request.args.get("phoneNumber")
1✔
1304
        store: SnsStore = sns_stores[account_id][region]
1✔
1305
        if filter_phone_number:
1✔
1306
            store.sms_messages = [
1✔
1307
                m for m in store.sms_messages if m.get("PhoneNumber") != filter_phone_number
1308
            ]
1309
            return Response("", status=204)
1✔
1310

1311
        store.sms_messages.clear()
1✔
1312
        return Response("", status=204)
1✔
1313

1314

1315
class SNSServiceSubscriptionTokenApiResource(SNSInternalResource):
1✔
1316
    resource_type = "subscription-token"
1✔
1317
    """Provides a REST API for retrospective access to Subscription Confirmation Tokens to confirm subscriptions.
1✔
1318
    Those are not sent for email, and sometimes inaccessible when working with external HTTPS endpoint which won't be
1319
    able to reach your local host.
1320

1321
    This is registered as a LocalStack internal HTTP resource.
1322

1323
    This endpoint has the following parameter:
1324
    - GET `subscription_arn`: `subscriptionArn`resource in SNS for which you want the SubscriptionToken
1325
    """
1326

1327
    @route(f"{sns_constants.SUBSCRIPTION_TOKENS_ENDPOINT}/<path:subscription_arn>", methods=["GET"])
1✔
1328
    @count_usage
1✔
1329
    def on_get(self, _request: Request, subscription_arn: str):
1✔
1330
        try:
1✔
1331
            parsed_arn = parse_arn(subscription_arn)
1✔
1332
        except InvalidArnException:
1✔
1333
            response = Response("", 400)
1✔
1334
            response.set_json(
1✔
1335
                {
1336
                    "error": "The provided SubscriptionARN is invalid",
1337
                    "subscription_arn": subscription_arn,
1338
                }
1339
            )
1340
            return response
1✔
1341

1342
        store: SnsStore = sns_stores[parsed_arn["account"]][parsed_arn["region"]]
1✔
1343

1344
        for token, sub_arn in store.subscription_tokens.items():
1✔
1345
            if sub_arn == subscription_arn:
1✔
1346
                return {
1✔
1347
                    "subscription_token": token,
1348
                    "subscription_arn": subscription_arn,
1349
                }
1350

1351
        response = Response("", 404)
1✔
1352
        response.set_json(
1✔
1353
            {
1354
                "error": "The provided SubscriptionARN is not found",
1355
                "subscription_arn": subscription_arn,
1356
            }
1357
        )
1358
        return response
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc