• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 17964544913

23 Sep 2025 07:08PM UTC coverage: 86.885% (+0.02%) from 86.864%
17964544913

push

github

web-flow
S3: fix DeleteObjectTagging on current object (#13174)

1 of 1 new or added line in 1 file covered. (100.0%)

129 existing lines in 7 files now uncovered.

67738 of 77963 relevant lines covered (86.88%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.69
/localstack-core/localstack/services/lambda_/invocation/internal_sqs_queue.py
1
import logging
1✔
2
import threading
1✔
3
from collections.abc import Iterable
1✔
4

5
from localstack import config
1✔
6
from localstack.aws.api.sqs import (
1✔
7
    AttributeNameList,
8
    CreateQueueResult,
9
    GetQueueAttributesResult,
10
    Message,
11
    MessageAttributeNameList,
12
    MessageBodyAttributeMap,
13
    MessageBodySystemAttributeMap,
14
    MessageSystemAttributeName,
15
    NullableInteger,
16
    QueueAttributeMap,
17
    ReceiveMessageResult,
18
    SendMessageResult,
19
    String,
20
    TagMap,
21
)
22
from localstack.services.sqs.models import SqsQueue, StandardQueue
1✔
23
from localstack.services.sqs.provider import (
1✔
24
    QueueUpdateWorker,
25
    _create_message_attribute_hash,
26
    to_sqs_api_message,
27
)
28
from localstack.services.sqs.utils import generate_message_id
1✔
29
from localstack.utils.objects import singleton_factory
1✔
30
from localstack.utils.strings import md5
1✔
31
from localstack.utils.time import now
1✔
32

33
LOG = logging.getLogger(__name__)
1✔
34

35

36
class EventQueueUpdateWorker(QueueUpdateWorker):
1✔
37
    """
38
    Regularly re-queues inflight and delayed messages whose visibility timeout has expired or delay deadline has been
39
    reached.
40
    """
41

42
    def __init__(self) -> None:
1✔
43
        super().__init__()
1✔
44
        self.queues = []
1✔
45

46
    def add_queue(self, queue: SqsQueue):
1✔
47
        self.queues.append(queue)
1✔
48

49
    def remove_queue(self, queue: SqsQueue):
1✔
50
        self.queues.remove(queue)
1✔
51

52
    def iter_queues(self) -> Iterable[SqsQueue]:
1✔
53
        return iter(self.queues)
1✔
54

55

56
class QueueManager:
1✔
57
    queues: dict[str, StandardQueue]
1✔
58
    queue_lock: threading.RLock
1✔
59
    queue_update_worker: EventQueueUpdateWorker
1✔
60

61
    def __init__(self):
1✔
62
        self.queues = {}
1✔
63
        # lock for handling queue lifecycle and avoiding duplicates
64
        self.queue_lock = threading.RLock()
1✔
65
        self.queue_update_worker = EventQueueUpdateWorker()
1✔
66

67
    def start(self):
1✔
68
        self.queue_update_worker.start()
1✔
69

70
    def stop(self):
1✔
71
        self.queue_update_worker.stop()
×
72

73
    def get_queue(self, queue_name: str):
1✔
74
        if queue_name not in self.queues:
1✔
UNCOV
75
            raise ValueError("Queue not available")
×
76
        return self.queues[queue_name]
1✔
77

78
    def create_queue(self, queue_name: str) -> SqsQueue:
1✔
79
        """
80
        Creates a queue.
81
        :param queue_name: Queue name, has to be unique
82
        :return: Queue Object
83
        """
84
        with self.queue_lock:
1✔
85
            if queue_name in self.queues:
1✔
86
                return self.queues[queue_name]
1✔
87

88
            queue = StandardQueue(
1✔
89
                name=queue_name,
90
                region="us-east-1",
91
                account_id=config.INTERNAL_RESOURCE_ACCOUNT,
92
            )
93
            self.queues[queue_name] = queue
1✔
94
            self.queue_update_worker.add_queue(queue)
1✔
95
        return queue
1✔
96

97
    def delete_queue(self, queue_name: str) -> None:
1✔
98
        with self.queue_lock:
1✔
99
            if queue_name not in self.queues:
1✔
100
                raise ValueError(f"Queue '{queue_name}' not available")
×
101

102
            queue = self.queues.pop(queue_name)
1✔
103
            self.queue_update_worker.remove_queue(queue)
1✔
104

105

106
class FakeSqsClient:
1✔
107
    def __init__(self, queue_manager: QueueManager):
1✔
108
        self.queue_manager = queue_manager
1✔
109

110
    def create_queue(
1✔
111
        self, QueueName: String, Attributes: QueueAttributeMap = None, tags: TagMap = None
112
    ) -> CreateQueueResult:
113
        self.queue_manager.create_queue(queue_name=QueueName)
1✔
114
        return {"QueueUrl": QueueName}
1✔
115

116
    def delete_queue(self, QueueUrl: String) -> None:
1✔
117
        self.queue_manager.delete_queue(queue_name=QueueUrl)
1✔
118

119
    def get_queue_attributes(
1✔
120
        self, QueueUrl: String, AttributeNames: AttributeNameList = None
121
    ) -> GetQueueAttributesResult:
122
        queue = self.queue_manager.get_queue(queue_name=QueueUrl)
×
123
        result = queue.get_queue_attributes(AttributeNames)
×
124
        return {"Attributes": result}
×
125

126
    def purge_queue(self, QueueUrl: String) -> None:
1✔
127
        queue = self.queue_manager.get_queue(queue_name=QueueUrl)
×
128
        queue.clear()
×
129

130
    def receive_message(
1✔
131
        self,
132
        QueueUrl: String,
133
        AttributeNames: AttributeNameList = None,
134
        MessageAttributeNames: MessageAttributeNameList = None,
135
        MaxNumberOfMessages: NullableInteger = None,
136
        VisibilityTimeout: NullableInteger = None,
137
        WaitTimeSeconds: NullableInteger = None,
138
        ReceiveRequestAttemptId: String = None,
139
    ) -> ReceiveMessageResult:
140
        queue = self.queue_manager.get_queue(queue_name=QueueUrl)
1✔
141
        num = MaxNumberOfMessages or 1
1✔
142
        result = queue.receive(
1✔
143
            num_messages=num,
144
            visibility_timeout=VisibilityTimeout,
145
            wait_time_seconds=WaitTimeSeconds,
146
        )
147

148
        messages = []
1✔
149
        for i, standard_message in enumerate(result.successful):
1✔
150
            message = to_sqs_api_message(standard_message, AttributeNames, MessageAttributeNames)
1✔
151
            message["ReceiptHandle"] = result.receipt_handles[i]
1✔
152
            messages.append(message)
1✔
153

154
        return {"Messages": messages if messages else None}
1✔
155

156
    def delete_message(self, QueueUrl: String, ReceiptHandle: String) -> None:
1✔
157
        queue = self.queue_manager.get_queue(queue_name=QueueUrl)
1✔
158
        queue.remove(ReceiptHandle)
1✔
159

160
    def _create_message_attributes(
1✔
161
        self,
162
        message_system_attributes: MessageBodySystemAttributeMap = None,
163
    ) -> dict[str, str]:
164
        result = {
1✔
165
            MessageSystemAttributeName.SenderId: config.INTERNAL_RESOURCE_ACCOUNT,  # not the account ID in AWS
166
            MessageSystemAttributeName.SentTimestamp: str(now(millis=True)),
167
        }
168

169
        if message_system_attributes is not None:
1✔
170
            for attr in message_system_attributes:
×
171
                result[attr] = message_system_attributes[attr]["StringValue"]
×
172

173
        return result
1✔
174

175
    def send_message(
1✔
176
        self,
177
        QueueUrl: String,
178
        MessageBody: String,
179
        DelaySeconds: NullableInteger = None,
180
        MessageAttributes: MessageBodyAttributeMap = None,
181
        MessageSystemAttributes: MessageBodySystemAttributeMap = None,
182
        MessageDeduplicationId: String = None,
183
        MessageGroupId: String = None,
184
    ) -> SendMessageResult:
185
        queue = self.queue_manager.get_queue(queue_name=QueueUrl)
1✔
186

187
        message = Message(
1✔
188
            MessageId=generate_message_id(),
189
            MD5OfBody=md5(MessageBody),
190
            Body=MessageBody,
191
            Attributes=self._create_message_attributes(MessageSystemAttributes),
192
            MD5OfMessageAttributes=_create_message_attribute_hash(MessageAttributes),
193
            MessageAttributes=MessageAttributes,
194
        )
195
        queue_item = queue.put(
1✔
196
            message=message,
197
            message_deduplication_id=MessageDeduplicationId,
198
            message_group_id=MessageGroupId,
199
            delay_seconds=int(DelaySeconds) if DelaySeconds is not None else None,
200
        )
201
        message = queue_item.message
1✔
202
        return {
1✔
203
            "MessageId": message["MessageId"],
204
            "MD5OfMessageBody": message["MD5OfBody"],
205
            "MD5OfMessageAttributes": message.get("MD5OfMessageAttributes"),
206
            "SequenceNumber": queue_item.sequence_number,
207
            "MD5OfMessageSystemAttributes": _create_message_attribute_hash(MessageSystemAttributes),
208
        }
209

210

211
@singleton_factory
1✔
212
def get_fake_sqs_client():
1✔
213
    queue_manager = QueueManager()
1✔
214
    queue_manager.start()
1✔
215
    return FakeSqsClient(queue_manager)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc