• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 17451507289

03 Sep 2025 05:30PM UTC coverage: 86.867% (+0.007%) from 86.86%
17451507289

push

github

web-flow
APIGW: fix UpdateResource on root method (#13093)

3 of 3 new or added lines in 1 file covered. (100.0%)

11 existing lines in 4 files now uncovered.

67095 of 77239 relevant lines covered (86.87%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.8
/localstack-core/localstack/services/sns/provider.py
1
import base64
1✔
2
import contextlib
1✔
3
import copy
1✔
4
import functools
1✔
5
import json
1✔
6
import logging
1✔
7
from uuid import uuid4
1✔
8

9
from botocore.utils import InvalidArnException
1✔
10
from moto.core.utils import camelcase_to_pascal, underscores_to_camelcase
1✔
11
from moto.sns import sns_backends
1✔
12
from moto.sns.models import MAXIMUM_MESSAGE_LENGTH, SNSBackend, Topic
1✔
13
from moto.sns.utils import is_e164
1✔
14

15
from localstack.aws.api import CommonServiceException, RequestContext
1✔
16
from localstack.aws.api.sns import (
1✔
17
    AmazonResourceName,
18
    BatchEntryIdsNotDistinctException,
19
    ConfirmSubscriptionResponse,
20
    CreateEndpointResponse,
21
    CreatePlatformApplicationResponse,
22
    CreateTopicResponse,
23
    EndpointDisabledException,
24
    GetSubscriptionAttributesResponse,
25
    GetTopicAttributesResponse,
26
    InvalidParameterException,
27
    InvalidParameterValueException,
28
    ListSubscriptionsByTopicResponse,
29
    ListSubscriptionsResponse,
30
    ListTagsForResourceResponse,
31
    MapStringToString,
32
    MessageAttributeMap,
33
    NotFoundException,
34
    PublishBatchRequestEntryList,
35
    PublishBatchResponse,
36
    PublishBatchResultEntry,
37
    PublishResponse,
38
    SnsApi,
39
    String,
40
    SubscribeResponse,
41
    Subscription,
42
    SubscriptionAttributesMap,
43
    TagKeyList,
44
    TagList,
45
    TagResourceResponse,
46
    TooManyEntriesInBatchRequestException,
47
    TopicAttributesMap,
48
    UntagResourceResponse,
49
    attributeName,
50
    attributeValue,
51
    authenticateOnUnsubscribe,
52
    boolean,
53
    messageStructure,
54
    nextToken,
55
    subscriptionARN,
56
    topicARN,
57
    topicName,
58
)
59
from localstack.constants import AWS_REGION_US_EAST_1, DEFAULT_AWS_ACCOUNT_ID
1✔
60
from localstack.http import Request, Response, Router, route
1✔
61
from localstack.services.edge import ROUTER
1✔
62
from localstack.services.moto import call_moto
1✔
63
from localstack.services.plugins import ServiceLifecycleHook
1✔
64
from localstack.services.sns import constants as sns_constants
1✔
65
from localstack.services.sns.certificate import SNS_SERVER_CERT
1✔
66
from localstack.services.sns.filter import FilterPolicyValidator
1✔
67
from localstack.services.sns.models import (
1✔
68
    SnsMessage,
69
    SnsMessageType,
70
    SnsStore,
71
    SnsSubscription,
72
    sns_stores,
73
)
74
from localstack.services.sns.publisher import (
1✔
75
    PublishDispatcher,
76
    SnsBatchPublishContext,
77
    SnsPublishContext,
78
)
79
from localstack.utils.aws.arns import (
1✔
80
    ArnData,
81
    extract_account_id_from_arn,
82
    extract_region_from_arn,
83
    get_partition,
84
    parse_arn,
85
)
86
from localstack.utils.collections import PaginatedList, select_from_typed_dict
1✔
87
from localstack.utils.strings import short_uid, to_bytes, to_str
1✔
88

89
from .analytics import internal_api_calls
1✔
90

91
# set up logger
92
LOG = logging.getLogger(__name__)
1✔
93

94

95
class SnsProvider(SnsApi, ServiceLifecycleHook):
1✔
96
    """
97
    Provider class for AWS Simple Notification Service.
98

99
    AWS supports following operations in a cross-account setup:
100
    - GetTopicAttributes
101
    - SetTopicAttributes
102
    - AddPermission
103
    - RemovePermission
104
    - Publish
105
    - Subscribe
106
    - ListSubscriptionByTopic
107
    - DeleteTopic
108
    """
109

110
    @route(sns_constants.SNS_CERT_ENDPOINT, methods=["GET"])
1✔
111
    def get_signature_cert_pem_file(self, request: Request):
1✔
112
        # see http://sns-public-resources.s3.amazonaws.com/SNS_Message_Signing_Release_Note_Jan_25_2011.pdf
113
        # see https://docs.aws.amazon.com/sns/latest/dg/sns-verify-signature-of-message.html
114
        return Response(self._signature_cert_pem, 200)
1✔
115

116
    def __init__(self) -> None:
1✔
117
        super().__init__()
1✔
118
        self._publisher = PublishDispatcher()
1✔
119
        self._signature_cert_pem: str = SNS_SERVER_CERT
1✔
120

121
    def on_before_stop(self):
1✔
122
        self._publisher.shutdown()
1✔
123

124
    def on_after_init(self):
1✔
125
        # Allow sent platform endpoint messages to be retrieved from the SNS endpoint
126
        register_sns_api_resource(ROUTER)
1✔
127
        # add the route to serve the certificate used to validate message signatures
128
        ROUTER.add(self.get_signature_cert_pem_file)
1✔
129

130
    @staticmethod
1✔
131
    def get_store(account_id: str, region_name: str) -> SnsStore:
1✔
132
        return sns_stores[account_id][region_name]
1✔
133

134
    @staticmethod
1✔
135
    def get_moto_backend(account_id: str, region_name: str) -> SNSBackend:
1✔
136
        return sns_backends[account_id][region_name]
1✔
137

138
    @staticmethod
1✔
139
    def _get_topic(arn: str, context: RequestContext) -> Topic:
1✔
140
        """
141
        :param arn: the Topic ARN
142
        :param context: the RequestContext of the request
143
        :param multiregion: if the request can fetch the topic across regions or not (ex. Publish cannot publish to a
144
        topic in a different region than the request)
145
        :return: the Moto model Topic
146
        """
147
        arn_data = parse_and_validate_topic_arn(arn)
1✔
148
        if context.region != arn_data["region"]:
1✔
149
            raise InvalidParameterException("Invalid parameter: TopicArn")
1✔
150

151
        try:
1✔
152
            return sns_backends[arn_data["account"]][context.region].topics[arn]
1✔
153
        except KeyError:
1✔
154
            raise NotFoundException("Topic does not exist")
1✔
155

156
    def get_topic_attributes(
1✔
157
        self, context: RequestContext, topic_arn: topicARN, **kwargs
158
    ) -> GetTopicAttributesResponse:
159
        # get the Topic from moto manually first, because Moto does not handle well the case where the ARN is malformed
160
        # (raises ValueError: not enough values to unpack (expected 6, got 1))
161
        moto_topic_model = self._get_topic(topic_arn, context)
1✔
162
        moto_response: GetTopicAttributesResponse = call_moto(context)
1✔
163
        # TODO: fix some attributes by moto, see snapshot
164
        # DeliveryPolicy
165
        # EffectiveDeliveryPolicy
166
        # Policy.Statement..Action -> SNS:Receive is added by moto but not returned in AWS
167
        # TODO: very hacky way to get the attributes we need instead of a moto patch
168
        # see the attributes we need: https://docs.aws.amazon.com/sns/latest/dg/sns-topic-attributes.html
169
        # would need more work to have the proper format out of moto, maybe extract the model to our store
170
        attributes = moto_response["Attributes"]
1✔
171
        for attr in vars(moto_topic_model):
1✔
172
            if "_feedback" in attr:
1✔
173
                key = camelcase_to_pascal(underscores_to_camelcase(attr))
1✔
174
                attributes[key] = getattr(moto_topic_model, attr)
1✔
175
            elif attr == "signature_version":
1✔
176
                attributes["SignatureVersion"] = moto_topic_model.signature_version
1✔
177
            elif attr == "archive_policy":
1✔
178
                attributes["ArchivePolicy"] = moto_topic_model.archive_policy
1✔
179

180
        return moto_response
1✔
181

182
    def set_topic_attributes(
1✔
183
        self,
184
        context: RequestContext,
185
        topic_arn: topicARN,
186
        attribute_name: attributeName,
187
        attribute_value: attributeValue | None = None,
188
        **kwargs,
189
    ) -> None:
190
        # validate the topic first
191
        self._get_topic(topic_arn, context)
1✔
192
        call_moto(context)
1✔
193

194
    def publish_batch(
1✔
195
        self,
196
        context: RequestContext,
197
        topic_arn: topicARN,
198
        publish_batch_request_entries: PublishBatchRequestEntryList,
199
        **kwargs,
200
    ) -> PublishBatchResponse:
201
        if len(publish_batch_request_entries) > 10:
1✔
202
            raise TooManyEntriesInBatchRequestException(
1✔
203
                "The batch request contains more entries than permissible."
204
            )
205

206
        parsed_arn = parse_and_validate_topic_arn(topic_arn)
1✔
207
        store = self.get_store(account_id=parsed_arn["account"], region_name=context.region)
1✔
208
        moto_topic = self._get_topic(topic_arn, context)
1✔
209

210
        ids = [entry["Id"] for entry in publish_batch_request_entries]
1✔
211
        if len(set(ids)) != len(publish_batch_request_entries):
1✔
212
            raise BatchEntryIdsNotDistinctException(
1✔
213
                "Two or more batch entries in the request have the same Id."
214
            )
215

216
        response: PublishBatchResponse = {"Successful": [], "Failed": []}
1✔
217

218
        # TODO: write AWS validated tests with FilterPolicy and batching
219
        # TODO: find a scenario where we can fail to send a message synchronously to be able to report it
220
        # right now, it seems that AWS fails the whole publish if something is wrong in the format of 1 message
221

222
        total_batch_size = 0
1✔
223
        message_contexts = []
1✔
224
        for entry_index, entry in enumerate(publish_batch_request_entries, start=1):
1✔
225
            message_payload = entry.get("Message")
1✔
226
            message_attributes = entry.get("MessageAttributes", {})
1✔
227
            if message_attributes:
1✔
228
                # if a message contains non-valid message attributes
229
                # will fail for the first non-valid message encountered, and raise ParameterValueInvalid
230
                validate_message_attributes(message_attributes, position=entry_index)
1✔
231

232
            total_batch_size += get_total_publish_size(message_payload, message_attributes)
1✔
233

234
            # TODO: WRITE AWS VALIDATED
235
            if entry.get("MessageStructure") == "json":
1✔
236
                try:
1✔
237
                    message = json.loads(message_payload)
1✔
238
                    # Keys in the JSON object that correspond to supported transport protocols must have
239
                    # simple JSON string values.
240
                    # Non-string values will cause the key to be ignored.
241
                    message = {
1✔
242
                        key: field for key, field in message.items() if isinstance(field, str)
243
                    }
244
                    if "default" not in message:
1✔
245
                        raise InvalidParameterException(
1✔
246
                            "Invalid parameter: Message Structure - No default entry in JSON message body"
247
                        )
248
                    entry["Message"] = message  # noqa
1✔
249
                except json.JSONDecodeError:
1✔
UNCOV
250
                    raise InvalidParameterException(
×
251
                        "Invalid parameter: Message Structure - JSON message body failed to parse"
252
                    )
253

254
            if is_fifo := (".fifo" in topic_arn):
1✔
255
                if not all("MessageGroupId" in entry for entry in publish_batch_request_entries):
1✔
256
                    raise InvalidParameterException(
1✔
257
                        "Invalid parameter: The MessageGroupId parameter is required for FIFO topics"
258
                    )
259
                if moto_topic.content_based_deduplication == "false":
1✔
260
                    if not all(
1✔
261
                        "MessageDeduplicationId" in entry for entry in publish_batch_request_entries
262
                    ):
263
                        raise InvalidParameterException(
1✔
264
                            "Invalid parameter: The topic should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided explicitly",
265
                        )
266

267
            msg_ctx = SnsMessage.from_batch_entry(entry, is_fifo=is_fifo)
1✔
268
            message_contexts.append(msg_ctx)
1✔
269
            success = PublishBatchResultEntry(
1✔
270
                Id=entry["Id"],
271
                MessageId=msg_ctx.message_id,
272
            )
273
            if is_fifo:
1✔
274
                success["SequenceNumber"] = msg_ctx.sequencer_number
1✔
275
            response["Successful"].append(success)
1✔
276

277
        if total_batch_size > MAXIMUM_MESSAGE_LENGTH:
1✔
278
            raise CommonServiceException(
1✔
279
                code="BatchRequestTooLong",
280
                message="The length of all the messages put together is more than the limit.",
281
                sender_fault=True,
282
            )
283

284
        publish_ctx = SnsBatchPublishContext(
1✔
285
            messages=message_contexts,
286
            store=store,
287
            request_headers=context.request.headers,
288
            topic_attributes=vars(moto_topic),
289
        )
290
        self._publisher.publish_batch_to_topic(publish_ctx, topic_arn)
1✔
291

292
        return response
1✔
293

294
    def set_subscription_attributes(
1✔
295
        self,
296
        context: RequestContext,
297
        subscription_arn: subscriptionARN,
298
        attribute_name: attributeName,
299
        attribute_value: attributeValue = None,
300
        **kwargs,
301
    ) -> None:
302
        store = self.get_store(account_id=context.account_id, region_name=context.region)
1✔
303
        sub = store.subscriptions.get(subscription_arn)
1✔
304
        if not sub:
1✔
305
            raise NotFoundException("Subscription does not exist")
1✔
306

307
        validate_subscription_attribute(
1✔
308
            attribute_name=attribute_name,
309
            attribute_value=attribute_value,
310
            topic_arn=sub["TopicArn"],
311
            endpoint=sub["Endpoint"],
312
        )
313
        if attribute_name == "RawMessageDelivery":
1✔
314
            attribute_value = attribute_value.lower()
1✔
315

316
        elif attribute_name == "FilterPolicy":
1✔
317
            filter_policy = json.loads(attribute_value) if attribute_value else None
1✔
318
            if filter_policy:
1✔
319
                validator = FilterPolicyValidator(
1✔
320
                    scope=sub.get("FilterPolicyScope", "MessageAttributes"),
321
                    is_subscribe_call=False,
322
                )
323
                validator.validate_filter_policy(filter_policy)
1✔
324

325
            store.subscription_filter_policy[subscription_arn] = filter_policy
1✔
326

327
        sub[attribute_name] = attribute_value
1✔
328

329
    def confirm_subscription(
1✔
330
        self,
331
        context: RequestContext,
332
        topic_arn: topicARN,
333
        token: String,
334
        authenticate_on_unsubscribe: authenticateOnUnsubscribe = None,
335
        **kwargs,
336
    ) -> ConfirmSubscriptionResponse:
337
        # TODO: validate format on the token (seems to be 288 hex chars)
338
        # this request can come from any http client, it might not be signed (we would need to implement
339
        # `authenticate_on_unsubscribe` to force a signing client to do this request.
340
        # so, the region and account_id might not be in the request. Use the ones from the topic_arn
341
        try:
1✔
342
            parsed_arn = parse_arn(topic_arn)
1✔
343
        except InvalidArnException:
1✔
344
            raise InvalidParameterException("Invalid parameter: Topic")
1✔
345

346
        store = self.get_store(account_id=parsed_arn["account"], region_name=parsed_arn["region"])
1✔
347

348
        # it seems SNS is able to know what the region of the topic should be, even though a wrong topic is accepted
349
        if parsed_arn["region"] != get_region_from_subscription_token(token):
1✔
350
            raise InvalidParameterException("Invalid parameter: Topic")
1✔
351

352
        subscription_arn = store.subscription_tokens.get(token)
1✔
353
        if not subscription_arn:
1✔
UNCOV
354
            raise InvalidParameterException("Invalid parameter: Token")
×
355

356
        subscription = store.subscriptions.get(subscription_arn)
1✔
357
        if not subscription:
1✔
358
            # subscription could have been deleted in the meantime
UNCOV
359
            raise InvalidParameterException("Invalid parameter: Token")
×
360

361
        # ConfirmSubscription is idempotent
362
        if subscription.get("PendingConfirmation") == "false":
1✔
363
            return ConfirmSubscriptionResponse(SubscriptionArn=subscription_arn)
1✔
364

365
        subscription["PendingConfirmation"] = "false"
1✔
366
        subscription["ConfirmationWasAuthenticated"] = "true"
1✔
367

368
        return ConfirmSubscriptionResponse(SubscriptionArn=subscription_arn)
1✔
369

370
    def untag_resource(
1✔
371
        self,
372
        context: RequestContext,
373
        resource_arn: AmazonResourceName,
374
        tag_keys: TagKeyList,
375
        **kwargs,
376
    ) -> UntagResourceResponse:
377
        call_moto(context)
1✔
378
        # TODO: probably get the account_id and region from the `resource_arn`
379
        store = self.get_store(context.account_id, context.region)
1✔
380
        existing_tags = store.sns_tags.setdefault(resource_arn, [])
1✔
381
        store.sns_tags[resource_arn] = [t for t in existing_tags if t["Key"] not in tag_keys]
1✔
382
        return UntagResourceResponse()
1✔
383

384
    def list_tags_for_resource(
1✔
385
        self, context: RequestContext, resource_arn: AmazonResourceName, **kwargs
386
    ) -> ListTagsForResourceResponse:
387
        # TODO: probably get the account_id and region from the `resource_arn`
388
        store = self.get_store(context.account_id, context.region)
1✔
389
        tags = store.sns_tags.setdefault(resource_arn, [])
1✔
390
        return ListTagsForResourceResponse(Tags=tags)
1✔
391

392
    def create_platform_application(
1✔
393
        self,
394
        context: RequestContext,
395
        name: String,
396
        platform: String,
397
        attributes: MapStringToString,
398
        **kwargs,
399
    ) -> CreatePlatformApplicationResponse:
400
        # TODO: validate platform
401
        # see https://docs.aws.amazon.com/cli/latest/reference/sns/create-platform-application.html
402
        # list of possible values: ADM, Baidu, APNS, APNS_SANDBOX, GCM, MPNS, WNS
403
        # each platform has a specific way to handle credentials
404
        # this can also be used for dispatching message to the right platform
405
        return call_moto(context)
1✔
406

407
    def create_platform_endpoint(
1✔
408
        self,
409
        context: RequestContext,
410
        platform_application_arn: String,
411
        token: String,
412
        custom_user_data: String = None,
413
        attributes: MapStringToString = None,
414
        **kwargs,
415
    ) -> CreateEndpointResponse:
416
        # TODO: support mobile app events
417
        # see https://docs.aws.amazon.com/sns/latest/dg/application-event-notifications.html
418
        return call_moto(context)
1✔
419

420
    def unsubscribe(
1✔
421
        self, context: RequestContext, subscription_arn: subscriptionARN, **kwargs
422
    ) -> None:
423
        if subscription_arn is None:
1✔
424
            raise InvalidParameterException(
1✔
425
                "Invalid parameter: SubscriptionArn Reason: no value for required parameter",
426
            )
427
        count = len(subscription_arn.split(":"))
1✔
428
        try:
1✔
429
            parsed_arn = parse_arn(subscription_arn)
1✔
430
        except InvalidArnException:
1✔
431
            # TODO: check for invalid SubscriptionGUID
432
            raise InvalidParameterException(
1✔
433
                f"Invalid parameter: SubscriptionArn Reason: An ARN must have at least 6 elements, not {count}"
434
            )
435

436
        account_id = parsed_arn["account"]
1✔
437
        region_name = parsed_arn["region"]
1✔
438

439
        store = self.get_store(account_id=account_id, region_name=region_name)
1✔
440
        if count == 6 and subscription_arn not in store.subscriptions:
1✔
441
            raise InvalidParameterException("Invalid parameter: SubscriptionId")
1✔
442

443
        moto_sns_backend = self.get_moto_backend(account_id, region_name)
1✔
444
        moto_sns_backend.unsubscribe(subscription_arn)
1✔
445

446
        # pop the subscription at the end, to avoid race condition by iterating over the topic subscriptions
447
        subscription = store.subscriptions.get(subscription_arn)
1✔
448

449
        if not subscription:
1✔
450
            # unsubscribe is idempotent, so unsubscribing from a non-existing topic does nothing
451
            return
1✔
452

453
        if subscription["Protocol"] in ["http", "https"]:
1✔
454
            # TODO: actually validate this (re)subscribe behaviour somehow (localhost.run?)
455
            #  we might need to save the sub token in the store
456
            # TODO: AWS only sends the UnsubscribeConfirmation if the call is unauthenticated or the requester is not
457
            #  the owner
458
            subscription_token = encode_subscription_token_with_region(region=context.region)
1✔
459
            message_ctx = SnsMessage(
1✔
460
                type=SnsMessageType.UnsubscribeConfirmation,
461
                token=subscription_token,
462
                message=f"You have chosen to deactivate subscription {subscription_arn}.\nTo cancel this operation and restore the subscription, visit the SubscribeURL included in this message.",
463
            )
464
            moto_topic = moto_sns_backend.topics.get(subscription["TopicArn"])
1✔
465
            publish_ctx = SnsPublishContext(
1✔
466
                message=message_ctx,
467
                store=store,
468
                request_headers=context.request.headers,
469
                topic_attributes=vars(moto_topic),
470
            )
471
            self._publisher.publish_to_topic_subscriber(
1✔
472
                publish_ctx,
473
                topic_arn=subscription["TopicArn"],
474
                subscription_arn=subscription_arn,
475
            )
476

477
        with contextlib.suppress(ValueError):
1✔
478
            store.topic_subscriptions[subscription["TopicArn"]].remove(subscription_arn)
1✔
479
        store.subscription_filter_policy.pop(subscription_arn, None)
1✔
480
        store.subscriptions.pop(subscription_arn, None)
1✔
481

482
    def get_subscription_attributes(
1✔
483
        self, context: RequestContext, subscription_arn: subscriptionARN, **kwargs
484
    ) -> GetSubscriptionAttributesResponse:
485
        store = self.get_store(account_id=context.account_id, region_name=context.region)
1✔
486
        sub = store.subscriptions.get(subscription_arn)
1✔
487
        if not sub:
1✔
488
            raise NotFoundException("Subscription does not exist")
1✔
489
        removed_attrs = ["sqs_queue_url"]
1✔
490
        if "FilterPolicyScope" in sub and not sub.get("FilterPolicy"):
1✔
491
            removed_attrs.append("FilterPolicyScope")
1✔
492
            removed_attrs.append("FilterPolicy")
1✔
493
        elif "FilterPolicy" in sub and "FilterPolicyScope" not in sub:
1✔
UNCOV
494
            sub["FilterPolicyScope"] = "MessageAttributes"
×
495

496
        attributes = {k: v for k, v in sub.items() if k not in removed_attrs}
1✔
497
        return GetSubscriptionAttributesResponse(Attributes=attributes)
1✔
498

499
    def list_subscriptions(
1✔
500
        self, context: RequestContext, next_token: nextToken = None, **kwargs
501
    ) -> ListSubscriptionsResponse:
502
        store = self.get_store(context.account_id, context.region)
1✔
503
        subscriptions = [
1✔
504
            select_from_typed_dict(Subscription, sub) for sub in list(store.subscriptions.values())
505
        ]
506
        paginated_subscriptions = PaginatedList(subscriptions)
1✔
507
        page, next_token = paginated_subscriptions.get_page(
1✔
508
            token_generator=lambda x: get_next_page_token_from_arn(x["SubscriptionArn"]),
509
            page_size=100,
510
            next_token=next_token,
511
        )
512

513
        response = ListSubscriptionsResponse(Subscriptions=page)
1✔
514
        if next_token:
1✔
515
            response["NextToken"] = next_token
1✔
516
        return response
1✔
517

518
    def list_subscriptions_by_topic(
1✔
519
        self, context: RequestContext, topic_arn: topicARN, next_token: nextToken = None, **kwargs
520
    ) -> ListSubscriptionsByTopicResponse:
521
        self._get_topic(topic_arn, context)
1✔
522
        parsed_topic_arn = parse_and_validate_topic_arn(topic_arn)
1✔
523
        store = self.get_store(parsed_topic_arn["account"], parsed_topic_arn["region"])
1✔
524
        sns_subscriptions = store.get_topic_subscriptions(topic_arn)
1✔
525
        subscriptions = [select_from_typed_dict(Subscription, sub) for sub in sns_subscriptions]
1✔
526

527
        paginated_subscriptions = PaginatedList(subscriptions)
1✔
528
        page, next_token = paginated_subscriptions.get_page(
1✔
529
            token_generator=lambda x: get_next_page_token_from_arn(x["SubscriptionArn"]),
530
            page_size=100,
531
            next_token=next_token,
532
        )
533

534
        response = ListSubscriptionsResponse(Subscriptions=page)
1✔
535
        if next_token:
1✔
536
            response["NextToken"] = next_token
1✔
537
        return response
1✔
538

539
    def publish(
1✔
540
        self,
541
        context: RequestContext,
542
        message: String,
543
        topic_arn: topicARN = None,
544
        target_arn: String = None,
545
        phone_number: String = None,
546
        subject: String = None,
547
        message_structure: messageStructure = None,
548
        message_attributes: MessageAttributeMap = None,
549
        message_deduplication_id: String = None,
550
        message_group_id: String = None,
551
        **kwargs,
552
    ) -> PublishResponse:
553
        if subject == "":
1✔
554
            raise InvalidParameterException("Invalid parameter: Subject")
1✔
555
        if not message or all(not m for m in message):
1✔
556
            raise InvalidParameterException("Invalid parameter: Empty message")
1✔
557

558
        # TODO: check for topic + target + phone number at the same time?
559
        # TODO: more validation on phone, it might be opted out?
560
        if phone_number and not is_e164(phone_number):
1✔
561
            raise InvalidParameterException(
1✔
562
                f"Invalid parameter: PhoneNumber Reason: {phone_number} is not valid to publish to"
563
            )
564

565
        if message_attributes:
1✔
566
            validate_message_attributes(message_attributes)
1✔
567

568
        if get_total_publish_size(message, message_attributes) > MAXIMUM_MESSAGE_LENGTH:
1✔
569
            raise InvalidParameterException("Invalid parameter: Message too long")
1✔
570

571
        # for compatibility reasons, AWS allows users to use either TargetArn or TopicArn for publishing to a topic
572
        # use any of them for topic validation
573
        topic_or_target_arn = topic_arn or target_arn
1✔
574
        topic_model = None
1✔
575

576
        if is_fifo := (topic_or_target_arn and ".fifo" in topic_or_target_arn):
1✔
577
            if not message_group_id:
1✔
578
                raise InvalidParameterException(
1✔
579
                    "Invalid parameter: The MessageGroupId parameter is required for FIFO topics",
580
                )
581
            topic_model = self._get_topic(topic_or_target_arn, context)
1✔
582
            if topic_model.content_based_deduplication == "false":
1✔
583
                if not message_deduplication_id:
1✔
584
                    raise InvalidParameterException(
1✔
585
                        "Invalid parameter: The topic should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided explicitly",
586
                    )
587
        elif message_deduplication_id:
1✔
588
            # this is the first one to raise if both are set while the topic is not fifo
589
            raise InvalidParameterException(
1✔
590
                "Invalid parameter: MessageDeduplicationId Reason: The request includes MessageDeduplicationId parameter that is not valid for this topic type"
591
            )
592

593
        is_endpoint_publish = target_arn and ":endpoint/" in target_arn
1✔
594
        if message_structure == "json":
1✔
595
            try:
1✔
596
                message = json.loads(message)
1✔
597
                # Keys in the JSON object that correspond to supported transport protocols must have
598
                # simple JSON string values.
599
                # Non-string values will cause the key to be ignored.
600
                message = {key: field for key, field in message.items() if isinstance(field, str)}
1✔
601
                # TODO: check no default key for direct TargetArn endpoint publish, need credentials
602
                # see example: https://docs.aws.amazon.com/sns/latest/dg/sns-send-custom-platform-specific-payloads-mobile-devices.html
603
                if "default" not in message and not is_endpoint_publish:
1✔
604
                    raise InvalidParameterException(
1✔
605
                        "Invalid parameter: Message Structure - No default entry in JSON message body"
606
                    )
607
            except json.JSONDecodeError:
1✔
608
                raise InvalidParameterException(
1✔
609
                    "Invalid parameter: Message Structure - JSON message body failed to parse"
610
                )
611

612
        if not phone_number:
1✔
613
            # use the account to get the store from the TopicArn (you can only publish in the same region as the topic)
614
            parsed_arn = parse_and_validate_topic_arn(topic_or_target_arn)
1✔
615
            store = self.get_store(account_id=parsed_arn["account"], region_name=context.region)
1✔
616
            moto_sns_backend = self.get_moto_backend(parsed_arn["account"], context.region)
1✔
617
            if is_endpoint_publish:
1✔
618
                if not (platform_endpoint := moto_sns_backend.platform_endpoints.get(target_arn)):
1✔
619
                    raise InvalidParameterException(
1✔
620
                        "Invalid parameter: TargetArn Reason: No endpoint found for the target arn specified"
621
                    )
622
                elif not platform_endpoint.enabled:
1✔
623
                    raise EndpointDisabledException("Endpoint is disabled")
1✔
624
            else:
625
                topic_model = self._get_topic(topic_or_target_arn, context)
1✔
626
        else:
627
            # use the store from the request context
628
            store = self.get_store(account_id=context.account_id, region_name=context.region)
1✔
629

630
        message_ctx = SnsMessage(
1✔
631
            type=SnsMessageType.Notification,
632
            message=message,
633
            message_attributes=message_attributes,
634
            message_deduplication_id=message_deduplication_id,
635
            message_group_id=message_group_id,
636
            message_structure=message_structure,
637
            subject=subject,
638
            is_fifo=is_fifo,
639
        )
640
        publish_ctx = SnsPublishContext(
1✔
641
            message=message_ctx, store=store, request_headers=context.request.headers
642
        )
643

644
        if is_endpoint_publish:
1✔
645
            self._publisher.publish_to_application_endpoint(
1✔
646
                ctx=publish_ctx, endpoint_arn=target_arn
647
            )
648
        elif phone_number:
1✔
649
            self._publisher.publish_to_phone_number(ctx=publish_ctx, phone_number=phone_number)
1✔
650
        else:
651
            # beware if the subscription is FIFO, the order might not be guaranteed.
652
            # 2 quick call to this method in succession might not be executed in order in the executor?
653
            # TODO: test how this behaves in a FIFO context with a lot of threads.
654
            publish_ctx.topic_attributes |= vars(topic_model)
1✔
655
            self._publisher.publish_to_topic(publish_ctx, topic_or_target_arn)
1✔
656

657
        if is_fifo:
1✔
658
            return PublishResponse(
1✔
659
                MessageId=message_ctx.message_id, SequenceNumber=message_ctx.sequencer_number
660
            )
661

662
        return PublishResponse(MessageId=message_ctx.message_id)
1✔
663

664
    def subscribe(
1✔
665
        self,
666
        context: RequestContext,
667
        topic_arn: topicARN,
668
        protocol: String,
669
        endpoint: String = None,
670
        attributes: SubscriptionAttributesMap = None,
671
        return_subscription_arn: boolean = None,
672
        **kwargs,
673
    ) -> SubscribeResponse:
674
        # TODO: check validation ordering
675
        parsed_topic_arn = parse_and_validate_topic_arn(topic_arn)
1✔
676
        if context.region != parsed_topic_arn["region"]:
1✔
677
            raise InvalidParameterException("Invalid parameter: TopicArn")
1✔
678

679
        store = self.get_store(account_id=parsed_topic_arn["account"], region_name=context.region)
1✔
680

681
        if topic_arn not in store.topic_subscriptions:
1✔
682
            raise NotFoundException("Topic does not exist")
1✔
683

684
        if not endpoint:
1✔
685
            # TODO: check AWS behaviour (because endpoint is optional)
686
            raise NotFoundException("Endpoint not specified in subscription")
×
687
        if protocol not in sns_constants.SNS_PROTOCOLS:
1✔
688
            raise InvalidParameterException(
1✔
689
                f"Invalid parameter: Amazon SNS does not support this protocol string: {protocol}"
690
            )
691
        elif protocol in ["http", "https"] and not endpoint.startswith(f"{protocol}://"):
1✔
UNCOV
692
            raise InvalidParameterException(
×
693
                "Invalid parameter: Endpoint must match the specified protocol"
694
            )
695
        elif protocol == "sms" and not is_e164(endpoint):
1✔
696
            raise InvalidParameterException(f"Invalid SMS endpoint: {endpoint}")
1✔
697

698
        elif protocol == "sqs":
1✔
699
            try:
1✔
700
                parse_arn(endpoint)
1✔
701
            except InvalidArnException:
1✔
702
                raise InvalidParameterException("Invalid parameter: SQS endpoint ARN")
1✔
703

704
        elif protocol == "application":
1✔
705
            # TODO: this is taken from moto, validate it
706
            moto_backend = self.get_moto_backend(
1✔
707
                account_id=parsed_topic_arn["account"], region_name=context.region
708
            )
709
            if endpoint not in moto_backend.platform_endpoints:
1✔
UNCOV
710
                raise NotFoundException("Endpoint does not exist")
×
711

712
        if ".fifo" in endpoint and ".fifo" not in topic_arn:
1✔
713
            raise InvalidParameterException(
1✔
714
                "Invalid parameter: Invalid parameter: Endpoint Reason: FIFO SQS Queues can not be subscribed to standard SNS topics"
715
            )
716

717
        sub_attributes = copy.deepcopy(attributes) if attributes else None
1✔
718
        if sub_attributes:
1✔
719
            for attr_name, attr_value in sub_attributes.items():
1✔
720
                validate_subscription_attribute(
1✔
721
                    attribute_name=attr_name,
722
                    attribute_value=attr_value,
723
                    topic_arn=topic_arn,
724
                    endpoint=endpoint,
725
                    is_subscribe_call=True,
726
                )
727
                if raw_msg_delivery := sub_attributes.get("RawMessageDelivery"):
1✔
728
                    sub_attributes["RawMessageDelivery"] = raw_msg_delivery.lower()
1✔
729

730
        # An endpoint may only be subscribed to a topic once. Subsequent
731
        # subscribe calls do nothing (subscribe is idempotent), except if its attributes are different.
732
        for existing_topic_subscription in store.topic_subscriptions.get(topic_arn, []):
1✔
733
            sub = store.subscriptions.get(existing_topic_subscription, {})
1✔
734
            if sub.get("Endpoint") == endpoint:
1✔
735
                if sub_attributes:
1✔
736
                    # validate the subscription attributes aren't different
737
                    for attr in sns_constants.VALID_SUBSCRIPTION_ATTR_NAME:
1✔
738
                        # if a new attribute is present and different from an existent one, raise
739
                        if (new_attr := sub_attributes.get(attr)) and sub.get(attr) != new_attr:
1✔
740
                            raise InvalidParameterException(
1✔
741
                                "Invalid parameter: Attributes Reason: Subscription already exists with different attributes"
742
                            )
743

744
                return SubscribeResponse(SubscriptionArn=sub["SubscriptionArn"])
1✔
745

746
        principal = sns_constants.DUMMY_SUBSCRIPTION_PRINCIPAL.format(
1✔
747
            partition=get_partition(context.region), account_id=context.account_id
748
        )
749
        subscription_arn = create_subscription_arn(topic_arn)
1✔
750
        subscription = SnsSubscription(
1✔
751
            # http://docs.aws.amazon.com/cli/latest/reference/sns/get-subscription-attributes.html
752
            TopicArn=topic_arn,
753
            Endpoint=endpoint,
754
            Protocol=protocol,
755
            SubscriptionArn=subscription_arn,
756
            PendingConfirmation="true",
757
            Owner=context.account_id,
758
            RawMessageDelivery="false",  # default value, will be overridden if set
759
            FilterPolicyScope="MessageAttributes",  # default value, will be overridden if set
760
            SubscriptionPrincipal=principal,  # dummy value, could be fetched with a call to STS?
761
        )
762
        if sub_attributes:
1✔
763
            subscription.update(sub_attributes)
1✔
764
            if "FilterPolicy" in sub_attributes:
1✔
765
                filter_policy = (
1✔
766
                    json.loads(sub_attributes["FilterPolicy"])
767
                    if sub_attributes["FilterPolicy"]
768
                    else None
769
                )
770
                if filter_policy:
1✔
771
                    validator = FilterPolicyValidator(
1✔
772
                        scope=subscription.get("FilterPolicyScope", "MessageAttributes"),
773
                        is_subscribe_call=True,
774
                    )
775
                    validator.validate_filter_policy(filter_policy)
1✔
776

777
                store.subscription_filter_policy[subscription_arn] = filter_policy
1✔
778

779
        store.subscriptions[subscription_arn] = subscription
1✔
780

781
        topic_subscriptions = store.topic_subscriptions.setdefault(topic_arn, [])
1✔
782
        topic_subscriptions.append(subscription_arn)
1✔
783

784
        # store the token and subscription arn
785
        # TODO: the token is a 288 hex char string
786
        subscription_token = encode_subscription_token_with_region(region=context.region)
1✔
787
        store.subscription_tokens[subscription_token] = subscription_arn
1✔
788

789
        response_subscription_arn = subscription_arn
1✔
790
        # Send out confirmation message for HTTP(S), fix for https://github.com/localstack/localstack/issues/881
791
        if protocol in ["http", "https"]:
1✔
792
            message_ctx = SnsMessage(
1✔
793
                type=SnsMessageType.SubscriptionConfirmation,
794
                token=subscription_token,
795
                message=f"You have chosen to subscribe to the topic {topic_arn}.\nTo confirm the subscription, visit the SubscribeURL included in this message.",
796
            )
797
            publish_ctx = SnsPublishContext(
1✔
798
                message=message_ctx,
799
                store=store,
800
                request_headers=context.request.headers,
801
                topic_attributes=vars(self._get_topic(topic_arn, context)),
802
            )
803
            self._publisher.publish_to_topic_subscriber(
1✔
804
                ctx=publish_ctx,
805
                topic_arn=topic_arn,
806
                subscription_arn=subscription_arn,
807
            )
808
            if not return_subscription_arn:
1✔
809
                response_subscription_arn = "pending confirmation"
1✔
810

811
        elif protocol not in ["email", "email-json"]:
1✔
812
            # Only HTTP(S) and email subscriptions are not auto validated
813
            # Except if the endpoint and the topic are not in the same AWS account, then you'd need to manually confirm
814
            # the subscription with the token
815
            # TODO: revisit for multi-account
816
            # TODO: test with AWS for email & email-json confirmation message
817
            # we need to add the following check:
818
            # if parsed_topic_arn["account"] == endpoint account (depending on the type, SQS, lambda, parse the arn)
819
            subscription["PendingConfirmation"] = "false"
1✔
820
            subscription["ConfirmationWasAuthenticated"] = "true"
1✔
821

822
        return SubscribeResponse(SubscriptionArn=response_subscription_arn)
1✔
823

824
    def tag_resource(
1✔
825
        self, context: RequestContext, resource_arn: AmazonResourceName, tags: TagList, **kwargs
826
    ) -> TagResourceResponse:
827
        # each tag key must be unique
828
        # https://docs.aws.amazon.com/general/latest/gr/aws_tagging.html#tag-best-practices
829
        unique_tag_keys = {tag["Key"] for tag in tags}
1✔
830
        if len(unique_tag_keys) < len(tags):
1✔
831
            raise InvalidParameterException("Invalid parameter: Duplicated keys are not allowed.")
1✔
832

833
        call_moto(context)
1✔
834
        store = self.get_store(context.account_id, context.region)
1✔
835
        existing_tags = store.sns_tags.get(resource_arn, [])
1✔
836

837
        def existing_tag_index(_item):
1✔
838
            for idx, tag in enumerate(existing_tags):
1✔
839
                if _item["Key"] == tag["Key"]:
1✔
840
                    return idx
1✔
841
            return None
1✔
842

843
        for item in tags:
1✔
844
            existing_index = existing_tag_index(item)
1✔
845
            if existing_index is None:
1✔
846
                existing_tags.append(item)
1✔
847
            else:
848
                existing_tags[existing_index] = item
1✔
849

850
        store.sns_tags[resource_arn] = existing_tags
1✔
851
        return TagResourceResponse()
1✔
852

853
    def delete_topic(self, context: RequestContext, topic_arn: topicARN, **kwargs) -> None:
1✔
854
        parsed_arn = parse_and_validate_topic_arn(topic_arn)
1✔
855
        if context.region != parsed_arn["region"]:
1✔
856
            raise InvalidParameterException("Invalid parameter: TopicArn")
1✔
857

858
        call_moto(context)
1✔
859
        store = self.get_store(account_id=parsed_arn["account"], region_name=context.region)
1✔
860
        topic_subscriptions = store.topic_subscriptions.pop(topic_arn, [])
1✔
861
        for topic_sub in topic_subscriptions:
1✔
862
            store.subscriptions.pop(topic_sub, None)
1✔
863

864
        store.sns_tags.pop(topic_arn, None)
1✔
865

866
    def create_topic(
1✔
867
        self,
868
        context: RequestContext,
869
        name: topicName,
870
        attributes: TopicAttributesMap = None,
871
        tags: TagList = None,
872
        data_protection_policy: attributeValue = None,
873
        **kwargs,
874
    ) -> CreateTopicResponse:
875
        moto_response = call_moto(context)
1✔
876
        store = self.get_store(account_id=context.account_id, region_name=context.region)
1✔
877
        topic_arn = moto_response["TopicArn"]
1✔
878
        tag_resource_success = extract_tags(topic_arn, tags, True, store)
1✔
879
        if not tag_resource_success:
1✔
880
            raise InvalidParameterException(
1✔
881
                "Invalid parameter: Tags Reason: Topic already exists with different tags"
882
            )
883
        if tags:
1✔
884
            self.tag_resource(context=context, resource_arn=topic_arn, tags=tags)
1✔
885
        store.topic_subscriptions.setdefault(topic_arn, [])
1✔
886
        return CreateTopicResponse(TopicArn=topic_arn)
1✔
887

888

889
def is_raw_message_delivery(susbcriber):
1✔
890
    return susbcriber.get("RawMessageDelivery") in ("true", True, "True")
1✔
891

892

893
def validate_subscription_attribute(
1✔
894
    attribute_name: str,
895
    attribute_value: str,
896
    topic_arn: str,
897
    endpoint: str,
898
    is_subscribe_call: bool = False,
899
) -> None:
900
    """
901
    Validate the subscription attribute to be set. See:
902
    https://docs.aws.amazon.com/sns/latest/api/API_SetSubscriptionAttributes.html
903
    :param attribute_name: the subscription attribute name, must be in VALID_SUBSCRIPTION_ATTR_NAME
904
    :param attribute_value: the subscription attribute value
905
    :param topic_arn: the topic_arn of the subscription, needed to know if it is FIFO
906
    :param endpoint: the subscription endpoint (like an SQS queue ARN)
907
    :param is_subscribe_call: the error message is different if called from Subscribe or SetSubscriptionAttributes
908
    :raises InvalidParameterException
909
    :return:
910
    """
911
    error_prefix = (
1✔
912
        "Invalid parameter: Attributes Reason: " if is_subscribe_call else "Invalid parameter: "
913
    )
914
    if attribute_name not in sns_constants.VALID_SUBSCRIPTION_ATTR_NAME:
1✔
915
        raise InvalidParameterException(f"{error_prefix}AttributeName")
1✔
916

917
    if attribute_name == "FilterPolicy":
1✔
918
        try:
1✔
919
            json.loads(attribute_value or "{}")
1✔
920
        except json.JSONDecodeError:
1✔
921
            raise InvalidParameterException(f"{error_prefix}FilterPolicy: failed to parse JSON.")
1✔
922
    elif attribute_name == "FilterPolicyScope":
1✔
923
        if attribute_value not in ("MessageAttributes", "MessageBody"):
1✔
924
            raise InvalidParameterException(
1✔
925
                f"{error_prefix}FilterPolicyScope: Invalid value [{attribute_value}]. "
926
                f"Please use either MessageBody or MessageAttributes"
927
            )
928
    elif attribute_name == "RawMessageDelivery":
1✔
929
        # TODO: only for SQS and https(s) subs, + firehose
930
        if attribute_value.lower() not in ("true", "false"):
1✔
931
            raise InvalidParameterException(
1✔
932
                f"{error_prefix}RawMessageDelivery: Invalid value [{attribute_value}]. "
933
                f"Must be true or false."
934
            )
935

936
    elif attribute_name == "RedrivePolicy":
1✔
937
        try:
1✔
938
            dlq_target_arn = json.loads(attribute_value).get("deadLetterTargetArn", "")
1✔
939
        except json.JSONDecodeError:
1✔
940
            raise InvalidParameterException(f"{error_prefix}RedrivePolicy: failed to parse JSON.")
1✔
941
        try:
1✔
942
            parsed_arn = parse_arn(dlq_target_arn)
1✔
943
        except InvalidArnException:
1✔
944
            raise InvalidParameterException(
1✔
945
                f"{error_prefix}RedrivePolicy: deadLetterTargetArn is an invalid arn"
946
            )
947

948
        if topic_arn.endswith(".fifo"):
1✔
949
            if endpoint.endswith(".fifo") and (
1✔
950
                not parsed_arn["resource"].endswith(".fifo") or "sqs" not in parsed_arn["service"]
951
            ):
952
                raise InvalidParameterException(
1✔
953
                    f"{error_prefix}RedrivePolicy: must use a FIFO queue as DLQ for a FIFO Subscription to a FIFO Topic."
954
                )
955

956

957
def validate_message_attributes(
1✔
958
    message_attributes: MessageAttributeMap, position: int | None = None
959
) -> None:
960
    """
961
    Validate the message attributes, and raises an exception if those do not follow AWS validation
962
    See: https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html
963
    Regex from: https://stackoverflow.com/questions/40718851/regex-that-does-not-allow-consecutive-dots
964
    :param message_attributes: the message attributes map for the message
965
    :param position: given to give the Batch Entry position if coming from `publishBatch`
966
    :raises: InvalidParameterValueException
967
    :return: None
968
    """
969
    for attr_name, attr in message_attributes.items():
1✔
970
        if len(attr_name) > 256:
1✔
971
            raise InvalidParameterValueException(
1✔
972
                "Length of message attribute name must be less than 256 bytes."
973
            )
974
        validate_message_attribute_name(attr_name)
1✔
975
        # `DataType` is a required field for MessageAttributeValue
976
        if (data_type := attr.get("DataType")) is None:
1✔
977
            if position:
1✔
978
                at = f"publishBatchRequestEntries.{position}.member.messageAttributes.{attr_name}.member.dataType"
1✔
979
            else:
980
                at = f"messageAttributes.{attr_name}.member.dataType"
1✔
981

982
            raise CommonServiceException(
1✔
983
                code="ValidationError",
984
                message=f"1 validation error detected: Value null at '{at}' failed to satisfy constraint: Member must not be null",
985
                sender_fault=True,
986
            )
987

988
        if data_type not in (
1✔
989
            "String",
990
            "Number",
991
            "Binary",
992
        ) and not sns_constants.ATTR_TYPE_REGEX.match(data_type):
993
            raise InvalidParameterValueException(
1✔
994
                f"The message attribute '{attr_name}' has an invalid message attribute type, the set of supported type prefixes is Binary, Number, and String."
995
            )
996
        if not any(attr_value.endswith("Value") for attr_value in attr):
1✔
997
            raise InvalidParameterValueException(
1✔
998
                f"The message attribute '{attr_name}' must contain non-empty message attribute value for message attribute type '{data_type}'."
999
            )
1000

1001
        value_key_data_type = "Binary" if data_type.startswith("Binary") else "String"
1✔
1002
        value_key = f"{value_key_data_type}Value"
1✔
1003
        if value_key not in attr:
1✔
1004
            raise InvalidParameterValueException(
1✔
1005
                f"The message attribute '{attr_name}' with type '{data_type}' must use field '{value_key_data_type}'."
1006
            )
1007
        elif not attr[value_key]:
1✔
1008
            raise InvalidParameterValueException(
1✔
1009
                f"The message attribute '{attr_name}' must contain non-empty message attribute value for message attribute type '{data_type}'.",
1010
            )
1011

1012

1013
def validate_message_attribute_name(name: str) -> None:
1✔
1014
    """
1015
    Validate the message attribute name with the specification of AWS.
1016
    The message attribute name can contain the following characters: A-Z, a-z, 0-9, underscore(_), hyphen(-), and period (.). The name must not start or end with a period, and it should not have successive periods.
1017
    :param name: message attribute name
1018
    :raises InvalidParameterValueException: if the name does not conform to the spec
1019
    """
1020
    if not sns_constants.MSG_ATTR_NAME_REGEX.match(name):
1✔
1021
        # find the proper exception
1022
        if name[0] == ".":
1✔
1023
            raise InvalidParameterValueException(
1✔
1024
                "Invalid message attribute name starting with character '.' was found."
1025
            )
1026
        elif name[-1] == ".":
1✔
1027
            raise InvalidParameterValueException(
1✔
1028
                "Invalid message attribute name ending with character '.' was found."
1029
            )
1030

1031
        for idx, char in enumerate(name):
1✔
1032
            if char not in sns_constants.VALID_MSG_ATTR_NAME_CHARS:
1✔
1033
                # change prefix from 0x to #x, without capitalizing the x
1034
                hex_char = "#x" + hex(ord(char)).upper()[2:]
1✔
1035
                raise InvalidParameterValueException(
1✔
1036
                    f"Invalid non-alphanumeric character '{hex_char}' was found in the message attribute name. Can only include alphanumeric characters, hyphens, underscores, or dots."
1037
                )
1038
            # even if we go negative index, it will be covered by starting/ending with dot
1039
            if char == "." and name[idx - 1] == ".":
1✔
1040
                raise InvalidParameterValueException(
1✔
1041
                    "Message attribute name can not have successive '.' character."
1042
                )
1043

1044

1045
def extract_tags(
1✔
1046
    topic_arn: str, tags: TagList, is_create_topic_request: bool, store: SnsStore
1047
) -> bool:
1048
    existing_tags = list(store.sns_tags.get(topic_arn, []))
1✔
1049
    # if this is none there is nothing to check
1050
    if topic_arn in store.topic_subscriptions:
1✔
1051
        if tags is None:
1✔
1052
            tags = []
1✔
1053
        for tag in tags:
1✔
1054
            # this means topic already created with empty tags and when we try to create it
1055
            # again with other tag value then it should fail according to aws documentation.
1056
            if is_create_topic_request and existing_tags is not None and tag not in existing_tags:
1✔
1057
                return False
1✔
1058
    return True
1✔
1059

1060

1061
def parse_and_validate_topic_arn(topic_arn: str | None) -> ArnData:
1✔
1062
    topic_arn = topic_arn or ""
1✔
1063
    try:
1✔
1064
        return parse_arn(topic_arn)
1✔
1065
    except InvalidArnException:
1✔
1066
        count = len(topic_arn.split(":"))
1✔
1067
        raise InvalidParameterException(
1✔
1068
            f"Invalid parameter: TopicArn Reason: An ARN must have at least 6 elements, not {count}"
1069
        )
1070

1071

1072
def create_subscription_arn(topic_arn: str) -> str:
1✔
1073
    # This is the format of a Subscription ARN
1074
    # arn:aws:sns:us-west-2:123456789012:my-topic:8a21d249-4329-4871-acc6-7be709c6ea7f
1075
    return f"{topic_arn}:{uuid4()}"
1✔
1076

1077

1078
def encode_subscription_token_with_region(region: str) -> str:
1✔
1079
    """
1080
    Create a 64 characters Subscription Token with the region encoded
1081
    :param region:
1082
    :return: a subscription token with the region encoded
1083
    """
1084
    return ((region.encode() + b"/").hex() + short_uid() * 8)[:64]
1✔
1085

1086

1087
def get_region_from_subscription_token(token: str) -> str:
1✔
1088
    """
1089
    Try to decode and return the region from a subscription token
1090
    :param token:
1091
    :return: the region if able to decode it
1092
    :raises: InvalidParameterException if the token is invalid
1093
    """
1094
    try:
1✔
1095
        region = token.split("2f", maxsplit=1)[0]
1✔
1096
        return bytes.fromhex(region).decode("utf-8")
1✔
1097
    except (IndexError, ValueError, TypeError, UnicodeDecodeError):
1✔
1098
        raise InvalidParameterException("Invalid parameter: Token")
1✔
1099

1100

1101
def get_next_page_token_from_arn(resource_arn: str) -> str:
1✔
1102
    return to_str(base64.b64encode(to_bytes(resource_arn)))
1✔
1103

1104

1105
def _get_byte_size(payload: str | bytes) -> int:
1✔
1106
    # Calculate the real length of the byte object if the object is a string
1107
    return len(to_bytes(payload))
1✔
1108

1109

1110
def get_total_publish_size(
1✔
1111
    message_body: str, message_attributes: MessageAttributeMap | None
1112
) -> int:
1113
    size = _get_byte_size(message_body)
1✔
1114
    if message_attributes:
1✔
1115
        # https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html
1116
        # All parts of the message attribute, including name, type, and value, are included in the message size
1117
        # restriction, which is 256 KB.
1118
        # iterate over the Keys and Attributes, adding the length of the Key to the length of all Attributes values
1119
        # (DataType and StringValue or BinaryValue)
1120
        size += sum(
1✔
1121
            _get_byte_size(key) + sum(_get_byte_size(attr_value) for attr_value in attr.values())
1122
            for key, attr in message_attributes.items()
1123
        )
1124

1125
    return size
1✔
1126

1127

1128
def register_sns_api_resource(router: Router):
1✔
1129
    """Register the retrospection endpoints as internal LocalStack endpoints."""
1130
    router.add(SNSServicePlatformEndpointMessagesApiResource())
1✔
1131
    router.add(SNSServiceSMSMessagesApiResource())
1✔
1132
    router.add(SNSServiceSubscriptionTokenApiResource())
1✔
1133

1134

1135
def _format_messages(sent_messages: list[dict[str, str]], validated_keys: list[str]):
1✔
1136
    """
1137
    This method format the messages to be more readable and undo the format change that was needed for Moto
1138
    Should be removed once we refactor SNS.
1139
    """
1140
    formatted_messages = []
1✔
1141
    for sent_message in sent_messages:
1✔
1142
        msg = {
1✔
1143
            key: json.dumps(value)
1144
            if key == "Message" and sent_message.get("MessageStructure") == "json"
1145
            else value
1146
            for key, value in sent_message.items()
1147
            if key in validated_keys
1148
        }
1149
        formatted_messages.append(msg)
1✔
1150

1151
    return formatted_messages
1✔
1152

1153

1154
class SNSInternalResource:
1✔
1155
    resource_type: str
1✔
1156
    """Base class with helper to properly track usage of internal endpoints"""
1✔
1157

1158
    def count_usage(self):
1✔
1159
        internal_api_calls.labels(resource_type=self.resource_type).increment()
1✔
1160

1161

1162
def count_usage(f):
1✔
1163
    @functools.wraps(f)
1✔
1164
    def _wrapper(self, *args, **kwargs):
1✔
1165
        self.count_usage()
1✔
1166
        return f(self, *args, **kwargs)
1✔
1167

1168
    return _wrapper
1✔
1169

1170

1171
class SNSServicePlatformEndpointMessagesApiResource(SNSInternalResource):
1✔
1172
    resource_type = "platform-endpoint-message"
1✔
1173
    """Provides a REST API for retrospective access to platform endpoint messages sent via SNS.
1✔
1174

1175
    This is registered as a LocalStack internal HTTP resource.
1176

1177
    This endpoint accepts:
1178
    - GET param `accountId`: selector for AWS account. If not specified, return fallback `000000000000` test ID
1179
    - GET param `region`: selector for AWS `region`. If not specified, return default "us-east-1"
1180
    - GET param `endpointArn`: filter for `endpointArn` resource in SNS
1181
    - DELETE param `accountId`: selector for AWS account
1182
    - DELETE param `region`: will delete saved messages for `region`
1183
    - DELETE param `endpointArn`: will delete saved messages for `endpointArn`
1184
    """
1185

1186
    _PAYLOAD_FIELDS = [
1✔
1187
        "TargetArn",
1188
        "TopicArn",
1189
        "Message",
1190
        "MessageAttributes",
1191
        "MessageStructure",
1192
        "Subject",
1193
        "MessageId",
1194
    ]
1195

1196
    @route(sns_constants.PLATFORM_ENDPOINT_MSGS_ENDPOINT, methods=["GET"])
1✔
1197
    @count_usage
1✔
1198
    def on_get(self, request: Request):
1✔
1199
        filter_endpoint_arn = request.args.get("endpointArn")
1✔
1200
        account_id = (
1✔
1201
            request.args.get("accountId", DEFAULT_AWS_ACCOUNT_ID)
1202
            if not filter_endpoint_arn
1203
            else extract_account_id_from_arn(filter_endpoint_arn)
1204
        )
1205
        region = (
1✔
1206
            request.args.get("region", AWS_REGION_US_EAST_1)
1207
            if not filter_endpoint_arn
1208
            else extract_region_from_arn(filter_endpoint_arn)
1209
        )
1210
        store: SnsStore = sns_stores[account_id][region]
1✔
1211
        if filter_endpoint_arn:
1✔
1212
            messages = store.platform_endpoint_messages.get(filter_endpoint_arn, [])
1✔
1213
            messages = _format_messages(messages, self._PAYLOAD_FIELDS)
1✔
1214
            return {
1✔
1215
                "platform_endpoint_messages": {filter_endpoint_arn: messages},
1216
                "region": region,
1217
            }
1218

1219
        platform_endpoint_messages = {
1✔
1220
            endpoint_arn: _format_messages(messages, self._PAYLOAD_FIELDS)
1221
            for endpoint_arn, messages in store.platform_endpoint_messages.items()
1222
        }
1223
        return {
1✔
1224
            "platform_endpoint_messages": platform_endpoint_messages,
1225
            "region": region,
1226
        }
1227

1228
    @route(sns_constants.PLATFORM_ENDPOINT_MSGS_ENDPOINT, methods=["DELETE"])
1✔
1229
    @count_usage
1✔
1230
    def on_delete(self, request: Request) -> Response:
1✔
1231
        filter_endpoint_arn = request.args.get("endpointArn")
1✔
1232
        account_id = (
1✔
1233
            request.args.get("accountId", DEFAULT_AWS_ACCOUNT_ID)
1234
            if not filter_endpoint_arn
1235
            else extract_account_id_from_arn(filter_endpoint_arn)
1236
        )
1237
        region = (
1✔
1238
            request.args.get("region", AWS_REGION_US_EAST_1)
1239
            if not filter_endpoint_arn
1240
            else extract_region_from_arn(filter_endpoint_arn)
1241
        )
1242
        store: SnsStore = sns_stores[account_id][region]
1✔
1243
        if filter_endpoint_arn:
1✔
1244
            store.platform_endpoint_messages.pop(filter_endpoint_arn, None)
1✔
1245
            return Response("", status=204)
1✔
1246

1247
        store.platform_endpoint_messages.clear()
1✔
1248
        return Response("", status=204)
1✔
1249

1250

1251
class SNSServiceSMSMessagesApiResource(SNSInternalResource):
1✔
1252
    resource_type = "sms-message"
1✔
1253
    """Provides a REST API for retrospective access to SMS messages sent via SNS.
1✔
1254

1255
    This is registered as a LocalStack internal HTTP resource.
1256

1257
    This endpoint accepts:
1258
    - GET param `accountId`: selector for AWS account. If not specified, return fallback `000000000000` test ID
1259
    - GET param `region`: selector for AWS `region`. If not specified, return default "us-east-1"
1260
    - GET param `phoneNumber`: filter for `phoneNumber` resource in SNS
1261
    """
1262

1263
    _PAYLOAD_FIELDS = [
1✔
1264
        "PhoneNumber",
1265
        "TopicArn",
1266
        "SubscriptionArn",
1267
        "MessageId",
1268
        "Message",
1269
        "MessageAttributes",
1270
        "MessageStructure",
1271
        "Subject",
1272
    ]
1273

1274
    @route(sns_constants.SMS_MSGS_ENDPOINT, methods=["GET"])
1✔
1275
    @count_usage
1✔
1276
    def on_get(self, request: Request):
1✔
1277
        account_id = request.args.get("accountId", DEFAULT_AWS_ACCOUNT_ID)
1✔
1278
        region = request.args.get("region", AWS_REGION_US_EAST_1)
1✔
1279
        filter_phone_number = request.args.get("phoneNumber")
1✔
1280
        store: SnsStore = sns_stores[account_id][region]
1✔
1281
        if filter_phone_number:
1✔
1282
            messages = [
1✔
1283
                m for m in store.sms_messages if m.get("PhoneNumber") == filter_phone_number
1284
            ]
1285
            messages = _format_messages(messages, self._PAYLOAD_FIELDS)
1✔
1286
            return {
1✔
1287
                "sms_messages": {filter_phone_number: messages},
1288
                "region": region,
1289
            }
1290

1291
        sms_messages = {}
1✔
1292

1293
        for m in _format_messages(store.sms_messages, self._PAYLOAD_FIELDS):
1✔
1294
            sms_messages.setdefault(m.get("PhoneNumber"), []).append(m)
1✔
1295

1296
        return {
1✔
1297
            "sms_messages": sms_messages,
1298
            "region": region,
1299
        }
1300

1301
    @route(sns_constants.SMS_MSGS_ENDPOINT, methods=["DELETE"])
1✔
1302
    @count_usage
1✔
1303
    def on_delete(self, request: Request) -> Response:
1✔
1304
        account_id = request.args.get("accountId", DEFAULT_AWS_ACCOUNT_ID)
1✔
1305
        region = request.args.get("region", AWS_REGION_US_EAST_1)
1✔
1306
        filter_phone_number = request.args.get("phoneNumber")
1✔
1307
        store: SnsStore = sns_stores[account_id][region]
1✔
1308
        if filter_phone_number:
1✔
1309
            store.sms_messages = [
1✔
1310
                m for m in store.sms_messages if m.get("PhoneNumber") != filter_phone_number
1311
            ]
1312
            return Response("", status=204)
1✔
1313

1314
        store.sms_messages.clear()
1✔
1315
        return Response("", status=204)
1✔
1316

1317

1318
class SNSServiceSubscriptionTokenApiResource(SNSInternalResource):
1✔
1319
    resource_type = "subscription-token"
1✔
1320
    """Provides a REST API for retrospective access to Subscription Confirmation Tokens to confirm subscriptions.
1✔
1321
    Those are not sent for email, and sometimes inaccessible when working with external HTTPS endpoint which won't be
1322
    able to reach your local host.
1323

1324
    This is registered as a LocalStack internal HTTP resource.
1325

1326
    This endpoint has the following parameter:
1327
    - GET `subscription_arn`: `subscriptionArn`resource in SNS for which you want the SubscriptionToken
1328
    """
1329

1330
    @route(f"{sns_constants.SUBSCRIPTION_TOKENS_ENDPOINT}/<path:subscription_arn>", methods=["GET"])
1✔
1331
    @count_usage
1✔
1332
    def on_get(self, _request: Request, subscription_arn: str):
1✔
1333
        try:
1✔
1334
            parsed_arn = parse_arn(subscription_arn)
1✔
1335
        except InvalidArnException:
1✔
1336
            response = Response("", 400)
1✔
1337
            response.set_json(
1✔
1338
                {
1339
                    "error": "The provided SubscriptionARN is invalid",
1340
                    "subscription_arn": subscription_arn,
1341
                }
1342
            )
1343
            return response
1✔
1344

1345
        store: SnsStore = sns_stores[parsed_arn["account"]][parsed_arn["region"]]
1✔
1346

1347
        for token, sub_arn in store.subscription_tokens.items():
1✔
1348
            if sub_arn == subscription_arn:
1✔
1349
                return {
1✔
1350
                    "subscription_token": token,
1351
                    "subscription_arn": subscription_arn,
1352
                }
1353

1354
        response = Response("", 404)
1✔
1355
        response.set_json(
1✔
1356
            {
1357
                "error": "The provided SubscriptionARN is not found",
1358
                "subscription_arn": subscription_arn,
1359
            }
1360
        )
1361
        return response
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc