• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 22334798432

23 Feb 2026 06:42PM UTC coverage: 86.956% (-0.02%) from 86.973%
22334798432

push

github

web-flow
S3: regenerate test snapshots & parity fixes (#13824)

69831 of 80306 relevant lines covered (86.96%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.55
/localstack-core/localstack/services/sns/provider.py
1
import contextlib
1✔
2
import copy
1✔
3
import functools
1✔
4
import json
1✔
5
import logging
1✔
6
import re
1✔
7

8
from botocore.utils import InvalidArnException
1✔
9
from rolo import Request, Router, route
1✔
10

11
from localstack.aws.api import CommonServiceException, RequestContext
1✔
12
from localstack.aws.api.sns import (
1✔
13
    ActionsList,
14
    AmazonResourceName,
15
    BatchEntryIdsNotDistinctException,
16
    CheckIfPhoneNumberIsOptedOutResponse,
17
    ConfirmSubscriptionResponse,
18
    CreateEndpointResponse,
19
    CreatePlatformApplicationResponse,
20
    CreateTopicResponse,
21
    DelegatesList,
22
    Endpoint,
23
    EndpointDisabledException,
24
    GetDataProtectionPolicyResponse,
25
    GetEndpointAttributesResponse,
26
    GetPlatformApplicationAttributesResponse,
27
    GetSMSAttributesResponse,
28
    GetSubscriptionAttributesResponse,
29
    GetTopicAttributesResponse,
30
    InvalidBatchEntryIdException,
31
    InvalidParameterException,
32
    InvalidParameterValueException,
33
    ListEndpointsByPlatformApplicationResponse,
34
    ListPhoneNumbersOptedOutResponse,
35
    ListPlatformApplicationsResponse,
36
    ListString,
37
    ListSubscriptionsByTopicResponse,
38
    ListSubscriptionsResponse,
39
    ListTagsForResourceResponse,
40
    ListTopicsResponse,
41
    MapStringToString,
42
    MessageAttributeMap,
43
    NotFoundException,
44
    OptInPhoneNumberResponse,
45
    PhoneNumber,
46
    PlatformApplication,
47
    PublishBatchRequestEntryList,
48
    PublishBatchResponse,
49
    PublishBatchResultEntry,
50
    PublishResponse,
51
    SetSMSAttributesResponse,
52
    SnsApi,
53
    String,
54
    SubscribeResponse,
55
    Subscription,
56
    SubscriptionAttributesMap,
57
    Tag,
58
    TagKeyList,
59
    TagList,
60
    TagResourceResponse,
61
    TooManyEntriesInBatchRequestException,
62
    TopicAttributesMap,
63
    UntagResourceResponse,
64
    attributeName,
65
    attributeValue,
66
    authenticateOnUnsubscribe,
67
    endpoint,
68
    label,
69
    message,
70
    messageStructure,
71
    nextToken,
72
    protocol,
73
    string,
74
    subject,
75
    subscriptionARN,
76
    topicARN,
77
    topicName,
78
)
79
from localstack.constants import AWS_REGION_US_EAST_1, DEFAULT_AWS_ACCOUNT_ID
1✔
80
from localstack.http import Response
1✔
81
from localstack.services.edge import ROUTER
1✔
82
from localstack.services.plugins import ServiceLifecycleHook
1✔
83
from localstack.services.sns.analytics import internal_api_calls
1✔
84
from localstack.services.sns.certificate import SNS_SERVER_CERT
1✔
85
from localstack.services.sns.constants import (
1✔
86
    ATTR_TYPE_REGEX,
87
    BATCH_ENTRY_ID_REGEX,
88
    DUMMY_SUBSCRIPTION_PRINCIPAL,
89
    E164_REGEX,
90
    MAXIMUM_MESSAGE_LENGTH,
91
    MSG_ATTR_NAME_REGEX,
92
    PLATFORM_ENDPOINT_MSGS_ENDPOINT,
93
    SMS_MSGS_ENDPOINT,
94
    SMS_PHONE_NUMBER_OPT_OUT_ENDPOINT,
95
    SNS_CERT_ENDPOINT,
96
    SNS_PROTOCOLS,
97
    SUBSCRIPTION_TOKENS_ENDPOINT,
98
    VALID_APPLICATION_PLATFORMS,
99
    VALID_MSG_ATTR_NAME_CHARS,
100
    VALID_POLICY_ACTIONS,
101
    VALID_SUBSCRIPTION_ATTR_NAME,
102
)
103
from localstack.services.sns.filter import FilterPolicyValidator
1✔
104
from localstack.services.sns.models import (
1✔
105
    SMS_ATTRIBUTE_NAMES,
106
    SMS_DEFAULT_SENDER_REGEX,
107
    SMS_TYPES,
108
    EndpointAttributeNames,
109
    PlatformApplicationDetails,
110
    PlatformEndpoint,
111
    SnsMessage,
112
    SnsMessageType,
113
    SnsStore,
114
    SnsSubscription,
115
    Topic,
116
    sns_stores,
117
)
118
from localstack.services.sns.publisher import (
1✔
119
    PublishDispatcher,
120
    SnsBatchPublishContext,
121
    SnsPublishContext,
122
)
123
from localstack.services.sns.utils import (
1✔
124
    create_default_topic_policy,
125
    create_platform_endpoint_arn,
126
    create_subscription_arn,
127
    encode_subscription_token_with_region,
128
    get_next_page_token_from_arn,
129
    get_region_from_subscription_token,
130
    get_topic_subscriptions,
131
    is_valid_e164_number,
132
    parse_and_validate_platform_application_arn,
133
    parse_and_validate_topic_arn,
134
    validate_subscription_attribute,
135
)
136
from localstack.state import StateVisitor
1✔
137
from localstack.utils.aws.arns import (
1✔
138
    extract_account_id_from_arn,
139
    extract_region_from_arn,
140
    get_partition,
141
    parse_arn,
142
    sns_platform_application_arn,
143
    sns_topic_arn,
144
)
145
from localstack.utils.collections import PaginatedList, select_from_typed_dict
1✔
146
from localstack.utils.strings import to_bytes
1✔
147

148
# set up logger
149
LOG = logging.getLogger(__name__)
1✔
150

151
SNS_TOPIC_NAME_PATTERN_FIFO = r"^[a-zA-Z0-9_-]{1,256}\.fifo$"
1✔
152
SNS_TOPIC_NAME_PATTERN = r"^[a-zA-Z0-9_-]{1,256}$"
1✔
153

154

155
class SnsProvider(SnsApi, ServiceLifecycleHook):
1✔
156
    def __init__(self) -> None:
1✔
157
        super().__init__()
1✔
158
        self._publisher = PublishDispatcher()
1✔
159
        self._signature_cert_pem: str = SNS_SERVER_CERT
1✔
160

161
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
162
        visitor.visit(sns_stores)
×
163

164
    def on_before_stop(self):
1✔
165
        self._publisher.shutdown()
1✔
166

167
    def on_after_init(self):
1✔
168
        # Allow sent platform endpoint messages to be retrieved from the SNS endpoint
169
        register_sns_api_resource(ROUTER)
1✔
170
        # add the route to serve the certificate used to validate message signatures
171
        ROUTER.add(self.get_signature_cert_pem_file)
1✔
172

173
    @route(SNS_CERT_ENDPOINT, methods=["GET"])
1✔
174
    def get_signature_cert_pem_file(self, request: Request):
1✔
175
        # see http://sns-public-resources.s3.amazonaws.com/SNS_Message_Signing_Release_Note_Jan_25_2011.pdf
176
        # see https://docs.aws.amazon.com/sns/latest/dg/sns-verify-signature-of-message.html
177
        return Response(self._signature_cert_pem, 200)
1✔
178

179
    # Tag Utils
180

181
    def _check_matching_tags(
1✔
182
        self, context: RequestContext, topic_arn: str, tags: TagList | None
183
    ) -> bool:
184
        """
185
        Checks if a topic to be created doesn't already exist with different tags
186
        :param context: The context of the original request
187
        :param topic_arn: Arn of the topic
188
        :param tags: Tags to be checked
189
        :return: False if there is a mismatch in tags, True otherwise
190
        """
191
        store = self.get_store(context.account_id, context.region)
1✔
192
        existing_tags = self._list_resource_tags(context, resource_arn=topic_arn)
1✔
193
        # if this is none there is nothing to check
194
        if topic_arn in store.topics:
1✔
195
            if tags is None:
1✔
196
                tags = []
1✔
197
            for tag in tags:
1✔
198
                # this means topic already created with empty tags and when we try to create it
199
                # again with other tag value then it should fail according to aws documentation.
200
                if existing_tags is not None and tag not in existing_tags:
1✔
201
                    return False
1✔
202
        return True
1✔
203

204
    def _list_resource_tags(self, context: RequestContext, resource_arn: str) -> TagList:
1✔
205
        store = self.get_store(context.account_id, context.region)
1✔
206
        tags = store.tags.get_tags(resource_arn)
1✔
207
        return [Tag(Key=key, Value=value) for key, value in tags.items()]
1✔
208

209
    def _tag_resource(self, context: RequestContext, resource_arn: str, tags: TagList) -> None:
1✔
210
        store = self.get_store(context.account_id, context.region)
1✔
211
        store.tags.update_tags(resource_arn, {tag["Key"]: tag["Value"] for tag in tags})
1✔
212

213
    def _untag_resource(
1✔
214
        self, context: RequestContext, resource_arn: str, tag_keys: TagKeyList
215
    ) -> None:
216
        store = self.get_store(context.account_id, context.region)
1✔
217
        store.tags.delete_tags(resource_arn, tag_keys)
1✔
218

219
    def _remove_resource_tags(self, context: RequestContext, resource_arn: str) -> None:
1✔
220
        store = self.get_store(context.account_id, context.region)
1✔
221
        store.tags.delete_all_tags(resource_arn)
1✔
222

223
    ## Topic Operations
224

225
    def create_topic(
1✔
226
        self,
227
        context: RequestContext,
228
        name: topicName,
229
        attributes: TopicAttributesMap | None = None,
230
        tags: TagList | None = None,
231
        data_protection_policy: attributeValue | None = None,
232
        **kwargs,
233
    ) -> CreateTopicResponse:
234
        store = self.get_store(context.account_id, context.region)
1✔
235
        topic_arn = sns_topic_arn(
1✔
236
            topic_name=name, region_name=context.region, account_id=context.account_id
237
        )
238
        attributes = dict(attributes) if attributes else {}
1✔
239
        if attributes.get("FifoTopic") and attributes["FifoTopic"].lower() == "true":
1✔
240
            pattern = SNS_TOPIC_NAME_PATTERN_FIFO
1✔
241
        else:
242
            # AWS does not seem to save explicit settings of fifo = false
243
            attributes.pop("FifoTopic", None)
1✔
244
            pattern = SNS_TOPIC_NAME_PATTERN
1✔
245

246
        if not re.match(pattern, name):
1✔
247
            raise InvalidParameterException("Invalid parameter: Topic Name")
1✔
248

249
        if existing_topic := store.topics.get(topic_arn):
1✔
250
            existing_attrs = existing_topic["attributes"]
1✔
251
            # TODO: validate attribute names
252
            for k, v in attributes.items():
1✔
253
                # special case for FifoTopic
254
                if k == "FifoTopic" and v == "false" and "FifoTopic" not in existing_attrs:
1✔
255
                    continue
×
256

257
                if not existing_attrs.get(k) or not existing_attrs.get(k) == v:
1✔
258
                    raise InvalidParameterException(
1✔
259
                        "Invalid parameter: Attributes Reason: Topic already exists with different attributes"
260
                    )
261
            tag_resource_success = self._check_matching_tags(context, topic_arn, tags)
1✔
262
            if not tag_resource_success:
1✔
263
                raise InvalidParameterException(
1✔
264
                    "Invalid parameter: Tags Reason: Topic already exists with different tags"
265
                )
266
            return CreateTopicResponse(TopicArn=topic_arn)
1✔
267

268
        attributes["EffectiveDeliveryPolicy"] = _create_default_effective_delivery_policy()
1✔
269

270
        topic = _create_topic(
1✔
271
            name=name,
272
            attributes=attributes,
273
            data_protection_policy=data_protection_policy,
274
            context=context,
275
        )
276
        if tags:
1✔
277
            self._tag_resource(context, resource_arn=topic_arn, tags=tags)
1✔
278

279
        store.topics[topic_arn] = topic
1✔
280

281
        return CreateTopicResponse(TopicArn=topic_arn)
1✔
282

283
    def get_topic_attributes(
1✔
284
        self, context: RequestContext, topic_arn: topicARN, **kwargs
285
    ) -> GetTopicAttributesResponse:
286
        topic: Topic = self._get_topic(arn=topic_arn, context=context)
1✔
287
        if topic:
1✔
288
            attributes = topic["attributes"]
1✔
289
            return GetTopicAttributesResponse(Attributes=attributes)
1✔
290
        else:
291
            raise NotFoundException("Topic does not exist")
×
292

293
    def delete_topic(self, context: RequestContext, topic_arn: topicARN, **kwargs) -> None:
1✔
294
        # This also deletes all subscriptions for the topic. In AWS, this is not immediately the case;
295
        # the subs still exist for a certain period of time (~48h), detached, after which they are garbage collected
296
        arn_data = parse_and_validate_topic_arn(topic_arn)
1✔
297
        if context.region != arn_data["region"]:
1✔
298
            raise InvalidParameterException("Invalid parameter: TopicArn")
1✔
299

300
        store = self.get_store(context.account_id, context.region)
1✔
301
        self._remove_resource_tags(context, topic_arn)
1✔
302
        store.topics.pop(topic_arn, None)
1✔
303

304
    def list_topics(
1✔
305
        self, context: RequestContext, next_token: nextToken | None = None, **kwargs
306
    ) -> ListTopicsResponse:
307
        store = self.get_store(context.account_id, context.region)
1✔
308
        topics = [{"TopicArn": t["arn"]} for t in list(store.topics.values())]
1✔
309
        topics = PaginatedList(topics)
1✔
310
        page, nxt = topics.get_page(
1✔
311
            token_generator=lambda x: get_next_page_token_from_arn(x["TopicArn"]),
312
            next_token=next_token,
313
            page_size=100,
314
        )
315
        topics = {"Topics": page, "NextToken": nxt}
1✔
316
        return ListTopicsResponse(**topics)
1✔
317

318
    def set_topic_attributes(
1✔
319
        self,
320
        context: RequestContext,
321
        topic_arn: topicARN,
322
        attribute_name: attributeName,
323
        attribute_value: attributeValue | None = None,
324
        **kwargs,
325
    ) -> None:
326
        topic: Topic = self._get_topic(arn=topic_arn, context=context)
1✔
327
        if attribute_name == "FifoTopic":
1✔
328
            raise InvalidParameterException("Invalid parameter: AttributeName")
1✔
329
        topic["attributes"][attribute_name] = attribute_value
1✔
330

331
    ## Subscribe operations
332

333
    def subscribe(
1✔
334
        self,
335
        context: RequestContext,
336
        topic_arn: topicARN,
337
        protocol: protocol,
338
        endpoint: endpoint | None = None,
339
        attributes: SubscriptionAttributesMap | None = None,
340
        return_subscription_arn: bool | None = None,
341
        **kwargs,
342
    ) -> SubscribeResponse:
343
        parsed_topic_arn = parse_and_validate_topic_arn(topic_arn)
1✔
344
        if context.region != parsed_topic_arn["region"]:
1✔
345
            raise InvalidParameterException("Invalid parameter: TopicArn")
1✔
346

347
        store = self.get_store(account_id=parsed_topic_arn["account"], region=context.region)
1✔
348

349
        topic = self._get_topic(arn=topic_arn, context=context)
1✔
350
        topic_subscriptions = topic["subscriptions"]
1✔
351
        if not endpoint:
1✔
352
            # TODO: check AWS behaviour (because endpoint is optional)
353
            raise NotFoundException("Endpoint not specified in subscription")
×
354
        if protocol not in SNS_PROTOCOLS:
1✔
355
            raise InvalidParameterException(
1✔
356
                f"Invalid parameter: Amazon SNS does not support this protocol string: {protocol}"
357
            )
358
        elif protocol in ["http", "https"] and not endpoint.startswith(f"{protocol}://"):
1✔
359
            raise InvalidParameterException(
×
360
                "Invalid parameter: Endpoint must match the specified protocol"
361
            )
362
        elif protocol == "sms" and not is_valid_e164_number(endpoint):
1✔
363
            raise InvalidParameterException(f"Invalid SMS endpoint: {endpoint}")
1✔
364

365
        elif protocol == "sqs":
1✔
366
            try:
1✔
367
                parse_arn(endpoint)
1✔
368
            except InvalidArnException:
1✔
369
                raise InvalidParameterException("Invalid parameter: SQS endpoint ARN")
1✔
370

371
        elif protocol == "application":
1✔
372
            # TODO: Validate exact behaviour
373
            try:
1✔
374
                parse_arn(endpoint)
1✔
375
            except InvalidArnException:
×
376
                raise InvalidParameterException("Invalid parameter: ApplicationEndpoint ARN")
×
377

378
        if ".fifo" in endpoint and ".fifo" not in topic_arn:
1✔
379
            # TODO: move to sqs protocol block if possible
380
            raise InvalidParameterException(
1✔
381
                "Invalid parameter: Invalid parameter: Endpoint Reason: FIFO SQS Queues can not be subscribed to standard SNS topics"
382
            )
383

384
        sub_attributes = copy.deepcopy(attributes) if attributes else None
1✔
385
        if sub_attributes:
1✔
386
            for attr_name, attr_value in sub_attributes.items():
1✔
387
                validate_subscription_attribute(
1✔
388
                    attribute_name=attr_name,
389
                    attribute_value=attr_value,
390
                    topic_arn=topic_arn,
391
                    endpoint=endpoint,
392
                    is_subscribe_call=True,
393
                )
394
                if raw_msg_delivery := sub_attributes.get("RawMessageDelivery"):
1✔
395
                    sub_attributes["RawMessageDelivery"] = raw_msg_delivery.lower()
1✔
396

397
        # An endpoint may only be subscribed to a topic once. Subsequent
398
        # subscribe calls do nothing (subscribe is idempotent), except if its attributes are different.
399
        for existing_topic_subscription in topic_subscriptions:
1✔
400
            sub = store.subscriptions.get(existing_topic_subscription, {})
1✔
401
            if sub.get("Endpoint") == endpoint:
1✔
402
                if sub_attributes:
1✔
403
                    # validate the subscription attributes aren't different
404
                    for attr in VALID_SUBSCRIPTION_ATTR_NAME:
1✔
405
                        # if a new attribute is present and different from an existent one, raise
406
                        if (new_attr := sub_attributes.get(attr)) and sub.get(attr) != new_attr:
1✔
407
                            raise InvalidParameterException(
1✔
408
                                "Invalid parameter: Attributes Reason: Subscription already exists with different attributes"
409
                            )
410

411
                return SubscribeResponse(SubscriptionArn=sub["SubscriptionArn"])
1✔
412
        principal = DUMMY_SUBSCRIPTION_PRINCIPAL.format(
1✔
413
            partition=get_partition(context.region), account_id=context.account_id
414
        )
415
        subscription_arn = create_subscription_arn(topic_arn)
1✔
416
        subscription = SnsSubscription(
1✔
417
            # http://docs.aws.amazon.com/cli/latest/reference/sns/get-subscription-attributes.html
418
            TopicArn=topic_arn,
419
            Endpoint=endpoint,
420
            Protocol=protocol,
421
            SubscriptionArn=subscription_arn,
422
            PendingConfirmation="true",
423
            Owner=context.account_id,
424
            RawMessageDelivery="false",  # default value, will be overridden if set
425
            FilterPolicyScope="MessageAttributes",  # default value, will be overridden if set
426
            SubscriptionPrincipal=principal,  # dummy value, could be fetched with a call to STS?
427
        )
428
        if sub_attributes:
1✔
429
            subscription.update(sub_attributes)
1✔
430
            if "FilterPolicy" in sub_attributes:
1✔
431
                filter_policy = (
1✔
432
                    json.loads(sub_attributes["FilterPolicy"])
433
                    if sub_attributes["FilterPolicy"]
434
                    else None
435
                )
436
                if filter_policy:
1✔
437
                    validator = FilterPolicyValidator(
1✔
438
                        scope=subscription.get("FilterPolicyScope", "MessageAttributes"),
439
                        is_subscribe_call=True,
440
                    )
441
                    validator.validate_filter_policy(filter_policy)
1✔
442

443
                store.subscription_filter_policy[subscription_arn] = filter_policy
1✔
444

445
        store.subscriptions[subscription_arn] = subscription
1✔
446

447
        topic_subscriptions.append(subscription_arn)
1✔
448

449
        # store the token and subscription arn
450
        # TODO: the token is a 288 hex char string
451
        subscription_token = encode_subscription_token_with_region(region=context.region)
1✔
452
        store.subscription_tokens[subscription_token] = subscription_arn
1✔
453

454
        response_subscription_arn = subscription_arn
1✔
455
        # Send out confirmation message for HTTP(S), fix for https://github.com/localstack/localstack/issues/881
456
        if protocol in ["http", "https"]:
1✔
457
            message_ctx = SnsMessage(
1✔
458
                type=SnsMessageType.SubscriptionConfirmation,
459
                token=subscription_token,
460
                message=f"You have chosen to subscribe to the topic {topic_arn}.\nTo confirm the subscription, visit the SubscribeURL included in this message.",
461
            )
462
            publish_ctx = SnsPublishContext(
1✔
463
                message=message_ctx,
464
                store=store,
465
                request_headers=context.request.headers,
466
                topic_attributes=topic["attributes"],
467
            )
468
            self._publisher.publish_to_topic_subscriber(
1✔
469
                ctx=publish_ctx,
470
                topic_arn=topic_arn,
471
                subscription_arn=subscription_arn,
472
            )
473
            if not return_subscription_arn:
1✔
474
                response_subscription_arn = "pending confirmation"
1✔
475

476
        elif protocol not in ["email", "email-json"]:
1✔
477
            # Only HTTP(S) and email subscriptions are not auto validated
478
            # Except if the endpoint and the topic are not in the same AWS account, then you'd need to manually confirm
479
            # the subscription with the token
480
            # TODO: revisit for multi-account
481
            # TODO: test with AWS for email & email-json confirmation message
482
            # we need to add the following check:
483
            # if parsed_topic_arn["account"] == endpoint account (depending on the type, SQS, lambda, parse the arn)
484
            subscription["PendingConfirmation"] = "false"
1✔
485
            subscription["ConfirmationWasAuthenticated"] = "true"
1✔
486

487
        return SubscribeResponse(SubscriptionArn=response_subscription_arn)
1✔
488

489
    def unsubscribe(
1✔
490
        self, context: RequestContext, subscription_arn: subscriptionARN, **kwargs
491
    ) -> None:
492
        if subscription_arn is None:
1✔
493
            raise InvalidParameterException(
1✔
494
                "Invalid parameter: SubscriptionArn Reason: no value for required parameter",
495
            )
496
        count = len(subscription_arn.split(":"))
1✔
497
        try:
1✔
498
            parsed_arn = parse_arn(subscription_arn)
1✔
499
        except InvalidArnException:
1✔
500
            # TODO: check for invalid SubscriptionGUID
501
            raise InvalidParameterException(
1✔
502
                f"Invalid parameter: SubscriptionArn Reason: An ARN must have at least 6 elements, not {count}"
503
            )
504

505
        account_id = parsed_arn["account"]
1✔
506
        region_name = parsed_arn["region"]
1✔
507

508
        store = self.get_store(account_id=account_id, region=region_name)
1✔
509
        if count == 6 and subscription_arn not in store.subscriptions:
1✔
510
            raise InvalidParameterException("Invalid parameter: SubscriptionId")
1✔
511

512
        # TODO: here was a moto_backend.unsubscribe call, check correct functionality and remove this comment
513
        #  before switching to v2 for production
514

515
        # pop the subscription at the end, to avoid race condition by iterating over the topic subscriptions
516
        subscription = store.subscriptions.get(subscription_arn)
1✔
517

518
        if not subscription:
1✔
519
            # unsubscribe is idempotent, so unsubscribing from a non-existing topic does nothing
520
            return
1✔
521

522
        if subscription["Protocol"] in ["http", "https"]:
1✔
523
            # TODO: actually validate this (re)subscribe behaviour somehow (localhost.run?)
524
            #  we might need to save the sub token in the store
525
            # TODO: AWS only sends the UnsubscribeConfirmation if the call is unauthenticated or the requester is not
526
            #  the owner
527
            subscription_token = encode_subscription_token_with_region(region=context.region)
1✔
528
            message_ctx = SnsMessage(
1✔
529
                type=SnsMessageType.UnsubscribeConfirmation,
530
                token=subscription_token,
531
                message=f"You have chosen to deactivate subscription {subscription_arn}.\nTo cancel this operation and restore the subscription, visit the SubscribeURL included in this message.",
532
            )
533
            publish_ctx = SnsPublishContext(
1✔
534
                message=message_ctx,
535
                store=store,
536
                request_headers=context.request.headers,
537
                # TODO: add the topic attributes once we ported them from moto to LocalStack
538
                # topic_attributes=vars(moto_topic),
539
            )
540
            self._publisher.publish_to_topic_subscriber(
1✔
541
                publish_ctx,
542
                topic_arn=subscription["TopicArn"],
543
                subscription_arn=subscription_arn,
544
            )
545

546
        with contextlib.suppress(KeyError):
1✔
547
            store.topics[subscription["TopicArn"]]["subscriptions"].remove(subscription_arn)
1✔
548
        store.subscription_filter_policy.pop(subscription_arn, None)
1✔
549
        store.subscriptions.pop(subscription_arn, None)
1✔
550

551
    def get_subscription_attributes(
1✔
552
        self, context: RequestContext, subscription_arn: subscriptionARN, **kwargs
553
    ) -> GetSubscriptionAttributesResponse:
554
        store = self.get_store(account_id=context.account_id, region=context.region)
1✔
555
        sub = store.subscriptions.get(subscription_arn)
1✔
556
        if not sub:
1✔
557
            raise NotFoundException("Subscription does not exist")
1✔
558
        removed_attrs = ["sqs_queue_url"]
1✔
559
        if "FilterPolicyScope" in sub and not sub.get("FilterPolicy"):
1✔
560
            removed_attrs.append("FilterPolicyScope")
1✔
561
            removed_attrs.append("FilterPolicy")
1✔
562
        elif "FilterPolicy" in sub and "FilterPolicyScope" not in sub:
1✔
563
            sub["FilterPolicyScope"] = "MessageAttributes"
×
564

565
        attributes = {k: v for k, v in sub.items() if k not in removed_attrs}
1✔
566
        return GetSubscriptionAttributesResponse(Attributes=attributes)
1✔
567

568
    def set_subscription_attributes(
1✔
569
        self,
570
        context: RequestContext,
571
        subscription_arn: subscriptionARN,
572
        attribute_name: attributeName,
573
        attribute_value: attributeValue = None,
574
        **kwargs,
575
    ) -> None:
576
        store = self.get_store(account_id=context.account_id, region=context.region)
1✔
577
        sub = store.subscriptions.get(subscription_arn)
1✔
578
        if not sub:
1✔
579
            raise NotFoundException("Subscription does not exist")
1✔
580

581
        validate_subscription_attribute(
1✔
582
            attribute_name=attribute_name,
583
            attribute_value=attribute_value,
584
            topic_arn=sub["TopicArn"],
585
            endpoint=sub["Endpoint"],
586
        )
587
        if attribute_name == "RawMessageDelivery":
1✔
588
            attribute_value = attribute_value.lower()
1✔
589

590
        elif attribute_name == "FilterPolicy":
1✔
591
            filter_policy = json.loads(attribute_value) if attribute_value else None
1✔
592
            if filter_policy:
1✔
593
                validator = FilterPolicyValidator(
1✔
594
                    scope=sub.get("FilterPolicyScope", "MessageAttributes"),
595
                    is_subscribe_call=False,
596
                )
597
                validator.validate_filter_policy(filter_policy)
1✔
598

599
            store.subscription_filter_policy[subscription_arn] = filter_policy
1✔
600

601
        sub[attribute_name] = attribute_value
1✔
602

603
    def confirm_subscription(
1✔
604
        self,
605
        context: RequestContext,
606
        topic_arn: topicARN,
607
        token: String,
608
        authenticate_on_unsubscribe: authenticateOnUnsubscribe = None,
609
        **kwargs,
610
    ) -> ConfirmSubscriptionResponse:
611
        # TODO: validate format on the token (seems to be 288 hex chars)
612
        # this request can come from any http client, it might not be signed (we would need to implement
613
        # `authenticate_on_unsubscribe` to force a signing client to do this request.
614
        # so, the region and account_id might not be in the request. Use the ones from the topic_arn
615
        try:
1✔
616
            parsed_arn = parse_arn(topic_arn)
1✔
617
        except InvalidArnException:
1✔
618
            raise InvalidParameterException("Invalid parameter: Topic")
1✔
619

620
        store = self.get_store(account_id=parsed_arn["account"], region=parsed_arn["region"])
1✔
621

622
        # it seems SNS is able to know what the region of the topic should be, even though a wrong topic is accepted
623
        if parsed_arn["region"] != get_region_from_subscription_token(token):
1✔
624
            raise InvalidParameterException("Invalid parameter: Topic")
1✔
625

626
        subscription_arn = store.subscription_tokens.get(token)
1✔
627
        if not subscription_arn:
1✔
628
            raise InvalidParameterException("Invalid parameter: Token")
×
629

630
        subscription = store.subscriptions.get(subscription_arn)
1✔
631
        if not subscription:
1✔
632
            # subscription could have been deleted in the meantime
633
            raise InvalidParameterException("Invalid parameter: Token")
×
634

635
        # ConfirmSubscription is idempotent
636
        if subscription.get("PendingConfirmation") == "false":
1✔
637
            return ConfirmSubscriptionResponse(SubscriptionArn=subscription_arn)
1✔
638

639
        subscription["PendingConfirmation"] = "false"
1✔
640
        subscription["ConfirmationWasAuthenticated"] = "true"
1✔
641

642
        return ConfirmSubscriptionResponse(SubscriptionArn=subscription_arn)
1✔
643

644
    def list_subscriptions(
1✔
645
        self, context: RequestContext, next_token: nextToken = None, **kwargs
646
    ) -> ListSubscriptionsResponse:
647
        store = self.get_store(context.account_id, context.region)
1✔
648
        subscriptions = []
1✔
649
        for s in list(store.subscriptions.values()):
1✔
650
            sub = select_from_typed_dict(Subscription, s)
1✔
651
            if s["PendingConfirmation"] == "true":
1✔
652
                sub["SubscriptionArn"] = "PendingConfirmation"
1✔
653
            subscriptions.append(sub)
1✔
654

655
        paginated_subscriptions = PaginatedList(subscriptions)
1✔
656
        page, next_token = paginated_subscriptions.get_page(
1✔
657
            token_generator=lambda x: get_next_page_token_from_arn(x["SubscriptionArn"]),
658
            page_size=100,
659
            next_token=next_token,
660
        )
661

662
        response = ListSubscriptionsResponse(Subscriptions=page)
1✔
663
        if next_token:
1✔
664
            response["NextToken"] = next_token
1✔
665
        return response
1✔
666

667
    def list_subscriptions_by_topic(
1✔
668
        self, context: RequestContext, topic_arn: topicARN, next_token: nextToken = None, **kwargs
669
    ) -> ListSubscriptionsByTopicResponse:
670
        self._get_topic(topic_arn, context)  # for validation purposes only
1✔
671
        parsed_topic_arn = parse_and_validate_topic_arn(topic_arn)
1✔
672
        store = self.get_store(parsed_topic_arn["account"], parsed_topic_arn["region"])
1✔
673
        subscriptions = get_topic_subscriptions(store, topic_arn)
1✔
674

675
        paginated_subscriptions = PaginatedList(subscriptions)
1✔
676
        page, next_token = paginated_subscriptions.get_page(
1✔
677
            token_generator=lambda x: get_next_page_token_from_arn(x["SubscriptionArn"]),
678
            page_size=100,
679
            next_token=next_token,
680
        )
681

682
        response = ListSubscriptionsResponse(Subscriptions=page)
1✔
683
        if next_token:
1✔
684
            response["NextToken"] = next_token
1✔
685
        return response
1✔
686

687
    #
688
    # Publish
689
    #
690

691
    def publish(
1✔
692
        self,
693
        context: RequestContext,
694
        message: message,
695
        topic_arn: topicARN | None = None,
696
        target_arn: String | None = None,
697
        phone_number: PhoneNumber | None = None,
698
        subject: subject | None = None,
699
        message_structure: messageStructure | None = None,
700
        message_attributes: MessageAttributeMap | None = None,
701
        message_deduplication_id: String | None = None,
702
        message_group_id: String | None = None,
703
        **kwargs,
704
    ) -> PublishResponse:
705
        if subject == "":
1✔
706
            raise InvalidParameterException("Invalid parameter: Subject")
1✔
707
        if not message or all(not m for m in message):
1✔
708
            raise InvalidParameterException("Invalid parameter: Empty message")
1✔
709

710
        # TODO: check for topic + target + phone number at the same time?
711
        # TODO: more validation on phone, it might be opted out?
712
        if phone_number and not is_valid_e164_number(phone_number):
1✔
713
            raise InvalidParameterException(
1✔
714
                f"Invalid parameter: PhoneNumber Reason: {phone_number} is not valid to publish to"
715
            )
716

717
        if message_attributes:
1✔
718
            _validate_message_attributes(message_attributes)
1✔
719

720
        if _get_total_publish_size(message, message_attributes) > MAXIMUM_MESSAGE_LENGTH:
1✔
721
            raise InvalidParameterException("Invalid parameter: Message too long")
1✔
722

723
        # for compatibility reasons, AWS allows users to use either TargetArn or TopicArn for publishing to a topic
724
        # use any of them for topic validation
725
        topic_or_target_arn = topic_arn or target_arn
1✔
726
        topic = None
1✔
727

728
        if is_fifo := (topic_or_target_arn and ".fifo" in topic_or_target_arn):
1✔
729
            if not message_group_id:
1✔
730
                raise InvalidParameterException(
1✔
731
                    "Invalid parameter: The MessageGroupId parameter is required for FIFO topics",
732
                )
733
            topic = self._get_topic(topic_or_target_arn, context)
1✔
734
            if topic["attributes"]["ContentBasedDeduplication"] == "false":
1✔
735
                if not message_deduplication_id:
1✔
736
                    raise InvalidParameterException(
1✔
737
                        "Invalid parameter: The topic should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided explicitly",
738
                    )
739
        elif message_deduplication_id:
1✔
740
            # this is the first one to raise if both are set while the topic is not fifo
741
            raise InvalidParameterException(
1✔
742
                "Invalid parameter: MessageDeduplicationId Reason: The request includes MessageDeduplicationId parameter that is not valid for this topic type"
743
            )
744

745
        is_endpoint_publish = target_arn and ":endpoint/" in target_arn
1✔
746
        if message_structure == "json":
1✔
747
            try:
1✔
748
                message = json.loads(message)
1✔
749
                # Keys in the JSON object that correspond to supported transport protocols must have
750
                # simple JSON string values.
751
                # Non-string values will cause the key to be ignored.
752
                message = {key: field for key, field in message.items() if isinstance(field, str)}
1✔
753
                # TODO: check no default key for direct TargetArn endpoint publish, need credentials
754
                # see example: https://docs.aws.amazon.com/sns/latest/dg/sns-send-custom-platform-specific-payloads-mobile-devices.html
755
                if "default" not in message and not is_endpoint_publish:
1✔
756
                    raise InvalidParameterException(
1✔
757
                        "Invalid parameter: Message Structure - No default entry in JSON message body"
758
                    )
759
            except json.JSONDecodeError:
1✔
760
                raise InvalidParameterException(
1✔
761
                    "Invalid parameter: Message Structure - JSON message body failed to parse"
762
                )
763

764
        if not phone_number:
1✔
765
            # use the account to get the store from the TopicArn (you can only publish in the same region as the topic)
766
            parsed_arn = parse_and_validate_topic_arn(topic_or_target_arn)
1✔
767
            store = self.get_store(account_id=parsed_arn["account"], region=context.region)
1✔
768
            if is_endpoint_publish:
1✔
769
                if not (platform_endpoint := store.platform_endpoints.get(target_arn)):
1✔
770
                    raise InvalidParameterException(
1✔
771
                        "Invalid parameter: TargetArn Reason: No endpoint found for the target arn specified"
772
                    )
773
                elif (
1✔
774
                    not platform_endpoint.platform_endpoint["Attributes"]
775
                    .get("Enabled", "false")
776
                    .lower()
777
                    == "true"
778
                ):
779
                    raise EndpointDisabledException("Endpoint is disabled")
1✔
780
            else:
781
                topic = self._get_topic(topic_or_target_arn, context)
1✔
782
        else:
783
            # use the store from the request context
784
            store = self.get_store(account_id=context.account_id, region=context.region)
1✔
785

786
        message_ctx = SnsMessage(
1✔
787
            type=SnsMessageType.Notification,
788
            message=message,
789
            message_attributes=message_attributes,
790
            message_deduplication_id=message_deduplication_id,
791
            message_group_id=message_group_id,
792
            message_structure=message_structure,
793
            subject=subject,
794
            is_fifo=is_fifo,
795
        )
796
        publish_ctx = SnsPublishContext(
1✔
797
            message=message_ctx, store=store, request_headers=context.request.headers
798
        )
799

800
        if is_endpoint_publish:
1✔
801
            self._publisher.publish_to_application_endpoint(
1✔
802
                ctx=publish_ctx, endpoint_arn=target_arn
803
            )
804
        elif phone_number:
1✔
805
            self._publisher.publish_to_phone_number(ctx=publish_ctx, phone_number=phone_number)
1✔
806
        else:
807
            # beware if the subscription is FIFO, the order might not be guaranteed.
808
            # 2 quick call to this method in succession might not be executed in order in the executor?
809
            # TODO: test how this behaves in a FIFO context with a lot of threads.
810
            publish_ctx.topic_attributes |= topic["attributes"]
1✔
811
            self._publisher.publish_to_topic(publish_ctx, topic_or_target_arn)
1✔
812

813
        if is_fifo:
1✔
814
            return PublishResponse(
1✔
815
                MessageId=message_ctx.message_id, SequenceNumber=message_ctx.sequencer_number
816
            )
817

818
        return PublishResponse(MessageId=message_ctx.message_id)
1✔
819

820
    def publish_batch(
1✔
821
        self,
822
        context: RequestContext,
823
        topic_arn: topicARN,
824
        publish_batch_request_entries: PublishBatchRequestEntryList,
825
        **kwargs,
826
    ) -> PublishBatchResponse:
827
        if len(publish_batch_request_entries) > 10:
1✔
828
            raise TooManyEntriesInBatchRequestException(
1✔
829
                "The batch request contains more entries than permissible."
830
            )
831

832
        parsed_arn = parse_and_validate_topic_arn(topic_arn)
1✔
833
        store = self.get_store(account_id=parsed_arn["account"], region=context.region)
1✔
834
        topic = self._get_topic(topic_arn, context)
1✔
835
        ids = [entry["Id"] for entry in publish_batch_request_entries]
1✔
836
        if len(set(ids)) != len(publish_batch_request_entries):
1✔
837
            raise BatchEntryIdsNotDistinctException(
1✔
838
                "Two or more batch entries in the request have the same Id."
839
            )
840

841
        # Validate each entry ID
842
        for entry_id in ids:
1✔
843
            if len(entry_id) > 80:
1✔
844
                raise InvalidBatchEntryIdException(
1✔
845
                    f"The Id of a batch entry in the batch request is too long: {entry_id}"
846
                )
847
            if not BATCH_ENTRY_ID_REGEX.match(entry_id):
1✔
848
                raise InvalidBatchEntryIdException(
1✔
849
                    f"The Id of a batch entry in the batch request contains an impermissible character: {entry_id}"
850
                )
851

852
        response: PublishBatchResponse = {"Successful": [], "Failed": []}
1✔
853

854
        # TODO: write AWS validated tests with FilterPolicy and batching
855
        # TODO: find a scenario where we can fail to send a message synchronously to be able to report it
856
        # right now, it seems that AWS fails the whole publish if something is wrong in the format of 1 message
857

858
        total_batch_size = 0
1✔
859
        message_contexts = []
1✔
860
        for entry_index, entry in enumerate(publish_batch_request_entries, start=1):
1✔
861
            message_payload = entry.get("Message")
1✔
862
            message_attributes = entry.get("MessageAttributes", {})
1✔
863
            if message_attributes:
1✔
864
                # if a message contains non-valid message attributes, it
865
                # will fail for the first non-valid message encountered, and raise ParameterValueInvalid
866
                _validate_message_attributes(message_attributes, position=entry_index)
1✔
867

868
            total_batch_size += _get_total_publish_size(message_payload, message_attributes)
1✔
869

870
            # TODO: WRITE AWS VALIDATED
871
            if entry.get("MessageStructure") == "json":
1✔
872
                try:
1✔
873
                    message = json.loads(message_payload)
1✔
874
                    # Keys in the JSON object that correspond to supported transport protocols must have
875
                    # simple JSON string values.
876
                    # Non-string values will cause the key to be ignored.
877
                    message = {
1✔
878
                        key: field for key, field in message.items() if isinstance(field, str)
879
                    }
880
                    if "default" not in message:
1✔
881
                        raise InvalidParameterException(
1✔
882
                            "Invalid parameter: Message Structure - No default entry in JSON message body"
883
                        )
884
                    entry["Message"] = message  # noqa
1✔
885
                except json.JSONDecodeError:
1✔
886
                    raise InvalidParameterException(
×
887
                        "Invalid parameter: Message Structure - JSON message body failed to parse"
888
                    )
889

890
            if is_fifo := (topic_arn.endswith(".fifo")):
1✔
891
                if not all("MessageGroupId" in entry for entry in publish_batch_request_entries):
1✔
892
                    raise InvalidParameterException(
1✔
893
                        "Invalid parameter: The MessageGroupId parameter is required for FIFO topics"
894
                    )
895
                if topic["attributes"]["ContentBasedDeduplication"] == "false":
1✔
896
                    if not all(
1✔
897
                        "MessageDeduplicationId" in entry for entry in publish_batch_request_entries
898
                    ):
899
                        raise InvalidParameterException(
1✔
900
                            "Invalid parameter: The topic should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided explicitly",
901
                        )
902

903
            msg_ctx = SnsMessage.from_batch_entry(entry, is_fifo=is_fifo)
1✔
904
            message_contexts.append(msg_ctx)
1✔
905
            success = PublishBatchResultEntry(
1✔
906
                Id=entry["Id"],
907
                MessageId=msg_ctx.message_id,
908
            )
909
            if is_fifo:
1✔
910
                success["SequenceNumber"] = msg_ctx.sequencer_number
1✔
911
            response["Successful"].append(success)
1✔
912

913
        if total_batch_size > MAXIMUM_MESSAGE_LENGTH:
1✔
914
            raise CommonServiceException(
1✔
915
                code="BatchRequestTooLong",
916
                message="The length of all the messages put together is more than the limit.",
917
                sender_fault=True,
918
            )
919

920
        publish_ctx = SnsBatchPublishContext(
1✔
921
            messages=message_contexts,
922
            store=store,
923
            request_headers=context.request.headers,
924
            topic_attributes=topic["attributes"],
925
        )
926
        self._publisher.publish_batch_to_topic(publish_ctx, topic_arn)
1✔
927

928
        return response
1✔
929

930
    #
931
    # PlatformApplications
932
    #
933
    def create_platform_application(
1✔
934
        self,
935
        context: RequestContext,
936
        name: String,
937
        platform: String,
938
        attributes: MapStringToString,
939
        **kwargs,
940
    ) -> CreatePlatformApplicationResponse:
941
        _validate_platform_application_name(name)
1✔
942
        if platform not in VALID_APPLICATION_PLATFORMS:
1✔
943
            raise InvalidParameterException(
1✔
944
                f"Invalid parameter: Platform Reason: {platform} is not supported"
945
            )
946

947
        _validate_platform_application_attributes(attributes)
1✔
948

949
        # attribute validation specific to create_platform_application
950
        if "PlatformCredential" in attributes and "PlatformPrincipal" not in attributes:
1✔
951
            raise InvalidParameterException(
1✔
952
                "Invalid parameter: Attributes Reason: PlatformCredential attribute provided without PlatformPrincipal"
953
            )
954

955
        elif "PlatformPrincipal" in attributes and "PlatformCredential" not in attributes:
1✔
956
            raise InvalidParameterException(
1✔
957
                "Invalid parameter: Attributes Reason: PlatformPrincipal attribute provided without PlatformCredential"
958
            )
959

960
        store = self.get_store(context.account_id, context.region)
1✔
961
        # We are not validating the access data here like AWS does (against ADM and the like)
962
        attributes.pop("PlatformPrincipal")
1✔
963
        attributes.pop("PlatformCredential")
1✔
964
        _attributes = {"Enabled": "true"}
1✔
965
        _attributes.update(attributes)
1✔
966
        application_arn = sns_platform_application_arn(
1✔
967
            platform_application_name=name,
968
            platform=platform,
969
            account_id=context.account_id,
970
            region_name=context.region,
971
        )
972
        platform_application_details = PlatformApplicationDetails(
1✔
973
            platform_application=PlatformApplication(
974
                PlatformApplicationArn=application_arn,
975
                Attributes=_attributes,
976
            ),
977
            platform_endpoints={},
978
        )
979
        store.platform_applications[application_arn] = platform_application_details
1✔
980

981
        return platform_application_details.platform_application
1✔
982

983
    def delete_platform_application(
1✔
984
        self, context: RequestContext, platform_application_arn: String, **kwargs
985
    ) -> None:
986
        store = self.get_store(context.account_id, context.region)
1✔
987
        store.platform_applications.pop(platform_application_arn, None)
1✔
988
        # TODO: if the platform had endpoints, should we remove them from the store? There is no way to list
989
        #   endpoints without an application, so this is impossible to check the state of AWS here
990

991
    def list_platform_applications(
1✔
992
        self, context: RequestContext, next_token: String | None = None, **kwargs
993
    ) -> ListPlatformApplicationsResponse:
994
        store = self.get_store(context.account_id, context.region)
1✔
995
        platform_applications = store.platform_applications.values()
1✔
996
        paginated_applications = PaginatedList(platform_applications)
1✔
997
        page, token = paginated_applications.get_page(
1✔
998
            token_generator=lambda x: get_next_page_token_from_arn(x["PlatformApplicationArn"]),
999
            page_size=100,
1000
            next_token=next_token,
1001
        )
1002

1003
        response = ListPlatformApplicationsResponse(
1✔
1004
            PlatformApplications=[platform_app.platform_application for platform_app in page]
1005
        )
1006
        if token:
1✔
1007
            response["NextToken"] = token
×
1008
        return response
1✔
1009

1010
    def get_platform_application_attributes(
1✔
1011
        self, context: RequestContext, platform_application_arn: String, **kwargs
1012
    ) -> GetPlatformApplicationAttributesResponse:
1013
        platform_application = self._get_platform_application(platform_application_arn, context)
1✔
1014
        attributes = platform_application["Attributes"]
1✔
1015
        return GetPlatformApplicationAttributesResponse(Attributes=attributes)
1✔
1016

1017
    def set_platform_application_attributes(
1✔
1018
        self,
1019
        context: RequestContext,
1020
        platform_application_arn: String,
1021
        attributes: MapStringToString,
1022
        **kwargs,
1023
    ) -> None:
1024
        parse_and_validate_platform_application_arn(platform_application_arn)
1✔
1025
        _validate_platform_application_attributes(attributes)
1✔
1026

1027
        platform_application = self._get_platform_application(platform_application_arn, context)
1✔
1028
        platform_application["Attributes"].update(attributes)
1✔
1029

1030
    #
1031
    # Platform Endpoints
1032
    #
1033

1034
    def create_platform_endpoint(
1✔
1035
        self,
1036
        context: RequestContext,
1037
        platform_application_arn: String,
1038
        token: String,
1039
        custom_user_data: String | None = None,
1040
        attributes: MapStringToString | None = None,
1041
        **kwargs,
1042
    ) -> CreateEndpointResponse:
1043
        store = self.get_store(context.account_id, context.region)
1✔
1044
        application = store.platform_applications.get(platform_application_arn)
1✔
1045
        if not application:
1✔
1046
            raise NotFoundException("PlatformApplication does not exist")
1✔
1047
        endpoint_arn = application.platform_endpoints.get(token, {})
1✔
1048
        attributes = attributes or {}
1✔
1049
        _validate_endpoint_attributes(attributes, allow_empty=True)
1✔
1050
        # CustomUserData can be specified both in attributes and as parameter. Attributes take precedence
1051
        if custom_user_data:
1✔
1052
            attributes.setdefault(EndpointAttributeNames.CUSTOM_USER_DATA, custom_user_data)
1✔
1053
        _attributes = {"Enabled": "true", "Token": token, **attributes}
1✔
1054
        if endpoint_arn and (
1✔
1055
            platform_endpoint_details := store.platform_endpoints.get(endpoint_arn)
1056
        ):
1057
            # endpoint for that application with that particular token already exists
1058
            if not platform_endpoint_details.platform_endpoint["Attributes"] == _attributes:
1✔
1059
                raise InvalidParameterException(
1✔
1060
                    f"Invalid parameter: Token Reason: Endpoint {endpoint_arn} already exists with the same Token, but different attributes."
1061
                )
1062
            else:
1063
                return CreateEndpointResponse(EndpointArn=endpoint_arn)
1✔
1064

1065
        endpoint_arn = create_platform_endpoint_arn(platform_application_arn)
1✔
1066
        platform_endpoint = PlatformEndpoint(
1✔
1067
            platform_application_arn=endpoint_arn,
1068
            platform_endpoint=Endpoint(
1069
                Attributes=_attributes,
1070
                EndpointArn=endpoint_arn,
1071
            ),
1072
        )
1073
        store.platform_endpoints[endpoint_arn] = platform_endpoint
1✔
1074
        application.platform_endpoints[token] = endpoint_arn
1✔
1075

1076
        return CreateEndpointResponse(EndpointArn=endpoint_arn)
1✔
1077

1078
    def delete_endpoint(self, context: RequestContext, endpoint_arn: String, **kwargs) -> None:
1✔
1079
        store = self.get_store(context.account_id, context.region)
1✔
1080
        platform_endpoint_details = store.platform_endpoints.pop(endpoint_arn, None)
1✔
1081
        if platform_endpoint_details:
1✔
1082
            platform_application = store.platform_applications.get(
1✔
1083
                platform_endpoint_details.platform_application_arn
1084
            )
1085
            if platform_application:
1✔
1086
                platform_endpoint = platform_endpoint_details.platform_endpoint
×
1087
                platform_application.platform_endpoints.pop(
×
1088
                    platform_endpoint["Attributes"]["Token"], None
1089
                )
1090

1091
    def list_endpoints_by_platform_application(
1✔
1092
        self,
1093
        context: RequestContext,
1094
        platform_application_arn: String,
1095
        next_token: String | None = None,
1096
        **kwargs,
1097
    ) -> ListEndpointsByPlatformApplicationResponse:
1098
        store = self.get_store(context.account_id, context.region)
1✔
1099
        platform_application = store.platform_applications.get(platform_application_arn)
1✔
1100
        if not platform_application:
1✔
1101
            raise NotFoundException("PlatformApplication does not exist")
1✔
1102
        endpoint_arns = platform_application.platform_endpoints.values()
1✔
1103
        paginated_endpoint_arns = PaginatedList(endpoint_arns)
1✔
1104
        page, token = paginated_endpoint_arns.get_page(
1✔
1105
            token_generator=lambda x: get_next_page_token_from_arn(x),
1106
            page_size=100,
1107
            next_token=next_token,
1108
        )
1109

1110
        response = ListEndpointsByPlatformApplicationResponse(
1✔
1111
            Endpoints=[
1112
                store.platform_endpoints[endpoint_arn].platform_endpoint
1113
                for endpoint_arn in page
1114
                if endpoint_arn in store.platform_endpoints
1115
            ]
1116
        )
1117
        if token:
1✔
1118
            response["NextToken"] = token
×
1119
        return response
1✔
1120

1121
    def get_endpoint_attributes(
1✔
1122
        self, context: RequestContext, endpoint_arn: String, **kwargs
1123
    ) -> GetEndpointAttributesResponse:
1124
        store = self.get_store(context.account_id, context.region)
1✔
1125
        platform_endpoint_details = store.platform_endpoints.get(endpoint_arn)
1✔
1126
        if not platform_endpoint_details:
1✔
1127
            raise NotFoundException("Endpoint does not exist")
1✔
1128
        attributes = platform_endpoint_details.platform_endpoint["Attributes"]
1✔
1129
        return GetEndpointAttributesResponse(Attributes=attributes)
1✔
1130

1131
    def set_endpoint_attributes(
1✔
1132
        self, context: RequestContext, endpoint_arn: String, attributes: MapStringToString, **kwargs
1133
    ) -> None:
1134
        store = self.get_store(context.account_id, context.region)
1✔
1135
        platform_endpoint_details = store.platform_endpoints.get(endpoint_arn)
1✔
1136
        if not platform_endpoint_details:
1✔
1137
            raise NotFoundException("Endpoint does not exist")
1✔
1138
        _validate_endpoint_attributes(attributes)
1✔
1139
        attributes = attributes or {}
1✔
1140
        platform_endpoint_details.platform_endpoint["Attributes"].update(attributes)
1✔
1141

1142
    #
1143
    # Sms operations
1144
    #
1145

1146
    def set_sms_attributes(
1✔
1147
        self, context: RequestContext, attributes: MapStringToString, **kwargs
1148
    ) -> SetSMSAttributesResponse:
1149
        store = self.get_store(context.account_id, context.region)
1✔
1150
        _validate_sms_attributes(attributes)
1✔
1151
        _set_sms_attribute_default(store)
1✔
1152
        store.sms_attributes.update(attributes or {})
1✔
1153
        return SetSMSAttributesResponse()
1✔
1154

1155
    def get_sms_attributes(
1✔
1156
        self, context: RequestContext, attributes: ListString | None = None, **kwargs
1157
    ) -> GetSMSAttributesResponse:
1158
        store = self.get_store(context.account_id, context.region)
1✔
1159
        _set_sms_attribute_default(store)
1✔
1160
        store_attributes = store.sms_attributes
1✔
1161
        return_attributes = {}
1✔
1162
        for k, v in store_attributes.items():
1✔
1163
            if not attributes or k in attributes:
1✔
1164
                return_attributes[k] = store_attributes[k]
1✔
1165

1166
        return GetSMSAttributesResponse(attributes=return_attributes)
1✔
1167

1168
    #
1169
    # Phone number operations
1170
    #
1171

1172
    def check_if_phone_number_is_opted_out(
1✔
1173
        self, context: RequestContext, phone_number: PhoneNumber, **kwargs
1174
    ) -> CheckIfPhoneNumberIsOptedOutResponse:
1175
        store = sns_stores[context.account_id][context.region]
1✔
1176
        return CheckIfPhoneNumberIsOptedOutResponse(
1✔
1177
            isOptedOut=phone_number in store.PHONE_NUMBERS_OPTED_OUT
1178
        )
1179

1180
    def list_phone_numbers_opted_out(
1✔
1181
        self, context: RequestContext, next_token: string | None = None, **kwargs
1182
    ) -> ListPhoneNumbersOptedOutResponse:
1183
        store = self.get_store(context.account_id, context.region)
1✔
1184
        numbers_opted_out = PaginatedList(store.PHONE_NUMBERS_OPTED_OUT)
1✔
1185
        page, nxt = numbers_opted_out.get_page(
1✔
1186
            token_generator=lambda x: x,
1187
            next_token=next_token,
1188
            page_size=100,
1189
        )
1190
        phone_numbers = {"phoneNumbers": page, "nextToken": nxt}
1✔
1191
        return ListPhoneNumbersOptedOutResponse(**phone_numbers)
1✔
1192

1193
    def opt_in_phone_number(
1✔
1194
        self, context: RequestContext, phone_number: PhoneNumber, **kwargs
1195
    ) -> OptInPhoneNumberResponse:
1196
        _validate_phone_number(phone_number)
1✔
1197
        store = self.get_store(context.account_id, context.region)
1✔
1198
        if phone_number in store.PHONE_NUMBERS_OPTED_OUT:
1✔
1199
            store.PHONE_NUMBERS_OPTED_OUT.remove(phone_number)
1✔
1200
        return OptInPhoneNumberResponse()
1✔
1201

1202
    #
1203
    # Permission operations
1204
    #
1205

1206
    def add_permission(
1✔
1207
        self,
1208
        context: RequestContext,
1209
        topic_arn: topicARN,
1210
        label: label,
1211
        aws_account_id: DelegatesList,
1212
        action_name: ActionsList,
1213
        **kwargs,
1214
    ) -> None:
1215
        topic: Topic = self._get_topic(topic_arn, context)
1✔
1216
        policy = json.loads(topic["attributes"]["Policy"])
1✔
1217
        statement = next(
1✔
1218
            (statement for statement in policy["Statement"] if statement["Sid"] == label),
1219
            None,
1220
        )
1221

1222
        if statement:
1✔
1223
            raise InvalidParameterException("Invalid parameter: Statement already exists")
1✔
1224

1225
        if any(action not in VALID_POLICY_ACTIONS for action in action_name):
1✔
1226
            raise InvalidParameterException(
1✔
1227
                "Invalid parameter: Policy statement action out of service scope!"
1228
            )
1229

1230
        principals = [
1✔
1231
            f"arn:{get_partition(context.region)}:iam::{account_id}:root"
1232
            for account_id in aws_account_id
1233
        ]
1234
        actions = [f"SNS:{action}" for action in action_name]
1✔
1235

1236
        statement = {
1✔
1237
            "Sid": label,
1238
            "Effect": "Allow",
1239
            "Principal": {"AWS": principals[0] if len(principals) == 1 else principals},
1240
            "Action": actions[0] if len(actions) == 1 else actions,
1241
            "Resource": topic_arn,
1242
        }
1243

1244
        policy["Statement"].append(statement)
1✔
1245
        topic["attributes"]["Policy"] = json.dumps(policy)
1✔
1246

1247
    def remove_permission(
1✔
1248
        self, context: RequestContext, topic_arn: topicARN, label: label, **kwargs
1249
    ) -> None:
1250
        topic = self._get_topic(topic_arn, context)
1✔
1251
        policy = json.loads(topic["attributes"]["Policy"])
1✔
1252
        statements = policy["Statement"]
1✔
1253
        statements = [statement for statement in statements if statement["Sid"] != label]
1✔
1254
        policy["Statement"] = statements
1✔
1255
        topic["attributes"]["Policy"] = json.dumps(policy)
1✔
1256

1257
    #
1258
    # Data Protection Policy operations
1259
    #
1260

1261
    def get_data_protection_policy(
1✔
1262
        self, context: RequestContext, resource_arn: topicARN, **kwargs
1263
    ) -> GetDataProtectionPolicyResponse:
1264
        topic = self._get_topic(resource_arn, context)
1✔
1265
        return GetDataProtectionPolicyResponse(
1✔
1266
            DataProtectionPolicy=topic.get("data_protection_policy")
1267
        )
1268

1269
    def put_data_protection_policy(
1✔
1270
        self,
1271
        context: RequestContext,
1272
        resource_arn: topicARN,
1273
        data_protection_policy: attributeValue,
1274
        **kwargs,
1275
    ) -> None:
1276
        topic = self._get_topic(resource_arn, context)
1✔
1277
        topic["data_protection_policy"] = data_protection_policy
1✔
1278

1279
    def list_tags_for_resource(
1✔
1280
        self, context: RequestContext, resource_arn: AmazonResourceName, **kwargs
1281
    ) -> ListTagsForResourceResponse:
1282
        tags = self._list_resource_tags(context, resource_arn)
1✔
1283
        return ListTagsForResourceResponse(Tags=tags)
1✔
1284

1285
    def tag_resource(
1✔
1286
        self, context: RequestContext, resource_arn: AmazonResourceName, tags: TagList, **kwargs
1287
    ) -> TagResourceResponse:
1288
        unique_tag_keys = {tag["Key"] for tag in tags}
1✔
1289
        if len(unique_tag_keys) < len(tags):
1✔
1290
            raise InvalidParameterException("Invalid parameter: Duplicated keys are not allowed.")
1✔
1291
        self._tag_resource(context, resource_arn, tags)
1✔
1292
        return TagResourceResponse()
1✔
1293

1294
    def untag_resource(
1✔
1295
        self,
1296
        context: RequestContext,
1297
        resource_arn: AmazonResourceName,
1298
        tag_keys: TagKeyList,
1299
        **kwargs,
1300
    ) -> UntagResourceResponse:
1301
        self._untag_resource(context, resource_arn, tag_keys)
1✔
1302
        return UntagResourceResponse()
1✔
1303

1304
    @staticmethod
1✔
1305
    def get_store(account_id: str, region: str) -> SnsStore:
1✔
1306
        return sns_stores[account_id][region]
1✔
1307

1308
    @staticmethod
1✔
1309
    def _get_topic(arn: str, context: RequestContext, multi_region: bool = False) -> Topic:
1✔
1310
        """
1311
        :param arn: the Topic ARN
1312
        :param context: the RequestContext of the request
1313
        :return: the model Topic
1314
        """
1315
        arn_data = parse_and_validate_topic_arn(arn)
1✔
1316
        if not multi_region and context.region != arn_data["region"]:
1✔
1317
            raise InvalidParameterException("Invalid parameter: TopicArn")
1✔
1318
        try:
1✔
1319
            store = SnsProvider.get_store(arn_data["account"], arn_data["region"])
1✔
1320
            return store.topics[arn]
1✔
1321
        except KeyError:
1✔
1322
            raise NotFoundException("Topic does not exist")
1✔
1323

1324
    @staticmethod
1✔
1325
    def _get_platform_application(
1✔
1326
        platform_application_arn: str, context: RequestContext
1327
    ) -> PlatformApplication:
1328
        parse_and_validate_platform_application_arn(platform_application_arn)
1✔
1329
        try:
1✔
1330
            store = SnsProvider.get_store(context.account_id, context.region)
1✔
1331
            return store.platform_applications[platform_application_arn].platform_application
1✔
1332
        except KeyError:
1✔
1333
            raise NotFoundException("PlatformApplication does not exist")
1✔
1334

1335

1336
def _create_topic(
1✔
1337
    name: str, attributes: dict, data_protection_policy: str, context: RequestContext
1338
) -> Topic:
1339
    topic_arn = sns_topic_arn(
1✔
1340
        topic_name=name, region_name=context.region, account_id=context.account_id
1341
    )
1342
    topic: Topic = {
1✔
1343
        "name": name,
1344
        "arn": topic_arn,
1345
        "attributes": {},
1346
        "subscriptions": [],
1347
        "data_protection_policy": data_protection_policy,
1348
    }
1349
    attrs = _default_attributes(topic, context)
1✔
1350
    attrs.update(attributes or {})
1✔
1351
    topic["attributes"] = attrs
1✔
1352

1353
    return topic
1✔
1354

1355

1356
def _default_attributes(topic: Topic, context: RequestContext) -> TopicAttributesMap:
1✔
1357
    default_attributes = {
1✔
1358
        "DisplayName": "",
1359
        "Owner": context.account_id,
1360
        "Policy": create_default_topic_policy(topic["arn"]),
1361
        "SubscriptionsConfirmed": "0",
1362
        "SubscriptionsDeleted": "0",
1363
        "SubscriptionsPending": "0",
1364
        "TopicArn": topic["arn"],
1365
    }
1366
    if topic["name"].endswith(".fifo"):
1✔
1367
        default_attributes.update(
1✔
1368
            {
1369
                "ContentBasedDeduplication": "false",
1370
                "FifoTopic": "false",
1371
            }
1372
        )
1373
    return default_attributes
1✔
1374

1375

1376
def _create_default_effective_delivery_policy():
1✔
1377
    return json.dumps(
1✔
1378
        {
1379
            "http": {
1380
                "defaultHealthyRetryPolicy": {
1381
                    "minDelayTarget": 20,
1382
                    "maxDelayTarget": 20,
1383
                    "numRetries": 3,
1384
                    "numMaxDelayRetries": 0,
1385
                    "numNoDelayRetries": 0,
1386
                    "numMinDelayRetries": 0,
1387
                    "backoffFunction": "linear",
1388
                },
1389
                "disableSubscriptionOverrides": False,
1390
                "defaultRequestPolicy": {"headerContentType": "text/plain; charset=UTF-8"},
1391
            }
1392
        }
1393
    )
1394

1395

1396
def _validate_message_attributes(
1✔
1397
    message_attributes: MessageAttributeMap, position: int | None = None
1398
) -> None:
1399
    """
1400
    Validate the message attributes, and raises an exception if those do not follow AWS validation
1401
    See: https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html
1402
    Regex from: https://stackoverflow.com/questions/40718851/regex-that-does-not-allow-consecutive-dots
1403
    :param message_attributes: the message attributes map for the message
1404
    :param position: given to give the Batch Entry position if coming from `publishBatch`
1405
    :raises: InvalidParameterValueException
1406
    :return: None
1407
    """
1408
    for attr_name, attr in message_attributes.items():
1✔
1409
        if len(attr_name) > 256:
1✔
1410
            raise InvalidParameterValueException(
1✔
1411
                "Length of message attribute name must be less than 256 bytes."
1412
            )
1413
        _validate_message_attribute_name(attr_name)
1✔
1414
        # `DataType` is a required field for MessageAttributeValue
1415
        if (data_type := attr.get("DataType")) is None:
1✔
1416
            if position:
1✔
1417
                at = f"publishBatchRequestEntries.{position}.member.messageAttributes.{attr_name}.member.dataType"
1✔
1418
            else:
1419
                at = f"messageAttributes.{attr_name}.member.dataType"
1✔
1420

1421
            raise CommonServiceException(
1✔
1422
                code="ValidationError",
1423
                message=f"1 validation error detected: Value null at '{at}' failed to satisfy constraint: Member must not be null",
1424
                sender_fault=True,
1425
            )
1426

1427
        if data_type not in (
1✔
1428
            "String",
1429
            "Number",
1430
            "Binary",
1431
        ) and not ATTR_TYPE_REGEX.match(data_type):
1432
            raise InvalidParameterValueException(
1✔
1433
                f"The message attribute '{attr_name}' has an invalid message attribute type, the set of supported type prefixes is Binary, Number, and String."
1434
            )
1435
        if not any(attr_value.endswith("Value") for attr_value in attr):
1✔
1436
            raise InvalidParameterValueException(
1✔
1437
                f"The message attribute '{attr_name}' must contain non-empty message attribute value for message attribute type '{data_type}'."
1438
            )
1439

1440
        value_key_data_type = "Binary" if data_type.startswith("Binary") else "String"
1✔
1441
        value_key = f"{value_key_data_type}Value"
1✔
1442
        if value_key not in attr:
1✔
1443
            raise InvalidParameterValueException(
1✔
1444
                f"The message attribute '{attr_name}' with type '{data_type}' must use field '{value_key_data_type}'."
1445
            )
1446
        elif not attr[value_key]:
1✔
1447
            raise InvalidParameterValueException(
1✔
1448
                f"The message attribute '{attr_name}' must contain non-empty message attribute value for message attribute type '{data_type}'.",
1449
            )
1450

1451

1452
def _validate_message_attribute_name(name: str) -> None:
1✔
1453
    """
1454
    Validate the message attribute name with the specification of AWS.
1455
    The message attribute name can contain the following characters: A-Z, a-z, 0-9, underscore(_), hyphen(-), and period (.). The name must not start or end with a period, and it should not have successive periods.
1456
    :param name: message attribute name
1457
    :raises InvalidParameterValueException: if the name does not conform to the spec
1458
    """
1459
    if not MSG_ATTR_NAME_REGEX.match(name):
1✔
1460
        # find the proper exception
1461
        if name[0] == ".":
1✔
1462
            raise InvalidParameterValueException(
1✔
1463
                "Invalid message attribute name starting with character '.' was found."
1464
            )
1465
        elif name[-1] == ".":
1✔
1466
            raise InvalidParameterValueException(
1✔
1467
                "Invalid message attribute name ending with character '.' was found."
1468
            )
1469

1470
        for idx, char in enumerate(name):
1✔
1471
            if char not in VALID_MSG_ATTR_NAME_CHARS:
1✔
1472
                # change prefix from 0x to #x, without capitalizing the x
1473
                hex_char = "#x" + hex(ord(char)).upper()[2:]
1✔
1474
                raise InvalidParameterValueException(
1✔
1475
                    f"Invalid non-alphanumeric character '{hex_char}' was found in the message attribute name. Can only include alphanumeric characters, hyphens, underscores, or dots."
1476
                )
1477
            # even if we go negative index, it will be covered by starting/ending with dot
1478
            if char == "." and name[idx - 1] == ".":
1✔
1479
                raise InvalidParameterValueException(
1✔
1480
                    "Message attribute name can not have successive '.' character."
1481
                )
1482

1483

1484
def _validate_platform_application_name(name: str) -> None:
1✔
1485
    reason = ""
1✔
1486
    if not name:
1✔
1487
        reason = "cannot be empty"
1✔
1488
    elif not re.match(r"^.{0,256}$", name):
1✔
1489
        reason = "must be at most 256 characters long"
1✔
1490
    elif not re.match(r"^[A-Za-z0-9._-]+$", name):
1✔
1491
        reason = "must contain only characters 'a'-'z', 'A'-'Z', '0'-'9', '_', '-', and '.'"
1✔
1492

1493
    if reason:
1✔
1494
        LOG.debug("SNS Platform Application Name rejected due to reason: %s", reason)
1✔
1495
        raise InvalidParameterException(f"Invalid parameter: {name} Reason: {reason}")
1✔
1496

1497

1498
def _validate_platform_application_attributes(attributes: dict) -> None:
1✔
1499
    _check_empty_attributes(attributes)
1✔
1500

1501

1502
def _check_empty_attributes(attributes: dict) -> None:
1✔
1503
    if not attributes:
1✔
1504
        raise CommonServiceException(
1✔
1505
            code="ValidationError",
1506
            message="1 validation error detected: Value null at 'attributes' failed to satisfy constraint: Member must not be null",
1507
            sender_fault=True,
1508
        )
1509

1510

1511
def _validate_endpoint_attributes(attributes: dict, allow_empty: bool = False) -> None:
1✔
1512
    if not allow_empty:
1✔
1513
        _check_empty_attributes(attributes)
1✔
1514
    for key in attributes:
1✔
1515
        if key not in EndpointAttributeNames:
1✔
1516
            raise InvalidParameterException(
1✔
1517
                f"Invalid parameter: Attributes Reason: Invalid attribute name: {key}"
1518
            )
1519
    if len(attributes.get(EndpointAttributeNames.CUSTOM_USER_DATA, "")) > 2048:
1✔
1520
        raise InvalidParameterException(
1✔
1521
            "Invalid parameter: Attributes Reason: Invalid value for attribute: CustomUserData: must be at most 2048 bytes long in UTF-8 encoding"
1522
        )
1523

1524

1525
def _validate_sms_attributes(attributes: dict) -> None:
1✔
1526
    for k, v in attributes.items():
1✔
1527
        if k not in SMS_ATTRIBUTE_NAMES:
1✔
1528
            raise InvalidParameterException(f"{k} is not a valid attribute")
1✔
1529
    default_send_id = attributes.get("DefaultSendID")
1✔
1530
    if default_send_id and not re.match(SMS_DEFAULT_SENDER_REGEX, default_send_id):
1✔
1531
        raise InvalidParameterException("DefaultSendID is not a valid attribute")
×
1532
    sms_type = attributes.get("DefaultSMSType")
1✔
1533
    if sms_type and sms_type not in SMS_TYPES:
1✔
1534
        raise InvalidParameterException("DefaultSMSType is invalid")
1✔
1535

1536

1537
def _set_sms_attribute_default(store: SnsStore) -> None:
1✔
1538
    # TODO: don't call this on every sms attribute crud api call
1539
    store.sms_attributes.setdefault("MonthlySpendLimit", "1")
1✔
1540

1541

1542
def _validate_phone_number(phone_number: str):
1✔
1543
    if not re.match(E164_REGEX, phone_number):
1✔
1544
        raise InvalidParameterException(
1✔
1545
            "Invalid parameter: PhoneNumber Reason: input incorrectly formatted"
1546
        )
1547

1548

1549
def _get_total_publish_size(
1✔
1550
    message_body: str, message_attributes: MessageAttributeMap | None
1551
) -> int:
1552
    size = _get_byte_size(message_body)
1✔
1553
    if message_attributes:
1✔
1554
        # https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html
1555
        # All parts of the message attribute, including name, type, and value, are included in the message size
1556
        # restriction, which is 256 KB.
1557
        # iterate over the Keys and Attributes, adding the length of the Key to the length of all Attributes values
1558
        # (DataType and StringValue or BinaryValue)
1559
        size += sum(
1✔
1560
            _get_byte_size(key) + sum(_get_byte_size(attr_value) for attr_value in attr.values())
1561
            for key, attr in message_attributes.items()
1562
        )
1563

1564
    return size
1✔
1565

1566

1567
def _get_byte_size(payload: str | bytes) -> int:
1✔
1568
    # Calculate the real length of the byte object if the object is a string
1569
    return len(to_bytes(payload))
1✔
1570

1571

1572
def _register_sns_api_resource(router: Router):
1✔
1573
    """Register the retrospection endpoints as internal LocalStack endpoints."""
1574
    router.add(SNSServicePlatformEndpointMessagesApiResource())
×
1575
    router.add(SNSServiceSMSMessagesApiResource())
×
1576
    router.add(SNSServiceSubscriptionTokenApiResource())
×
1577

1578

1579
class SNSInternalResource:
1✔
1580
    resource_type: str
1✔
1581
    """Base class with helper to properly track usage of internal endpoints"""
1✔
1582

1583
    def count_usage(self):
1✔
1584
        internal_api_calls.labels(resource_type=self.resource_type).increment()
1✔
1585

1586

1587
def count_usage(f):
1✔
1588
    @functools.wraps(f)
1✔
1589
    def _wrapper(self, *args, **kwargs):
1✔
1590
        self.count_usage()
1✔
1591
        return f(self, *args, **kwargs)
1✔
1592

1593
    return _wrapper
1✔
1594

1595

1596
class SNSServicePlatformEndpointMessagesApiResource(SNSInternalResource):
1✔
1597
    resource_type = "platform-endpoint-message"
1✔
1598
    """Provides a REST API for retrospective access to platform endpoint messages sent via SNS.
1✔
1599

1600
    This is registered as a LocalStack internal HTTP resource.
1601

1602
    This endpoint accepts:
1603
    - GET param `accountId`: selector for AWS account. If not specified, return fallback `000000000000` test ID
1604
    - GET param `region`: selector for AWS `region`. If not specified, return default "us-east-1"
1605
    - GET param `endpointArn`: filter for `endpointArn` resource in SNS
1606
    - DELETE param `accountId`: selector for AWS account
1607
    - DELETE param `region`: will delete saved messages for `region`
1608
    - DELETE param `endpointArn`: will delete saved messages for `endpointArn`
1609
    """
1610

1611
    _PAYLOAD_FIELDS = [
1✔
1612
        "TargetArn",
1613
        "TopicArn",
1614
        "Message",
1615
        "MessageAttributes",
1616
        "MessageStructure",
1617
        "Subject",
1618
        "MessageId",
1619
    ]
1620

1621
    @route(PLATFORM_ENDPOINT_MSGS_ENDPOINT, methods=["GET"])
1✔
1622
    @count_usage
1✔
1623
    def on_get(self, request: Request):
1✔
1624
        filter_endpoint_arn = request.args.get("endpointArn")
1✔
1625
        account_id = (
1✔
1626
            request.args.get("accountId", DEFAULT_AWS_ACCOUNT_ID)
1627
            if not filter_endpoint_arn
1628
            else extract_account_id_from_arn(filter_endpoint_arn)
1629
        )
1630
        region = (
1✔
1631
            request.args.get("region", AWS_REGION_US_EAST_1)
1632
            if not filter_endpoint_arn
1633
            else extract_region_from_arn(filter_endpoint_arn)
1634
        )
1635
        store: SnsStore = sns_stores[account_id][region]
1✔
1636
        if filter_endpoint_arn:
1✔
1637
            messages = store.platform_endpoint_messages.get(filter_endpoint_arn, [])
1✔
1638
            messages = _format_messages(messages, self._PAYLOAD_FIELDS)
1✔
1639
            return {
1✔
1640
                "platform_endpoint_messages": {filter_endpoint_arn: messages},
1641
                "region": region,
1642
            }
1643

1644
        platform_endpoint_messages = {
1✔
1645
            endpoint_arn: _format_messages(messages, self._PAYLOAD_FIELDS)
1646
            for endpoint_arn, messages in store.platform_endpoint_messages.items()
1647
        }
1648
        return {
1✔
1649
            "platform_endpoint_messages": platform_endpoint_messages,
1650
            "region": region,
1651
        }
1652

1653
    @route(PLATFORM_ENDPOINT_MSGS_ENDPOINT, methods=["DELETE"])
1✔
1654
    @count_usage
1✔
1655
    def on_delete(self, request: Request) -> Response:
1✔
1656
        filter_endpoint_arn = request.args.get("endpointArn")
1✔
1657
        account_id = (
1✔
1658
            request.args.get("accountId", DEFAULT_AWS_ACCOUNT_ID)
1659
            if not filter_endpoint_arn
1660
            else extract_account_id_from_arn(filter_endpoint_arn)
1661
        )
1662
        region = (
1✔
1663
            request.args.get("region", AWS_REGION_US_EAST_1)
1664
            if not filter_endpoint_arn
1665
            else extract_region_from_arn(filter_endpoint_arn)
1666
        )
1667
        store: SnsStore = sns_stores[account_id][region]
1✔
1668
        if filter_endpoint_arn:
1✔
1669
            store.platform_endpoint_messages.pop(filter_endpoint_arn, None)
1✔
1670
            return Response("", status=204)
1✔
1671

1672
        store.platform_endpoint_messages.clear()
1✔
1673
        return Response("", status=204)
1✔
1674

1675

1676
def register_sns_api_resource(router: Router):
1✔
1677
    """Register the retrospection endpoints as internal LocalStack endpoints."""
1678
    router.add(SNSServicePlatformEndpointMessagesApiResource())
1✔
1679
    router.add(SNSServiceSMSMessagesApiResource())
1✔
1680
    router.add(SNSServiceSubscriptionTokenApiResource())
1✔
1681
    router.add(SNSServiceSMSPhoneOptOutResource())
1✔
1682

1683

1684
def _format_messages(sent_messages: list[dict[str, str]], validated_keys: list[str]):
1✔
1685
    """
1686
    This method format the messages to be more readable and undo the format change that was needed for Moto
1687
    Should be removed once we refactor SNS.
1688
    """
1689
    formatted_messages = []
1✔
1690
    for sent_message in sent_messages:
1✔
1691
        msg = {
1✔
1692
            key: json.dumps(value)
1693
            if key == "Message" and sent_message.get("MessageStructure") == "json"
1694
            else value
1695
            for key, value in sent_message.items()
1696
            if key in validated_keys
1697
        }
1698
        formatted_messages.append(msg)
1✔
1699

1700
    return formatted_messages
1✔
1701

1702

1703
class SNSServiceSMSMessagesApiResource(SNSInternalResource):
1✔
1704
    resource_type = "sms-message"
1✔
1705
    """Provides a REST API for retrospective access to SMS messages sent via SNS.
1✔
1706

1707
    This is registered as a LocalStack internal HTTP resource.
1708

1709
    This endpoint accepts:
1710
    - GET param `accountId`: selector for AWS account. If not specified, return fallback `000000000000` test ID
1711
    - GET param `region`: selector for AWS `region`. If not specified, return default "us-east-1"
1712
    - GET param `phoneNumber`: filter for `phoneNumber` resource in SNS
1713
    """
1714

1715
    _PAYLOAD_FIELDS = [
1✔
1716
        "PhoneNumber",
1717
        "TopicArn",
1718
        "SubscriptionArn",
1719
        "MessageId",
1720
        "Message",
1721
        "MessageAttributes",
1722
        "MessageStructure",
1723
        "Subject",
1724
    ]
1725

1726
    @route(SMS_MSGS_ENDPOINT, methods=["GET"])
1✔
1727
    @count_usage
1✔
1728
    def on_get(self, request: Request):
1✔
1729
        account_id = request.args.get("accountId", DEFAULT_AWS_ACCOUNT_ID)
1✔
1730
        region = request.args.get("region", AWS_REGION_US_EAST_1)
1✔
1731
        filter_phone_number = request.args.get("phoneNumber")
1✔
1732
        store: SnsStore = sns_stores[account_id][region]
1✔
1733
        if filter_phone_number:
1✔
1734
            messages = [
1✔
1735
                m for m in store.sms_messages if m.get("PhoneNumber") == filter_phone_number
1736
            ]
1737
            messages = _format_messages(messages, self._PAYLOAD_FIELDS)
1✔
1738
            return {
1✔
1739
                "sms_messages": {filter_phone_number: messages},
1740
                "region": region,
1741
            }
1742

1743
        sms_messages = {}
1✔
1744

1745
        for m in _format_messages(store.sms_messages, self._PAYLOAD_FIELDS):
1✔
1746
            sms_messages.setdefault(m.get("PhoneNumber"), []).append(m)
1✔
1747

1748
        return {
1✔
1749
            "sms_messages": sms_messages,
1750
            "region": region,
1751
        }
1752

1753
    @route(SMS_MSGS_ENDPOINT, methods=["DELETE"])
1✔
1754
    @count_usage
1✔
1755
    def on_delete(self, request: Request) -> Response:
1✔
1756
        account_id = request.args.get("accountId", DEFAULT_AWS_ACCOUNT_ID)
1✔
1757
        region = request.args.get("region", AWS_REGION_US_EAST_1)
1✔
1758
        filter_phone_number = request.args.get("phoneNumber")
1✔
1759
        store: SnsStore = sns_stores[account_id][region]
1✔
1760
        if filter_phone_number:
1✔
1761
            store.sms_messages = [
1✔
1762
                m for m in store.sms_messages if m.get("PhoneNumber") != filter_phone_number
1763
            ]
1764
            return Response("", status=204)
1✔
1765

1766
        store.sms_messages.clear()
1✔
1767
        return Response("", status=204)
1✔
1768

1769

1770
class SNSServiceSubscriptionTokenApiResource(SNSInternalResource):
1✔
1771
    resource_type = "subscription-token"
1✔
1772
    """Provides a REST API for retrospective access to Subscription Confirmation Tokens to confirm subscriptions.
1✔
1773
    Those are not sent for email, and sometimes inaccessible when working with external HTTPS endpoint which won't be
1774
    able to reach your local host.
1775

1776
    This is registered as a LocalStack internal HTTP resource.
1777

1778
    This endpoint has the following parameter:
1779
    - GET `subscription_arn`: `subscriptionArn`resource in SNS for which you want the SubscriptionToken
1780
    """
1781

1782
    @route(f"{SUBSCRIPTION_TOKENS_ENDPOINT}/<path:subscription_arn>", methods=["GET"])
1✔
1783
    @count_usage
1✔
1784
    def on_get(self, _request: Request, subscription_arn: str):
1✔
1785
        try:
1✔
1786
            parsed_arn = parse_arn(subscription_arn)
1✔
1787
        except InvalidArnException:
1✔
1788
            response = Response("", 400)
1✔
1789
            response.set_json(
1✔
1790
                {
1791
                    "error": "The provided SubscriptionARN is invalid",
1792
                    "subscription_arn": subscription_arn,
1793
                }
1794
            )
1795
            return response
1✔
1796

1797
        store: SnsStore = sns_stores[parsed_arn["account"]][parsed_arn["region"]]
1✔
1798

1799
        for token, sub_arn in store.subscription_tokens.items():
1✔
1800
            if sub_arn == subscription_arn:
1✔
1801
                return {
1✔
1802
                    "subscription_token": token,
1803
                    "subscription_arn": subscription_arn,
1804
                }
1805

1806
        response = Response("", 404)
1✔
1807
        response.set_json(
1✔
1808
            {
1809
                "error": "The provided SubscriptionARN is not found",
1810
                "subscription_arn": subscription_arn,
1811
            }
1812
        )
1813
        return response
1✔
1814

1815

1816
class SNSServiceSMSPhoneOptOutResource(SNSInternalResource):
1✔
1817
    resource_type = "phone-number-opt-out"
1✔
1818
    """Provides a REST API for adding phone numbers to the opt out list for testing purposes.
1✔
1819
    In AWS this seems to be handled by pin-point, which is scheduled for deprecation.
1820

1821
    This is registered as a LocalStack internal HTTP resource.
1822

1823
    This endpoint accepts:
1824
    - POST data `phoneNumber`: phone number to be opted out in SNS
1825
    - POST data `accountId`: account ID
1826
    """
1827

1828
    @route(SMS_PHONE_NUMBER_OPT_OUT_ENDPOINT, methods=["POST"])
1✔
1829
    @count_usage
1✔
1830
    def on_post(self, request: Request):
1✔
1831
        data = json.loads(request.data) or {}
1✔
1832
        account_id = data.get("accountId", DEFAULT_AWS_ACCOUNT_ID)
1✔
1833
        region = AWS_REGION_US_EAST_1  # opt-out list is account-wide
1✔
1834
        opt_out_phone_number = data.get("phoneNumber")
1✔
1835
        store: SnsStore = sns_stores[account_id][region]
1✔
1836
        if opt_out_phone_number:
1✔
1837
            store.PHONE_NUMBERS_OPTED_OUT.add(opt_out_phone_number)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc