• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19880423371

02 Dec 2025 08:25PM UTC coverage: 86.905% (-0.04%) from 86.945%
19880423371

push

github

web-flow
fix/external client CA bundle (#13451)

1 of 5 new or added lines in 1 file covered. (20.0%)

414 existing lines in 19 files now uncovered.

69738 of 80246 relevant lines covered (86.91%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.47
/localstack-core/localstack/services/sns/provider.py
1
import base64
1✔
2
import contextlib
1✔
3
import copy
1✔
4
import functools
1✔
5
import json
1✔
6
import logging
1✔
7
from uuid import uuid4
1✔
8

9
from botocore.utils import InvalidArnException
1✔
10
from moto.core.utils import camelcase_to_pascal, underscores_to_camelcase
1✔
11
from moto.sns import sns_backends
1✔
12
from moto.sns.models import MAXIMUM_MESSAGE_LENGTH, SNSBackend, Topic
1✔
13
from moto.sns.utils import is_e164
1✔
14

15
from localstack.aws.api import CommonServiceException, RequestContext
1✔
16
from localstack.aws.api.sns import (
1✔
17
    AmazonResourceName,
18
    BatchEntryIdsNotDistinctException,
19
    ConfirmSubscriptionResponse,
20
    CreateEndpointResponse,
21
    CreatePlatformApplicationResponse,
22
    CreateTopicResponse,
23
    EndpointDisabledException,
24
    GetSubscriptionAttributesResponse,
25
    GetTopicAttributesResponse,
26
    InvalidParameterException,
27
    InvalidParameterValueException,
28
    ListSubscriptionsByTopicResponse,
29
    ListSubscriptionsResponse,
30
    ListTagsForResourceResponse,
31
    MapStringToString,
32
    MessageAttributeMap,
33
    NotFoundException,
34
    PublishBatchRequestEntryList,
35
    PublishBatchResponse,
36
    PublishBatchResultEntry,
37
    PublishResponse,
38
    SnsApi,
39
    String,
40
    SubscribeResponse,
41
    Subscription,
42
    SubscriptionAttributesMap,
43
    TagKeyList,
44
    TagList,
45
    TagResourceResponse,
46
    TooManyEntriesInBatchRequestException,
47
    TopicAttributesMap,
48
    UntagResourceResponse,
49
    attributeName,
50
    attributeValue,
51
    authenticateOnUnsubscribe,
52
    boolean,
53
    messageStructure,
54
    nextToken,
55
    subscriptionARN,
56
    topicARN,
57
    topicName,
58
)
59
from localstack.constants import AWS_REGION_US_EAST_1, DEFAULT_AWS_ACCOUNT_ID
1✔
60
from localstack.http import Request, Response, Router, route
1✔
61
from localstack.services.edge import ROUTER
1✔
62
from localstack.services.moto import call_moto
1✔
63
from localstack.services.plugins import ServiceLifecycleHook
1✔
64
from localstack.services.sns import constants as sns_constants
1✔
65
from localstack.services.sns.certificate import SNS_SERVER_CERT
1✔
66
from localstack.services.sns.filter import FilterPolicyValidator
1✔
67
from localstack.services.sns.models import (
1✔
68
    SnsMessage,
69
    SnsMessageType,
70
    SnsStore,
71
    SnsSubscription,
72
    sns_stores,
73
)
74
from localstack.services.sns.publisher import (
1✔
75
    PublishDispatcher,
76
    SnsBatchPublishContext,
77
    SnsPublishContext,
78
)
79
from localstack.utils.aws.arns import (
1✔
80
    ArnData,
81
    extract_account_id_from_arn,
82
    extract_region_from_arn,
83
    get_partition,
84
    parse_arn,
85
)
86
from localstack.utils.collections import PaginatedList, select_from_typed_dict
1✔
87
from localstack.utils.strings import short_uid, to_bytes, to_str
1✔
88

89
from ...state import StateVisitor
1✔
90
from .analytics import internal_api_calls
1✔
91

92
# set up logger
93
LOG = logging.getLogger(__name__)
1✔
94

95

96
class SnsProvider(SnsApi, ServiceLifecycleHook):
1✔
97
    """
98
    Provider class for AWS Simple Notification Service.
99

100
    AWS supports following operations in a cross-account setup:
101
    - GetTopicAttributes
102
    - SetTopicAttributes
103
    - AddPermission
104
    - RemovePermission
105
    - Publish
106
    - Subscribe
107
    - ListSubscriptionByTopic
108
    - DeleteTopic
109
    """
110

111
    @route(sns_constants.SNS_CERT_ENDPOINT, methods=["GET"])
1✔
112
    def get_signature_cert_pem_file(self, request: Request):
1✔
113
        # see http://sns-public-resources.s3.amazonaws.com/SNS_Message_Signing_Release_Note_Jan_25_2011.pdf
114
        # see https://docs.aws.amazon.com/sns/latest/dg/sns-verify-signature-of-message.html
115
        return Response(self._signature_cert_pem, 200)
1✔
116

117
    def __init__(self) -> None:
1✔
118
        super().__init__()
1✔
119
        self._publisher = PublishDispatcher()
1✔
120
        self._signature_cert_pem: str = SNS_SERVER_CERT
1✔
121

122
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
UNCOV
123
        visitor.visit(sns_backends)
×
UNCOV
124
        visitor.visit(sns_stores)
×
125

126
    def on_before_stop(self):
1✔
127
        self._publisher.shutdown()
1✔
128

129
    def on_after_init(self):
1✔
130
        # Allow sent platform endpoint messages to be retrieved from the SNS endpoint
131
        register_sns_api_resource(ROUTER)
1✔
132
        # add the route to serve the certificate used to validate message signatures
133
        ROUTER.add(self.get_signature_cert_pem_file)
1✔
134

135
    @staticmethod
1✔
136
    def get_store(account_id: str, region_name: str) -> SnsStore:
1✔
137
        return sns_stores[account_id][region_name]
1✔
138

139
    @staticmethod
1✔
140
    def get_moto_backend(account_id: str, region_name: str) -> SNSBackend:
1✔
141
        return sns_backends[account_id][region_name]
1✔
142

143
    @staticmethod
1✔
144
    def _get_topic(arn: str, context: RequestContext) -> Topic:
1✔
145
        """
146
        :param arn: the Topic ARN
147
        :param context: the RequestContext of the request
148
        :param multiregion: if the request can fetch the topic across regions or not (ex. Publish cannot publish to a
149
        topic in a different region than the request)
150
        :return: the Moto model Topic
151
        """
152
        arn_data = parse_and_validate_topic_arn(arn)
1✔
153
        if context.region != arn_data["region"]:
1✔
154
            raise InvalidParameterException("Invalid parameter: TopicArn")
1✔
155

156
        try:
1✔
157
            return sns_backends[arn_data["account"]][context.region].topics[arn]
1✔
158
        except KeyError:
1✔
159
            raise NotFoundException("Topic does not exist")
1✔
160

161
    def get_topic_attributes(
1✔
162
        self, context: RequestContext, topic_arn: topicARN, **kwargs
163
    ) -> GetTopicAttributesResponse:
164
        # get the Topic from moto manually first, because Moto does not handle well the case where the ARN is malformed
165
        # (raises ValueError: not enough values to unpack (expected 6, got 1))
166
        moto_topic_model = self._get_topic(topic_arn, context)
1✔
167
        moto_response: GetTopicAttributesResponse = call_moto(context)
1✔
168
        # TODO: fix some attributes by moto, see snapshot
169
        # DeliveryPolicy
170
        # EffectiveDeliveryPolicy
171
        # Policy.Statement..Action -> SNS:Receive is added by moto but not returned in AWS
172
        # TODO: very hacky way to get the attributes we need instead of a moto patch
173
        # see the attributes we need: https://docs.aws.amazon.com/sns/latest/dg/sns-topic-attributes.html
174
        # would need more work to have the proper format out of moto, maybe extract the model to our store
175
        attributes = moto_response["Attributes"]
1✔
176
        for attr in vars(moto_topic_model):
1✔
177
            if "_feedback" in attr:
1✔
178
                key = camelcase_to_pascal(underscores_to_camelcase(attr))
1✔
179
                attributes[key] = getattr(moto_topic_model, attr)
1✔
180
            elif attr == "signature_version":
1✔
181
                attributes["SignatureVersion"] = moto_topic_model.signature_version
1✔
182
            elif attr == "archive_policy":
1✔
183
                attributes["ArchivePolicy"] = moto_topic_model.archive_policy
1✔
184

185
        return moto_response
1✔
186

187
    def set_topic_attributes(
1✔
188
        self,
189
        context: RequestContext,
190
        topic_arn: topicARN,
191
        attribute_name: attributeName,
192
        attribute_value: attributeValue | None = None,
193
        **kwargs,
194
    ) -> None:
195
        # validate the topic first
196
        self._get_topic(topic_arn, context)
1✔
197
        call_moto(context)
1✔
198

199
    def publish_batch(
1✔
200
        self,
201
        context: RequestContext,
202
        topic_arn: topicARN,
203
        publish_batch_request_entries: PublishBatchRequestEntryList,
204
        **kwargs,
205
    ) -> PublishBatchResponse:
206
        if len(publish_batch_request_entries) > 10:
1✔
207
            raise TooManyEntriesInBatchRequestException(
1✔
208
                "The batch request contains more entries than permissible."
209
            )
210

211
        parsed_arn = parse_and_validate_topic_arn(topic_arn)
1✔
212
        store = self.get_store(account_id=parsed_arn["account"], region_name=context.region)
1✔
213
        moto_topic = self._get_topic(topic_arn, context)
1✔
214

215
        ids = [entry["Id"] for entry in publish_batch_request_entries]
1✔
216
        if len(set(ids)) != len(publish_batch_request_entries):
1✔
217
            raise BatchEntryIdsNotDistinctException(
1✔
218
                "Two or more batch entries in the request have the same Id."
219
            )
220

221
        response: PublishBatchResponse = {"Successful": [], "Failed": []}
1✔
222

223
        # TODO: write AWS validated tests with FilterPolicy and batching
224
        # TODO: find a scenario where we can fail to send a message synchronously to be able to report it
225
        # right now, it seems that AWS fails the whole publish if something is wrong in the format of 1 message
226

227
        total_batch_size = 0
1✔
228
        message_contexts = []
1✔
229
        for entry_index, entry in enumerate(publish_batch_request_entries, start=1):
1✔
230
            message_payload = entry.get("Message")
1✔
231
            message_attributes = entry.get("MessageAttributes", {})
1✔
232
            if message_attributes:
1✔
233
                # if a message contains non-valid message attributes
234
                # will fail for the first non-valid message encountered, and raise ParameterValueInvalid
235
                validate_message_attributes(message_attributes, position=entry_index)
1✔
236

237
            total_batch_size += get_total_publish_size(message_payload, message_attributes)
1✔
238

239
            # TODO: WRITE AWS VALIDATED
240
            if entry.get("MessageStructure") == "json":
1✔
241
                try:
1✔
242
                    message = json.loads(message_payload)
1✔
243
                    # Keys in the JSON object that correspond to supported transport protocols must have
244
                    # simple JSON string values.
245
                    # Non-string values will cause the key to be ignored.
246
                    message = {
1✔
247
                        key: field for key, field in message.items() if isinstance(field, str)
248
                    }
249
                    if "default" not in message:
1✔
250
                        raise InvalidParameterException(
1✔
251
                            "Invalid parameter: Message Structure - No default entry in JSON message body"
252
                        )
253
                    entry["Message"] = message  # noqa
1✔
254
                except json.JSONDecodeError:
1✔
UNCOV
255
                    raise InvalidParameterException(
×
256
                        "Invalid parameter: Message Structure - JSON message body failed to parse"
257
                    )
258

259
            if is_fifo := (".fifo" in topic_arn):
1✔
260
                if not all("MessageGroupId" in entry for entry in publish_batch_request_entries):
1✔
261
                    raise InvalidParameterException(
1✔
262
                        "Invalid parameter: The MessageGroupId parameter is required for FIFO topics"
263
                    )
264
                if moto_topic.content_based_deduplication == "false":
1✔
265
                    if not all(
1✔
266
                        "MessageDeduplicationId" in entry for entry in publish_batch_request_entries
267
                    ):
268
                        raise InvalidParameterException(
1✔
269
                            "Invalid parameter: The topic should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided explicitly",
270
                        )
271

272
            msg_ctx = SnsMessage.from_batch_entry(entry, is_fifo=is_fifo)
1✔
273
            message_contexts.append(msg_ctx)
1✔
274
            success = PublishBatchResultEntry(
1✔
275
                Id=entry["Id"],
276
                MessageId=msg_ctx.message_id,
277
            )
278
            if is_fifo:
1✔
279
                success["SequenceNumber"] = msg_ctx.sequencer_number
1✔
280
            response["Successful"].append(success)
1✔
281

282
        if total_batch_size > MAXIMUM_MESSAGE_LENGTH:
1✔
283
            raise CommonServiceException(
1✔
284
                code="BatchRequestTooLong",
285
                message="The length of all the messages put together is more than the limit.",
286
                sender_fault=True,
287
            )
288

289
        publish_ctx = SnsBatchPublishContext(
1✔
290
            messages=message_contexts,
291
            store=store,
292
            request_headers=context.request.headers,
293
            topic_attributes=vars(moto_topic),
294
        )
295
        self._publisher.publish_batch_to_topic(publish_ctx, topic_arn)
1✔
296

297
        return response
1✔
298

299
    def set_subscription_attributes(
1✔
300
        self,
301
        context: RequestContext,
302
        subscription_arn: subscriptionARN,
303
        attribute_name: attributeName,
304
        attribute_value: attributeValue = None,
305
        **kwargs,
306
    ) -> None:
307
        store = self.get_store(account_id=context.account_id, region_name=context.region)
1✔
308
        sub = store.subscriptions.get(subscription_arn)
1✔
309
        if not sub:
1✔
310
            raise NotFoundException("Subscription does not exist")
1✔
311

312
        validate_subscription_attribute(
1✔
313
            attribute_name=attribute_name,
314
            attribute_value=attribute_value,
315
            topic_arn=sub["TopicArn"],
316
            endpoint=sub["Endpoint"],
317
        )
318
        if attribute_name == "RawMessageDelivery":
1✔
319
            attribute_value = attribute_value.lower()
1✔
320

321
        elif attribute_name == "FilterPolicy":
1✔
322
            filter_policy = json.loads(attribute_value) if attribute_value else None
1✔
323
            if filter_policy:
1✔
324
                validator = FilterPolicyValidator(
1✔
325
                    scope=sub.get("FilterPolicyScope", "MessageAttributes"),
326
                    is_subscribe_call=False,
327
                )
328
                validator.validate_filter_policy(filter_policy)
1✔
329

330
            store.subscription_filter_policy[subscription_arn] = filter_policy
1✔
331

332
        sub[attribute_name] = attribute_value
1✔
333

334
    def confirm_subscription(
1✔
335
        self,
336
        context: RequestContext,
337
        topic_arn: topicARN,
338
        token: String,
339
        authenticate_on_unsubscribe: authenticateOnUnsubscribe = None,
340
        **kwargs,
341
    ) -> ConfirmSubscriptionResponse:
342
        # TODO: validate format on the token (seems to be 288 hex chars)
343
        # this request can come from any http client, it might not be signed (we would need to implement
344
        # `authenticate_on_unsubscribe` to force a signing client to do this request.
345
        # so, the region and account_id might not be in the request. Use the ones from the topic_arn
346
        try:
1✔
347
            parsed_arn = parse_arn(topic_arn)
1✔
348
        except InvalidArnException:
1✔
349
            raise InvalidParameterException("Invalid parameter: Topic")
1✔
350

351
        store = self.get_store(account_id=parsed_arn["account"], region_name=parsed_arn["region"])
1✔
352

353
        # it seems SNS is able to know what the region of the topic should be, even though a wrong topic is accepted
354
        if parsed_arn["region"] != get_region_from_subscription_token(token):
1✔
355
            raise InvalidParameterException("Invalid parameter: Topic")
1✔
356

357
        subscription_arn = store.subscription_tokens.get(token)
1✔
358
        if not subscription_arn:
1✔
359
            raise InvalidParameterException("Invalid parameter: Token")
×
360

361
        subscription = store.subscriptions.get(subscription_arn)
1✔
362
        if not subscription:
1✔
363
            # subscription could have been deleted in the meantime
UNCOV
364
            raise InvalidParameterException("Invalid parameter: Token")
×
365

366
        # ConfirmSubscription is idempotent
367
        if subscription.get("PendingConfirmation") == "false":
1✔
368
            return ConfirmSubscriptionResponse(SubscriptionArn=subscription_arn)
1✔
369

370
        subscription["PendingConfirmation"] = "false"
1✔
371
        subscription["ConfirmationWasAuthenticated"] = "true"
1✔
372

373
        return ConfirmSubscriptionResponse(SubscriptionArn=subscription_arn)
1✔
374

375
    def untag_resource(
1✔
376
        self,
377
        context: RequestContext,
378
        resource_arn: AmazonResourceName,
379
        tag_keys: TagKeyList,
380
        **kwargs,
381
    ) -> UntagResourceResponse:
382
        call_moto(context)
1✔
383
        # TODO: probably get the account_id and region from the `resource_arn`
384
        store = self.get_store(context.account_id, context.region)
1✔
385
        existing_tags = store.sns_tags.setdefault(resource_arn, [])
1✔
386
        store.sns_tags[resource_arn] = [t for t in existing_tags if t["Key"] not in tag_keys]
1✔
387
        return UntagResourceResponse()
1✔
388

389
    def list_tags_for_resource(
1✔
390
        self, context: RequestContext, resource_arn: AmazonResourceName, **kwargs
391
    ) -> ListTagsForResourceResponse:
392
        # TODO: probably get the account_id and region from the `resource_arn`
393
        store = self.get_store(context.account_id, context.region)
1✔
394
        tags = store.sns_tags.setdefault(resource_arn, [])
1✔
395
        return ListTagsForResourceResponse(Tags=tags)
1✔
396

397
    def create_platform_application(
1✔
398
        self,
399
        context: RequestContext,
400
        name: String,
401
        platform: String,
402
        attributes: MapStringToString,
403
        **kwargs,
404
    ) -> CreatePlatformApplicationResponse:
405
        # TODO: validate platform
406
        # see https://docs.aws.amazon.com/cli/latest/reference/sns/create-platform-application.html
407
        # list of possible values: ADM, Baidu, APNS, APNS_SANDBOX, GCM, MPNS, WNS
408
        # each platform has a specific way to handle credentials
409
        # this can also be used for dispatching message to the right platform
410
        return call_moto(context)
1✔
411

412
    def create_platform_endpoint(
1✔
413
        self,
414
        context: RequestContext,
415
        platform_application_arn: String,
416
        token: String,
417
        custom_user_data: String = None,
418
        attributes: MapStringToString = None,
419
        **kwargs,
420
    ) -> CreateEndpointResponse:
421
        # TODO: support mobile app events
422
        # see https://docs.aws.amazon.com/sns/latest/dg/application-event-notifications.html
423
        return call_moto(context)
1✔
424

425
    def unsubscribe(
1✔
426
        self, context: RequestContext, subscription_arn: subscriptionARN, **kwargs
427
    ) -> None:
428
        if subscription_arn is None:
1✔
429
            raise InvalidParameterException(
1✔
430
                "Invalid parameter: SubscriptionArn Reason: no value for required parameter",
431
            )
432
        count = len(subscription_arn.split(":"))
1✔
433
        try:
1✔
434
            parsed_arn = parse_arn(subscription_arn)
1✔
435
        except InvalidArnException:
1✔
436
            # TODO: check for invalid SubscriptionGUID
437
            raise InvalidParameterException(
1✔
438
                f"Invalid parameter: SubscriptionArn Reason: An ARN must have at least 6 elements, not {count}"
439
            )
440

441
        account_id = parsed_arn["account"]
1✔
442
        region_name = parsed_arn["region"]
1✔
443

444
        store = self.get_store(account_id=account_id, region_name=region_name)
1✔
445
        if count == 6 and subscription_arn not in store.subscriptions:
1✔
446
            raise InvalidParameterException("Invalid parameter: SubscriptionId")
1✔
447

448
        moto_sns_backend = self.get_moto_backend(account_id, region_name)
1✔
449
        moto_sns_backend.unsubscribe(subscription_arn)
1✔
450

451
        # pop the subscription at the end, to avoid race condition by iterating over the topic subscriptions
452
        subscription = store.subscriptions.get(subscription_arn)
1✔
453

454
        if not subscription:
1✔
455
            # unsubscribe is idempotent, so unsubscribing from a non-existing topic does nothing
456
            return
1✔
457

458
        if subscription["Protocol"] in ["http", "https"]:
1✔
459
            # TODO: actually validate this (re)subscribe behaviour somehow (localhost.run?)
460
            #  we might need to save the sub token in the store
461
            # TODO: AWS only sends the UnsubscribeConfirmation if the call is unauthenticated or the requester is not
462
            #  the owner
463
            subscription_token = encode_subscription_token_with_region(region=context.region)
1✔
464
            message_ctx = SnsMessage(
1✔
465
                type=SnsMessageType.UnsubscribeConfirmation,
466
                token=subscription_token,
467
                message=f"You have chosen to deactivate subscription {subscription_arn}.\nTo cancel this operation and restore the subscription, visit the SubscribeURL included in this message.",
468
            )
469
            moto_topic = moto_sns_backend.topics.get(subscription["TopicArn"])
1✔
470
            publish_ctx = SnsPublishContext(
1✔
471
                message=message_ctx,
472
                store=store,
473
                request_headers=context.request.headers,
474
                topic_attributes=vars(moto_topic),
475
            )
476
            self._publisher.publish_to_topic_subscriber(
1✔
477
                publish_ctx,
478
                topic_arn=subscription["TopicArn"],
479
                subscription_arn=subscription_arn,
480
            )
481

482
        with contextlib.suppress(ValueError):
1✔
483
            store.topic_subscriptions[subscription["TopicArn"]].remove(subscription_arn)
1✔
484
        store.subscription_filter_policy.pop(subscription_arn, None)
1✔
485
        store.subscriptions.pop(subscription_arn, None)
1✔
486

487
    def get_subscription_attributes(
1✔
488
        self, context: RequestContext, subscription_arn: subscriptionARN, **kwargs
489
    ) -> GetSubscriptionAttributesResponse:
490
        store = self.get_store(account_id=context.account_id, region_name=context.region)
1✔
491
        sub = store.subscriptions.get(subscription_arn)
1✔
492
        if not sub:
1✔
493
            raise NotFoundException("Subscription does not exist")
1✔
494
        removed_attrs = ["sqs_queue_url"]
1✔
495
        if "FilterPolicyScope" in sub and not sub.get("FilterPolicy"):
1✔
496
            removed_attrs.append("FilterPolicyScope")
1✔
497
            removed_attrs.append("FilterPolicy")
1✔
498
        elif "FilterPolicy" in sub and "FilterPolicyScope" not in sub:
1✔
UNCOV
499
            sub["FilterPolicyScope"] = "MessageAttributes"
×
500

501
        attributes = {k: v for k, v in sub.items() if k not in removed_attrs}
1✔
502
        return GetSubscriptionAttributesResponse(Attributes=attributes)
1✔
503

504
    def list_subscriptions(
1✔
505
        self, context: RequestContext, next_token: nextToken = None, **kwargs
506
    ) -> ListSubscriptionsResponse:
507
        store = self.get_store(context.account_id, context.region)
1✔
508
        subscriptions = [
1✔
509
            select_from_typed_dict(Subscription, sub) for sub in list(store.subscriptions.values())
510
        ]
511
        paginated_subscriptions = PaginatedList(subscriptions)
1✔
512
        page, next_token = paginated_subscriptions.get_page(
1✔
513
            token_generator=lambda x: get_next_page_token_from_arn(x["SubscriptionArn"]),
514
            page_size=100,
515
            next_token=next_token,
516
        )
517

518
        response = ListSubscriptionsResponse(Subscriptions=page)
1✔
519
        if next_token:
1✔
520
            response["NextToken"] = next_token
1✔
521
        return response
1✔
522

523
    def list_subscriptions_by_topic(
1✔
524
        self, context: RequestContext, topic_arn: topicARN, next_token: nextToken = None, **kwargs
525
    ) -> ListSubscriptionsByTopicResponse:
526
        self._get_topic(topic_arn, context)
1✔
527
        parsed_topic_arn = parse_and_validate_topic_arn(topic_arn)
1✔
528
        store = self.get_store(parsed_topic_arn["account"], parsed_topic_arn["region"])
1✔
529
        sns_subscriptions = store.get_topic_subscriptions(topic_arn)
1✔
530
        subscriptions = [select_from_typed_dict(Subscription, sub) for sub in sns_subscriptions]
1✔
531

532
        paginated_subscriptions = PaginatedList(subscriptions)
1✔
533
        page, next_token = paginated_subscriptions.get_page(
1✔
534
            token_generator=lambda x: get_next_page_token_from_arn(x["SubscriptionArn"]),
535
            page_size=100,
536
            next_token=next_token,
537
        )
538

539
        response = ListSubscriptionsResponse(Subscriptions=page)
1✔
540
        if next_token:
1✔
541
            response["NextToken"] = next_token
1✔
542
        return response
1✔
543

544
    def publish(
1✔
545
        self,
546
        context: RequestContext,
547
        message: String,
548
        topic_arn: topicARN = None,
549
        target_arn: String = None,
550
        phone_number: String = None,
551
        subject: String = None,
552
        message_structure: messageStructure = None,
553
        message_attributes: MessageAttributeMap = None,
554
        message_deduplication_id: String = None,
555
        message_group_id: String = None,
556
        **kwargs,
557
    ) -> PublishResponse:
558
        if subject == "":
1✔
559
            raise InvalidParameterException("Invalid parameter: Subject")
1✔
560
        if not message or all(not m for m in message):
1✔
561
            raise InvalidParameterException("Invalid parameter: Empty message")
1✔
562

563
        # TODO: check for topic + target + phone number at the same time?
564
        # TODO: more validation on phone, it might be opted out?
565
        if phone_number and not is_e164(phone_number):
1✔
566
            raise InvalidParameterException(
1✔
567
                f"Invalid parameter: PhoneNumber Reason: {phone_number} is not valid to publish to"
568
            )
569

570
        if message_attributes:
1✔
571
            validate_message_attributes(message_attributes)
1✔
572

573
        if get_total_publish_size(message, message_attributes) > MAXIMUM_MESSAGE_LENGTH:
1✔
574
            raise InvalidParameterException("Invalid parameter: Message too long")
1✔
575

576
        # for compatibility reasons, AWS allows users to use either TargetArn or TopicArn for publishing to a topic
577
        # use any of them for topic validation
578
        topic_or_target_arn = topic_arn or target_arn
1✔
579
        topic_model = None
1✔
580

581
        if is_fifo := (topic_or_target_arn and ".fifo" in topic_or_target_arn):
1✔
582
            if not message_group_id:
1✔
583
                raise InvalidParameterException(
1✔
584
                    "Invalid parameter: The MessageGroupId parameter is required for FIFO topics",
585
                )
586
            topic_model = self._get_topic(topic_or_target_arn, context)
1✔
587
            if topic_model.content_based_deduplication == "false":
1✔
588
                if not message_deduplication_id:
1✔
589
                    raise InvalidParameterException(
1✔
590
                        "Invalid parameter: The topic should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided explicitly",
591
                    )
592
        elif message_deduplication_id:
1✔
593
            # this is the first one to raise if both are set while the topic is not fifo
594
            raise InvalidParameterException(
1✔
595
                "Invalid parameter: MessageDeduplicationId Reason: The request includes MessageDeduplicationId parameter that is not valid for this topic type"
596
            )
597

598
        is_endpoint_publish = target_arn and ":endpoint/" in target_arn
1✔
599
        if message_structure == "json":
1✔
600
            try:
1✔
601
                message = json.loads(message)
1✔
602
                # Keys in the JSON object that correspond to supported transport protocols must have
603
                # simple JSON string values.
604
                # Non-string values will cause the key to be ignored.
605
                message = {key: field for key, field in message.items() if isinstance(field, str)}
1✔
606
                # TODO: check no default key for direct TargetArn endpoint publish, need credentials
607
                # see example: https://docs.aws.amazon.com/sns/latest/dg/sns-send-custom-platform-specific-payloads-mobile-devices.html
608
                if "default" not in message and not is_endpoint_publish:
1✔
609
                    raise InvalidParameterException(
1✔
610
                        "Invalid parameter: Message Structure - No default entry in JSON message body"
611
                    )
612
            except json.JSONDecodeError:
1✔
613
                raise InvalidParameterException(
1✔
614
                    "Invalid parameter: Message Structure - JSON message body failed to parse"
615
                )
616

617
        if not phone_number:
1✔
618
            # use the account to get the store from the TopicArn (you can only publish in the same region as the topic)
619
            parsed_arn = parse_and_validate_topic_arn(topic_or_target_arn)
1✔
620
            store = self.get_store(account_id=parsed_arn["account"], region_name=context.region)
1✔
621
            moto_sns_backend = self.get_moto_backend(parsed_arn["account"], context.region)
1✔
622
            if is_endpoint_publish:
1✔
623
                if not (platform_endpoint := moto_sns_backend.platform_endpoints.get(target_arn)):
1✔
624
                    raise InvalidParameterException(
1✔
625
                        "Invalid parameter: TargetArn Reason: No endpoint found for the target arn specified"
626
                    )
627
                elif not platform_endpoint.enabled:
1✔
628
                    raise EndpointDisabledException("Endpoint is disabled")
1✔
629
            else:
630
                topic_model = self._get_topic(topic_or_target_arn, context)
1✔
631
        else:
632
            # use the store from the request context
633
            store = self.get_store(account_id=context.account_id, region_name=context.region)
1✔
634

635
        message_ctx = SnsMessage(
1✔
636
            type=SnsMessageType.Notification,
637
            message=message,
638
            message_attributes=message_attributes,
639
            message_deduplication_id=message_deduplication_id,
640
            message_group_id=message_group_id,
641
            message_structure=message_structure,
642
            subject=subject,
643
            is_fifo=is_fifo,
644
        )
645
        publish_ctx = SnsPublishContext(
1✔
646
            message=message_ctx, store=store, request_headers=context.request.headers
647
        )
648

649
        if is_endpoint_publish:
1✔
650
            self._publisher.publish_to_application_endpoint(
1✔
651
                ctx=publish_ctx, endpoint_arn=target_arn
652
            )
653
        elif phone_number:
1✔
654
            self._publisher.publish_to_phone_number(ctx=publish_ctx, phone_number=phone_number)
1✔
655
        else:
656
            # beware if the subscription is FIFO, the order might not be guaranteed.
657
            # 2 quick call to this method in succession might not be executed in order in the executor?
658
            # TODO: test how this behaves in a FIFO context with a lot of threads.
659
            publish_ctx.topic_attributes |= vars(topic_model)
1✔
660
            self._publisher.publish_to_topic(publish_ctx, topic_or_target_arn)
1✔
661

662
        if is_fifo:
1✔
663
            return PublishResponse(
1✔
664
                MessageId=message_ctx.message_id, SequenceNumber=message_ctx.sequencer_number
665
            )
666

667
        return PublishResponse(MessageId=message_ctx.message_id)
1✔
668

669
    def subscribe(
1✔
670
        self,
671
        context: RequestContext,
672
        topic_arn: topicARN,
673
        protocol: String,
674
        endpoint: String = None,
675
        attributes: SubscriptionAttributesMap = None,
676
        return_subscription_arn: boolean = None,
677
        **kwargs,
678
    ) -> SubscribeResponse:
679
        # TODO: check validation ordering
680
        parsed_topic_arn = parse_and_validate_topic_arn(topic_arn)
1✔
681
        if context.region != parsed_topic_arn["region"]:
1✔
682
            raise InvalidParameterException("Invalid parameter: TopicArn")
1✔
683

684
        store = self.get_store(account_id=parsed_topic_arn["account"], region_name=context.region)
1✔
685

686
        if topic_arn not in store.topic_subscriptions:
1✔
687
            raise NotFoundException("Topic does not exist")
1✔
688

689
        if not endpoint:
1✔
690
            # TODO: check AWS behaviour (because endpoint is optional)
UNCOV
691
            raise NotFoundException("Endpoint not specified in subscription")
×
692
        if protocol not in sns_constants.SNS_PROTOCOLS:
1✔
693
            raise InvalidParameterException(
1✔
694
                f"Invalid parameter: Amazon SNS does not support this protocol string: {protocol}"
695
            )
696
        elif protocol in ["http", "https"] and not endpoint.startswith(f"{protocol}://"):
1✔
UNCOV
697
            raise InvalidParameterException(
×
698
                "Invalid parameter: Endpoint must match the specified protocol"
699
            )
700
        elif protocol == "sms" and not is_e164(endpoint):
1✔
701
            raise InvalidParameterException(f"Invalid SMS endpoint: {endpoint}")
1✔
702

703
        elif protocol == "sqs":
1✔
704
            try:
1✔
705
                parse_arn(endpoint)
1✔
706
            except InvalidArnException:
1✔
707
                raise InvalidParameterException("Invalid parameter: SQS endpoint ARN")
1✔
708

709
        elif protocol == "application":
1✔
710
            # TODO: this is taken from moto, validate it
711
            moto_backend = self.get_moto_backend(
1✔
712
                account_id=parsed_topic_arn["account"], region_name=context.region
713
            )
714
            if endpoint not in moto_backend.platform_endpoints:
1✔
UNCOV
715
                raise NotFoundException("Endpoint does not exist")
×
716

717
        if ".fifo" in endpoint and ".fifo" not in topic_arn:
1✔
718
            raise InvalidParameterException(
1✔
719
                "Invalid parameter: Invalid parameter: Endpoint Reason: FIFO SQS Queues can not be subscribed to standard SNS topics"
720
            )
721

722
        sub_attributes = copy.deepcopy(attributes) if attributes else None
1✔
723
        if sub_attributes:
1✔
724
            for attr_name, attr_value in sub_attributes.items():
1✔
725
                validate_subscription_attribute(
1✔
726
                    attribute_name=attr_name,
727
                    attribute_value=attr_value,
728
                    topic_arn=topic_arn,
729
                    endpoint=endpoint,
730
                    is_subscribe_call=True,
731
                )
732
                if raw_msg_delivery := sub_attributes.get("RawMessageDelivery"):
1✔
733
                    sub_attributes["RawMessageDelivery"] = raw_msg_delivery.lower()
1✔
734

735
        # An endpoint may only be subscribed to a topic once. Subsequent
736
        # subscribe calls do nothing (subscribe is idempotent), except if its attributes are different.
737
        for existing_topic_subscription in store.topic_subscriptions.get(topic_arn, []):
1✔
738
            sub = store.subscriptions.get(existing_topic_subscription, {})
1✔
739
            if sub.get("Endpoint") == endpoint:
1✔
740
                if sub_attributes:
1✔
741
                    # validate the subscription attributes aren't different
742
                    for attr in sns_constants.VALID_SUBSCRIPTION_ATTR_NAME:
1✔
743
                        # if a new attribute is present and different from an existent one, raise
744
                        if (new_attr := sub_attributes.get(attr)) and sub.get(attr) != new_attr:
1✔
745
                            raise InvalidParameterException(
1✔
746
                                "Invalid parameter: Attributes Reason: Subscription already exists with different attributes"
747
                            )
748

749
                return SubscribeResponse(SubscriptionArn=sub["SubscriptionArn"])
1✔
750

751
        principal = sns_constants.DUMMY_SUBSCRIPTION_PRINCIPAL.format(
1✔
752
            partition=get_partition(context.region), account_id=context.account_id
753
        )
754
        subscription_arn = create_subscription_arn(topic_arn)
1✔
755
        subscription = SnsSubscription(
1✔
756
            # http://docs.aws.amazon.com/cli/latest/reference/sns/get-subscription-attributes.html
757
            TopicArn=topic_arn,
758
            Endpoint=endpoint,
759
            Protocol=protocol,
760
            SubscriptionArn=subscription_arn,
761
            PendingConfirmation="true",
762
            Owner=context.account_id,
763
            RawMessageDelivery="false",  # default value, will be overridden if set
764
            FilterPolicyScope="MessageAttributes",  # default value, will be overridden if set
765
            SubscriptionPrincipal=principal,  # dummy value, could be fetched with a call to STS?
766
        )
767
        if sub_attributes:
1✔
768
            subscription.update(sub_attributes)
1✔
769
            if "FilterPolicy" in sub_attributes:
1✔
770
                filter_policy = (
1✔
771
                    json.loads(sub_attributes["FilterPolicy"])
772
                    if sub_attributes["FilterPolicy"]
773
                    else None
774
                )
775
                if filter_policy:
1✔
776
                    validator = FilterPolicyValidator(
1✔
777
                        scope=subscription.get("FilterPolicyScope", "MessageAttributes"),
778
                        is_subscribe_call=True,
779
                    )
780
                    validator.validate_filter_policy(filter_policy)
1✔
781

782
                store.subscription_filter_policy[subscription_arn] = filter_policy
1✔
783

784
        store.subscriptions[subscription_arn] = subscription
1✔
785

786
        topic_subscriptions = store.topic_subscriptions.setdefault(topic_arn, [])
1✔
787
        topic_subscriptions.append(subscription_arn)
1✔
788

789
        # store the token and subscription arn
790
        # TODO: the token is a 288 hex char string
791
        subscription_token = encode_subscription_token_with_region(region=context.region)
1✔
792
        store.subscription_tokens[subscription_token] = subscription_arn
1✔
793

794
        response_subscription_arn = subscription_arn
1✔
795
        # Send out confirmation message for HTTP(S), fix for https://github.com/localstack/localstack/issues/881
796
        if protocol in ["http", "https"]:
1✔
797
            message_ctx = SnsMessage(
1✔
798
                type=SnsMessageType.SubscriptionConfirmation,
799
                token=subscription_token,
800
                message=f"You have chosen to subscribe to the topic {topic_arn}.\nTo confirm the subscription, visit the SubscribeURL included in this message.",
801
            )
802
            publish_ctx = SnsPublishContext(
1✔
803
                message=message_ctx,
804
                store=store,
805
                request_headers=context.request.headers,
806
                topic_attributes=vars(self._get_topic(topic_arn, context)),
807
            )
808
            self._publisher.publish_to_topic_subscriber(
1✔
809
                ctx=publish_ctx,
810
                topic_arn=topic_arn,
811
                subscription_arn=subscription_arn,
812
            )
813
            if not return_subscription_arn:
1✔
814
                response_subscription_arn = "pending confirmation"
1✔
815

816
        elif protocol not in ["email", "email-json"]:
1✔
817
            # Only HTTP(S) and email subscriptions are not auto validated
818
            # Except if the endpoint and the topic are not in the same AWS account, then you'd need to manually confirm
819
            # the subscription with the token
820
            # TODO: revisit for multi-account
821
            # TODO: test with AWS for email & email-json confirmation message
822
            # we need to add the following check:
823
            # if parsed_topic_arn["account"] == endpoint account (depending on the type, SQS, lambda, parse the arn)
824
            subscription["PendingConfirmation"] = "false"
1✔
825
            subscription["ConfirmationWasAuthenticated"] = "true"
1✔
826

827
        return SubscribeResponse(SubscriptionArn=response_subscription_arn)
1✔
828

829
    def tag_resource(
1✔
830
        self, context: RequestContext, resource_arn: AmazonResourceName, tags: TagList, **kwargs
831
    ) -> TagResourceResponse:
832
        # each tag key must be unique
833
        # https://docs.aws.amazon.com/general/latest/gr/aws_tagging.html#tag-best-practices
834
        unique_tag_keys = {tag["Key"] for tag in tags}
1✔
835
        if len(unique_tag_keys) < len(tags):
1✔
836
            raise InvalidParameterException("Invalid parameter: Duplicated keys are not allowed.")
1✔
837

838
        call_moto(context)
1✔
839
        store = self.get_store(context.account_id, context.region)
1✔
840
        existing_tags = store.sns_tags.get(resource_arn, [])
1✔
841

842
        def existing_tag_index(_item):
1✔
843
            for idx, tag in enumerate(existing_tags):
1✔
844
                if _item["Key"] == tag["Key"]:
1✔
845
                    return idx
1✔
846
            return None
1✔
847

848
        for item in tags:
1✔
849
            existing_index = existing_tag_index(item)
1✔
850
            if existing_index is None:
1✔
851
                existing_tags.append(item)
1✔
852
            else:
853
                existing_tags[existing_index] = item
1✔
854

855
        store.sns_tags[resource_arn] = existing_tags
1✔
856
        return TagResourceResponse()
1✔
857

858
    def delete_topic(self, context: RequestContext, topic_arn: topicARN, **kwargs) -> None:
1✔
859
        parsed_arn = parse_and_validate_topic_arn(topic_arn)
1✔
860
        if context.region != parsed_arn["region"]:
1✔
861
            raise InvalidParameterException("Invalid parameter: TopicArn")
1✔
862

863
        call_moto(context)
1✔
864
        store = self.get_store(account_id=parsed_arn["account"], region_name=context.region)
1✔
865
        topic_subscriptions = store.topic_subscriptions.pop(topic_arn, [])
1✔
866
        for topic_sub in topic_subscriptions:
1✔
867
            store.subscriptions.pop(topic_sub, None)
1✔
868

869
        store.sns_tags.pop(topic_arn, None)
1✔
870

871
    def create_topic(
1✔
872
        self,
873
        context: RequestContext,
874
        name: topicName,
875
        attributes: TopicAttributesMap = None,
876
        tags: TagList = None,
877
        data_protection_policy: attributeValue = None,
878
        **kwargs,
879
    ) -> CreateTopicResponse:
880
        moto_response = call_moto(context)
1✔
881
        store = self.get_store(account_id=context.account_id, region_name=context.region)
1✔
882
        topic_arn = moto_response["TopicArn"]
1✔
883
        tag_resource_success = extract_tags(topic_arn, tags, True, store)
1✔
884
        if not tag_resource_success:
1✔
885
            raise InvalidParameterException(
1✔
886
                "Invalid parameter: Tags Reason: Topic already exists with different tags"
887
            )
888
        if tags:
1✔
889
            self.tag_resource(context=context, resource_arn=topic_arn, tags=tags)
1✔
890
        store.topic_subscriptions.setdefault(topic_arn, [])
1✔
891
        return CreateTopicResponse(TopicArn=topic_arn)
1✔
892

893

894
def is_raw_message_delivery(susbcriber):
1✔
895
    return susbcriber.get("RawMessageDelivery") in ("true", True, "True")
1✔
896

897

898
def validate_subscription_attribute(
1✔
899
    attribute_name: str,
900
    attribute_value: str,
901
    topic_arn: str,
902
    endpoint: str,
903
    is_subscribe_call: bool = False,
904
) -> None:
905
    """
906
    Validate the subscription attribute to be set. See:
907
    https://docs.aws.amazon.com/sns/latest/api/API_SetSubscriptionAttributes.html
908
    :param attribute_name: the subscription attribute name, must be in VALID_SUBSCRIPTION_ATTR_NAME
909
    :param attribute_value: the subscription attribute value
910
    :param topic_arn: the topic_arn of the subscription, needed to know if it is FIFO
911
    :param endpoint: the subscription endpoint (like an SQS queue ARN)
912
    :param is_subscribe_call: the error message is different if called from Subscribe or SetSubscriptionAttributes
913
    :raises InvalidParameterException
914
    :return:
915
    """
916
    error_prefix = (
1✔
917
        "Invalid parameter: Attributes Reason: " if is_subscribe_call else "Invalid parameter: "
918
    )
919
    if attribute_name not in sns_constants.VALID_SUBSCRIPTION_ATTR_NAME:
1✔
920
        raise InvalidParameterException(f"{error_prefix}AttributeName")
1✔
921

922
    if attribute_name == "FilterPolicy":
1✔
923
        try:
1✔
924
            json.loads(attribute_value or "{}")
1✔
925
        except json.JSONDecodeError:
1✔
926
            raise InvalidParameterException(f"{error_prefix}FilterPolicy: failed to parse JSON.")
1✔
927
    elif attribute_name == "FilterPolicyScope":
1✔
928
        if attribute_value not in ("MessageAttributes", "MessageBody"):
1✔
929
            raise InvalidParameterException(
1✔
930
                f"{error_prefix}FilterPolicyScope: Invalid value [{attribute_value}]. "
931
                f"Please use either MessageBody or MessageAttributes"
932
            )
933
    elif attribute_name == "RawMessageDelivery":
1✔
934
        # TODO: only for SQS and https(s) subs, + firehose
935
        if attribute_value.lower() not in ("true", "false"):
1✔
936
            raise InvalidParameterException(
1✔
937
                f"{error_prefix}RawMessageDelivery: Invalid value [{attribute_value}]. "
938
                f"Must be true or false."
939
            )
940

941
    elif attribute_name == "RedrivePolicy":
1✔
942
        try:
1✔
943
            dlq_target_arn = json.loads(attribute_value).get("deadLetterTargetArn", "")
1✔
944
        except json.JSONDecodeError:
1✔
945
            raise InvalidParameterException(f"{error_prefix}RedrivePolicy: failed to parse JSON.")
1✔
946
        try:
1✔
947
            parsed_arn = parse_arn(dlq_target_arn)
1✔
948
        except InvalidArnException:
1✔
949
            raise InvalidParameterException(
1✔
950
                f"{error_prefix}RedrivePolicy: deadLetterTargetArn is an invalid arn"
951
            )
952

953
        if topic_arn.endswith(".fifo"):
1✔
954
            if endpoint.endswith(".fifo") and (
1✔
955
                not parsed_arn["resource"].endswith(".fifo") or "sqs" not in parsed_arn["service"]
956
            ):
957
                raise InvalidParameterException(
1✔
958
                    f"{error_prefix}RedrivePolicy: must use a FIFO queue as DLQ for a FIFO Subscription to a FIFO Topic."
959
                )
960

961

962
def validate_message_attributes(
1✔
963
    message_attributes: MessageAttributeMap, position: int | None = None
964
) -> None:
965
    """
966
    Validate the message attributes, and raises an exception if those do not follow AWS validation
967
    See: https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html
968
    Regex from: https://stackoverflow.com/questions/40718851/regex-that-does-not-allow-consecutive-dots
969
    :param message_attributes: the message attributes map for the message
970
    :param position: given to give the Batch Entry position if coming from `publishBatch`
971
    :raises: InvalidParameterValueException
972
    :return: None
973
    """
974
    for attr_name, attr in message_attributes.items():
1✔
975
        if len(attr_name) > 256:
1✔
976
            raise InvalidParameterValueException(
1✔
977
                "Length of message attribute name must be less than 256 bytes."
978
            )
979
        validate_message_attribute_name(attr_name)
1✔
980
        # `DataType` is a required field for MessageAttributeValue
981
        if (data_type := attr.get("DataType")) is None:
1✔
982
            if position:
1✔
983
                at = f"publishBatchRequestEntries.{position}.member.messageAttributes.{attr_name}.member.dataType"
1✔
984
            else:
985
                at = f"messageAttributes.{attr_name}.member.dataType"
1✔
986

987
            raise CommonServiceException(
1✔
988
                code="ValidationError",
989
                message=f"1 validation error detected: Value null at '{at}' failed to satisfy constraint: Member must not be null",
990
                sender_fault=True,
991
            )
992

993
        if data_type not in (
1✔
994
            "String",
995
            "Number",
996
            "Binary",
997
        ) and not sns_constants.ATTR_TYPE_REGEX.match(data_type):
998
            raise InvalidParameterValueException(
1✔
999
                f"The message attribute '{attr_name}' has an invalid message attribute type, the set of supported type prefixes is Binary, Number, and String."
1000
            )
1001
        if not any(attr_value.endswith("Value") for attr_value in attr):
1✔
1002
            raise InvalidParameterValueException(
1✔
1003
                f"The message attribute '{attr_name}' must contain non-empty message attribute value for message attribute type '{data_type}'."
1004
            )
1005

1006
        value_key_data_type = "Binary" if data_type.startswith("Binary") else "String"
1✔
1007
        value_key = f"{value_key_data_type}Value"
1✔
1008
        if value_key not in attr:
1✔
1009
            raise InvalidParameterValueException(
1✔
1010
                f"The message attribute '{attr_name}' with type '{data_type}' must use field '{value_key_data_type}'."
1011
            )
1012
        elif not attr[value_key]:
1✔
1013
            raise InvalidParameterValueException(
1✔
1014
                f"The message attribute '{attr_name}' must contain non-empty message attribute value for message attribute type '{data_type}'.",
1015
            )
1016

1017

1018
def validate_message_attribute_name(name: str) -> None:
1✔
1019
    """
1020
    Validate the message attribute name with the specification of AWS.
1021
    The message attribute name can contain the following characters: A-Z, a-z, 0-9, underscore(_), hyphen(-), and period (.). The name must not start or end with a period, and it should not have successive periods.
1022
    :param name: message attribute name
1023
    :raises InvalidParameterValueException: if the name does not conform to the spec
1024
    """
1025
    if not sns_constants.MSG_ATTR_NAME_REGEX.match(name):
1✔
1026
        # find the proper exception
1027
        if name[0] == ".":
1✔
1028
            raise InvalidParameterValueException(
1✔
1029
                "Invalid message attribute name starting with character '.' was found."
1030
            )
1031
        elif name[-1] == ".":
1✔
1032
            raise InvalidParameterValueException(
1✔
1033
                "Invalid message attribute name ending with character '.' was found."
1034
            )
1035

1036
        for idx, char in enumerate(name):
1✔
1037
            if char not in sns_constants.VALID_MSG_ATTR_NAME_CHARS:
1✔
1038
                # change prefix from 0x to #x, without capitalizing the x
1039
                hex_char = "#x" + hex(ord(char)).upper()[2:]
1✔
1040
                raise InvalidParameterValueException(
1✔
1041
                    f"Invalid non-alphanumeric character '{hex_char}' was found in the message attribute name. Can only include alphanumeric characters, hyphens, underscores, or dots."
1042
                )
1043
            # even if we go negative index, it will be covered by starting/ending with dot
1044
            if char == "." and name[idx - 1] == ".":
1✔
1045
                raise InvalidParameterValueException(
1✔
1046
                    "Message attribute name can not have successive '.' character."
1047
                )
1048

1049

1050
def extract_tags(
1✔
1051
    topic_arn: str, tags: TagList, is_create_topic_request: bool, store: SnsStore
1052
) -> bool:
1053
    existing_tags = list(store.sns_tags.get(topic_arn, []))
1✔
1054
    # if this is none there is nothing to check
1055
    if topic_arn in store.topic_subscriptions:
1✔
1056
        if tags is None:
1✔
1057
            tags = []
1✔
1058
        for tag in tags:
1✔
1059
            # this means topic already created with empty tags and when we try to create it
1060
            # again with other tag value then it should fail according to aws documentation.
1061
            if is_create_topic_request and existing_tags is not None and tag not in existing_tags:
1✔
1062
                return False
1✔
1063
    return True
1✔
1064

1065

1066
def parse_and_validate_topic_arn(topic_arn: str | None) -> ArnData:
1✔
1067
    topic_arn = topic_arn or ""
1✔
1068
    try:
1✔
1069
        return parse_arn(topic_arn)
1✔
1070
    except InvalidArnException:
1✔
1071
        count = len(topic_arn.split(":"))
1✔
1072
        raise InvalidParameterException(
1✔
1073
            f"Invalid parameter: TopicArn Reason: An ARN must have at least 6 elements, not {count}"
1074
        )
1075

1076

1077
def create_subscription_arn(topic_arn: str) -> str:
1✔
1078
    # This is the format of a Subscription ARN
1079
    # arn:aws:sns:us-west-2:123456789012:my-topic:8a21d249-4329-4871-acc6-7be709c6ea7f
1080
    return f"{topic_arn}:{uuid4()}"
1✔
1081

1082

1083
def encode_subscription_token_with_region(region: str) -> str:
1✔
1084
    """
1085
    Create a 64 characters Subscription Token with the region encoded
1086
    :param region:
1087
    :return: a subscription token with the region encoded
1088
    """
1089
    return ((region.encode() + b"/").hex() + short_uid() * 8)[:64]
1✔
1090

1091

1092
def get_region_from_subscription_token(token: str) -> str:
1✔
1093
    """
1094
    Try to decode and return the region from a subscription token
1095
    :param token:
1096
    :return: the region if able to decode it
1097
    :raises: InvalidParameterException if the token is invalid
1098
    """
1099
    try:
1✔
1100
        region = token.split("2f", maxsplit=1)[0]
1✔
1101
        return bytes.fromhex(region).decode("utf-8")
1✔
1102
    except (IndexError, ValueError, TypeError, UnicodeDecodeError):
1✔
1103
        raise InvalidParameterException("Invalid parameter: Token")
1✔
1104

1105

1106
def get_next_page_token_from_arn(resource_arn: str) -> str:
1✔
1107
    return to_str(base64.b64encode(to_bytes(resource_arn)))
1✔
1108

1109

1110
def _get_byte_size(payload: str | bytes) -> int:
1✔
1111
    # Calculate the real length of the byte object if the object is a string
1112
    return len(to_bytes(payload))
1✔
1113

1114

1115
def get_total_publish_size(
1✔
1116
    message_body: str, message_attributes: MessageAttributeMap | None
1117
) -> int:
1118
    size = _get_byte_size(message_body)
1✔
1119
    if message_attributes:
1✔
1120
        # https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html
1121
        # All parts of the message attribute, including name, type, and value, are included in the message size
1122
        # restriction, which is 256 KB.
1123
        # iterate over the Keys and Attributes, adding the length of the Key to the length of all Attributes values
1124
        # (DataType and StringValue or BinaryValue)
1125
        size += sum(
1✔
1126
            _get_byte_size(key) + sum(_get_byte_size(attr_value) for attr_value in attr.values())
1127
            for key, attr in message_attributes.items()
1128
        )
1129

1130
    return size
1✔
1131

1132

1133
def register_sns_api_resource(router: Router):
1✔
1134
    """Register the retrospection endpoints as internal LocalStack endpoints."""
1135
    router.add(SNSServicePlatformEndpointMessagesApiResource())
1✔
1136
    router.add(SNSServiceSMSMessagesApiResource())
1✔
1137
    router.add(SNSServiceSubscriptionTokenApiResource())
1✔
1138

1139

1140
def _format_messages(sent_messages: list[dict[str, str]], validated_keys: list[str]):
1✔
1141
    """
1142
    This method format the messages to be more readable and undo the format change that was needed for Moto
1143
    Should be removed once we refactor SNS.
1144
    """
1145
    formatted_messages = []
1✔
1146
    for sent_message in sent_messages:
1✔
1147
        msg = {
1✔
1148
            key: json.dumps(value)
1149
            if key == "Message" and sent_message.get("MessageStructure") == "json"
1150
            else value
1151
            for key, value in sent_message.items()
1152
            if key in validated_keys
1153
        }
1154
        formatted_messages.append(msg)
1✔
1155

1156
    return formatted_messages
1✔
1157

1158

1159
class SNSInternalResource:
1✔
1160
    resource_type: str
1✔
1161
    """Base class with helper to properly track usage of internal endpoints"""
1✔
1162

1163
    def count_usage(self):
1✔
1164
        internal_api_calls.labels(resource_type=self.resource_type).increment()
1✔
1165

1166

1167
def count_usage(f):
1✔
1168
    @functools.wraps(f)
1✔
1169
    def _wrapper(self, *args, **kwargs):
1✔
1170
        self.count_usage()
1✔
1171
        return f(self, *args, **kwargs)
1✔
1172

1173
    return _wrapper
1✔
1174

1175

1176
class SNSServicePlatformEndpointMessagesApiResource(SNSInternalResource):
1✔
1177
    resource_type = "platform-endpoint-message"
1✔
1178
    """Provides a REST API for retrospective access to platform endpoint messages sent via SNS.
1✔
1179

1180
    This is registered as a LocalStack internal HTTP resource.
1181

1182
    This endpoint accepts:
1183
    - GET param `accountId`: selector for AWS account. If not specified, return fallback `000000000000` test ID
1184
    - GET param `region`: selector for AWS `region`. If not specified, return default "us-east-1"
1185
    - GET param `endpointArn`: filter for `endpointArn` resource in SNS
1186
    - DELETE param `accountId`: selector for AWS account
1187
    - DELETE param `region`: will delete saved messages for `region`
1188
    - DELETE param `endpointArn`: will delete saved messages for `endpointArn`
1189
    """
1190

1191
    _PAYLOAD_FIELDS = [
1✔
1192
        "TargetArn",
1193
        "TopicArn",
1194
        "Message",
1195
        "MessageAttributes",
1196
        "MessageStructure",
1197
        "Subject",
1198
        "MessageId",
1199
    ]
1200

1201
    @route(sns_constants.PLATFORM_ENDPOINT_MSGS_ENDPOINT, methods=["GET"])
1✔
1202
    @count_usage
1✔
1203
    def on_get(self, request: Request):
1✔
1204
        filter_endpoint_arn = request.args.get("endpointArn")
1✔
1205
        account_id = (
1✔
1206
            request.args.get("accountId", DEFAULT_AWS_ACCOUNT_ID)
1207
            if not filter_endpoint_arn
1208
            else extract_account_id_from_arn(filter_endpoint_arn)
1209
        )
1210
        region = (
1✔
1211
            request.args.get("region", AWS_REGION_US_EAST_1)
1212
            if not filter_endpoint_arn
1213
            else extract_region_from_arn(filter_endpoint_arn)
1214
        )
1215
        store: SnsStore = sns_stores[account_id][region]
1✔
1216
        if filter_endpoint_arn:
1✔
1217
            messages = store.platform_endpoint_messages.get(filter_endpoint_arn, [])
1✔
1218
            messages = _format_messages(messages, self._PAYLOAD_FIELDS)
1✔
1219
            return {
1✔
1220
                "platform_endpoint_messages": {filter_endpoint_arn: messages},
1221
                "region": region,
1222
            }
1223

1224
        platform_endpoint_messages = {
1✔
1225
            endpoint_arn: _format_messages(messages, self._PAYLOAD_FIELDS)
1226
            for endpoint_arn, messages in store.platform_endpoint_messages.items()
1227
        }
1228
        return {
1✔
1229
            "platform_endpoint_messages": platform_endpoint_messages,
1230
            "region": region,
1231
        }
1232

1233
    @route(sns_constants.PLATFORM_ENDPOINT_MSGS_ENDPOINT, methods=["DELETE"])
1✔
1234
    @count_usage
1✔
1235
    def on_delete(self, request: Request) -> Response:
1✔
1236
        filter_endpoint_arn = request.args.get("endpointArn")
1✔
1237
        account_id = (
1✔
1238
            request.args.get("accountId", DEFAULT_AWS_ACCOUNT_ID)
1239
            if not filter_endpoint_arn
1240
            else extract_account_id_from_arn(filter_endpoint_arn)
1241
        )
1242
        region = (
1✔
1243
            request.args.get("region", AWS_REGION_US_EAST_1)
1244
            if not filter_endpoint_arn
1245
            else extract_region_from_arn(filter_endpoint_arn)
1246
        )
1247
        store: SnsStore = sns_stores[account_id][region]
1✔
1248
        if filter_endpoint_arn:
1✔
1249
            store.platform_endpoint_messages.pop(filter_endpoint_arn, None)
1✔
1250
            return Response("", status=204)
1✔
1251

1252
        store.platform_endpoint_messages.clear()
1✔
1253
        return Response("", status=204)
1✔
1254

1255

1256
class SNSServiceSMSMessagesApiResource(SNSInternalResource):
1✔
1257
    resource_type = "sms-message"
1✔
1258
    """Provides a REST API for retrospective access to SMS messages sent via SNS.
1✔
1259

1260
    This is registered as a LocalStack internal HTTP resource.
1261

1262
    This endpoint accepts:
1263
    - GET param `accountId`: selector for AWS account. If not specified, return fallback `000000000000` test ID
1264
    - GET param `region`: selector for AWS `region`. If not specified, return default "us-east-1"
1265
    - GET param `phoneNumber`: filter for `phoneNumber` resource in SNS
1266
    """
1267

1268
    _PAYLOAD_FIELDS = [
1✔
1269
        "PhoneNumber",
1270
        "TopicArn",
1271
        "SubscriptionArn",
1272
        "MessageId",
1273
        "Message",
1274
        "MessageAttributes",
1275
        "MessageStructure",
1276
        "Subject",
1277
    ]
1278

1279
    @route(sns_constants.SMS_MSGS_ENDPOINT, methods=["GET"])
1✔
1280
    @count_usage
1✔
1281
    def on_get(self, request: Request):
1✔
1282
        account_id = request.args.get("accountId", DEFAULT_AWS_ACCOUNT_ID)
1✔
1283
        region = request.args.get("region", AWS_REGION_US_EAST_1)
1✔
1284
        filter_phone_number = request.args.get("phoneNumber")
1✔
1285
        store: SnsStore = sns_stores[account_id][region]
1✔
1286
        if filter_phone_number:
1✔
1287
            messages = [
1✔
1288
                m for m in store.sms_messages if m.get("PhoneNumber") == filter_phone_number
1289
            ]
1290
            messages = _format_messages(messages, self._PAYLOAD_FIELDS)
1✔
1291
            return {
1✔
1292
                "sms_messages": {filter_phone_number: messages},
1293
                "region": region,
1294
            }
1295

1296
        sms_messages = {}
1✔
1297

1298
        for m in _format_messages(store.sms_messages, self._PAYLOAD_FIELDS):
1✔
1299
            sms_messages.setdefault(m.get("PhoneNumber"), []).append(m)
1✔
1300

1301
        return {
1✔
1302
            "sms_messages": sms_messages,
1303
            "region": region,
1304
        }
1305

1306
    @route(sns_constants.SMS_MSGS_ENDPOINT, methods=["DELETE"])
1✔
1307
    @count_usage
1✔
1308
    def on_delete(self, request: Request) -> Response:
1✔
1309
        account_id = request.args.get("accountId", DEFAULT_AWS_ACCOUNT_ID)
1✔
1310
        region = request.args.get("region", AWS_REGION_US_EAST_1)
1✔
1311
        filter_phone_number = request.args.get("phoneNumber")
1✔
1312
        store: SnsStore = sns_stores[account_id][region]
1✔
1313
        if filter_phone_number:
1✔
1314
            store.sms_messages = [
1✔
1315
                m for m in store.sms_messages if m.get("PhoneNumber") != filter_phone_number
1316
            ]
1317
            return Response("", status=204)
1✔
1318

1319
        store.sms_messages.clear()
1✔
1320
        return Response("", status=204)
1✔
1321

1322

1323
class SNSServiceSubscriptionTokenApiResource(SNSInternalResource):
1✔
1324
    resource_type = "subscription-token"
1✔
1325
    """Provides a REST API for retrospective access to Subscription Confirmation Tokens to confirm subscriptions.
1✔
1326
    Those are not sent for email, and sometimes inaccessible when working with external HTTPS endpoint which won't be
1327
    able to reach your local host.
1328

1329
    This is registered as a LocalStack internal HTTP resource.
1330

1331
    This endpoint has the following parameter:
1332
    - GET `subscription_arn`: `subscriptionArn`resource in SNS for which you want the SubscriptionToken
1333
    """
1334

1335
    @route(f"{sns_constants.SUBSCRIPTION_TOKENS_ENDPOINT}/<path:subscription_arn>", methods=["GET"])
1✔
1336
    @count_usage
1✔
1337
    def on_get(self, _request: Request, subscription_arn: str):
1✔
1338
        try:
1✔
1339
            parsed_arn = parse_arn(subscription_arn)
1✔
1340
        except InvalidArnException:
1✔
1341
            response = Response("", 400)
1✔
1342
            response.set_json(
1✔
1343
                {
1344
                    "error": "The provided SubscriptionARN is invalid",
1345
                    "subscription_arn": subscription_arn,
1346
                }
1347
            )
1348
            return response
1✔
1349

1350
        store: SnsStore = sns_stores[parsed_arn["account"]][parsed_arn["region"]]
1✔
1351

1352
        for token, sub_arn in store.subscription_tokens.items():
1✔
1353
            if sub_arn == subscription_arn:
1✔
1354
                return {
1✔
1355
                    "subscription_token": token,
1356
                    "subscription_arn": subscription_arn,
1357
                }
1358

1359
        response = Response("", 404)
1✔
1360
        response.set_json(
1✔
1361
            {
1362
                "error": "The provided SubscriptionARN is not found",
1363
                "subscription_arn": subscription_arn,
1364
            }
1365
        )
1366
        return response
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc