• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 21810945741

06 Feb 2026 05:38PM UTC coverage: 86.871% (-0.01%) from 86.883%
21810945741

push

github

web-flow
APIGW: fix model typing / update custom id logic (#13694)

5 of 5 new or added lines in 2 files covered. (100.0%)

70 existing lines in 4 files now uncovered.

69960 of 80533 relevant lines covered (86.87%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.69
/localstack-core/localstack/services/lambda_/invocation/internal_sqs_queue.py
1
import logging
1✔
2
import threading
1✔
3
from collections.abc import Iterable
1✔
4

5
from localstack import config
1✔
6
from localstack.aws.api.sqs import (
1✔
7
    AttributeNameList,
8
    CreateQueueResult,
9
    GetQueueAttributesResult,
10
    Message,
11
    MessageAttributeNameList,
12
    MessageBodyAttributeMap,
13
    MessageBodySystemAttributeMap,
14
    MessageSystemAttributeName,
15
    NullableInteger,
16
    QueueAttributeMap,
17
    ReceiveMessageResult,
18
    SendMessageResult,
19
    String,
20
    TagMap,
21
)
22
from localstack.services.sqs.models import SqsQueue, StandardQueue, to_sqs_api_message
1✔
23
from localstack.services.sqs.provider import QueueUpdateWorker
1✔
24
from localstack.services.sqs.utils import create_message_attribute_hash, generate_message_id
1✔
25
from localstack.utils.objects import singleton_factory
1✔
26
from localstack.utils.strings import md5
1✔
27
from localstack.utils.time import now
1✔
28

29
LOG = logging.getLogger(__name__)
1✔
30

31

32
class EventQueueUpdateWorker(QueueUpdateWorker):
1✔
33
    """
34
    Regularly re-queues inflight and delayed messages whose visibility timeout has expired or delay deadline has been
35
    reached.
36
    """
37

38
    def __init__(self) -> None:
1✔
39
        super().__init__()
1✔
40
        self.queues = []
1✔
41

42
    def add_queue(self, queue: SqsQueue):
1✔
43
        self.queues.append(queue)
1✔
44

45
    def remove_queue(self, queue: SqsQueue):
1✔
46
        self.queues.remove(queue)
1✔
47

48
    def iter_queues(self) -> Iterable[SqsQueue]:
1✔
49
        return iter(self.queues)
1✔
50

51

52
class QueueManager:
1✔
53
    queues: dict[str, StandardQueue]
1✔
54
    queue_lock: threading.RLock
1✔
55
    queue_update_worker: EventQueueUpdateWorker
1✔
56

57
    def __init__(self):
1✔
58
        self.queues = {}
1✔
59
        # lock for handling queue lifecycle and avoiding duplicates
60
        self.queue_lock = threading.RLock()
1✔
61
        self.queue_update_worker = EventQueueUpdateWorker()
1✔
62

63
    def start(self):
1✔
64
        self.queue_update_worker.start()
1✔
65

66
    def stop(self):
1✔
67
        self.queue_update_worker.stop()
×
68

69
    def get_queue(self, queue_name: str):
1✔
70
        if queue_name not in self.queues:
1✔
UNCOV
71
            raise ValueError("Queue not available")
×
72
        return self.queues[queue_name]
1✔
73

74
    def create_queue(self, queue_name: str) -> SqsQueue:
1✔
75
        """
76
        Creates a queue.
77
        :param queue_name: Queue name, has to be unique
78
        :return: Queue Object
79
        """
80
        with self.queue_lock:
1✔
81
            if queue_name in self.queues:
1✔
82
                return self.queues[queue_name]
1✔
83

84
            queue = StandardQueue(
1✔
85
                name=queue_name,
86
                region="us-east-1",
87
                account_id=config.INTERNAL_RESOURCE_ACCOUNT,
88
            )
89
            self.queues[queue_name] = queue
1✔
90
            self.queue_update_worker.add_queue(queue)
1✔
91
        return queue
1✔
92

93
    def delete_queue(self, queue_name: str) -> None:
1✔
94
        with self.queue_lock:
1✔
95
            if queue_name not in self.queues:
1✔
96
                raise ValueError(f"Queue '{queue_name}' not available")
×
97

98
            queue = self.queues.pop(queue_name)
1✔
99
            self.queue_update_worker.remove_queue(queue)
1✔
100

101

102
class FakeSqsClient:
1✔
103
    def __init__(self, queue_manager: QueueManager):
1✔
104
        self.queue_manager = queue_manager
1✔
105

106
    def create_queue(
1✔
107
        self, QueueName: String, Attributes: QueueAttributeMap = None, tags: TagMap = None
108
    ) -> CreateQueueResult:
109
        self.queue_manager.create_queue(queue_name=QueueName)
1✔
110
        return {"QueueUrl": QueueName}
1✔
111

112
    def delete_queue(self, QueueUrl: String) -> None:
1✔
113
        self.queue_manager.delete_queue(queue_name=QueueUrl)
1✔
114

115
    def get_queue_attributes(
1✔
116
        self, QueueUrl: String, AttributeNames: AttributeNameList = None
117
    ) -> GetQueueAttributesResult:
118
        queue = self.queue_manager.get_queue(queue_name=QueueUrl)
×
119
        result = queue.get_queue_attributes(AttributeNames)
×
120
        return {"Attributes": result}
×
121

122
    def purge_queue(self, QueueUrl: String) -> None:
1✔
123
        queue = self.queue_manager.get_queue(queue_name=QueueUrl)
×
124
        queue.clear()
×
125

126
    def receive_message(
1✔
127
        self,
128
        QueueUrl: String,
129
        AttributeNames: AttributeNameList = None,
130
        MessageAttributeNames: MessageAttributeNameList = None,
131
        MaxNumberOfMessages: NullableInteger = None,
132
        VisibilityTimeout: NullableInteger = None,
133
        WaitTimeSeconds: NullableInteger = None,
134
        ReceiveRequestAttemptId: String = None,
135
    ) -> ReceiveMessageResult:
136
        queue = self.queue_manager.get_queue(queue_name=QueueUrl)
1✔
137
        num = MaxNumberOfMessages or 1
1✔
138
        result = queue.receive(
1✔
139
            num_messages=num,
140
            visibility_timeout=VisibilityTimeout,
141
            wait_time_seconds=WaitTimeSeconds,
142
        )
143

144
        messages = []
1✔
145
        for i, standard_message in enumerate(result.successful):
1✔
146
            message = to_sqs_api_message(standard_message, AttributeNames, MessageAttributeNames)
1✔
147
            message["ReceiptHandle"] = result.receipt_handles[i]
1✔
148
            messages.append(message)
1✔
149

150
        return {"Messages": messages if messages else None}
1✔
151

152
    def delete_message(self, QueueUrl: String, ReceiptHandle: String) -> None:
1✔
153
        queue = self.queue_manager.get_queue(queue_name=QueueUrl)
1✔
154
        queue.remove(ReceiptHandle)
1✔
155

156
    def _create_message_attributes(
1✔
157
        self,
158
        message_system_attributes: MessageBodySystemAttributeMap = None,
159
    ) -> dict[str, str]:
160
        result = {
1✔
161
            MessageSystemAttributeName.SenderId: config.INTERNAL_RESOURCE_ACCOUNT,  # not the account ID in AWS
162
            MessageSystemAttributeName.SentTimestamp: str(now(millis=True)),
163
        }
164

165
        if message_system_attributes is not None:
1✔
166
            for attr in message_system_attributes:
×
167
                result[attr] = message_system_attributes[attr]["StringValue"]
×
168

169
        return result
1✔
170

171
    def send_message(
1✔
172
        self,
173
        QueueUrl: String,
174
        MessageBody: String,
175
        DelaySeconds: NullableInteger = None,
176
        MessageAttributes: MessageBodyAttributeMap = None,
177
        MessageSystemAttributes: MessageBodySystemAttributeMap = None,
178
        MessageDeduplicationId: String = None,
179
        MessageGroupId: String = None,
180
    ) -> SendMessageResult:
181
        queue = self.queue_manager.get_queue(queue_name=QueueUrl)
1✔
182

183
        message = Message(
1✔
184
            MessageId=generate_message_id(),
185
            MD5OfBody=md5(MessageBody),
186
            Body=MessageBody,
187
            Attributes=self._create_message_attributes(MessageSystemAttributes),
188
            MD5OfMessageAttributes=create_message_attribute_hash(MessageAttributes),
189
            MessageAttributes=MessageAttributes,
190
        )
191
        queue_item = queue.put(
1✔
192
            message=message,
193
            message_deduplication_id=MessageDeduplicationId,
194
            message_group_id=MessageGroupId,
195
            delay_seconds=int(DelaySeconds) if DelaySeconds is not None else None,
196
        )
197
        message = queue_item.message
1✔
198
        return {
1✔
199
            "MessageId": message["MessageId"],
200
            "MD5OfMessageBody": message["MD5OfBody"],
201
            "MD5OfMessageAttributes": message.get("MD5OfMessageAttributes"),
202
            "SequenceNumber": queue_item.sequence_number,
203
            "MD5OfMessageSystemAttributes": create_message_attribute_hash(MessageSystemAttributes),
204
        }
205

206

207
@singleton_factory
1✔
208
def get_fake_sqs_client():
1✔
209
    queue_manager = QueueManager()
1✔
210
    queue_manager.start()
1✔
211
    return FakeSqsClient(queue_manager)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc