• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 18928049101

29 Oct 2025 12:05PM UTC coverage: 86.912% (+0.01%) from 86.9%
18928049101

push

github

web-flow
Sns:v2 platform application crud (#13312)

77 of 78 new or added lines in 5 files covered. (98.72%)

8 existing lines in 5 files now uncovered.

68432 of 78737 relevant lines covered (86.91%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.68
/localstack-core/localstack/services/sns/v2/provider.py
1
import contextlib
1✔
2
import copy
1✔
3
import json
1✔
4
import logging
1✔
5
import re
1✔
6

7
from botocore.utils import InvalidArnException
1✔
8

9
from localstack.aws.api import CommonServiceException, RequestContext
1✔
10
from localstack.aws.api.sns import (
1✔
11
    AmazonResourceName,
12
    ConfirmSubscriptionResponse,
13
    CreatePlatformApplicationResponse,
14
    CreateTopicResponse,
15
    GetPlatformApplicationAttributesResponse,
16
    GetSMSAttributesResponse,
17
    GetSubscriptionAttributesResponse,
18
    GetTopicAttributesResponse,
19
    InvalidParameterException,
20
    ListEndpointsByPlatformApplicationResponse,
21
    ListPlatformApplicationsResponse,
22
    ListString,
23
    ListSubscriptionsByTopicResponse,
24
    ListSubscriptionsResponse,
25
    ListTagsForResourceResponse,
26
    ListTopicsResponse,
27
    MapStringToString,
28
    NotFoundException,
29
    PlatformApplication,
30
    SetSMSAttributesResponse,
31
    SnsApi,
32
    String,
33
    SubscribeResponse,
34
    Subscription,
35
    SubscriptionAttributesMap,
36
    TagKeyList,
37
    TagList,
38
    TagResourceResponse,
39
    TopicAttributesMap,
40
    UntagResourceResponse,
41
    attributeName,
42
    attributeValue,
43
    authenticateOnUnsubscribe,
44
    endpoint,
45
    nextToken,
46
    protocol,
47
    subscriptionARN,
48
    topicARN,
49
    topicName,
50
)
51
from localstack.services.sns import constants as sns_constants
1✔
52
from localstack.services.sns.certificate import SNS_SERVER_CERT
1✔
53
from localstack.services.sns.constants import (
1✔
54
    DUMMY_SUBSCRIPTION_PRINCIPAL,
55
    VALID_APPLICATION_PLATFORMS,
56
)
57
from localstack.services.sns.filter import FilterPolicyValidator
1✔
58
from localstack.services.sns.publisher import PublishDispatcher, SnsPublishContext
1✔
59
from localstack.services.sns.v2.models import (
1✔
60
    SMS_ATTRIBUTE_NAMES,
61
    SMS_DEFAULT_SENDER_REGEX,
62
    SMS_TYPES,
63
    SnsMessage,
64
    SnsMessageType,
65
    SnsStore,
66
    SnsSubscription,
67
    Topic,
68
    sns_stores,
69
)
70
from localstack.services.sns.v2.utils import (
1✔
71
    create_subscription_arn,
72
    encode_subscription_token_with_region,
73
    get_next_page_token_from_arn,
74
    get_region_from_subscription_token,
75
    is_valid_e164_number,
76
    parse_and_validate_platform_application_arn,
77
    parse_and_validate_topic_arn,
78
    validate_subscription_attribute,
79
)
80
from localstack.utils.aws.arns import (
1✔
81
    get_partition,
82
    parse_arn,
83
    sns_platform_application_arn,
84
    sns_topic_arn,
85
)
86
from localstack.utils.collections import PaginatedList, select_from_typed_dict
1✔
87

88
# set up logger
89
LOG = logging.getLogger(__name__)
1✔
90

91
SNS_TOPIC_NAME_PATTERN_FIFO = r"^[a-zA-Z0-9_-]{1,256}\.fifo$"
1✔
92
SNS_TOPIC_NAME_PATTERN = r"^[a-zA-Z0-9_-]{1,256}$"
1✔
93

94

95
class SnsProvider(SnsApi):
1✔
96
    def __init__(self) -> None:
1✔
97
        super().__init__()
1✔
98
        self._publisher = PublishDispatcher()
1✔
99
        self._signature_cert_pem: str = SNS_SERVER_CERT
1✔
100

101
    ## Topic Operations
102

103
    def create_topic(
1✔
104
        self,
105
        context: RequestContext,
106
        name: topicName,
107
        attributes: TopicAttributesMap | None = None,
108
        tags: TagList | None = None,
109
        data_protection_policy: attributeValue | None = None,
110
        **kwargs,
111
    ) -> CreateTopicResponse:
112
        store = self.get_store(context.account_id, context.region)
1✔
113
        topic_arn = sns_topic_arn(
1✔
114
            topic_name=name, region_name=context.region, account_id=context.account_id
115
        )
116
        topic: Topic = store.topics.get(topic_arn)
1✔
117
        attributes = attributes or {}
1✔
118
        if topic:
1✔
119
            attrs = topic["attributes"]
1✔
120
            for k, v in attributes.values():
1✔
121
                if not attrs.get(k) or not attrs.get(k) == v:
×
122
                    # TODO:
123
                    raise InvalidParameterException("Fix this Exception message and type")
×
124
            tag_resource_success = _check_matching_tags(topic_arn, tags, store)
1✔
125
            if not tag_resource_success:
1✔
126
                raise InvalidParameterException(
1✔
127
                    "Invalid parameter: Tags Reason: Topic already exists with different tags"
128
                )
129
            return CreateTopicResponse(TopicArn=topic_arn)
1✔
130

131
        attributes = attributes or {}
1✔
132
        if attributes.get("FifoTopic") and attributes["FifoTopic"].lower() == "true":
1✔
133
            fifo_match = re.match(SNS_TOPIC_NAME_PATTERN_FIFO, name)
1✔
134
            if not fifo_match:
1✔
135
                # TODO: check this with a separate test
136
                raise InvalidParameterException(
×
137
                    "Fifo Topic names must end with .fifo and must be made up of only uppercase and lowercase ASCII letters, numbers, underscores, and hyphens, and must be between 1 and 256 characters long."
138
                )
139
        else:
140
            # AWS does not seem to save explicit settings of fifo = false
141

142
            attributes.pop("FifoTopic", None)
1✔
143
            name_match = re.match(SNS_TOPIC_NAME_PATTERN, name)
1✔
144
            if not name_match:
1✔
145
                raise InvalidParameterException("Invalid parameter: Topic Name")
1✔
146

147
        topic = _create_topic(name=name, attributes=attributes, context=context)
1✔
148
        if tags:
1✔
149
            self.tag_resource(context=context, resource_arn=topic_arn, tags=tags)
1✔
150

151
        store.topics[topic_arn] = topic
1✔
152

153
        return CreateTopicResponse(TopicArn=topic_arn)
1✔
154

155
    def get_topic_attributes(
1✔
156
        self, context: RequestContext, topic_arn: topicARN, **kwargs
157
    ) -> GetTopicAttributesResponse:
158
        topic: Topic = self._get_topic(arn=topic_arn, context=context)
1✔
159
        if topic:
1✔
160
            attributes = topic["attributes"]
1✔
161
            return GetTopicAttributesResponse(Attributes=attributes)
1✔
162
        else:
163
            raise NotFoundException("Topic does not exist")
×
164

165
    def delete_topic(self, context: RequestContext, topic_arn: topicARN, **kwargs) -> None:
1✔
166
        store = self.get_store(context.account_id, context.region)
1✔
167

168
        store.topics.pop(topic_arn, None)
1✔
169

170
    def list_topics(
1✔
171
        self, context: RequestContext, next_token: nextToken | None = None, **kwargs
172
    ) -> ListTopicsResponse:
173
        store = self.get_store(context.account_id, context.region)
1✔
174
        topics = [{"TopicArn": t["arn"]} for t in list(store.topics.values())]
1✔
175
        topics = PaginatedList(topics)
1✔
176
        page, nxt = topics.get_page(
1✔
177
            token_generator=lambda x: get_next_page_token_from_arn(x["TopicArn"]),
178
            next_token=next_token,
179
            page_size=100,
180
        )
181
        topics = {"Topics": page, "NextToken": nxt}
1✔
182
        return ListTopicsResponse(**topics)
1✔
183

184
    def set_topic_attributes(
1✔
185
        self,
186
        context: RequestContext,
187
        topic_arn: topicARN,
188
        attribute_name: attributeName,
189
        attribute_value: attributeValue | None = None,
190
        **kwargs,
191
    ) -> None:
192
        topic: Topic = self._get_topic(arn=topic_arn, context=context)
1✔
193
        if attribute_name == "FifoTopic":
1✔
194
            raise InvalidParameterException("Invalid parameter: AttributeName")
1✔
195
        topic["attributes"][attribute_name] = attribute_value
1✔
196

197
    ## Subscribe operations
198

199
    def subscribe(
1✔
200
        self,
201
        context: RequestContext,
202
        topic_arn: topicARN,
203
        protocol: protocol,
204
        endpoint: endpoint | None = None,
205
        attributes: SubscriptionAttributesMap | None = None,
206
        return_subscription_arn: bool | None = None,
207
        **kwargs,
208
    ) -> SubscribeResponse:
209
        parsed_topic_arn = parse_and_validate_topic_arn(topic_arn)
1✔
210
        if context.region != parsed_topic_arn["region"]:
1✔
211
            raise InvalidParameterException("Invalid parameter: TopicArn")
×
212

213
        store = self.get_store(account_id=parsed_topic_arn["account"], region=context.region)
1✔
214

215
        if topic_arn not in store.topics:
1✔
216
            raise NotFoundException("Topic does not exist")
1✔
217

218
        topic_subscriptions = store.topics[topic_arn]["subscriptions"]
1✔
219
        if not endpoint:
1✔
220
            # TODO: check AWS behaviour (because endpoint is optional)
221
            raise NotFoundException("Endpoint not specified in subscription")
×
222
        if protocol not in sns_constants.SNS_PROTOCOLS:
1✔
223
            raise InvalidParameterException(
1✔
224
                f"Invalid parameter: Amazon SNS does not support this protocol string: {protocol}"
225
            )
226
        elif protocol in ["http", "https"] and not endpoint.startswith(f"{protocol}://"):
1✔
227
            raise InvalidParameterException(
×
228
                "Invalid parameter: Endpoint must match the specified protocol"
229
            )
230
        elif protocol == "sms" and not is_valid_e164_number(endpoint):
1✔
231
            raise InvalidParameterException(f"Invalid SMS endpoint: {endpoint}")
1✔
232

233
        elif protocol == "sqs":
1✔
234
            try:
1✔
235
                parse_arn(endpoint)
1✔
236
            except InvalidArnException:
1✔
237
                raise InvalidParameterException("Invalid parameter: SQS endpoint ARN")
1✔
238

239
        elif protocol == "application":
1✔
240
            # TODO: This needs to be implemented once applications are ported from moto to the new provider
241
            raise NotImplementedError(
242
                "This functionality needs yet to be ported to the new SNS provider"
243
            )
244

245
        if ".fifo" in endpoint and ".fifo" not in topic_arn:
1✔
246
            # TODO: move to sqs protocol block if possible
247
            raise InvalidParameterException(
×
248
                "Invalid parameter: Invalid parameter: Endpoint Reason: FIFO SQS Queues can not be subscribed to standard SNS topics"
249
            )
250

251
        sub_attributes = copy.deepcopy(attributes) if attributes else None
1✔
252
        if sub_attributes:
1✔
253
            for attr_name, attr_value in sub_attributes.items():
1✔
254
                validate_subscription_attribute(
1✔
255
                    attribute_name=attr_name,
256
                    attribute_value=attr_value,
257
                    topic_arn=topic_arn,
258
                    endpoint=endpoint,
259
                    is_subscribe_call=True,
260
                )
261
                if raw_msg_delivery := sub_attributes.get("RawMessageDelivery"):
1✔
262
                    sub_attributes["RawMessageDelivery"] = raw_msg_delivery.lower()
1✔
263

264
        # An endpoint may only be subscribed to a topic once. Subsequent
265
        # subscribe calls do nothing (subscribe is idempotent), except if its attributes are different.
266
        for existing_topic_subscription in topic_subscriptions:
1✔
267
            sub = store.subscriptions.get(existing_topic_subscription, {})
1✔
268
            if sub.get("Endpoint") == endpoint:
1✔
269
                if sub_attributes:
1✔
270
                    # validate the subscription attributes aren't different
271
                    for attr in sns_constants.VALID_SUBSCRIPTION_ATTR_NAME:
1✔
272
                        # if a new attribute is present and different from an existent one, raise
273
                        if (new_attr := sub_attributes.get(attr)) and sub.get(attr) != new_attr:
1✔
274
                            raise InvalidParameterException(
1✔
275
                                "Invalid parameter: Attributes Reason: Subscription already exists with different attributes"
276
                            )
277

278
                return SubscribeResponse(SubscriptionArn=sub["SubscriptionArn"])
1✔
279
        principal = DUMMY_SUBSCRIPTION_PRINCIPAL.format(
1✔
280
            partition=get_partition(context.region), account_id=context.account_id
281
        )
282
        subscription_arn = create_subscription_arn(topic_arn)
1✔
283
        subscription = SnsSubscription(
1✔
284
            # http://docs.aws.amazon.com/cli/latest/reference/sns/get-subscription-attributes.html
285
            TopicArn=topic_arn,
286
            Endpoint=endpoint,
287
            Protocol=protocol,
288
            SubscriptionArn=subscription_arn,
289
            PendingConfirmation="true",
290
            Owner=context.account_id,
291
            RawMessageDelivery="false",  # default value, will be overridden if set
292
            FilterPolicyScope="MessageAttributes",  # default value, will be overridden if set
293
            SubscriptionPrincipal=principal,  # dummy value, could be fetched with a call to STS?
294
        )
295
        if sub_attributes:
1✔
296
            subscription.update(sub_attributes)
1✔
297
            if "FilterPolicy" in sub_attributes:
1✔
298
                filter_policy = (
1✔
299
                    json.loads(sub_attributes["FilterPolicy"])
300
                    if sub_attributes["FilterPolicy"]
301
                    else None
302
                )
303
                if filter_policy:
1✔
304
                    validator = FilterPolicyValidator(
×
305
                        scope=subscription.get("FilterPolicyScope", "MessageAttributes"),
306
                        is_subscribe_call=True,
307
                    )
308
                    validator.validate_filter_policy(filter_policy)
×
309

310
                store.subscription_filter_policy[subscription_arn] = filter_policy
1✔
311

312
        store.subscriptions[subscription_arn] = subscription
1✔
313

314
        topic_subscriptions.append(subscription_arn)
1✔
315

316
        # store the token and subscription arn
317
        # TODO: the token is a 288 hex char string
318
        subscription_token = encode_subscription_token_with_region(region=context.region)
1✔
319
        store.subscription_tokens[subscription_token] = subscription_arn
1✔
320

321
        response_subscription_arn = subscription_arn
1✔
322
        # Send out confirmation message for HTTP(S), fix for https://github.com/localstack/localstack/issues/881
323
        if protocol in ["http", "https"]:
1✔
324
            message_ctx = SnsMessage(
1✔
325
                type=SnsMessageType.SubscriptionConfirmation,
326
                token=subscription_token,
327
                message=f"You have chosen to subscribe to the topic {topic_arn}.\nTo confirm the subscription, visit the SubscribeURL included in this message.",
328
            )
329
            publish_ctx = SnsPublishContext(
1✔
330
                message=message_ctx,
331
                store=store,
332
                request_headers=context.request.headers,
333
                # TODO: add topic attributes once they are ported from moto to LocalStack
334
                # topic_attributes=vars(self._get_topic(topic_arn, context)),
335
            )
336
            self._publisher.publish_to_topic_subscriber(
1✔
337
                ctx=publish_ctx,
338
                topic_arn=topic_arn,
339
                subscription_arn=subscription_arn,
340
            )
341
            if not return_subscription_arn:
1✔
342
                response_subscription_arn = "pending confirmation"
1✔
343

344
        elif protocol not in ["email", "email-json"]:
1✔
345
            # Only HTTP(S) and email subscriptions are not auto validated
346
            # Except if the endpoint and the topic are not in the same AWS account, then you'd need to manually confirm
347
            # the subscription with the token
348
            # TODO: revisit for multi-account
349
            # TODO: test with AWS for email & email-json confirmation message
350
            # we need to add the following check:
351
            # if parsed_topic_arn["account"] == endpoint account (depending on the type, SQS, lambda, parse the arn)
352
            subscription["PendingConfirmation"] = "false"
1✔
353
            subscription["ConfirmationWasAuthenticated"] = "true"
1✔
354

355
        return SubscribeResponse(SubscriptionArn=response_subscription_arn)
1✔
356

357
    def unsubscribe(
1✔
358
        self, context: RequestContext, subscription_arn: subscriptionARN, **kwargs
359
    ) -> None:
360
        if subscription_arn is None:
1✔
361
            raise InvalidParameterException(
1✔
362
                "Invalid parameter: SubscriptionArn Reason: no value for required parameter",
363
            )
364
        count = len(subscription_arn.split(":"))
1✔
365
        try:
1✔
366
            parsed_arn = parse_arn(subscription_arn)
1✔
367
        except InvalidArnException:
1✔
368
            # TODO: check for invalid SubscriptionGUID
369
            raise InvalidParameterException(
1✔
370
                f"Invalid parameter: SubscriptionArn Reason: An ARN must have at least 6 elements, not {count}"
371
            )
372

373
        account_id = parsed_arn["account"]
1✔
374
        region_name = parsed_arn["region"]
1✔
375

376
        store = self.get_store(account_id=account_id, region=region_name)
1✔
377
        if count == 6 and subscription_arn not in store.subscriptions:
1✔
378
            raise InvalidParameterException("Invalid parameter: SubscriptionId")
1✔
379

380
        # TODO: here was a moto_backend.unsubscribe call, check correct functionality and remove this comment
381
        #  before switching to v2 for production
382

383
        # pop the subscription at the end, to avoid race condition by iterating over the topic subscriptions
384
        subscription = store.subscriptions.get(subscription_arn)
1✔
385

386
        if not subscription:
1✔
387
            # unsubscribe is idempotent, so unsubscribing from a non-existing topic does nothing
388
            return
1✔
389

390
        if subscription["Protocol"] in ["http", "https"]:
1✔
391
            # TODO: actually validate this (re)subscribe behaviour somehow (localhost.run?)
392
            #  we might need to save the sub token in the store
393
            # TODO: AWS only sends the UnsubscribeConfirmation if the call is unauthenticated or the requester is not
394
            #  the owner
395
            subscription_token = encode_subscription_token_with_region(region=context.region)
×
396
            message_ctx = SnsMessage(
×
397
                type=SnsMessageType.UnsubscribeConfirmation,
398
                token=subscription_token,
399
                message=f"You have chosen to deactivate subscription {subscription_arn}.\nTo cancel this operation and restore the subscription, visit the SubscribeURL included in this message.",
400
            )
401
            publish_ctx = SnsPublishContext(
×
402
                message=message_ctx,
403
                store=store,
404
                request_headers=context.request.headers,
405
                # TODO: add the topic attributes once we ported them from moto to LocalStack
406
                # topic_attributes=vars(moto_topic),
407
            )
408
            self._publisher.publish_to_topic_subscriber(
×
409
                publish_ctx,
410
                topic_arn=subscription["TopicArn"],
411
                subscription_arn=subscription_arn,
412
            )
413

414
        with contextlib.suppress(KeyError):
1✔
415
            store.topics[subscription["TopicArn"]]["subscriptions"].remove(subscription_arn)
1✔
416
        store.subscription_filter_policy.pop(subscription_arn, None)
1✔
417
        store.subscriptions.pop(subscription_arn, None)
1✔
418

419
    def get_subscription_attributes(
1✔
420
        self, context: RequestContext, subscription_arn: subscriptionARN, **kwargs
421
    ) -> GetSubscriptionAttributesResponse:
422
        store = self.get_store(account_id=context.account_id, region=context.region)
1✔
423
        sub = store.subscriptions.get(subscription_arn)
1✔
424
        if not sub:
1✔
425
            raise NotFoundException("Subscription does not exist")
1✔
426
        removed_attrs = ["sqs_queue_url"]
1✔
427
        if "FilterPolicyScope" in sub and not sub.get("FilterPolicy"):
1✔
428
            removed_attrs.append("FilterPolicyScope")
1✔
429
            removed_attrs.append("FilterPolicy")
1✔
430
        elif "FilterPolicy" in sub and "FilterPolicyScope" not in sub:
×
431
            sub["FilterPolicyScope"] = "MessageAttributes"
×
432

433
        attributes = {k: v for k, v in sub.items() if k not in removed_attrs}
1✔
434
        return GetSubscriptionAttributesResponse(Attributes=attributes)
1✔
435

436
    def set_subscription_attributes(
1✔
437
        self,
438
        context: RequestContext,
439
        subscription_arn: subscriptionARN,
440
        attribute_name: attributeName,
441
        attribute_value: attributeValue = None,
442
        **kwargs,
443
    ) -> None:
444
        store = self.get_store(account_id=context.account_id, region=context.region)
1✔
445
        sub = store.subscriptions.get(subscription_arn)
1✔
446
        if not sub:
1✔
447
            raise NotFoundException("Subscription does not exist")
1✔
448

449
        validate_subscription_attribute(
1✔
450
            attribute_name=attribute_name,
451
            attribute_value=attribute_value,
452
            topic_arn=sub["TopicArn"],
453
            endpoint=sub["Endpoint"],
454
        )
455
        if attribute_name == "RawMessageDelivery":
×
456
            attribute_value = attribute_value.lower()
×
457

458
        elif attribute_name == "FilterPolicy":
×
459
            filter_policy = json.loads(attribute_value) if attribute_value else None
×
460
            if filter_policy:
×
461
                validator = FilterPolicyValidator(
×
462
                    scope=sub.get("FilterPolicyScope", "MessageAttributes"),
463
                    is_subscribe_call=False,
464
                )
465
                validator.validate_filter_policy(filter_policy)
×
466

467
            store.subscription_filter_policy[subscription_arn] = filter_policy
×
468

469
        sub[attribute_name] = attribute_value
×
470

471
    def confirm_subscription(
1✔
472
        self,
473
        context: RequestContext,
474
        topic_arn: topicARN,
475
        token: String,
476
        authenticate_on_unsubscribe: authenticateOnUnsubscribe = None,
477
        **kwargs,
478
    ) -> ConfirmSubscriptionResponse:
479
        # TODO: validate format on the token (seems to be 288 hex chars)
480
        # this request can come from any http client, it might not be signed (we would need to implement
481
        # `authenticate_on_unsubscribe` to force a signing client to do this request.
482
        # so, the region and account_id might not be in the request. Use the ones from the topic_arn
483
        try:
1✔
484
            parsed_arn = parse_arn(topic_arn)
1✔
485
        except InvalidArnException:
×
486
            raise InvalidParameterException("Invalid parameter: Topic")
×
487

488
        store = self.get_store(account_id=parsed_arn["account"], region=parsed_arn["region"])
1✔
489

490
        # it seems SNS is able to know what the region of the topic should be, even though a wrong topic is accepted
491
        if parsed_arn["region"] != get_region_from_subscription_token(token):
1✔
492
            raise InvalidParameterException("Invalid parameter: Topic")
×
493

494
        subscription_arn = store.subscription_tokens.get(token)
×
495
        if not subscription_arn:
×
496
            raise InvalidParameterException("Invalid parameter: Token")
×
497

498
        subscription = store.subscriptions.get(subscription_arn)
×
499
        if not subscription:
×
500
            # subscription could have been deleted in the meantime
501
            raise InvalidParameterException("Invalid parameter: Token")
×
502

503
        # ConfirmSubscription is idempotent
504
        if subscription.get("PendingConfirmation") == "false":
×
505
            return ConfirmSubscriptionResponse(SubscriptionArn=subscription_arn)
×
506

507
        subscription["PendingConfirmation"] = "false"
×
508
        subscription["ConfirmationWasAuthenticated"] = "true"
×
509

510
        return ConfirmSubscriptionResponse(SubscriptionArn=subscription_arn)
×
511

512
    def list_subscriptions(
1✔
513
        self, context: RequestContext, next_token: nextToken = None, **kwargs
514
    ) -> ListSubscriptionsResponse:
515
        store = self.get_store(context.account_id, context.region)
1✔
516
        subscriptions = [
1✔
517
            select_from_typed_dict(Subscription, sub) for sub in list(store.subscriptions.values())
518
        ]
519
        paginated_subscriptions = PaginatedList(subscriptions)
1✔
520
        page, next_token = paginated_subscriptions.get_page(
1✔
521
            token_generator=lambda x: get_next_page_token_from_arn(x["SubscriptionArn"]),
522
            page_size=100,
523
            next_token=next_token,
524
        )
525

526
        response = ListSubscriptionsResponse(Subscriptions=page)
1✔
527
        if next_token:
1✔
528
            response["NextToken"] = next_token
1✔
529
        return response
1✔
530

531
    def list_subscriptions_by_topic(
1✔
532
        self, context: RequestContext, topic_arn: topicARN, next_token: nextToken = None, **kwargs
533
    ) -> ListSubscriptionsByTopicResponse:
534
        topic: Topic = self._get_topic(topic_arn, context)
1✔
535
        parsed_topic_arn = parse_and_validate_topic_arn(topic_arn)
1✔
536
        store = self.get_store(parsed_topic_arn["account"], parsed_topic_arn["region"])
1✔
537
        sub_arns: list[str] = topic.get("subscriptions", [])
1✔
538
        subscriptions = [store.subscriptions[k] for k in sub_arns if k in store.subscriptions]
1✔
539

540
        paginated_subscriptions = PaginatedList(subscriptions)
1✔
541
        page, next_token = paginated_subscriptions.get_page(
1✔
542
            token_generator=lambda x: get_next_page_token_from_arn(x["SubscriptionArn"]),
543
            page_size=100,
544
            next_token=next_token,
545
        )
546

547
        response = ListSubscriptionsResponse(Subscriptions=page)
1✔
548
        if next_token:
1✔
549
            response["NextToken"] = next_token
1✔
550
        return response
1✔
551

552
    #
553
    # PlatformApplications
554
    #
555
    def create_platform_application(
1✔
556
        self,
557
        context: RequestContext,
558
        name: String,
559
        platform: String,
560
        attributes: MapStringToString,
561
        **kwargs,
562
    ) -> CreatePlatformApplicationResponse:
563
        _validate_platform_application_name(name)
1✔
564
        if platform not in VALID_APPLICATION_PLATFORMS:
1✔
565
            raise InvalidParameterException(
1✔
566
                f"Invalid parameter: Platform Reason: {platform} is not supported"
567
            )
568

569
        _validate_platform_application_attributes(attributes)
1✔
570

571
        # attribute validation specific to create_platform_application
572
        if "PlatformCredential" in attributes and "PlatformPrincipal" not in attributes:
1✔
573
            raise InvalidParameterException(
1✔
574
                "Invalid parameter: Attributes Reason: PlatformCredential attribute provided without PlatformPrincipal"
575
            )
576

577
        elif "PlatformPrincipal" in attributes and "PlatformCredential" not in attributes:
1✔
578
            raise InvalidParameterException(
1✔
579
                "Invalid parameter: Attributes Reason: PlatformPrincipal attribute provided without PlatformCredential"
580
            )
581

582
        store = self.get_store(context.account_id, context.region)
1✔
583
        # We are not validating the access data here like AWS does (against ADM and the like)
584
        attributes.pop("PlatformPrincipal")
1✔
585
        attributes.pop("PlatformCredential")
1✔
586
        _attributes = {"Enabled": "true"}
1✔
587
        _attributes.update(attributes)
1✔
588
        application_arn = sns_platform_application_arn(
1✔
589
            platform_application_name=name,
590
            platform=platform,
591
            account_id=context.account_id,
592
            region_name=context.region,
593
        )
594
        platform_application = PlatformApplication(
1✔
595
            PlatformApplicationArn=application_arn, Attributes=_attributes
596
        )
597
        store.platform_applications[application_arn] = platform_application
1✔
598
        return CreatePlatformApplicationResponse(**platform_application)
1✔
599

600
    def delete_platform_application(
1✔
601
        self, context: RequestContext, platform_application_arn: String, **kwargs
602
    ) -> None:
603
        store = self.get_store(context.account_id, context.region)
1✔
604
        store.platform_applications.pop(platform_application_arn, None)
1✔
605

606
    def list_platform_applications(
1✔
607
        self, context: RequestContext, next_token: String | None = None, **kwargs
608
    ) -> ListPlatformApplicationsResponse:
609
        store = self.get_store(context.account_id, context.region)
1✔
610
        platform_applications = store.platform_applications.values()
1✔
611
        paginated_applications = PaginatedList(platform_applications)
1✔
612
        page, token = paginated_applications.get_page(
1✔
613
            token_generator=lambda x: get_next_page_token_from_arn(x["PlatformApplicationArn"]),
614
            page_size=100,
615
            next_token=next_token,
616
        )
617

618
        response = ListPlatformApplicationsResponse(PlatformApplications=page)
1✔
619
        if token:
1✔
NEW
620
            response["NextToken"] = token
×
621
        return response
1✔
622

623
    def get_platform_application_attributes(
1✔
624
        self, context: RequestContext, platform_application_arn: String, **kwargs
625
    ) -> GetPlatformApplicationAttributesResponse:
626
        platform_application = self._get_platform_application(platform_application_arn, context)
1✔
627
        attributes = platform_application["Attributes"]
1✔
628
        return GetPlatformApplicationAttributesResponse(Attributes=attributes)
1✔
629

630
    def set_platform_application_attributes(
1✔
631
        self,
632
        context: RequestContext,
633
        platform_application_arn: String,
634
        attributes: MapStringToString,
635
        **kwargs,
636
    ) -> None:
637
        parse_and_validate_platform_application_arn(platform_application_arn)
1✔
638
        _validate_platform_application_attributes(attributes)
1✔
639

640
        platform_application = self._get_platform_application(platform_application_arn, context)
1✔
641
        platform_application["Attributes"].update(attributes)
1✔
642

643
    #
644
    # Platform Endpoints
645
    #
646

647
    def list_endpoints_by_platform_application(
1✔
648
        self,
649
        context: RequestContext,
650
        platform_application_arn: String,
651
        next_token: String | None = None,
652
        **kwargs,
653
    ) -> ListEndpointsByPlatformApplicationResponse:
654
        # TODO: stub so cleanup fixture won't fail
655
        return ListEndpointsByPlatformApplicationResponse(Endpoints=[])
1✔
656

657
    #
658
    # Sms operations
659
    #
660

661
    def set_sms_attributes(
1✔
662
        self, context: RequestContext, attributes: MapStringToString, **kwargs
663
    ) -> SetSMSAttributesResponse:
664
        store = self.get_store(context.account_id, context.region)
1✔
665
        _validate_sms_attributes(attributes)
1✔
666
        _set_sms_attribute_default(store)
1✔
667
        store.sms_attributes.update(attributes or {})
1✔
668
        return SetSMSAttributesResponse()
1✔
669

670
    def get_sms_attributes(
1✔
671
        self, context: RequestContext, attributes: ListString | None = None, **kwargs
672
    ) -> GetSMSAttributesResponse:
673
        store = self.get_store(context.account_id, context.region)
1✔
674
        _set_sms_attribute_default(store)
1✔
675
        store_attributes = store.sms_attributes
1✔
676
        return_attributes = {}
1✔
677
        for k, v in store_attributes.items():
1✔
678
            if not attributes or k in attributes:
1✔
679
                return_attributes[k] = store_attributes[k]
1✔
680

681
        return GetSMSAttributesResponse(attributes=return_attributes)
1✔
682

683
    def list_tags_for_resource(
1✔
684
        self, context: RequestContext, resource_arn: AmazonResourceName, **kwargs
685
    ) -> ListTagsForResourceResponse:
686
        store = sns_stores[context.account_id][context.region]
1✔
687
        tags = store.TAGS.list_tags_for_resource(resource_arn)
1✔
688
        return ListTagsForResourceResponse(Tags=tags.get("Tags"))
1✔
689

690
    def tag_resource(
1✔
691
        self, context: RequestContext, resource_arn: AmazonResourceName, tags: TagList, **kwargs
692
    ) -> TagResourceResponse:
693
        unique_tag_keys = {tag["Key"] for tag in tags}
1✔
694
        if len(unique_tag_keys) < len(tags):
1✔
695
            raise InvalidParameterException("Invalid parameter: Duplicated keys are not allowed.")
1✔
696
        store = sns_stores[context.account_id][context.region]
1✔
697
        store.TAGS.tag_resource(resource_arn, tags)
1✔
698
        return TagResourceResponse()
1✔
699

700
    def untag_resource(
1✔
701
        self,
702
        context: RequestContext,
703
        resource_arn: AmazonResourceName,
704
        tag_keys: TagKeyList,
705
        **kwargs,
706
    ) -> UntagResourceResponse:
707
        store = sns_stores[context.account_id][context.region]
1✔
708
        store.TAGS.untag_resource(resource_arn, tag_keys)
1✔
709
        return UntagResourceResponse()
1✔
710

711
    @staticmethod
1✔
712
    def get_store(account_id: str, region: str) -> SnsStore:
1✔
713
        return sns_stores[account_id][region]
1✔
714

715
    # TODO: reintroduce multi-region parameter (latest before final migration from v1)
716
    @staticmethod
1✔
717
    def _get_topic(arn: str, context: RequestContext) -> Topic:
1✔
718
        """
719
        :param arn: the Topic ARN
720
        :param context: the RequestContext of the request
721
        :return: the model Topic
722
        """
723
        arn_data = parse_and_validate_topic_arn(arn)
1✔
724
        if context.region != arn_data["region"]:
1✔
725
            raise InvalidParameterException("Invalid parameter: TopicArn")
×
726
        try:
1✔
727
            store = SnsProvider.get_store(context.account_id, context.region)
1✔
728
            return store.topics[arn]
1✔
729
        except KeyError:
1✔
730
            raise NotFoundException("Topic does not exist")
1✔
731

732
    @staticmethod
1✔
733
    def _get_platform_application(
1✔
734
        platform_application_arn: str, context: RequestContext
735
    ) -> PlatformApplication:
736
        parse_and_validate_platform_application_arn(platform_application_arn)
1✔
737
        try:
1✔
738
            store = SnsProvider.get_store(context.account_id, context.region)
1✔
739
            return store.platform_applications[platform_application_arn]
1✔
740
        except KeyError:
1✔
741
            raise NotFoundException("PlatformApplication does not exist")
1✔
742

743

744
def _create_topic(name: str, attributes: dict, context: RequestContext) -> Topic:
1✔
745
    topic_arn = sns_topic_arn(
1✔
746
        topic_name=name, region_name=context.region, account_id=context.account_id
747
    )
748
    topic: Topic = {
1✔
749
        "name": name,
750
        "arn": topic_arn,
751
        "attributes": {},
752
        "subscriptions": [],
753
    }
754
    attrs = _default_attributes(topic, context)
1✔
755
    attrs.update(attributes or {})
1✔
756
    topic["attributes"] = attrs
1✔
757

758
    return topic
1✔
759

760

761
def _default_attributes(topic: Topic, context: RequestContext) -> TopicAttributesMap:
1✔
762
    default_attributes = {
1✔
763
        "DisplayName": "",
764
        "Owner": context.account_id,
765
        "Policy": _create_default_topic_policy(topic, context),
766
        "SubscriptionsConfirmed": "0",
767
        "SubscriptionsDeleted": "0",
768
        "SubscriptionsPending": "0",
769
        "TopicArn": topic["arn"],
770
    }
771
    if topic["name"].endswith(".fifo"):
1✔
772
        default_attributes.update(
1✔
773
            {
774
                "ContentBasedDeduplication": "false",
775
                "FifoTopic": "false",
776
                "SignatureVersion": "2",
777
            }
778
        )
779
    return default_attributes
1✔
780

781

782
def _create_default_topic_policy(topic: Topic, context: RequestContext) -> str:
1✔
783
    return json.dumps(
1✔
784
        {
785
            "Version": "2008-10-17",
786
            "Id": "__default_policy_ID",
787
            "Statement": [
788
                {
789
                    "Effect": "Allow",
790
                    "Sid": "__default_statement_ID",
791
                    "Principal": {"AWS": "*"},
792
                    "Action": [
793
                        "SNS:GetTopicAttributes",
794
                        "SNS:SetTopicAttributes",
795
                        "SNS:AddPermission",
796
                        "SNS:RemovePermission",
797
                        "SNS:DeleteTopic",
798
                        "SNS:Subscribe",
799
                        "SNS:ListSubscriptionsByTopic",
800
                        "SNS:Publish",
801
                    ],
802
                    "Resource": topic["arn"],
803
                    "Condition": {"StringEquals": {"AWS:SourceOwner": context.account_id}},
804
                }
805
            ],
806
        }
807
    )
808

809

810
def _validate_platform_application_name(name: str) -> None:
1✔
811
    reason = ""
1✔
812
    if not name:
1✔
813
        reason = "cannot be empty"
1✔
814
    elif not re.match(r"^.{0,256}$", name):
1✔
815
        reason = "must be at most 256 characters long"
1✔
816
    elif not re.match(r"^[A-Za-z0-9._-]+$", name):
1✔
817
        reason = "must contain only characters 'a'-'z', 'A'-'Z', '0'-'9', '_', '-', and '.'"
1✔
818

819
    if reason:
1✔
820
        raise InvalidParameterException(f"Invalid parameter: {name} Reason: {reason}")
1✔
821

822

823
def _validate_platform_application_attributes(attributes: dict) -> None:
1✔
824
    if not attributes:
1✔
825
        raise CommonServiceException(
1✔
826
            code="ValidationError",
827
            message="1 validation error detected: Value null at 'attributes' failed to satisfy constraint: Member must not be null",
828
            sender_fault=True,
829
        )
830

831

832
def _validate_sms_attributes(attributes: dict) -> None:
1✔
833
    for k, v in attributes.items():
1✔
834
        if k not in SMS_ATTRIBUTE_NAMES:
1✔
835
            raise InvalidParameterException(f"{k} is not a valid attribute")
1✔
836
    default_send_id = attributes.get("DefaultSendID")
1✔
837
    if default_send_id and not re.match(SMS_DEFAULT_SENDER_REGEX, default_send_id):
1✔
838
        raise InvalidParameterException("DefaultSendID is not a valid attribute")
×
839
    sms_type = attributes.get("DefaultSMSType")
1✔
840
    if sms_type and sms_type not in SMS_TYPES:
1✔
841
        raise InvalidParameterException("DefaultSMSType is invalid")
1✔
842

843

844
def _set_sms_attribute_default(store: SnsStore) -> None:
1✔
845
    # TODO: don't call this on every sms attribute crud api call
846
    store.sms_attributes.setdefault("MonthlySpendLimit", "1")
1✔
847

848

849
def _check_matching_tags(topic_arn: str, tags: TagList | None, store: SnsStore) -> bool:
1✔
850
    """
851
    Checks if a topic to be created doesn't already exist with different tags
852
    :param topic_arn: Arn of the topic
853
    :param tags: Tags to be checked
854
    :param store: Store object that holds the topics and tags
855
    :return: False if there is a mismatch in tags, True otherwise
856
    """
857
    existing_tags = store.TAGS.list_tags_for_resource(topic_arn)["Tags"]
1✔
858
    # if this is none there is nothing to check
859
    if topic_arn in store.topics:
1✔
860
        if tags is None:
1✔
861
            tags = []
1✔
862
        for tag in tags:
1✔
863
            # this means topic already created with empty tags and when we try to create it
864
            # again with other tag value then it should fail according to aws documentation.
865
            if existing_tags is not None and tag not in existing_tags:
1✔
866
                return False
1✔
867
    return True
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc