• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 21575916503

30 Jan 2026 10:27AM UTC coverage: 86.969% (+0.007%) from 86.962%
21575916503

push

github

web-flow
Admin: Add typehints to utils/strings and utils/threads (#13658)

39 of 42 new or added lines in 3 files covered. (92.86%)

27 existing lines in 1 file now uncovered.

70391 of 80938 relevant lines covered (86.97%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.95
/localstack-core/localstack/services/sqs/models.py
1
import copy
1✔
2
import hashlib
1✔
3
import heapq
1✔
4
import inspect
1✔
5
import json
1✔
6
import logging
1✔
7
import re
1✔
8
import threading
1✔
9
import time
1✔
10
from datetime import datetime
1✔
11
from queue import Empty
1✔
12

13
from localstack import config
1✔
14
from localstack.aws.api import RequestContext
1✔
15
from localstack.aws.api.sqs import (
1✔
16
    AttributeNameList,
17
    InvalidAttributeName,
18
    Message,
19
    MessageAttributeNameList,
20
    MessageSystemAttributeName,
21
    QueueAttributeMap,
22
    QueueAttributeName,
23
    ReceiptHandleIsInvalid,
24
    TagMap,
25
)
26
from localstack.services.sqs import constants as sqs_constants
1✔
27
from localstack.services.sqs.constants import DYNAMIC_ATTRIBUTES
1✔
28
from localstack.services.sqs.exceptions import (
1✔
29
    InvalidAttributeValue,
30
    InvalidParameterValueException,
31
    MissingRequiredParameterException,
32
)
33
from localstack.services.sqs.queue import InterruptiblePriorityQueue, InterruptibleQueue
1✔
34
from localstack.services.sqs.utils import (
1✔
35
    create_message_attribute_hash,
36
    encode_move_task_handle,
37
    encode_receipt_handle,
38
    extract_receipt_handle_info,
39
    global_message_sequence,
40
    guess_endpoint_strategy_and_host,
41
    is_message_deduplication_id_required,
42
    message_filter_attributes,
43
    message_filter_message_attributes,
44
)
45
from localstack.services.stores import AccountRegionBundle, BaseStore, LocalAttribute
1✔
46
from localstack.utils.aws.arns import get_partition
1✔
47
from localstack.utils.strings import long_uid
1✔
48
from localstack.utils.time import now
1✔
49
from localstack.utils.urls import localstack_host
1✔
50

51
LOG = logging.getLogger(__name__)
1✔
52

53
ReceiptHandle = str
1✔
54

55

56
class SqsMessage:
1✔
57
    message: Message
1✔
58
    created: float
1✔
59
    visibility_timeout: int
1✔
60
    receive_count: int
1✔
61
    delay_seconds: int | None
1✔
62
    receipt_handles: set[str]
1✔
63
    last_received: float | None
1✔
64
    first_received: float | None
1✔
65
    visibility_deadline: float | None
1✔
66
    deleted: bool
1✔
67
    priority: float
1✔
68
    message_deduplication_id: str
1✔
69
    message_group_id: str
1✔
70
    sequence_number: str
1✔
71

72
    def __init__(
1✔
73
        self,
74
        priority: float,
75
        message: Message,
76
        message_deduplication_id: str = None,
77
        message_group_id: str = None,
78
        sequence_number: str = None,
79
    ) -> None:
80
        self.created = time.time()
1✔
81
        self.message = message
1✔
82
        self.receive_count = 0
1✔
83
        self.receipt_handles = set()
1✔
84

85
        self.delay_seconds = None
1✔
86
        self.last_received = None
1✔
87
        self.first_received = None
1✔
88
        self.visibility_deadline = None
1✔
89
        self.deleted = False
1✔
90
        self.priority = priority
1✔
91
        self.sequence_number = sequence_number
1✔
92

93
        attributes = {}
1✔
94
        if message_group_id is not None:
1✔
95
            attributes["MessageGroupId"] = message_group_id
1✔
96
        if message_deduplication_id is not None:
1✔
97
            attributes["MessageDeduplicationId"] = message_deduplication_id
1✔
98
        if sequence_number is not None:
1✔
99
            attributes["SequenceNumber"] = sequence_number
1✔
100

101
        if self.message.get("Attributes"):
1✔
102
            self.message["Attributes"].update(attributes)
1✔
103
        else:
UNCOV
104
            self.message["Attributes"] = attributes
×
105

106
        # set attribute default values if not set
107
        self.message["Attributes"].setdefault(
1✔
108
            MessageSystemAttributeName.ApproximateReceiveCount, "0"
109
        )
110

111
    @property
1✔
112
    def message_group_id(self) -> str | None:
1✔
113
        return self.message["Attributes"].get(MessageSystemAttributeName.MessageGroupId)
1✔
114

115
    @property
1✔
116
    def message_deduplication_id(self) -> str | None:
1✔
117
        return self.message["Attributes"].get(MessageSystemAttributeName.MessageDeduplicationId)
1✔
118

119
    @property
1✔
120
    def dead_letter_queue_source_arn(self) -> str | None:
1✔
121
        return self.message["Attributes"].get(MessageSystemAttributeName.DeadLetterQueueSourceArn)
1✔
122

123
    @property
1✔
124
    def message_id(self):
1✔
125
        return self.message["MessageId"]
1✔
126

127
    def increment_approximate_receive_count(self):
1✔
128
        """
129
        Increment the message system attribute ``ApproximateReceiveCount``.
130
        """
131
        # TODO: need better handling of system attributes
132
        cnt = int(
1✔
133
            self.message["Attributes"].get(MessageSystemAttributeName.ApproximateReceiveCount, "0")
134
        )
135
        cnt += 1
1✔
136
        self.message["Attributes"][MessageSystemAttributeName.ApproximateReceiveCount] = str(cnt)
1✔
137

138
    def set_last_received(self, timestamp: float):
1✔
139
        """
140
        Sets the last received timestamp of the message to the given value, and updates the visibility deadline
141
        accordingly.
142

143
        :param timestamp: the last time the message was received
144
        """
145
        self.last_received = timestamp
1✔
146
        self.visibility_deadline = timestamp + self.visibility_timeout
1✔
147

148
    def update_visibility_timeout(self, timeout: int):
1✔
149
        """
150
        Sets the visibility timeout of the message to the given value, and updates the visibility deadline accordingly.
151

152
        :param timeout: the timeout value in seconds
153
        """
154
        self.visibility_timeout = timeout
1✔
155
        self.visibility_deadline = time.time() + timeout
1✔
156

157
    @property
1✔
158
    def is_visible(self) -> bool:
1✔
159
        """
160
        Returns false if the message has a visibility deadline that is in the future.
161

162
        :return: whether the message is visible or not.
163
        """
164
        if self.visibility_deadline is None:
1✔
165
            return True
1✔
166
        if time.time() >= self.visibility_deadline:
1✔
167
            return True
1✔
168

169
        return False
1✔
170

171
    @property
1✔
172
    def is_delayed(self) -> bool:
1✔
173
        if self.delay_seconds is None:
1✔
UNCOV
174
            return False
×
175
        return time.time() <= self.created + self.delay_seconds
1✔
176

177
    def __gt__(self, other):
1✔
UNCOV
178
        return self.priority > other.priority
×
179

180
    def __ge__(self, other):
1✔
UNCOV
181
        return self.priority >= other.priority
×
182

183
    def __lt__(self, other):
1✔
184
        return self.priority < other.priority
1✔
185

186
    def __le__(self, other):
1✔
UNCOV
187
        return self.priority <= other.priority
×
188

189
    def __eq__(self, other):
1✔
UNCOV
190
        return self.message_id == other.message_id
×
191

192
    def __hash__(self):
1✔
193
        return self.message_id.__hash__()
1✔
194

195
    def __repr__(self):
196
        return f"SqsMessage(id={self.message_id},group={self.message_group_id})"
197

198

199
def to_sqs_api_message(
1✔
200
    standard_message: SqsMessage,
201
    attribute_names: AttributeNameList = None,
202
    message_attribute_names: MessageAttributeNameList = None,
203
) -> Message:
204
    """
205
    Utility function to convert an SQS message from LocalStack's internal representation to the AWS API
206
    concept 'Message', which is the format returned by the ``ReceiveMessage`` operation.
207

208
    :param standard_message: A LocalStack SQS message
209
    :param attribute_names: the attribute name list to filter
210
    :param message_attribute_names: the message attribute names to filter
211
    :return: a copy of the original Message with updated message attributes and MD5 attribute hash sums
212
    """
213
    # prepare message for receiver
214
    message = copy.deepcopy(standard_message.message)
1✔
215

216
    # update system attributes of the message copy
217
    message["Attributes"][MessageSystemAttributeName.ApproximateFirstReceiveTimestamp] = str(
1✔
218
        int((standard_message.first_received or 0) * 1000)
219
    )
220

221
    # filter attributes for receiver
222
    message_filter_attributes(message, attribute_names)
1✔
223
    message_filter_message_attributes(message, message_attribute_names)
1✔
224
    if message.get("MessageAttributes"):
1✔
225
        message["MD5OfMessageAttributes"] = create_message_attribute_hash(
1✔
226
            message["MessageAttributes"]
227
        )
228
    else:
229
        # delete the value that was computed when creating the message
230
        message.pop("MD5OfMessageAttributes", None)
1✔
231
    return message
1✔
232

233

234
class ReceiveMessageResult:
1✔
235
    """
236
    Object to communicate the result of a "receive messages" operation between the SqsProvider and
237
    the underlying datastructure holding the messages.
238
    """
239

240
    successful: list[SqsMessage]
1✔
241
    """The messages that were successfully received from the queue"""
1✔
242

243
    receipt_handles: list[str]
1✔
244
    """The array index position in ``successful`` and ``receipt_handles`` need to be the same (this
1✔
245
    assumption is needed when assembling the result in `SqsProvider.receive_message`)"""
246

247
    dead_letter_messages: list[SqsMessage]
1✔
248
    """All messages that were received more than maxReceiveCount in the redrive policy (if any)"""
1✔
249

250
    def __init__(self):
1✔
251
        self.successful = []
1✔
252
        self.receipt_handles = []
1✔
253
        self.dead_letter_messages = []
1✔
254

255

256
class MessageMoveTaskStatus(str):
1✔
257
    CREATED = "CREATED"  # not validated, for internal use
1✔
258
    RUNNING = "RUNNING"
1✔
259
    COMPLETED = "COMPLETED"
1✔
260
    CANCELLING = "CANCELLING"
1✔
261
    CANCELLED = "CANCELLED"
1✔
262
    FAILED = "FAILED"
1✔
263

264

265
class MessageMoveTask:
1✔
266
    """
267
    A task created by the ``StartMessageMoveTask`` operation.
268
    """
269

270
    # configurable fields
271
    source_arn: str
1✔
272
    """The arn of the DLQ the messages are currently in."""
1✔
273
    destination_arn: str | None = None
1✔
274
    """If the DestinationArn is not specified, the original source arn will be used as target."""
1✔
275
    max_number_of_messages_per_second: int | None = None
1✔
276

277
    # dynamic fields
278
    task_id: str
1✔
279
    status: str = MessageMoveTaskStatus.CREATED
1✔
280
    started_timestamp: datetime | None = None
1✔
281
    approximate_number_of_messages_moved: int | None = None
1✔
282
    approximate_number_of_messages_to_move: int | None = None
1✔
283
    failure_reason: str | None = None
1✔
284

285
    cancel_event: threading.Event
1✔
286

287
    def __init__(
1✔
288
        self, source_arn: str, destination_arn: str, max_number_of_messages_per_second: int = None
289
    ):
290
        self.task_id = long_uid()
1✔
291
        self.source_arn = source_arn
1✔
292
        self.destination_arn = destination_arn
1✔
293
        self.max_number_of_messages_per_second = max_number_of_messages_per_second
1✔
294
        self.cancel_event = threading.Event()
1✔
295

296
    def mark_started(self):
1✔
297
        self.started_timestamp = datetime.utcnow()
1✔
298
        self.status = MessageMoveTaskStatus.RUNNING
1✔
299
        self.cancel_event.clear()
1✔
300

301
    @property
1✔
302
    def task_handle(self) -> str:
1✔
303
        return encode_move_task_handle(self.task_id, self.source_arn)
1✔
304

305

306
class SqsQueue:
1✔
307
    name: str
1✔
308
    region: str
1✔
309
    account_id: str
1✔
310

311
    attributes: QueueAttributeMap
1✔
312
    tags: TagMap
1✔
313

314
    purge_in_progress: bool
1✔
315
    purge_timestamp: float | None
1✔
316

317
    delayed: set[SqsMessage]
1✔
318
    # Simulating an ordered set in python. Only the keys are used and of interest.
319
    inflight: dict[SqsMessage, None]
1✔
320
    receipts: dict[str, SqsMessage]
1✔
321

322
    def __init__(self, name: str, region: str, account_id: str, attributes=None, tags=None) -> None:
1✔
323
        self.name = name
1✔
324
        self.region = region
1✔
325
        self.account_id = account_id
1✔
326

327
        self._assert_queue_name(name)
1✔
328
        self.tags = tags or {}
1✔
329

330
        self.delayed = set()
1✔
331
        self.inflight = {}
1✔
332
        self.receipts = {}
1✔
333

334
        self.attributes = self.default_attributes()
1✔
335
        if attributes:
1✔
336
            self.validate_queue_attributes(attributes)
1✔
337
            self.attributes.update(attributes)
1✔
338

339
        self.purge_in_progress = False
1✔
340
        self.purge_timestamp = None
1✔
341

342
        self.permissions = set()
1✔
343
        self.mutex = threading.RLock()
1✔
344

345
    def shutdown(self):
1✔
UNCOV
346
        pass
×
347

348
    def default_attributes(self) -> QueueAttributeMap:
1✔
349
        return {
1✔
350
            QueueAttributeName.CreatedTimestamp: str(now()),
351
            QueueAttributeName.DelaySeconds: "0",
352
            QueueAttributeName.LastModifiedTimestamp: str(now()),
353
            QueueAttributeName.MaximumMessageSize: str(sqs_constants.DEFAULT_MAXIMUM_MESSAGE_SIZE),
354
            QueueAttributeName.MessageRetentionPeriod: "345600",
355
            QueueAttributeName.QueueArn: self.arn,
356
            QueueAttributeName.ReceiveMessageWaitTimeSeconds: "0",
357
            QueueAttributeName.VisibilityTimeout: "30",
358
            QueueAttributeName.SqsManagedSseEnabled: "true",
359
        }
360

361
    def update_delay_seconds(self, value: int):
1✔
362
        """
363
        For standard queues, the per-queue delay setting is not retroactive—changing the setting doesn't affect the
364
        delay of messages already in the queue. For FIFO queues, the per-queue delay setting is retroactive—changing
365
        the setting affects the delay of messages already in the queue.
366

367
        https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-delay-queues.html
368

369
        :param value: the number of seconds
370
        """
UNCOV
371
        self.attributes[QueueAttributeName.DelaySeconds] = str(value)
×
372

373
    def update_last_modified(self, timestamp: int = None):
1✔
UNCOV
374
        if timestamp is None:
×
UNCOV
375
            timestamp = now()
×
376

UNCOV
377
        self.attributes[QueueAttributeName.LastModifiedTimestamp] = str(timestamp)
×
378

379
    @property
1✔
380
    def arn(self) -> str:
1✔
381
        return f"arn:{get_partition(self.region)}:sqs:{self.region}:{self.account_id}:{self.name}"
1✔
382

383
    def url(self, context: RequestContext) -> str:
1✔
384
        """Return queue URL which depending on the endpoint strategy returns e.g.:
385
        * (standard) http://sqs.eu-west-1.localhost.localstack.cloud:4566/000000000000/myqueue
386
        * (domain) http://eu-west-1.queue.localhost.localstack.cloud:4566/000000000000/myqueue
387
        * (path) http://localhost.localstack.cloud:4566/queue/eu-central-1/000000000000/myqueue
388
        * otherwise: http://localhost.localstack.cloud:4566/000000000000/myqueue
389
        """
390

391
        scheme = config.get_protocol()  # TODO: should probably change to context.request.scheme
1✔
392
        host_definition = localstack_host()
1✔
393
        host_and_port = host_definition.host_and_port()
1✔
394

395
        endpoint_strategy = config.SQS_ENDPOINT_STRATEGY
1✔
396

397
        if endpoint_strategy == "dynamic":
1✔
UNCOV
398
            scheme = context.request.scheme
×
399
            # determine the endpoint strategy that should be used, and determine the host dynamically
UNCOV
400
            endpoint_strategy, host_and_port = guess_endpoint_strategy_and_host(
×
401
                context.request.host
402
            )
403

404
        if endpoint_strategy == "standard":
1✔
405
            # Region is always part of the queue URL
406
            # sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/my-queue
407
            scheme = context.request.scheme
1✔
408
            host_url = f"{scheme}://sqs.{self.region}.{host_and_port}"
1✔
409
        elif endpoint_strategy == "domain":
1✔
410
            # Legacy style
411
            # queue.localhost.localstack.cloud:4566/000000000000/my-queue (us-east-1)
412
            # or us-east-2.queue.localhost.localstack.cloud:4566/000000000000/my-queue
413
            region = "" if self.region == "us-east-1" else self.region + "."
1✔
414
            host_url = f"{scheme}://{region}queue.{host_and_port}"
1✔
415
        elif endpoint_strategy == "path":
1✔
416
            # https?://localhost:4566/queue/us-east-1/00000000000/my-queue (us-east-1)
417
            host_url = f"{scheme}://{host_and_port}/queue/{self.region}"
1✔
418
        else:
419
            host_url = f"{scheme}://{host_and_port}"
1✔
420

421
        return "{host}/{account_id}/{name}".format(
1✔
422
            host=host_url.rstrip("/"),
423
            account_id=self.account_id,
424
            name=self.name,
425
        )
426

427
    @property
1✔
428
    def redrive_policy(self) -> dict | None:
1✔
429
        if policy_document := self.attributes.get(QueueAttributeName.RedrivePolicy):
1✔
430
            return json.loads(policy_document)
1✔
431
        return None
1✔
432

433
    @property
1✔
434
    def max_receive_count(self) -> int | None:
1✔
435
        """
436
        Returns the maxReceiveCount attribute of the redrive policy. If no redrive policy is set, then it
437
        returns None.
438
        """
439
        if redrive_policy := self.redrive_policy:
1✔
440
            return int(redrive_policy["maxReceiveCount"])
1✔
441
        return None
1✔
442

443
    @property
1✔
444
    def visibility_timeout(self) -> int:
1✔
445
        return int(self.attributes[QueueAttributeName.VisibilityTimeout])
1✔
446

447
    @property
1✔
448
    def delay_seconds(self) -> int:
1✔
449
        return int(self.attributes[QueueAttributeName.DelaySeconds])
1✔
450

451
    @property
1✔
452
    def wait_time_seconds(self) -> int:
1✔
453
        return int(self.attributes[QueueAttributeName.ReceiveMessageWaitTimeSeconds])
1✔
454

455
    @property
1✔
456
    def message_retention_period(self) -> int:
1✔
457
        """
458
        ``MessageRetentionPeriod`` -- the length of time, in seconds, for which Amazon SQS retains a message. Valid
459
        values: An integer representing seconds, from 60 (1 minute) to 1,209,600 (14 days). Default: 345,600 (4 days).
460
        """
461
        return int(self.attributes[QueueAttributeName.MessageRetentionPeriod])
1✔
462

463
    @property
1✔
464
    def maximum_message_size(self) -> int:
1✔
465
        return int(self.attributes[QueueAttributeName.MaximumMessageSize])
1✔
466

467
    @property
1✔
468
    def approximate_number_of_messages(self) -> int:
1✔
469
        raise NotImplementedError
470

471
    @property
1✔
472
    def approximate_number_of_messages_not_visible(self) -> int:
1✔
473
        return len(self.inflight)
1✔
474

475
    @property
1✔
476
    def approximate_number_of_messages_delayed(self) -> int:
1✔
477
        return len(self.delayed)
1✔
478

479
    def validate_receipt_handle(self, receipt_handle: str):
1✔
480
        if self.arn != extract_receipt_handle_info(receipt_handle).queue_arn:
1✔
UNCOV
481
            raise ReceiptHandleIsInvalid(
×
482
                f'The input receipt handle "{receipt_handle}" is not a valid receipt handle.'
483
            )
484

485
    def update_visibility_timeout(self, receipt_handle: str, visibility_timeout: int):
1✔
486
        with self.mutex:
1✔
487
            self.validate_receipt_handle(receipt_handle)
1✔
488

489
            if receipt_handle not in self.receipts:
1✔
490
                raise InvalidParameterValueException(
1✔
491
                    f"Value {receipt_handle} for parameter ReceiptHandle is invalid. Reason: Message does not exist "
492
                    f"or is not available for visibility timeout change."
493
                )
494

495
            standard_message = self.receipts[receipt_handle]
1✔
496

497
            if standard_message not in self.inflight:
1✔
498
                return
1✔
499

500
            standard_message.update_visibility_timeout(visibility_timeout)
1✔
501

502
            if visibility_timeout == 0:
1✔
503
                LOG.info(
1✔
504
                    "terminating the visibility timeout of %s",
505
                    standard_message.message["MessageId"],
506
                )
507
                # Terminating the visibility timeout for a message
508
                # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html#terminating-message-visibility-timeout
509
                del self.inflight[standard_message]
1✔
510
                self._put_message(standard_message)
1✔
511

512
    def remove(self, receipt_handle: str):
1✔
513
        with self.mutex:
1✔
514
            self.validate_receipt_handle(receipt_handle)
1✔
515

516
            if receipt_handle not in self.receipts:
1✔
517
                LOG.debug(
1✔
518
                    "no in-flight message found for receipt handle %s in queue %s",
519
                    receipt_handle,
520
                    self.arn,
521
                )
522
                return
1✔
523

524
            standard_message = self.receipts[receipt_handle]
1✔
525
            self._pre_delete_checks(standard_message, receipt_handle)
1✔
526
            standard_message.deleted = True
1✔
527
            LOG.debug(
1✔
528
                "deleting message %s from queue %s",
529
                standard_message.message["MessageId"],
530
                self.arn,
531
            )
532

533
            # remove all handles associated with this message
534
            for handle in standard_message.receipt_handles:
1✔
535
                del self.receipts[handle]
1✔
536
            standard_message.receipt_handles.clear()
1✔
537

538
            self._on_remove_message(standard_message)
1✔
539

540
    def _on_remove_message(self, message: SqsMessage):
1✔
541
        """Hook for queue-specific logic executed when a message is removed."""
UNCOV
542
        pass
×
543

544
    def put(
1✔
545
        self,
546
        message: Message,
547
        visibility_timeout: int = None,
548
        message_deduplication_id: str = None,
549
        message_group_id: str = None,
550
        delay_seconds: int = None,
551
    ) -> SqsMessage:
552
        raise NotImplementedError
553

554
    def receive(
1✔
555
        self,
556
        num_messages: int = 1,
557
        wait_time_seconds: int = None,
558
        visibility_timeout: int = None,
559
        *,
560
        poll_empty_queue: bool = False,
561
    ) -> ReceiveMessageResult:
562
        """
563
        Receive ``num_messages`` from the queue, and wait at max ``wait_time_seconds``. If a visibility
564
        timeout is given, also change the visibility timeout of all received messages accordingly.
565

566
        :param num_messages: the number of messages you want to get from the underlying queue
567
        :param wait_time_seconds: the number of seconds you want to wait
568
        :param visibility_timeout: an optional new visibility timeout
569
        :param poll_empty_queue: whether to keep polling an empty queue until the duration ``wait_time_seconds`` has elapsed
570
        :return: a ReceiveMessageResult object that contains the result of the operation
571
        """
572
        raise NotImplementedError
573

574
    def clear(self):
1✔
575
        """
576
        Calls clear on all internal datastructures that hold messages and data related to them.
577
        """
578
        with self.mutex:
1✔
579
            self.inflight.clear()
1✔
580
            self.delayed.clear()
1✔
581
            self.receipts.clear()
1✔
582

583
    def _put_message(self, message: SqsMessage):
1✔
584
        """Low-level put operation to put messages into a queue and modify visibilities accordingly."""
585
        raise NotImplementedError
586

587
    def create_receipt_handle(self, message: SqsMessage) -> str:
1✔
588
        return encode_receipt_handle(self.arn, message)
1✔
589

590
    def requeue_inflight_messages(self):
1✔
591
        if not self.inflight:
1✔
592
            return
1✔
593

594
        with self.mutex:
1✔
595
            messages = [message for message in self.inflight if message.is_visible]
1✔
596
            for standard_message in messages:
1✔
597
                LOG.debug(
1✔
598
                    "re-queueing inflight messages %s into queue %s",
599
                    standard_message,
600
                    self.arn,
601
                )
602
                del self.inflight[standard_message]
1✔
603
                self._put_message(standard_message)
1✔
604

605
    def add_inflight_message(self, message: SqsMessage):
1✔
606
        """
607
        We are simulating an ordered set with a dict. When a value is added, it is added as key to the dict, which
608
        is all we need. Hence all "values" in this ordered set are None
609
        :param message: The message to put in flight
610
        """
611
        self.inflight[message] = None
1✔
612

613
    def enqueue_delayed_messages(self):
1✔
614
        if not self.delayed:
1✔
615
            return
1✔
616

617
        with self.mutex:
1✔
618
            messages = [message for message in self.delayed if not message.is_delayed]
1✔
619
            for standard_message in messages:
1✔
620
                LOG.debug(
1✔
621
                    "enqueueing delayed messages %s into queue %s",
622
                    standard_message.message["MessageId"],
623
                    self.arn,
624
                )
625
                self.delayed.remove(standard_message)
1✔
626
                self._put_message(standard_message)
1✔
627

628
    def remove_expired_messages(self):
1✔
629
        """
630
        Removes messages from the queue whose retention period has expired.
631
        """
632
        raise NotImplementedError
633

634
    def _assert_queue_name(self, name):
1✔
635
        if not re.match(r"^[a-zA-Z0-9_-]{1,80}$", name):
1✔
636
            raise InvalidParameterValueException(
1✔
637
                "Can only include alphanumeric characters, hyphens, or underscores. 1 to 80 in length"
638
            )
639

640
    def validate_queue_attributes(self, attributes):
1✔
641
        pass
1✔
642

643
    def add_permission(self, label: str, actions: list[str], account_ids: list[str]) -> None:
1✔
644
        """
645
        Create / append to a policy for usage with the add_permission api call
646

647
        :param actions: List of actions to be included in the policy, without the SQS: prefix
648
        :param account_ids: List of account ids to be included in the policy
649
        :param label: Permission label
650
        """
651
        statement = {
1✔
652
            "Sid": label,
653
            "Effect": "Allow",
654
            "Principal": {
655
                "AWS": [
656
                    f"arn:{get_partition(self.region)}:iam::{account_id}:root"
657
                    for account_id in account_ids
658
                ]
659
                if len(account_ids) > 1
660
                else f"arn:{get_partition(self.region)}:iam::{account_ids[0]}:root"
661
            },
662
            "Action": [f"SQS:{action}" for action in actions]
663
            if len(actions) > 1
664
            else f"SQS:{actions[0]}",
665
            "Resource": self.arn,
666
        }
667
        if policy := self.attributes.get(QueueAttributeName.Policy):
1✔
668
            policy = json.loads(policy)
1✔
669
            policy.setdefault("Statement", [])
1✔
670
        else:
671
            policy = {
1✔
672
                "Version": "2008-10-17",
673
                "Id": f"{self.arn}/SQSDefaultPolicy",
674
                "Statement": [],
675
            }
676
        policy.setdefault("Statement", [])
1✔
677
        existing_statement_ids = [statement.get("Sid") for statement in policy["Statement"]]
1✔
678
        if label in existing_statement_ids:
1✔
679
            raise InvalidParameterValueException(
1✔
680
                f"Value {label} for parameter Label is invalid. Reason: Already exists."
681
            )
682
        policy["Statement"].append(statement)
1✔
683
        self.attributes[QueueAttributeName.Policy] = json.dumps(policy)
1✔
684

685
    def remove_permission(self, label: str) -> None:
1✔
686
        """
687
        Delete a policy statement for usage of the remove_permission call
688

689
        :param label: Permission label
690
        """
691
        if policy := self.attributes.get(QueueAttributeName.Policy):
1✔
692
            policy = json.loads(policy)
1✔
693
            # this should not be necessary, but we can upload custom policies, so it's better to be safe
694
            policy.setdefault("Statement", [])
1✔
695
        else:
696
            policy = {
1✔
697
                "Version": "2008-10-17",
698
                "Id": f"{self.arn}/SQSDefaultPolicy",
699
                "Statement": [],
700
            }
701
        existing_statement_ids = [statement.get("Sid") for statement in policy["Statement"]]
1✔
702
        if label not in existing_statement_ids:
1✔
703
            raise InvalidParameterValueException(
1✔
704
                f"Value {label} for parameter Label is invalid. Reason: can't find label."
705
            )
706
        policy["Statement"] = [
1✔
707
            statement for statement in policy["Statement"] if statement.get("Sid") != label
708
        ]
709
        if policy["Statement"]:
1✔
710
            self.attributes[QueueAttributeName.Policy] = json.dumps(policy)
1✔
711
        else:
712
            del self.attributes[QueueAttributeName.Policy]
1✔
713

714
    def get_queue_attributes(self, attribute_names: AttributeNameList = None) -> dict[str, str]:
1✔
715
        if not attribute_names:
1✔
716
            return {}
1✔
717

718
        if QueueAttributeName.All in attribute_names:
1✔
719
            attribute_names = list(self.attributes.keys()) + DYNAMIC_ATTRIBUTES
1✔
720

721
        result: dict[QueueAttributeName, str] = {}
1✔
722

723
        for attr in attribute_names:
1✔
724
            try:
1✔
725
                getattr(QueueAttributeName, attr)
1✔
726
            except AttributeError:
1✔
727
                raise InvalidAttributeName(f"Unknown Attribute {attr}.")
1✔
728

729
            # The approximate_* attributes are calculated on the spot when accessed.
730
            # We have a @property for each of those which calculates the value.
731
            match attr:
1✔
732
                case QueueAttributeName.ApproximateNumberOfMessages:
1✔
733
                    value = str(self.approximate_number_of_messages)
1✔
734
                case QueueAttributeName.ApproximateNumberOfMessagesDelayed:
1✔
735
                    value = str(self.approximate_number_of_messages_delayed)
1✔
736
                case QueueAttributeName.ApproximateNumberOfMessagesNotVisible:
1✔
737
                    value = str(self.approximate_number_of_messages_not_visible)
1✔
738
                case _:
1✔
739
                    value = self.attributes.get(attr)
1✔
740
            if value == "False" or value == "True":
1✔
741
                result[attr] = value.lower()
1✔
742
            elif value is not None:
1✔
743
                result[attr] = value
1✔
744
        return result
1✔
745

746
    @staticmethod
1✔
747
    def remove_expired_messages_from_heap(
1✔
748
        heap: list[SqsMessage], message_retention_period: int
749
    ) -> list[SqsMessage]:
750
        """
751
        Removes from the given heap of SqsMessages all messages that have expired in the context of the current time
752
        and the given message retention period. The method manipulates the heap but retains the heap property.
753

754
        :param heap: an array satisfying the heap property
755
        :param message_retention_period: the message retention period to use in relation to the current time
756
        :return: a list of expired messages that have been removed from the heap
757
        """
758
        th = time.time() - message_retention_period
1✔
759

760
        expired = []
1✔
761
        while heap:
1✔
762
            # here we're leveraging the heap property "that a[0] is always its smallest element"
763
            # and the assumption that message.created == message.priority
764
            message = heap[0]
1✔
765
            if th < message.created:
1✔
766
                break
1✔
767
            # remove the expired element
768
            expired.append(message)
1✔
769
            heapq.heappop(heap)
1✔
770

771
        return expired
1✔
772

773
    def _pre_delete_checks(self, standard_message: SqsMessage, receipt_handle: str) -> None:
1✔
774
        """
775
        Runs any potential checks if a message that has been successfully identified via a receipt handle
776
        is indeed supposed to be deleted.
777
        For example, a receipt handle that has expired might not lead to deletion.
778

779
        :param standard_message: The message to be deleted
780
        :param receipt_handle: The handle associated with the message
781
        :return: None. Potential violations raise errors.
782
        """
783
        pass
1✔
784

785

786
class StandardQueue(SqsQueue):
1✔
787
    visible: InterruptiblePriorityQueue[SqsMessage]
1✔
788

789
    def __init__(self, name: str, region: str, account_id: str, attributes=None, tags=None) -> None:
1✔
790
        super().__init__(name, region, account_id, attributes, tags)
1✔
791
        self.visible = InterruptiblePriorityQueue()
1✔
792

793
    def clear(self):
1✔
794
        with self.mutex:
1✔
795
            super().clear()
1✔
796
            self.visible.queue.clear()
1✔
797

798
    @property
1✔
799
    def approximate_number_of_messages(self):
1✔
800
        return self.visible.qsize()
1✔
801

802
    def shutdown(self):
1✔
803
        self.visible.shutdown()
1✔
804

805
    def put(
1✔
806
        self,
807
        message: Message,
808
        visibility_timeout: int = None,
809
        message_deduplication_id: str = None,
810
        message_group_id: str = None,
811
        delay_seconds: int = None,
812
    ):
813
        if message_deduplication_id:
1✔
UNCOV
814
            raise InvalidParameterValueException(
×
815
                f"Value {message_deduplication_id} for parameter MessageDeduplicationId is invalid. Reason: The "
816
                f"request includes a parameter that is not valid for this queue type."
817
            )
818

819
        standard_message = SqsMessage(
1✔
820
            time.time(),
821
            message,
822
            message_group_id=message_group_id,
823
        )
824

825
        if visibility_timeout is not None:
1✔
UNCOV
826
            standard_message.visibility_timeout = visibility_timeout
×
827
        else:
828
            # use the attribute from the queue
829
            standard_message.visibility_timeout = self.visibility_timeout
1✔
830

831
        if delay_seconds is not None:
1✔
832
            standard_message.delay_seconds = delay_seconds
1✔
833
        else:
834
            standard_message.delay_seconds = self.delay_seconds
1✔
835

836
        if standard_message.is_delayed:
1✔
837
            self.delayed.add(standard_message)
1✔
838
        else:
839
            self._put_message(standard_message)
1✔
840

841
        return standard_message
1✔
842

843
    def _put_message(self, message: SqsMessage):
1✔
844
        self.visible.put_nowait(message)
1✔
845

846
    def remove_expired_messages(self):
1✔
847
        with self.mutex:
1✔
848
            messages = self.remove_expired_messages_from_heap(
1✔
849
                self.visible.queue, self.message_retention_period
850
            )
851

852
        for message in messages:
1✔
853
            LOG.debug("Removed expired message %s from queue %s", message.message_id, self.arn)
1✔
854

855
    def receive(
1✔
856
        self,
857
        num_messages: int = 1,
858
        wait_time_seconds: int = None,
859
        visibility_timeout: int = None,
860
        *,
861
        poll_empty_queue: bool = False,
862
    ) -> ReceiveMessageResult:
863
        result = ReceiveMessageResult()
1✔
864

865
        max_receive_count = self.max_receive_count
1✔
866
        visibility_timeout = (
1✔
867
            self.visibility_timeout if visibility_timeout is None else visibility_timeout
868
        )
869

870
        block = True if wait_time_seconds else False
1✔
871
        timeout = wait_time_seconds or 0
1✔
872
        start = time.time()
1✔
873

874
        # collect messages
875
        while True:
1✔
876
            try:
1✔
877
                message = self.visible.get(block=block, timeout=timeout)
1✔
878
            except Empty:
1✔
879
                break
1✔
880
            # setting block to false guarantees that, if we've already waited before, we don't wait the
881
            # full time again in the next iteration if max_number_of_messages is set but there are no more
882
            # messages in the queue. see https://github.com/localstack/localstack/issues/5824
883
            if not poll_empty_queue:
1✔
884
                block = False
1✔
885

886
            timeout -= time.time() - start
1✔
887
            if timeout < 0:
1✔
888
                timeout = 0
1✔
889

890
            if message.deleted:
1✔
891
                # filter messages that were deleted with an expired receipt handle after they have been
892
                # re-queued. this can only happen due to a race with `remove`.
UNCOV
893
                continue
×
894

895
            # update message attributes
896
            message.receive_count += 1
1✔
897
            message.update_visibility_timeout(visibility_timeout)
1✔
898
            message.set_last_received(time.time())
1✔
899
            if message.first_received is None:
1✔
900
                message.first_received = message.last_received
1✔
901

902
            LOG.debug("de-queued message %s from %s", message, self.arn)
1✔
903
            if max_receive_count and message.receive_count > max_receive_count:
1✔
904
                # the message needs to move to the DLQ
905
                LOG.debug(
1✔
906
                    "message %s has been received %d times, marking it for DLQ",
907
                    message,
908
                    message.receive_count,
909
                )
910
                result.dead_letter_messages.append(message)
1✔
911
            else:
912
                result.successful.append(message)
1✔
913
                message.increment_approximate_receive_count()
1✔
914

915
                # now we can return
916
                if len(result.successful) == num_messages:
1✔
917
                    break
1✔
918

919
        # now process the successful result messages: create receipt handles and manage visibility.
920
        for message in result.successful:
1✔
921
            # manage receipt handle
922
            receipt_handle = self.create_receipt_handle(message)
1✔
923
            message.receipt_handles.add(receipt_handle)
1✔
924
            self.receipts[receipt_handle] = message
1✔
925
            result.receipt_handles.append(receipt_handle)
1✔
926

927
            # manage message visibility
928
            if message.visibility_timeout == 0:
1✔
929
                self.visible.put_nowait(message)
1✔
930
            else:
931
                self.add_inflight_message(message)
1✔
932

933
        return result
1✔
934

935
    def _on_remove_message(self, message: SqsMessage):
1✔
936
        try:
1✔
937
            del self.inflight[message]
1✔
938
        except KeyError:
1✔
939
            # this likely means the message was removed with an expired receipt handle unfortunately this
940
            # means we need to scan the queue for the element and remove it from there, and then re-heapify
941
            # the queue
942
            try:
1✔
943
                self.visible.queue.remove(message)
1✔
944
                heapq.heapify(self.visible.queue)
1✔
945
            except ValueError:
1✔
946
                # this may happen if the message no longer exists because it was removed earlier
947
                pass
1✔
948

949
    def validate_queue_attributes(self, attributes):
1✔
950
        valid = [
1✔
951
            k[1]
952
            for k in inspect.getmembers(
953
                QueueAttributeName, lambda x: isinstance(x, str) and not x.startswith("__")
954
            )
955
            if k[1] not in sqs_constants.INVALID_STANDARD_QUEUE_ATTRIBUTES
956
        ]
957

958
        for k in attributes.keys():
1✔
959
            if k in [QueueAttributeName.FifoThroughputLimit, QueueAttributeName.DeduplicationScope]:
1✔
960
                raise InvalidAttributeName(
1✔
961
                    f"You can specify the {k} only when FifoQueue is set to true."
962
                )
963
            if k not in valid:
1✔
964
                raise InvalidAttributeName(f"Unknown Attribute {k}.")
1✔
965

966

967
class MessageGroup:
1✔
968
    message_group_id: str
1✔
969
    messages: list[SqsMessage]
1✔
970

971
    def __init__(self, message_group_id: str):
1✔
972
        self.message_group_id = message_group_id
1✔
973
        self.messages = []
1✔
974

975
    def empty(self) -> bool:
1✔
976
        return not self.messages
1✔
977

978
    def size(self) -> int:
1✔
UNCOV
979
        return len(self.messages)
×
980

981
    def pop(self) -> SqsMessage:
1✔
982
        return heapq.heappop(self.messages)
1✔
983

984
    def push(self, message: SqsMessage):
1✔
985
        heapq.heappush(self.messages, message)
1✔
986

987
    def __eq__(self, other):
1✔
UNCOV
988
        return self.message_group_id == other.message_group_id
×
989

990
    def __hash__(self):
1✔
991
        return self.message_group_id.__hash__()
1✔
992

993
    def __repr__(self):
994
        return f"MessageGroup(id={self.message_group_id}, size={len(self.messages)})"
995

996

997
class FifoQueue(SqsQueue):
1✔
998
    """
999
    A FIFO queue behaves differently than a default queue. Most behavior has to be implemented separately.
1000

1001
    See https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html
1002

1003
    TODO: raise exceptions when trying to remove a message with an expired receipt handle
1004
    """
1005

1006
    deduplication: dict[str, SqsMessage]
1✔
1007
    message_groups: dict[str, MessageGroup]
1✔
1008
    inflight_groups: set[MessageGroup]
1✔
1009
    message_group_queue: InterruptibleQueue
1✔
1010
    deduplication_scope: str
1✔
1011

1012
    def __init__(self, name: str, region: str, account_id: str, attributes=None, tags=None) -> None:
1✔
1013
        super().__init__(name, region, account_id, attributes, tags)
1✔
1014
        self.deduplication = {}
1✔
1015

1016
        self.message_groups = {}
1✔
1017
        self.inflight_groups = set()
1✔
1018
        self.message_group_queue = InterruptibleQueue()
1✔
1019

1020
        # SQS does not seem to change the deduplication behaviour of fifo queues if you
1021
        # change to/from 'queue'/'messageGroup' scope after creation -> we need to set this on creation
1022
        self.deduplication_scope = self.attributes[QueueAttributeName.DeduplicationScope]
1✔
1023

1024
    @property
1✔
1025
    def approximate_number_of_messages(self):
1✔
1026
        n = 0
1✔
1027
        for message_group in self.message_groups.values():
1✔
1028
            n += len(message_group.messages)
1✔
1029
        return n
1✔
1030

1031
    def shutdown(self):
1✔
1032
        self.message_group_queue.shutdown()
1✔
1033

1034
    def get_message_group(self, message_group_id: str) -> MessageGroup:
1✔
1035
        """
1036
        Thread safe lazy factory for MessageGroup objects.
1037

1038
        :param message_group_id: the message group ID
1039
        :return: a new or existing MessageGroup object
1040
        """
1041
        with self.mutex:
1✔
1042
            if message_group_id not in self.message_groups:
1✔
1043
                self.message_groups[message_group_id] = MessageGroup(message_group_id)
1✔
1044

1045
            return self.message_groups.get(message_group_id)
1✔
1046

1047
    def default_attributes(self) -> QueueAttributeMap:
1✔
1048
        return {
1✔
1049
            **super().default_attributes(),
1050
            QueueAttributeName.ContentBasedDeduplication: "false",
1051
            QueueAttributeName.DeduplicationScope: "queue",
1052
            QueueAttributeName.FifoThroughputLimit: "perQueue",
1053
        }
1054

1055
    def update_delay_seconds(self, value: int):
1✔
UNCOV
1056
        super().update_delay_seconds(value)
×
UNCOV
1057
        for message in self.delayed:
×
UNCOV
1058
            message.delay_seconds = value
×
1059

1060
    def _pre_delete_checks(self, message: SqsMessage, receipt_handle: str) -> None:
1✔
1061
        if message.is_visible:
1✔
1062
            raise InvalidParameterValueException(
1✔
1063
                f"Value {receipt_handle} for parameter ReceiptHandle is invalid. Reason: The receipt handle has expired."
1064
            )
1065

1066
    def remove(self, receipt_handle: str):
1✔
1067
        self.validate_receipt_handle(receipt_handle)
1✔
1068

1069
        super().remove(receipt_handle)
1✔
1070

1071
    def put(
1✔
1072
        self,
1073
        message: Message,
1074
        visibility_timeout: int = None,
1075
        message_deduplication_id: str = None,
1076
        message_group_id: str = None,
1077
        delay_seconds: int = None,
1078
    ):
1079
        if delay_seconds:
1✔
1080
            # in fifo queues, delay is only applied on queue level. However, explicitly setting delay_seconds=0 is valid
1081
            raise InvalidParameterValueException(
1✔
1082
                f"Value {delay_seconds} for parameter DelaySeconds is invalid. Reason: The request include parameter "
1083
                f"that is not valid for this queue type."
1084
            )
1085

1086
        if not message_group_id:
1✔
1087
            raise MissingRequiredParameterException(
1✔
1088
                "The request must contain the parameter MessageGroupId."
1089
            )
1090
        dedup_id = message_deduplication_id
1✔
1091
        content_based_deduplication = not is_message_deduplication_id_required(self)
1✔
1092
        if not dedup_id and content_based_deduplication:
1✔
1093
            dedup_id = hashlib.sha256(message.get("Body").encode("utf-8")).hexdigest()
1✔
1094
        if not dedup_id:
1✔
1095
            raise InvalidParameterValueException(
1✔
1096
                "The queue should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided explicitly"
1097
            )
1098

1099
        fifo_message = SqsMessage(
1✔
1100
            time.time(),
1101
            message,
1102
            message_deduplication_id=dedup_id,
1103
            message_group_id=message_group_id,
1104
            sequence_number=str(self.next_sequence_number()),
1105
        )
1106
        if visibility_timeout is not None:
1✔
UNCOV
1107
            fifo_message.visibility_timeout = visibility_timeout
×
1108
        else:
1109
            # use the attribute from the queue
1110
            fifo_message.visibility_timeout = self.visibility_timeout
1✔
1111

1112
        # FIFO queues always use the queue level setting for 'DelaySeconds'
1113
        fifo_message.delay_seconds = self.delay_seconds
1✔
1114

1115
        original_message = self.deduplication.get(dedup_id)
1✔
1116
        if (
1✔
1117
            original_message
1118
            and original_message.priority + sqs_constants.DEDUPLICATION_INTERVAL_IN_SEC
1119
            > fifo_message.priority
1120
            # account for deduplication scope required for (but not restricted to) high-throughput-mode
1121
            and (
1122
                not self.deduplication_scope == "messageGroup"
1123
                or fifo_message.message_group_id == original_message.message_group_id
1124
            )
1125
        ):
1126
            message["MessageId"] = original_message.message["MessageId"]
1✔
1127
        else:
1128
            if fifo_message.is_delayed:
1✔
1129
                self.delayed.add(fifo_message)
1✔
1130
            else:
1131
                self._put_message(fifo_message)
1✔
1132

1133
            self.deduplication[dedup_id] = fifo_message
1✔
1134

1135
        return fifo_message
1✔
1136

1137
    def _put_message(self, message: SqsMessage):
1✔
1138
        """Once a message becomes visible in a FIFO queue, its message group also becomes visible."""
1139
        message_group = self.get_message_group(message.message_group_id)
1✔
1140

1141
        with self.mutex:
1✔
1142
            previously_empty = message_group.empty()
1✔
1143
            # put the message into the group
1144
            message_group.push(message)
1✔
1145

1146
            # new messages should not make groups visible that are currently inflight
1147
            if message.receive_count < 1 and message_group in self.inflight_groups:
1✔
1148
                return
1✔
1149
            # if an older message becomes visible again in the queue, that message's group becomes visible also.
1150
            if message_group in self.inflight_groups:
1✔
1151
                self.inflight_groups.remove(message_group)
1✔
1152
                self.message_group_queue.put_nowait(message_group)
1✔
1153
            # if the group was previously empty, it was not yet added back to the queue
1154
            elif previously_empty:
1✔
1155
                self.message_group_queue.put_nowait(message_group)
1✔
1156

1157
    def requeue_inflight_messages(self):
1✔
1158
        if not self.inflight:
1✔
1159
            return
1✔
1160

1161
        with self.mutex:
1✔
1162
            messages = list(self.inflight)
1✔
1163
            for standard_message in messages:
1✔
1164
                # in fifo, an invisible message blocks potentially visible messages afterwards
1165
                # this can happen for example if multiple message of the same group are received at once, then one
1166
                # message of this batch has its visibility timeout extended
1167
                if not standard_message.is_visible:
1✔
1168
                    return
1✔
1169
                LOG.debug(
1✔
1170
                    "re-queueing inflight messages %s into queue %s",
1171
                    standard_message,
1172
                    self.arn,
1173
                )
1174
                del self.inflight[standard_message]
1✔
1175
                self._put_message(standard_message)
1✔
1176

1177
    def remove_expired_messages(self):
1✔
1178
        with self.mutex:
1✔
1179
            retention_period = self.message_retention_period
1✔
1180
            for message_group in self.message_groups.values():
1✔
1181
                messages = self.remove_expired_messages_from_heap(
1✔
1182
                    message_group.messages, retention_period
1183
                )
1184

1185
                for message in messages:
1✔
1186
                    LOG.debug(
1✔
1187
                        "Removed expired message %s from message group %s in queue %s",
1188
                        message.message_id,
1189
                        message.message_group_id,
1190
                        self.arn,
1191
                    )
1192

1193
    def receive(
1✔
1194
        self,
1195
        num_messages: int = 1,
1196
        wait_time_seconds: int = None,
1197
        visibility_timeout: int = None,
1198
        *,
1199
        poll_empty_queue: bool = False,
1200
    ) -> ReceiveMessageResult:
1201
        """
1202
        Receive logic for FIFO queues is different from standard queues. See
1203
        https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues-understanding-logic.html.
1204

1205
        When receiving messages from a FIFO queue with multiple message group IDs, SQS first attempts to
1206
        return as many messages with the same message group ID as possible. This allows other consumers to
1207
        process messages with a different message group ID. When you receive a message with a message group
1208
        ID, no more messages for the same message group ID are returned unless you delete the message, or it
1209
        becomes visible.
1210
        """
1211
        result = ReceiveMessageResult()
1✔
1212

1213
        max_receive_count = self.max_receive_count
1✔
1214
        visibility_timeout = (
1✔
1215
            self.visibility_timeout if visibility_timeout is None else visibility_timeout
1216
        )
1217

1218
        block = True if wait_time_seconds else False
1✔
1219
        timeout = wait_time_seconds or 0
1✔
1220
        start = time.time()
1✔
1221

1222
        received_groups: set[MessageGroup] = set()
1✔
1223

1224
        # collect messages over potentially multiple groups
1225
        while True:
1✔
1226
            try:
1✔
1227
                group: MessageGroup = self.message_group_queue.get(block=block, timeout=timeout)
1✔
1228
            except Empty:
1✔
1229
                break
1✔
1230

1231
            if group.empty():
1✔
1232
                # this can be the case if all messages in the group are still invisible or
1233
                # if all messages of a group have been processed.
1234
                # TODO: it should be blocking until at least one message is in the queue, but we don't
1235
                #  want to block the group
1236
                # TODO: check behavior in case it happens if all messages were removed from a group due to message
1237
                #  retention period.
1238
                timeout -= time.time() - start
1✔
1239
                if timeout < 0:
1✔
1240
                    timeout = 0
1✔
1241
                continue
1✔
1242

1243
            self.inflight_groups.add(group)
1✔
1244

1245
            received_groups.add(group)
1✔
1246

1247
            if not poll_empty_queue:
1✔
1248
                block = False
1✔
1249

1250
            # we lock the queue while accessing the groups to not get into races with re-queueing/deleting
1251
            with self.mutex:
1✔
1252
                # collect messages from the group until a continue/break condition is met
1253
                while True:
1✔
1254
                    try:
1✔
1255
                        message = group.pop()
1✔
1256
                    except IndexError:
1✔
1257
                        break
1✔
1258

1259
                    if message.deleted:
1✔
1260
                        # this means the message was deleted with a receipt handle after its visibility
1261
                        # timeout expired and the messages was re-queued in the meantime.
UNCOV
1262
                        continue
×
1263

1264
                    # update message attributes
1265
                    message.receive_count += 1
1✔
1266
                    message.update_visibility_timeout(visibility_timeout)
1✔
1267
                    message.set_last_received(time.time())
1✔
1268
                    if message.first_received is None:
1✔
1269
                        message.first_received = message.last_received
1✔
1270

1271
                    LOG.debug("de-queued message %s from fifo queue %s", message, self.arn)
1✔
1272
                    if max_receive_count and message.receive_count > max_receive_count:
1✔
1273
                        # the message needs to move to the DLQ
1274
                        LOG.debug(
1✔
1275
                            "message %s has been received %d times, marking it for DLQ",
1276
                            message,
1277
                            message.receive_count,
1278
                        )
1279
                        result.dead_letter_messages.append(message)
1✔
1280
                    else:
1281
                        result.successful.append(message)
1✔
1282
                        message.increment_approximate_receive_count()
1✔
1283

1284
                        # now we can break the inner loop
1285
                        if len(result.successful) == num_messages:
1✔
1286
                            break
1✔
1287

1288
                # but we also need to check the condition to return from the outer loop
1289
                if len(result.successful) == num_messages:
1✔
1290
                    break
1✔
1291

1292
        # now process the successful result messages: create receipt handles and manage visibility.
1293
        # we use the mutex again because we are modifying the group
1294
        with self.mutex:
1✔
1295
            for message in result.successful:
1✔
1296
                # manage receipt handle
1297
                receipt_handle = self.create_receipt_handle(message)
1✔
1298
                message.receipt_handles.add(receipt_handle)
1✔
1299
                self.receipts[receipt_handle] = message
1✔
1300
                result.receipt_handles.append(receipt_handle)
1✔
1301

1302
                # manage message visibility
1303
                if message.visibility_timeout == 0:
1✔
1304
                    self._put_message(message)
1✔
1305
                else:
1306
                    self.add_inflight_message(message)
1✔
1307
        return result
1✔
1308

1309
    def _on_remove_message(self, message: SqsMessage):
1✔
1310
        # if a message is deleted from the queue, the message's group can become visible again
1311
        message_group = self.get_message_group(message.message_group_id)
1✔
1312

1313
        with self.mutex:
1✔
1314
            try:
1✔
1315
                del self.inflight[message]
1✔
UNCOV
1316
            except KeyError:
×
1317
                # in FIFO queues, this should not happen, as expired receipt handles cannot be used to
1318
                # delete a message.
1319
                pass
×
1320
            self.update_message_group_visibility(message_group)
1✔
1321

1322
    def update_message_group_visibility(self, message_group: MessageGroup):
1✔
1323
        """
1324
        Check if the passed message group should be made visible again
1325
        """
1326

1327
        with self.mutex:
1✔
1328
            if message_group in self.inflight_groups:
1✔
1329
                # it becomes visible again only if there are no other in flight messages in that group
1330
                for message in self.inflight:
1✔
1331
                    if message.message_group_id == message_group.message_group_id:
1✔
1332
                        return
1✔
1333

1334
                self.inflight_groups.remove(message_group)
1✔
1335
                if not message_group.empty():
1✔
1336
                    self.message_group_queue.put_nowait(message_group)
1✔
1337

1338
    def _assert_queue_name(self, name):
1✔
1339
        if not name.endswith(".fifo"):
1✔
1340
            raise InvalidParameterValueException(
1✔
1341
                "The name of a FIFO queue can only include alphanumeric characters, hyphens, or underscores, "
1342
                "must end with .fifo suffix and be 1 to 80 in length"
1343
            )
1344
        # The .fifo suffix counts towards the 80-character queue name quota.
1345
        queue_name = name[:-5] + "_fifo"
1✔
1346
        super()._assert_queue_name(queue_name)
1✔
1347

1348
    def validate_queue_attributes(self, attributes):
1✔
1349
        valid = [
1✔
1350
            k[1]
1351
            for k in inspect.getmembers(QueueAttributeName)
1352
            if k not in sqs_constants.INTERNAL_QUEUE_ATTRIBUTES
1353
        ]
1354
        for k in attributes.keys():
1✔
1355
            if k not in valid:
1✔
UNCOV
1356
                raise InvalidAttributeName(f"Unknown Attribute {k}.")
×
1357
        # Special Cases
1358
        fifo = attributes.get(QueueAttributeName.FifoQueue)
1✔
1359
        if fifo and fifo.lower() != "true":
1✔
1360
            raise InvalidAttributeValue(
1✔
1361
                "Invalid value for the parameter FifoQueue. Reason: Modifying queue type is not supported."
1362
            )
1363

1364
    def next_sequence_number(self):
1✔
1365
        return next(global_message_sequence())
1✔
1366

1367
    def clear(self):
1✔
1368
        with self.mutex:
1✔
1369
            super().clear()
1✔
1370
            self.message_groups.clear()
1✔
1371
            self.inflight_groups.clear()
1✔
1372
            self.message_group_queue.queue.clear()
1✔
1373
            self.deduplication.clear()
1✔
1374

1375

1376
class SqsStore(BaseStore):
1✔
1377
    queues: dict[str, SqsQueue] = LocalAttribute(default=dict)
1✔
1378

1379
    deleted: dict[str, float] = LocalAttribute(default=dict)
1✔
1380

1381
    move_tasks: dict[str, MessageMoveTask] = LocalAttribute(default=dict)
1✔
1382
    """Maps task IDs to their ``MoveMessageTask`` object. Task IDs can be found by decoding a task handle."""
1✔
1383

1384
    def expire_deleted(self):
1✔
1385
        for k in list(self.deleted.keys()):
1✔
1386
            if self.deleted[k] <= (time.time() - sqs_constants.RECENTLY_DELETED_TIMEOUT):
1✔
1387
                del self.deleted[k]
1✔
1388

1389

1390
sqs_stores = AccountRegionBundle("sqs", SqsStore)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc