• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19880423371

02 Dec 2025 08:25PM UTC coverage: 86.905% (-0.04%) from 86.945%
19880423371

push

github

web-flow
fix/external client CA bundle (#13451)

1 of 5 new or added lines in 1 file covered. (20.0%)

414 existing lines in 19 files now uncovered.

69738 of 80246 relevant lines covered (86.91%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.59
/localstack-core/localstack/services/s3control/provider.py
1
from localstack.aws.api import CommonServiceException, RequestContext
1✔
2
from localstack.aws.api.s3control import (
1✔
3
    AccountId,
4
    ListTagsForResourceResult,
5
    S3ControlApi,
6
    S3ResourceArn,
7
    TagKeyList,
8
    TagList,
9
    TagResourceResult,
10
    UntagResourceResult,
11
)
12
from localstack.aws.forwarder import NotImplementedAvoidFallbackError
1✔
13
from localstack.services.s3.models import s3_stores
1✔
14
from localstack.services.s3control.validation import validate_tags
1✔
15
from localstack.state import StateVisitor
1✔
16
from localstack.utils.tagging import TaggingService
1✔
17

18

19
class NoSuchResource(CommonServiceException):
1✔
20
    def __init__(self, message=None):
1✔
21
        super().__init__("NoSuchResource", status_code=404, message=message)
1✔
22

23

24
class S3ControlProvider(S3ControlApi):
1✔
25
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
UNCOV
26
        from moto.s3control.models import s3control_backends
×
27

UNCOV
28
        visitor.visit(s3control_backends)
×
29

30
    """
1✔
31
    S3Control is a management interface for S3, and can access some of its internals with no public API
32
    This requires us to access the s3 stores directly
33
    """
34

35
    @staticmethod
1✔
36
    def _get_tagging_service_for_bucket(
1✔
37
        resource_arn: S3ResourceArn,
38
        partition: str,
39
        region: str,
40
        account_id: str,
41
    ) -> TaggingService:
42
        s3_prefix = f"arn:{partition}:s3:::"
1✔
43
        if not resource_arn.startswith(s3_prefix):
1✔
44
            # Moto does not support Tagging operations for S3 Control, so we should not forward those operations back
45
            # to it
46
            raise NotImplementedAvoidFallbackError(
47
                "LocalStack only support Bucket tagging operations for S3Control"
48
            )
49

50
        store = s3_stores[account_id][region]
1✔
51
        bucket_name = resource_arn.removeprefix(s3_prefix)
1✔
52
        if bucket_name not in store.global_bucket_map:
1✔
53
            raise NoSuchResource("The specified resource doesn't exist.")
1✔
54

55
        return store.TAGS
1✔
56

57
    def tag_resource(
1✔
58
        self,
59
        context: RequestContext,
60
        account_id: AccountId,
61
        resource_arn: S3ResourceArn,
62
        tags: TagList,
63
        **kwargs,
64
    ) -> TagResourceResult:
65
        # currently S3Control only supports tagging buckets
66
        tagging_service = self._get_tagging_service_for_bucket(
1✔
67
            resource_arn=resource_arn,
68
            partition=context.partition,
69
            region=context.region,
70
            account_id=account_id,
71
        )
72

73
        validate_tags(tags=tags)
1✔
74
        tagging_service.tag_resource(resource_arn, tags)
1✔
75

76
        return TagResourceResult()
1✔
77

78
    def untag_resource(
1✔
79
        self,
80
        context: RequestContext,
81
        account_id: AccountId,
82
        resource_arn: S3ResourceArn,
83
        tag_keys: TagKeyList,
84
        **kwargs,
85
    ) -> UntagResourceResult:
86
        # currently S3Control only supports tagging buckets
87
        tagging_service = self._get_tagging_service_for_bucket(
1✔
88
            resource_arn=resource_arn,
89
            partition=context.partition,
90
            region=context.region,
91
            account_id=account_id,
92
        )
93

94
        tagging_service.untag_resource(resource_arn, tag_keys)
1✔
95

96
        return TagResourceResult()
1✔
97

98
    def list_tags_for_resource(
1✔
99
        self, context: RequestContext, account_id: AccountId, resource_arn: S3ResourceArn, **kwargs
100
    ) -> ListTagsForResourceResult:
101
        # currently S3Control only supports tagging buckets
102
        tagging_service = self._get_tagging_service_for_bucket(
1✔
103
            resource_arn=resource_arn,
104
            partition=context.partition,
105
            region=context.region,
106
            account_id=account_id,
107
        )
108

109
        tags = tagging_service.list_tags_for_resource(resource_arn)
1✔
110
        return ListTagsForResourceResult(Tags=tags["Tags"])
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc