• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 22334798432

23 Feb 2026 06:42PM UTC coverage: 86.956% (-0.02%) from 86.973%
22334798432

push

github

web-flow
S3: regenerate test snapshots & parity fixes (#13824)

69831 of 80306 relevant lines covered (86.96%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.99
/localstack-core/localstack/services/sqs/models.py
1
import copy
1✔
2
import hashlib
1✔
3
import heapq
1✔
4
import inspect
1✔
5
import json
1✔
6
import logging
1✔
7
import re
1✔
8
import threading
1✔
9
import time
1✔
10
from datetime import datetime
1✔
11
from queue import Empty
1✔
12

13
from localstack import config
1✔
14
from localstack.aws.api import RequestContext
1✔
15
from localstack.aws.api.sqs import (
1✔
16
    AttributeNameList,
17
    InvalidAttributeName,
18
    Message,
19
    MessageAttributeNameList,
20
    MessageSystemAttributeName,
21
    QueueAttributeMap,
22
    QueueAttributeName,
23
    ReceiptHandleIsInvalid,
24
    TagMap,
25
)
26
from localstack.services.sqs import constants as sqs_constants
1✔
27
from localstack.services.sqs.constants import DYNAMIC_ATTRIBUTES
1✔
28
from localstack.services.sqs.exceptions import (
1✔
29
    InvalidAttributeValue,
30
    InvalidParameterValueException,
31
    MissingRequiredParameterException,
32
)
33
from localstack.services.sqs.queue import InterruptiblePriorityQueue, InterruptibleQueue
1✔
34
from localstack.services.sqs.utils import (
1✔
35
    create_message_attribute_hash,
36
    encode_move_task_handle,
37
    encode_receipt_handle,
38
    extract_receipt_handle_info,
39
    global_message_sequence,
40
    guess_endpoint_strategy_and_host,
41
    is_message_deduplication_id_required,
42
    message_filter_attributes,
43
    message_filter_message_attributes,
44
)
45
from localstack.services.stores import AccountRegionBundle, BaseStore, LocalAttribute
1✔
46
from localstack.utils.aws.arns import get_partition
1✔
47
from localstack.utils.strings import long_uid
1✔
48
from localstack.utils.tagging import Tags
1✔
49
from localstack.utils.time import now
1✔
50
from localstack.utils.urls import localstack_host
1✔
51

52
LOG = logging.getLogger(__name__)
1✔
53

54
ReceiptHandle = str
1✔
55

56

57
class SqsMessage:
1✔
58
    message: Message
1✔
59
    created: float
1✔
60
    visibility_timeout: int | None
1✔
61
    receive_count: int
1✔
62
    delay_seconds: int | None
1✔
63
    receipt_handles: set[str]
1✔
64
    last_received: float | None
1✔
65
    first_received: float | None
1✔
66
    visibility_deadline: float | None
1✔
67
    deleted: bool
1✔
68
    priority: float
1✔
69
    sequence_number: str | None
1✔
70

71
    def __init__(
1✔
72
        self,
73
        priority: float,
74
        message: Message,
75
        message_deduplication_id: str = None,
76
        message_group_id: str = None,
77
        sequence_number: str = None,
78
    ) -> None:
79
        self.created = time.time()
1✔
80
        self.message = message
1✔
81
        self.receive_count = 0
1✔
82
        self.receipt_handles = set()
1✔
83

84
        self.delay_seconds = None
1✔
85
        self.last_received = None
1✔
86
        self.first_received = None
1✔
87
        self.visibility_timeout = None
1✔
88
        self.visibility_deadline = None
1✔
89
        self.deleted = False
1✔
90
        self.priority = priority
1✔
91
        self.sequence_number = sequence_number
1✔
92

93
        attributes = {}
1✔
94
        if message_group_id is not None:
1✔
95
            attributes["MessageGroupId"] = message_group_id
1✔
96
        if message_deduplication_id is not None:
1✔
97
            attributes["MessageDeduplicationId"] = message_deduplication_id
1✔
98
        if sequence_number is not None:
1✔
99
            attributes["SequenceNumber"] = sequence_number
1✔
100

101
        if self.message.get("Attributes"):
1✔
102
            self.message["Attributes"].update(attributes)
1✔
103
        else:
104
            self.message["Attributes"] = attributes
×
105

106
        # set attribute default values if not set
107
        self.message["Attributes"].setdefault(
1✔
108
            MessageSystemAttributeName.ApproximateReceiveCount, "0"
109
        )
110

111
    @property
1✔
112
    def message_group_id(self) -> str | None:
1✔
113
        return self.message["Attributes"].get(MessageSystemAttributeName.MessageGroupId)
1✔
114

115
    @property
1✔
116
    def message_deduplication_id(self) -> str | None:
1✔
117
        return self.message["Attributes"].get(MessageSystemAttributeName.MessageDeduplicationId)
1✔
118

119
    @property
1✔
120
    def dead_letter_queue_source_arn(self) -> str | None:
1✔
121
        return self.message["Attributes"].get(MessageSystemAttributeName.DeadLetterQueueSourceArn)
1✔
122

123
    @property
1✔
124
    def message_id(self):
1✔
125
        return self.message["MessageId"]
1✔
126

127
    def increment_approximate_receive_count(self):
1✔
128
        """
129
        Increment the message system attribute ``ApproximateReceiveCount``.
130
        """
131
        # TODO: need better handling of system attributes
132
        cnt = int(
1✔
133
            self.message["Attributes"].get(MessageSystemAttributeName.ApproximateReceiveCount, "0")
134
        )
135
        cnt += 1
1✔
136
        self.message["Attributes"][MessageSystemAttributeName.ApproximateReceiveCount] = str(cnt)
1✔
137

138
    def set_last_received(self, timestamp: float):
1✔
139
        """
140
        Sets the last received timestamp of the message to the given value, and updates the visibility deadline
141
        accordingly.
142

143
        :param timestamp: the last time the message was received
144
        """
145
        self.last_received = timestamp
1✔
146
        self.visibility_deadline = timestamp + self.visibility_timeout
1✔
147

148
    def update_visibility_timeout(self, timeout: int):
1✔
149
        """
150
        Sets the visibility timeout of the message to the given value, and updates the visibility deadline accordingly.
151

152
        :param timeout: the timeout value in seconds
153
        """
154
        self.visibility_timeout = timeout
1✔
155
        self.visibility_deadline = time.time() + timeout
1✔
156

157
    @property
1✔
158
    def is_visible(self) -> bool:
1✔
159
        """
160
        Returns false if the message has a visibility deadline that is in the future.
161

162
        :return: whether the message is visible or not.
163
        """
164
        if self.visibility_deadline is None:
1✔
165
            return True
1✔
166
        if time.time() >= self.visibility_deadline:
1✔
167
            return True
1✔
168

169
        return False
1✔
170

171
    @property
1✔
172
    def is_delayed(self) -> bool:
1✔
173
        if self.delay_seconds is None:
1✔
174
            return False
×
175
        return time.time() <= self.created + self.delay_seconds
1✔
176

177
    def __gt__(self, other):
1✔
178
        return self.priority > other.priority
×
179

180
    def __ge__(self, other):
1✔
181
        return self.priority >= other.priority
×
182

183
    def __lt__(self, other):
1✔
184
        return self.priority < other.priority
1✔
185

186
    def __le__(self, other):
1✔
187
        return self.priority <= other.priority
×
188

189
    def __eq__(self, other):
1✔
190
        return self.message_id == other.message_id
×
191

192
    def __hash__(self):
1✔
193
        return self.message_id.__hash__()
1✔
194

195
    def __repr__(self):
196
        return f"SqsMessage(id={self.message_id},group={self.message_group_id})"
197

198

199
def to_sqs_api_message(
1✔
200
    standard_message: SqsMessage,
201
    attribute_names: AttributeNameList = None,
202
    message_attribute_names: MessageAttributeNameList = None,
203
) -> Message:
204
    """
205
    Utility function to convert an SQS message from LocalStack's internal representation to the AWS API
206
    concept 'Message', which is the format returned by the ``ReceiveMessage`` operation.
207

208
    :param standard_message: A LocalStack SQS message
209
    :param attribute_names: the attribute name list to filter
210
    :param message_attribute_names: the message attribute names to filter
211
    :return: a copy of the original Message with updated message attributes and MD5 attribute hash sums
212
    """
213
    # prepare message for receiver
214
    message = copy.deepcopy(standard_message.message)
1✔
215

216
    # update system attributes of the message copy
217
    message["Attributes"][MessageSystemAttributeName.ApproximateFirstReceiveTimestamp] = str(
1✔
218
        int((standard_message.first_received or 0) * 1000)
219
    )
220

221
    # filter attributes for receiver
222
    message_filter_attributes(message, attribute_names)
1✔
223
    message_filter_message_attributes(message, message_attribute_names)
1✔
224
    if message.get("MessageAttributes"):
1✔
225
        message["MD5OfMessageAttributes"] = create_message_attribute_hash(
1✔
226
            message["MessageAttributes"]
227
        )
228
    else:
229
        # delete the value that was computed when creating the message
230
        message.pop("MD5OfMessageAttributes", None)
1✔
231
    return message
1✔
232

233

234
class ReceiveMessageResult:
1✔
235
    """
236
    Object to communicate the result of a "receive messages" operation between the SqsProvider and
237
    the underlying datastructure holding the messages.
238
    """
239

240
    successful: list[SqsMessage]
1✔
241
    """The messages that were successfully received from the queue"""
1✔
242

243
    receipt_handles: list[str]
1✔
244
    """The array index position in ``successful`` and ``receipt_handles`` need to be the same (this
1✔
245
    assumption is needed when assembling the result in `SqsProvider.receive_message`)"""
246

247
    dead_letter_messages: list[SqsMessage]
1✔
248
    """All messages that were received more than maxReceiveCount in the redrive policy (if any)"""
1✔
249

250
    def __init__(self):
1✔
251
        self.successful = []
1✔
252
        self.receipt_handles = []
1✔
253
        self.dead_letter_messages = []
1✔
254

255

256
class MessageMoveTaskStatus(str):
1✔
257
    CREATED = "CREATED"  # not validated, for internal use
1✔
258
    RUNNING = "RUNNING"
1✔
259
    COMPLETED = "COMPLETED"
1✔
260
    CANCELLING = "CANCELLING"
1✔
261
    CANCELLED = "CANCELLED"
1✔
262
    FAILED = "FAILED"
1✔
263

264

265
class MessageMoveTask:
1✔
266
    """
267
    A task created by the ``StartMessageMoveTask`` operation.
268
    """
269

270
    # configurable fields
271
    source_arn: str
1✔
272
    """The arn of the DLQ the messages are currently in."""
1✔
273
    destination_arn: str | None
1✔
274
    """If the DestinationArn is not specified, the original source arn will be used as target."""
1✔
275
    max_number_of_messages_per_second: int | None
1✔
276

277
    # dynamic fields
278
    task_id: str
1✔
279
    status: str
1✔
280
    started_timestamp: datetime | None
1✔
281
    approximate_number_of_messages_moved: int | None
1✔
282
    approximate_number_of_messages_to_move: int | None
1✔
283
    failure_reason: str | None
1✔
284

285
    cancel_event: threading.Event
1✔
286

287
    def __init__(
1✔
288
        self,
289
        source_arn: str,
290
        destination_arn: str,
291
        max_number_of_messages_per_second: int | None = None,
292
    ):
293
        self.task_id = long_uid()
1✔
294
        self.source_arn = source_arn
1✔
295
        self.destination_arn = destination_arn
1✔
296
        self.max_number_of_messages_per_second = max_number_of_messages_per_second
1✔
297
        self.cancel_event = threading.Event()
1✔
298
        self.status = MessageMoveTaskStatus.CREATED
1✔
299
        self.started_timestamp = None
1✔
300
        self.approximate_number_of_messages_moved = None
1✔
301
        self.approximate_number_of_messages_to_move = None
1✔
302
        self.failure_reason = None
1✔
303

304
    def mark_started(self):
1✔
305
        self.started_timestamp = datetime.utcnow()
1✔
306
        self.status = MessageMoveTaskStatus.RUNNING
1✔
307
        self.cancel_event.clear()
1✔
308

309
    @property
1✔
310
    def task_handle(self) -> str:
1✔
311
        return encode_move_task_handle(self.task_id, self.source_arn)
1✔
312

313

314
class SqsQueue:
1✔
315
    name: str
1✔
316
    region: str
1✔
317
    account_id: str
1✔
318

319
    attributes: QueueAttributeMap
1✔
320
    tags: TagMap
1✔
321

322
    purge_in_progress: bool
1✔
323
    purge_timestamp: float | None
1✔
324

325
    delayed: set[SqsMessage]
1✔
326
    # Simulating an ordered set in python. Only the keys are used and of interest.
327
    inflight: dict[SqsMessage, None]
1✔
328
    receipts: dict[str, SqsMessage]
1✔
329
    mutex: threading.RLock
1✔
330

331
    def __init__(self, name: str, region: str, account_id: str, attributes=None, tags=None) -> None:
1✔
332
        self.name = name
1✔
333
        self.region = region
1✔
334
        self.account_id = account_id
1✔
335

336
        self._assert_queue_name(name)
1✔
337
        self.tags = tags or {}
1✔
338

339
        self.delayed = set()
1✔
340
        self.inflight = {}
1✔
341
        self.receipts = {}
1✔
342

343
        self.attributes = self.default_attributes()
1✔
344
        if attributes:
1✔
345
            self.validate_queue_attributes(attributes)
1✔
346
            self.attributes.update(attributes)
1✔
347

348
        self.purge_in_progress = False
1✔
349
        self.purge_timestamp = None
1✔
350

351
        self.permissions = set()
1✔
352
        self.mutex = threading.RLock()
1✔
353

354
    def shutdown(self):
1✔
355
        pass
×
356

357
    def default_attributes(self) -> QueueAttributeMap:
1✔
358
        return {
1✔
359
            QueueAttributeName.CreatedTimestamp: str(now()),
360
            QueueAttributeName.DelaySeconds: "0",
361
            QueueAttributeName.LastModifiedTimestamp: str(now()),
362
            QueueAttributeName.MaximumMessageSize: str(sqs_constants.DEFAULT_MAXIMUM_MESSAGE_SIZE),
363
            QueueAttributeName.MessageRetentionPeriod: "345600",
364
            QueueAttributeName.QueueArn: self.arn,
365
            QueueAttributeName.ReceiveMessageWaitTimeSeconds: "0",
366
            QueueAttributeName.VisibilityTimeout: "30",
367
            QueueAttributeName.SqsManagedSseEnabled: "true",
368
        }
369

370
    def update_delay_seconds(self, value: int):
1✔
371
        """
372
        For standard queues, the per-queue delay setting is not retroactive—changing the setting doesn't affect the
373
        delay of messages already in the queue. For FIFO queues, the per-queue delay setting is retroactive—changing
374
        the setting affects the delay of messages already in the queue.
375

376
        https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-delay-queues.html
377

378
        :param value: the number of seconds
379
        """
380
        self.attributes[QueueAttributeName.DelaySeconds] = str(value)
×
381

382
    def update_last_modified(self, timestamp: int = None):
1✔
383
        if timestamp is None:
×
384
            timestamp = now()
×
385

386
        self.attributes[QueueAttributeName.LastModifiedTimestamp] = str(timestamp)
×
387

388
    @property
1✔
389
    def arn(self) -> str:
1✔
390
        return f"arn:{get_partition(self.region)}:sqs:{self.region}:{self.account_id}:{self.name}"
1✔
391

392
    def url(self, context: RequestContext) -> str:
1✔
393
        """Return queue URL which depending on the endpoint strategy returns e.g.:
394
        * (standard) http://sqs.eu-west-1.localhost.localstack.cloud:4566/000000000000/myqueue
395
        * (domain) http://eu-west-1.queue.localhost.localstack.cloud:4566/000000000000/myqueue
396
        * (path) http://localhost.localstack.cloud:4566/queue/eu-central-1/000000000000/myqueue
397
        * otherwise: http://localhost.localstack.cloud:4566/000000000000/myqueue
398
        """
399

400
        scheme = config.get_protocol()  # TODO: should probably change to context.request.scheme
1✔
401
        host_definition = localstack_host()
1✔
402
        host_and_port = host_definition.host_and_port()
1✔
403

404
        endpoint_strategy = config.SQS_ENDPOINT_STRATEGY
1✔
405

406
        if endpoint_strategy == "dynamic":
1✔
407
            scheme = context.request.scheme
×
408
            # determine the endpoint strategy that should be used, and determine the host dynamically
409
            endpoint_strategy, host_and_port = guess_endpoint_strategy_and_host(
×
410
                context.request.host
411
            )
412

413
        if endpoint_strategy == "standard":
1✔
414
            # Region is always part of the queue URL
415
            # sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/my-queue
416
            scheme = context.request.scheme
1✔
417
            host_url = f"{scheme}://sqs.{self.region}.{host_and_port}"
1✔
418
        elif endpoint_strategy == "domain":
1✔
419
            # Legacy style
420
            # queue.localhost.localstack.cloud:4566/000000000000/my-queue (us-east-1)
421
            # or us-east-2.queue.localhost.localstack.cloud:4566/000000000000/my-queue
422
            region = "" if self.region == "us-east-1" else self.region + "."
1✔
423
            host_url = f"{scheme}://{region}queue.{host_and_port}"
1✔
424
        elif endpoint_strategy == "path":
1✔
425
            # https?://localhost:4566/queue/us-east-1/00000000000/my-queue (us-east-1)
426
            host_url = f"{scheme}://{host_and_port}/queue/{self.region}"
1✔
427
        else:
428
            host_url = f"{scheme}://{host_and_port}"
1✔
429

430
        return "{host}/{account_id}/{name}".format(
1✔
431
            host=host_url.rstrip("/"),
432
            account_id=self.account_id,
433
            name=self.name,
434
        )
435

436
    @property
1✔
437
    def redrive_policy(self) -> dict | None:
1✔
438
        if policy_document := self.attributes.get(QueueAttributeName.RedrivePolicy):
1✔
439
            return json.loads(policy_document)
1✔
440
        return None
1✔
441

442
    @property
1✔
443
    def max_receive_count(self) -> int | None:
1✔
444
        """
445
        Returns the maxReceiveCount attribute of the redrive policy. If no redrive policy is set, then it
446
        returns None.
447
        """
448
        if redrive_policy := self.redrive_policy:
1✔
449
            return int(redrive_policy["maxReceiveCount"])
1✔
450
        return None
1✔
451

452
    @property
1✔
453
    def visibility_timeout(self) -> int:
1✔
454
        return int(self.attributes[QueueAttributeName.VisibilityTimeout])
1✔
455

456
    @property
1✔
457
    def delay_seconds(self) -> int:
1✔
458
        return int(self.attributes[QueueAttributeName.DelaySeconds])
1✔
459

460
    @property
1✔
461
    def wait_time_seconds(self) -> int:
1✔
462
        return int(self.attributes[QueueAttributeName.ReceiveMessageWaitTimeSeconds])
1✔
463

464
    @property
1✔
465
    def message_retention_period(self) -> int:
1✔
466
        """
467
        ``MessageRetentionPeriod`` -- the length of time, in seconds, for which Amazon SQS retains a message. Valid
468
        values: An integer representing seconds, from 60 (1 minute) to 1,209,600 (14 days). Default: 345,600 (4 days).
469
        """
470
        return int(self.attributes[QueueAttributeName.MessageRetentionPeriod])
1✔
471

472
    @property
1✔
473
    def maximum_message_size(self) -> int:
1✔
474
        return int(self.attributes[QueueAttributeName.MaximumMessageSize])
1✔
475

476
    @property
1✔
477
    def approximate_number_of_messages(self) -> int:
1✔
478
        raise NotImplementedError
479

480
    @property
1✔
481
    def approximate_number_of_messages_not_visible(self) -> int:
1✔
482
        return len(self.inflight)
1✔
483

484
    @property
1✔
485
    def approximate_number_of_messages_delayed(self) -> int:
1✔
486
        return len(self.delayed)
1✔
487

488
    def validate_receipt_handle(self, receipt_handle: str):
1✔
489
        if self.arn != extract_receipt_handle_info(receipt_handle).queue_arn:
1✔
490
            raise ReceiptHandleIsInvalid(
×
491
                f'The input receipt handle "{receipt_handle}" is not a valid receipt handle.'
492
            )
493

494
    def update_visibility_timeout(self, receipt_handle: str, visibility_timeout: int):
1✔
495
        with self.mutex:
1✔
496
            self.validate_receipt_handle(receipt_handle)
1✔
497

498
            if receipt_handle not in self.receipts:
1✔
499
                raise InvalidParameterValueException(
1✔
500
                    f"Value {receipt_handle} for parameter ReceiptHandle is invalid. Reason: Message does not exist "
501
                    f"or is not available for visibility timeout change."
502
                )
503

504
            standard_message = self.receipts[receipt_handle]
1✔
505

506
            if standard_message not in self.inflight:
1✔
507
                return
1✔
508

509
            standard_message.update_visibility_timeout(visibility_timeout)
1✔
510

511
            if visibility_timeout == 0:
1✔
512
                LOG.info(
1✔
513
                    "terminating the visibility timeout of %s",
514
                    standard_message.message["MessageId"],
515
                )
516
                # Terminating the visibility timeout for a message
517
                # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html#terminating-message-visibility-timeout
518
                del self.inflight[standard_message]
1✔
519
                self._put_message(standard_message)
1✔
520

521
    def remove(self, receipt_handle: str):
1✔
522
        with self.mutex:
1✔
523
            self.validate_receipt_handle(receipt_handle)
1✔
524

525
            if receipt_handle not in self.receipts:
1✔
526
                LOG.debug(
1✔
527
                    "no in-flight message found for receipt handle %s in queue %s",
528
                    receipt_handle,
529
                    self.arn,
530
                )
531
                return
1✔
532

533
            standard_message = self.receipts[receipt_handle]
1✔
534
            self._pre_delete_checks(standard_message, receipt_handle)
1✔
535
            standard_message.deleted = True
1✔
536
            LOG.debug(
1✔
537
                "deleting message %s from queue %s",
538
                standard_message.message["MessageId"],
539
                self.arn,
540
            )
541

542
            # remove all handles associated with this message
543
            for handle in standard_message.receipt_handles:
1✔
544
                del self.receipts[handle]
1✔
545
            standard_message.receipt_handles.clear()
1✔
546

547
            self._on_remove_message(standard_message)
1✔
548

549
    def _on_remove_message(self, message: SqsMessage):
1✔
550
        """Hook for queue-specific logic executed when a message is removed."""
551
        pass
×
552

553
    def put(
1✔
554
        self,
555
        message: Message,
556
        visibility_timeout: int = None,
557
        message_deduplication_id: str = None,
558
        message_group_id: str = None,
559
        delay_seconds: int = None,
560
    ) -> SqsMessage:
561
        raise NotImplementedError
562

563
    def receive(
1✔
564
        self,
565
        num_messages: int = 1,
566
        wait_time_seconds: int = None,
567
        visibility_timeout: int = None,
568
        *,
569
        poll_empty_queue: bool = False,
570
    ) -> ReceiveMessageResult:
571
        """
572
        Receive ``num_messages`` from the queue, and wait at max ``wait_time_seconds``. If a visibility
573
        timeout is given, also change the visibility timeout of all received messages accordingly.
574

575
        :param num_messages: the number of messages you want to get from the underlying queue
576
        :param wait_time_seconds: the number of seconds you want to wait
577
        :param visibility_timeout: an optional new visibility timeout
578
        :param poll_empty_queue: whether to keep polling an empty queue until the duration ``wait_time_seconds`` has elapsed
579
        :return: a ReceiveMessageResult object that contains the result of the operation
580
        """
581
        raise NotImplementedError
582

583
    def clear(self):
1✔
584
        """
585
        Calls clear on all internal datastructures that hold messages and data related to them.
586
        """
587
        with self.mutex:
1✔
588
            self.inflight.clear()
1✔
589
            self.delayed.clear()
1✔
590
            self.receipts.clear()
1✔
591

592
    def _put_message(self, message: SqsMessage):
1✔
593
        """Low-level put operation to put messages into a queue and modify visibilities accordingly."""
594
        raise NotImplementedError
595

596
    def create_receipt_handle(self, message: SqsMessage) -> str:
1✔
597
        return encode_receipt_handle(self.arn, message)
1✔
598

599
    def requeue_inflight_messages(self):
1✔
600
        if not self.inflight:
1✔
601
            return
1✔
602

603
        with self.mutex:
1✔
604
            messages = [message for message in self.inflight if message.is_visible]
1✔
605
            for standard_message in messages:
1✔
606
                LOG.debug(
1✔
607
                    "re-queueing inflight messages %s into queue %s",
608
                    standard_message,
609
                    self.arn,
610
                )
611
                del self.inflight[standard_message]
1✔
612
                self._put_message(standard_message)
1✔
613

614
    def add_inflight_message(self, message: SqsMessage):
1✔
615
        """
616
        We are simulating an ordered set with a dict. When a value is added, it is added as key to the dict, which
617
        is all we need. Hence all "values" in this ordered set are None
618
        :param message: The message to put in flight
619
        """
620
        self.inflight[message] = None
1✔
621

622
    def enqueue_delayed_messages(self):
1✔
623
        if not self.delayed:
1✔
624
            return
1✔
625

626
        with self.mutex:
1✔
627
            messages = [message for message in self.delayed if not message.is_delayed]
1✔
628
            for standard_message in messages:
1✔
629
                LOG.debug(
1✔
630
                    "enqueueing delayed messages %s into queue %s",
631
                    standard_message.message["MessageId"],
632
                    self.arn,
633
                )
634
                self.delayed.remove(standard_message)
1✔
635
                self._put_message(standard_message)
1✔
636

637
    def remove_expired_messages(self):
1✔
638
        """
639
        Removes messages from the queue whose retention period has expired.
640
        """
641
        raise NotImplementedError
642

643
    def _assert_queue_name(self, name):
1✔
644
        if not re.match(r"^[a-zA-Z0-9_-]{1,80}$", name):
1✔
645
            raise InvalidParameterValueException(
1✔
646
                "Can only include alphanumeric characters, hyphens, or underscores. 1 to 80 in length"
647
            )
648

649
    def validate_queue_attributes(self, attributes):
1✔
650
        pass
1✔
651

652
    def add_permission(self, label: str, actions: list[str], account_ids: list[str]) -> None:
1✔
653
        """
654
        Create / append to a policy for usage with the add_permission api call
655

656
        :param actions: List of actions to be included in the policy, without the SQS: prefix
657
        :param account_ids: List of account ids to be included in the policy
658
        :param label: Permission label
659
        """
660
        statement = {
1✔
661
            "Sid": label,
662
            "Effect": "Allow",
663
            "Principal": {
664
                "AWS": [
665
                    f"arn:{get_partition(self.region)}:iam::{account_id}:root"
666
                    for account_id in account_ids
667
                ]
668
                if len(account_ids) > 1
669
                else f"arn:{get_partition(self.region)}:iam::{account_ids[0]}:root"
670
            },
671
            "Action": [f"SQS:{action}" for action in actions]
672
            if len(actions) > 1
673
            else f"SQS:{actions[0]}",
674
            "Resource": self.arn,
675
        }
676
        if policy := self.attributes.get(QueueAttributeName.Policy):
1✔
677
            policy = json.loads(policy)
1✔
678
            policy.setdefault("Statement", [])
1✔
679
        else:
680
            policy = {
1✔
681
                "Version": "2008-10-17",
682
                "Id": f"{self.arn}/SQSDefaultPolicy",
683
                "Statement": [],
684
            }
685
        policy.setdefault("Statement", [])
1✔
686
        existing_statement_ids = [statement.get("Sid") for statement in policy["Statement"]]
1✔
687
        if label in existing_statement_ids:
1✔
688
            raise InvalidParameterValueException(
1✔
689
                f"Value {label} for parameter Label is invalid. Reason: Already exists."
690
            )
691
        policy["Statement"].append(statement)
1✔
692
        self.attributes[QueueAttributeName.Policy] = json.dumps(policy)
1✔
693

694
    def remove_permission(self, label: str) -> None:
1✔
695
        """
696
        Delete a policy statement for usage of the remove_permission call
697

698
        :param label: Permission label
699
        """
700
        if policy := self.attributes.get(QueueAttributeName.Policy):
1✔
701
            policy = json.loads(policy)
1✔
702
            # this should not be necessary, but we can upload custom policies, so it's better to be safe
703
            policy.setdefault("Statement", [])
1✔
704
        else:
705
            policy = {
1✔
706
                "Version": "2008-10-17",
707
                "Id": f"{self.arn}/SQSDefaultPolicy",
708
                "Statement": [],
709
            }
710
        existing_statement_ids = [statement.get("Sid") for statement in policy["Statement"]]
1✔
711
        if label not in existing_statement_ids:
1✔
712
            raise InvalidParameterValueException(
1✔
713
                f"Value {label} for parameter Label is invalid. Reason: can't find label."
714
            )
715
        policy["Statement"] = [
1✔
716
            statement for statement in policy["Statement"] if statement.get("Sid") != label
717
        ]
718
        if policy["Statement"]:
1✔
719
            self.attributes[QueueAttributeName.Policy] = json.dumps(policy)
1✔
720
        else:
721
            del self.attributes[QueueAttributeName.Policy]
1✔
722

723
    def get_queue_attributes(self, attribute_names: AttributeNameList = None) -> dict[str, str]:
1✔
724
        if not attribute_names:
1✔
725
            return {}
1✔
726

727
        if QueueAttributeName.All in attribute_names:
1✔
728
            attribute_names = list(self.attributes.keys()) + DYNAMIC_ATTRIBUTES
1✔
729

730
        result: dict[QueueAttributeName, str] = {}
1✔
731

732
        for attr in attribute_names:
1✔
733
            try:
1✔
734
                getattr(QueueAttributeName, attr)
1✔
735
            except AttributeError:
1✔
736
                raise InvalidAttributeName(f"Unknown Attribute {attr}.")
1✔
737

738
            # The approximate_* attributes are calculated on the spot when accessed.
739
            # We have a @property for each of those which calculates the value.
740
            match attr:
1✔
741
                case QueueAttributeName.ApproximateNumberOfMessages:
1✔
742
                    value = str(self.approximate_number_of_messages)
1✔
743
                case QueueAttributeName.ApproximateNumberOfMessagesDelayed:
1✔
744
                    value = str(self.approximate_number_of_messages_delayed)
1✔
745
                case QueueAttributeName.ApproximateNumberOfMessagesNotVisible:
1✔
746
                    value = str(self.approximate_number_of_messages_not_visible)
1✔
747
                case _:
1✔
748
                    value = self.attributes.get(attr)
1✔
749
            if value == "False" or value == "True":
1✔
750
                result[attr] = value.lower()
1✔
751
            elif value is not None:
1✔
752
                result[attr] = value
1✔
753
        return result
1✔
754

755
    @staticmethod
1✔
756
    def remove_expired_messages_from_heap(
1✔
757
        heap: list[SqsMessage], message_retention_period: int
758
    ) -> list[SqsMessage]:
759
        """
760
        Removes from the given heap of SqsMessages all messages that have expired in the context of the current time
761
        and the given message retention period. The method manipulates the heap but retains the heap property.
762

763
        :param heap: an array satisfying the heap property
764
        :param message_retention_period: the message retention period to use in relation to the current time
765
        :return: a list of expired messages that have been removed from the heap
766
        """
767
        th = time.time() - message_retention_period
1✔
768

769
        expired = []
1✔
770
        while heap:
1✔
771
            # here we're leveraging the heap property "that a[0] is always its smallest element"
772
            # and the assumption that message.created == message.priority
773
            message = heap[0]
1✔
774
            if th < message.created:
1✔
775
                break
1✔
776
            # remove the expired element
777
            expired.append(message)
1✔
778
            heapq.heappop(heap)
1✔
779

780
        return expired
1✔
781

782
    def _pre_delete_checks(self, standard_message: SqsMessage, receipt_handle: str) -> None:
1✔
783
        """
784
        Runs any potential checks if a message that has been successfully identified via a receipt handle
785
        is indeed supposed to be deleted.
786
        For example, a receipt handle that has expired might not lead to deletion.
787

788
        :param standard_message: The message to be deleted
789
        :param receipt_handle: The handle associated with the message
790
        :return: None. Potential violations raise errors.
791
        """
792
        pass
1✔
793

794

795
class StandardQueue(SqsQueue):
1✔
796
    visible: InterruptiblePriorityQueue[SqsMessage]
1✔
797

798
    def __init__(self, name: str, region: str, account_id: str, attributes=None, tags=None) -> None:
1✔
799
        super().__init__(name, region, account_id, attributes, tags)
1✔
800
        self.visible = InterruptiblePriorityQueue()
1✔
801

802
    def clear(self):
1✔
803
        with self.mutex:
1✔
804
            super().clear()
1✔
805
            self.visible.queue.clear()
1✔
806

807
    @property
1✔
808
    def approximate_number_of_messages(self):
1✔
809
        return self.visible.qsize()
1✔
810

811
    def shutdown(self):
1✔
812
        self.visible.shutdown()
1✔
813

814
    def put(
1✔
815
        self,
816
        message: Message,
817
        visibility_timeout: int = None,
818
        message_deduplication_id: str = None,
819
        message_group_id: str = None,
820
        delay_seconds: int = None,
821
    ):
822
        if message_deduplication_id:
1✔
823
            raise InvalidParameterValueException(
×
824
                f"Value {message_deduplication_id} for parameter MessageDeduplicationId is invalid. Reason: The "
825
                f"request includes a parameter that is not valid for this queue type."
826
            )
827

828
        standard_message = SqsMessage(
1✔
829
            time.time(),
830
            message,
831
            message_group_id=message_group_id,
832
        )
833

834
        if visibility_timeout is not None:
1✔
835
            standard_message.visibility_timeout = visibility_timeout
×
836
        else:
837
            # use the attribute from the queue
838
            standard_message.visibility_timeout = self.visibility_timeout
1✔
839

840
        if delay_seconds is not None:
1✔
841
            standard_message.delay_seconds = delay_seconds
1✔
842
        else:
843
            standard_message.delay_seconds = self.delay_seconds
1✔
844

845
        if standard_message.is_delayed:
1✔
846
            self.delayed.add(standard_message)
1✔
847
        else:
848
            self._put_message(standard_message)
1✔
849

850
        return standard_message
1✔
851

852
    def _put_message(self, message: SqsMessage):
1✔
853
        self.visible.put_nowait(message)
1✔
854

855
    def remove_expired_messages(self):
1✔
856
        with self.mutex:
1✔
857
            messages = self.remove_expired_messages_from_heap(
1✔
858
                self.visible.queue, self.message_retention_period
859
            )
860

861
        for message in messages:
1✔
862
            LOG.debug("Removed expired message %s from queue %s", message.message_id, self.arn)
1✔
863

864
    def receive(
1✔
865
        self,
866
        num_messages: int = 1,
867
        wait_time_seconds: int = None,
868
        visibility_timeout: int = None,
869
        *,
870
        poll_empty_queue: bool = False,
871
    ) -> ReceiveMessageResult:
872
        result = ReceiveMessageResult()
1✔
873

874
        max_receive_count = self.max_receive_count
1✔
875
        visibility_timeout = (
1✔
876
            self.visibility_timeout if visibility_timeout is None else visibility_timeout
877
        )
878

879
        block = True if wait_time_seconds else False
1✔
880
        timeout = wait_time_seconds or 0
1✔
881
        start = time.time()
1✔
882

883
        # collect messages
884
        while True:
1✔
885
            try:
1✔
886
                message = self.visible.get(block=block, timeout=timeout)
1✔
887
            except Empty:
1✔
888
                break
1✔
889
            # setting block to false guarantees that, if we've already waited before, we don't wait the
890
            # full time again in the next iteration if max_number_of_messages is set but there are no more
891
            # messages in the queue. see https://github.com/localstack/localstack/issues/5824
892
            if not poll_empty_queue:
1✔
893
                block = False
1✔
894

895
            timeout -= time.time() - start
1✔
896
            if timeout < 0:
1✔
897
                timeout = 0
1✔
898

899
            if message.deleted:
1✔
900
                # filter messages that were deleted with an expired receipt handle after they have been
901
                # re-queued. this can only happen due to a race with `remove`.
902
                continue
×
903

904
            # update message attributes
905
            message.receive_count += 1
1✔
906
            message.update_visibility_timeout(visibility_timeout)
1✔
907
            message.set_last_received(time.time())
1✔
908
            if message.first_received is None:
1✔
909
                message.first_received = message.last_received
1✔
910

911
            LOG.debug("de-queued message %s from %s", message, self.arn)
1✔
912
            if max_receive_count and message.receive_count > max_receive_count:
1✔
913
                # the message needs to move to the DLQ
914
                LOG.debug(
1✔
915
                    "message %s has been received %d times, marking it for DLQ",
916
                    message,
917
                    message.receive_count,
918
                )
919
                result.dead_letter_messages.append(message)
1✔
920
            else:
921
                result.successful.append(message)
1✔
922
                message.increment_approximate_receive_count()
1✔
923

924
                # now we can return
925
                if len(result.successful) == num_messages:
1✔
926
                    break
1✔
927

928
        # now process the successful result messages: create receipt handles and manage visibility.
929
        for message in result.successful:
1✔
930
            # manage receipt handle
931
            receipt_handle = self.create_receipt_handle(message)
1✔
932
            message.receipt_handles.add(receipt_handle)
1✔
933
            self.receipts[receipt_handle] = message
1✔
934
            result.receipt_handles.append(receipt_handle)
1✔
935

936
            # manage message visibility
937
            if message.visibility_timeout == 0:
1✔
938
                self.visible.put_nowait(message)
1✔
939
            else:
940
                self.add_inflight_message(message)
1✔
941

942
        return result
1✔
943

944
    def _on_remove_message(self, message: SqsMessage):
1✔
945
        try:
1✔
946
            del self.inflight[message]
1✔
947
        except KeyError:
1✔
948
            # this likely means the message was removed with an expired receipt handle unfortunately this
949
            # means we need to scan the queue for the element and remove it from there, and then re-heapify
950
            # the queue
951
            try:
1✔
952
                self.visible.queue.remove(message)
1✔
953
                heapq.heapify(self.visible.queue)
1✔
954
            except ValueError:
1✔
955
                # this may happen if the message no longer exists because it was removed earlier
956
                pass
1✔
957

958
    def validate_queue_attributes(self, attributes):
1✔
959
        valid = [
1✔
960
            k[1]
961
            for k in inspect.getmembers(
962
                QueueAttributeName, lambda x: isinstance(x, str) and not x.startswith("__")
963
            )
964
            if k[1] not in sqs_constants.INVALID_STANDARD_QUEUE_ATTRIBUTES
965
        ]
966

967
        for k in attributes.keys():
1✔
968
            if k in [QueueAttributeName.FifoThroughputLimit, QueueAttributeName.DeduplicationScope]:
1✔
969
                raise InvalidAttributeName(
1✔
970
                    f"You can specify the {k} only when FifoQueue is set to true."
971
                )
972
            if k not in valid:
1✔
973
                raise InvalidAttributeName(f"Unknown Attribute {k}.")
1✔
974

975

976
class MessageGroup:
1✔
977
    message_group_id: str
1✔
978
    messages: list[SqsMessage]
1✔
979

980
    def __init__(self, message_group_id: str):
1✔
981
        self.message_group_id = message_group_id
1✔
982
        self.messages = []
1✔
983

984
    def empty(self) -> bool:
1✔
985
        return not self.messages
1✔
986

987
    def size(self) -> int:
1✔
988
        return len(self.messages)
×
989

990
    def pop(self) -> SqsMessage:
1✔
991
        return heapq.heappop(self.messages)
1✔
992

993
    def push(self, message: SqsMessage):
1✔
994
        heapq.heappush(self.messages, message)
1✔
995

996
    def __eq__(self, other):
1✔
997
        return self.message_group_id == other.message_group_id
×
998

999
    def __hash__(self):
1✔
1000
        return self.message_group_id.__hash__()
1✔
1001

1002
    def __repr__(self):
1003
        return f"MessageGroup(id={self.message_group_id}, size={len(self.messages)})"
1004

1005

1006
class FifoQueue(SqsQueue):
1✔
1007
    """
1008
    A FIFO queue behaves differently than a default queue. Most behavior has to be implemented separately.
1009

1010
    See https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html
1011

1012
    TODO: raise exceptions when trying to remove a message with an expired receipt handle
1013
    """
1014

1015
    deduplication: dict[str, SqsMessage]
1✔
1016
    message_groups: dict[str, MessageGroup]
1✔
1017
    inflight_groups: set[MessageGroup]
1✔
1018
    message_group_queue: InterruptibleQueue
1✔
1019
    deduplication_scope: str
1✔
1020

1021
    def __init__(self, name: str, region: str, account_id: str, attributes=None, tags=None) -> None:
1✔
1022
        super().__init__(name, region, account_id, attributes, tags)
1✔
1023
        self.deduplication = {}
1✔
1024

1025
        self.message_groups = {}
1✔
1026
        self.inflight_groups = set()
1✔
1027
        self.message_group_queue = InterruptibleQueue()
1✔
1028

1029
        # SQS does not seem to change the deduplication behaviour of fifo queues if you
1030
        # change to/from 'queue'/'messageGroup' scope after creation -> we need to set this on creation
1031
        self.deduplication_scope = self.attributes[QueueAttributeName.DeduplicationScope]
1✔
1032

1033
    @property
1✔
1034
    def approximate_number_of_messages(self):
1✔
1035
        n = 0
1✔
1036
        for message_group in self.message_groups.values():
1✔
1037
            n += len(message_group.messages)
1✔
1038
        return n
1✔
1039

1040
    def shutdown(self):
1✔
1041
        self.message_group_queue.shutdown()
1✔
1042

1043
    def get_message_group(self, message_group_id: str) -> MessageGroup:
1✔
1044
        """
1045
        Thread safe lazy factory for MessageGroup objects.
1046

1047
        :param message_group_id: the message group ID
1048
        :return: a new or existing MessageGroup object
1049
        """
1050
        with self.mutex:
1✔
1051
            if message_group_id not in self.message_groups:
1✔
1052
                self.message_groups[message_group_id] = MessageGroup(message_group_id)
1✔
1053

1054
            return self.message_groups.get(message_group_id)
1✔
1055

1056
    def default_attributes(self) -> QueueAttributeMap:
1✔
1057
        return {
1✔
1058
            **super().default_attributes(),
1059
            QueueAttributeName.ContentBasedDeduplication: "false",
1060
            QueueAttributeName.DeduplicationScope: "queue",
1061
            QueueAttributeName.FifoThroughputLimit: "perQueue",
1062
        }
1063

1064
    def update_delay_seconds(self, value: int):
1✔
1065
        super().update_delay_seconds(value)
×
1066
        for message in self.delayed:
×
1067
            message.delay_seconds = value
×
1068

1069
    def _pre_delete_checks(self, message: SqsMessage, receipt_handle: str) -> None:
1✔
1070
        if message.is_visible:
1✔
1071
            raise InvalidParameterValueException(
1✔
1072
                f"Value {receipt_handle} for parameter ReceiptHandle is invalid. Reason: The receipt handle has expired."
1073
            )
1074

1075
    def remove(self, receipt_handle: str):
1✔
1076
        self.validate_receipt_handle(receipt_handle)
1✔
1077

1078
        super().remove(receipt_handle)
1✔
1079

1080
    def put(
1✔
1081
        self,
1082
        message: Message,
1083
        visibility_timeout: int = None,
1084
        message_deduplication_id: str = None,
1085
        message_group_id: str = None,
1086
        delay_seconds: int = None,
1087
    ):
1088
        if delay_seconds:
1✔
1089
            # in fifo queues, delay is only applied on queue level. However, explicitly setting delay_seconds=0 is valid
1090
            raise InvalidParameterValueException(
1✔
1091
                f"Value {delay_seconds} for parameter DelaySeconds is invalid. Reason: The request include parameter "
1092
                f"that is not valid for this queue type."
1093
            )
1094

1095
        if not message_group_id:
1✔
1096
            raise MissingRequiredParameterException(
1✔
1097
                "The request must contain the parameter MessageGroupId."
1098
            )
1099
        dedup_id = message_deduplication_id
1✔
1100
        content_based_deduplication = not is_message_deduplication_id_required(self)
1✔
1101
        if not dedup_id and content_based_deduplication:
1✔
1102
            dedup_id = hashlib.sha256(message.get("Body").encode("utf-8")).hexdigest()
1✔
1103
        if not dedup_id:
1✔
1104
            raise InvalidParameterValueException(
1✔
1105
                "The queue should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided explicitly"
1106
            )
1107

1108
        fifo_message = SqsMessage(
1✔
1109
            time.time(),
1110
            message,
1111
            message_deduplication_id=dedup_id,
1112
            message_group_id=message_group_id,
1113
            sequence_number=str(self.next_sequence_number()),
1114
        )
1115
        if visibility_timeout is not None:
1✔
1116
            fifo_message.visibility_timeout = visibility_timeout
×
1117
        else:
1118
            # use the attribute from the queue
1119
            fifo_message.visibility_timeout = self.visibility_timeout
1✔
1120

1121
        # FIFO queues always use the queue level setting for 'DelaySeconds'
1122
        fifo_message.delay_seconds = self.delay_seconds
1✔
1123

1124
        original_message = self.deduplication.get(dedup_id)
1✔
1125
        if (
1✔
1126
            original_message
1127
            and original_message.priority + sqs_constants.DEDUPLICATION_INTERVAL_IN_SEC
1128
            > fifo_message.priority
1129
            # account for deduplication scope required for (but not restricted to) high-throughput-mode
1130
            and (
1131
                not self.deduplication_scope == "messageGroup"
1132
                or fifo_message.message_group_id == original_message.message_group_id
1133
            )
1134
        ):
1135
            message["MessageId"] = original_message.message["MessageId"]
1✔
1136
        else:
1137
            if fifo_message.is_delayed:
1✔
1138
                self.delayed.add(fifo_message)
1✔
1139
            else:
1140
                self._put_message(fifo_message)
1✔
1141

1142
            self.deduplication[dedup_id] = fifo_message
1✔
1143

1144
        return fifo_message
1✔
1145

1146
    def _put_message(self, message: SqsMessage):
1✔
1147
        """Once a message becomes visible in a FIFO queue, its message group also becomes visible."""
1148
        message_group = self.get_message_group(message.message_group_id)
1✔
1149

1150
        with self.mutex:
1✔
1151
            previously_empty = message_group.empty()
1✔
1152
            # put the message into the group
1153
            message_group.push(message)
1✔
1154

1155
            # new messages should not make groups visible that are currently inflight
1156
            if message.receive_count < 1 and message_group in self.inflight_groups:
1✔
1157
                return
1✔
1158
            # if an older message becomes visible again in the queue, that message's group becomes visible also.
1159
            if message_group in self.inflight_groups:
1✔
1160
                self.inflight_groups.remove(message_group)
1✔
1161
                self.message_group_queue.put_nowait(message_group)
1✔
1162
            # if the group was previously empty, it was not yet added back to the queue
1163
            elif previously_empty:
1✔
1164
                self.message_group_queue.put_nowait(message_group)
1✔
1165

1166
    def requeue_inflight_messages(self):
1✔
1167
        if not self.inflight:
1✔
1168
            return
1✔
1169

1170
        with self.mutex:
1✔
1171
            messages = list(self.inflight)
1✔
1172
            for standard_message in messages:
1✔
1173
                # in fifo, an invisible message blocks potentially visible messages afterwards
1174
                # this can happen for example if multiple message of the same group are received at once, then one
1175
                # message of this batch has its visibility timeout extended
1176
                if not standard_message.is_visible:
1✔
1177
                    return
1✔
1178
                LOG.debug(
1✔
1179
                    "re-queueing inflight messages %s into queue %s",
1180
                    standard_message,
1181
                    self.arn,
1182
                )
1183
                del self.inflight[standard_message]
1✔
1184
                self._put_message(standard_message)
1✔
1185

1186
    def remove_expired_messages(self):
1✔
1187
        with self.mutex:
1✔
1188
            retention_period = self.message_retention_period
1✔
1189
            for message_group in self.message_groups.values():
1✔
1190
                messages = self.remove_expired_messages_from_heap(
1✔
1191
                    message_group.messages, retention_period
1192
                )
1193

1194
                for message in messages:
1✔
1195
                    LOG.debug(
1✔
1196
                        "Removed expired message %s from message group %s in queue %s",
1197
                        message.message_id,
1198
                        message.message_group_id,
1199
                        self.arn,
1200
                    )
1201

1202
    def receive(
1✔
1203
        self,
1204
        num_messages: int = 1,
1205
        wait_time_seconds: int = None,
1206
        visibility_timeout: int = None,
1207
        *,
1208
        poll_empty_queue: bool = False,
1209
    ) -> ReceiveMessageResult:
1210
        """
1211
        Receive logic for FIFO queues is different from standard queues. See
1212
        https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues-understanding-logic.html.
1213

1214
        When receiving messages from a FIFO queue with multiple message group IDs, SQS first attempts to
1215
        return as many messages with the same message group ID as possible. This allows other consumers to
1216
        process messages with a different message group ID. When you receive a message with a message group
1217
        ID, no more messages for the same message group ID are returned unless you delete the message, or it
1218
        becomes visible.
1219
        """
1220
        result = ReceiveMessageResult()
1✔
1221

1222
        max_receive_count = self.max_receive_count
1✔
1223
        visibility_timeout = (
1✔
1224
            self.visibility_timeout if visibility_timeout is None else visibility_timeout
1225
        )
1226

1227
        block = True if wait_time_seconds else False
1✔
1228
        timeout = wait_time_seconds or 0
1✔
1229
        start = time.time()
1✔
1230

1231
        received_groups: set[MessageGroup] = set()
1✔
1232

1233
        # collect messages over potentially multiple groups
1234
        while True:
1✔
1235
            try:
1✔
1236
                group: MessageGroup = self.message_group_queue.get(block=block, timeout=timeout)
1✔
1237
            except Empty:
1✔
1238
                break
1✔
1239

1240
            if group.empty():
1✔
1241
                # this can be the case if all messages in the group are still invisible or
1242
                # if all messages of a group have been processed.
1243
                # TODO: it should be blocking until at least one message is in the queue, but we don't
1244
                #  want to block the group
1245
                # TODO: check behavior in case it happens if all messages were removed from a group due to message
1246
                #  retention period.
1247
                timeout -= time.time() - start
1✔
1248
                if timeout < 0:
1✔
1249
                    timeout = 0
1✔
1250
                continue
1✔
1251

1252
            self.inflight_groups.add(group)
1✔
1253

1254
            received_groups.add(group)
1✔
1255

1256
            if not poll_empty_queue:
1✔
1257
                block = False
1✔
1258

1259
            # we lock the queue while accessing the groups to not get into races with re-queueing/deleting
1260
            with self.mutex:
1✔
1261
                # collect messages from the group until a continue/break condition is met
1262
                while True:
1✔
1263
                    try:
1✔
1264
                        message = group.pop()
1✔
1265
                    except IndexError:
1✔
1266
                        break
1✔
1267

1268
                    if message.deleted:
1✔
1269
                        # this means the message was deleted with a receipt handle after its visibility
1270
                        # timeout expired and the messages was re-queued in the meantime.
1271
                        continue
×
1272

1273
                    # update message attributes
1274
                    message.receive_count += 1
1✔
1275
                    message.update_visibility_timeout(visibility_timeout)
1✔
1276
                    message.set_last_received(time.time())
1✔
1277
                    if message.first_received is None:
1✔
1278
                        message.first_received = message.last_received
1✔
1279

1280
                    LOG.debug("de-queued message %s from fifo queue %s", message, self.arn)
1✔
1281
                    if max_receive_count and message.receive_count > max_receive_count:
1✔
1282
                        # the message needs to move to the DLQ
1283
                        LOG.debug(
1✔
1284
                            "message %s has been received %d times, marking it for DLQ",
1285
                            message,
1286
                            message.receive_count,
1287
                        )
1288
                        result.dead_letter_messages.append(message)
1✔
1289
                    else:
1290
                        result.successful.append(message)
1✔
1291
                        message.increment_approximate_receive_count()
1✔
1292

1293
                        # now we can break the inner loop
1294
                        if len(result.successful) == num_messages:
1✔
1295
                            break
1✔
1296

1297
                # but we also need to check the condition to return from the outer loop
1298
                if len(result.successful) == num_messages:
1✔
1299
                    break
1✔
1300

1301
        # now process the successful result messages: create receipt handles and manage visibility.
1302
        # we use the mutex again because we are modifying the group
1303
        with self.mutex:
1✔
1304
            for message in result.successful:
1✔
1305
                # manage receipt handle
1306
                receipt_handle = self.create_receipt_handle(message)
1✔
1307
                message.receipt_handles.add(receipt_handle)
1✔
1308
                self.receipts[receipt_handle] = message
1✔
1309
                result.receipt_handles.append(receipt_handle)
1✔
1310

1311
                # manage message visibility
1312
                if message.visibility_timeout == 0:
1✔
1313
                    self._put_message(message)
1✔
1314
                else:
1315
                    self.add_inflight_message(message)
1✔
1316
        return result
1✔
1317

1318
    def _on_remove_message(self, message: SqsMessage):
1✔
1319
        # if a message is deleted from the queue, the message's group can become visible again
1320
        message_group = self.get_message_group(message.message_group_id)
1✔
1321

1322
        with self.mutex:
1✔
1323
            try:
1✔
1324
                del self.inflight[message]
1✔
1325
            except KeyError:
×
1326
                # in FIFO queues, this should not happen, as expired receipt handles cannot be used to
1327
                # delete a message.
1328
                pass
×
1329
            self.update_message_group_visibility(message_group)
1✔
1330

1331
    def update_message_group_visibility(self, message_group: MessageGroup):
1✔
1332
        """
1333
        Check if the passed message group should be made visible again
1334
        """
1335

1336
        with self.mutex:
1✔
1337
            if message_group in self.inflight_groups:
1✔
1338
                # it becomes visible again only if there are no other in flight messages in that group
1339
                for message in self.inflight:
1✔
1340
                    if message.message_group_id == message_group.message_group_id:
1✔
1341
                        return
1✔
1342

1343
                self.inflight_groups.remove(message_group)
1✔
1344
                if not message_group.empty():
1✔
1345
                    self.message_group_queue.put_nowait(message_group)
1✔
1346

1347
    def _assert_queue_name(self, name):
1✔
1348
        if not name.endswith(".fifo"):
1✔
1349
            raise InvalidParameterValueException(
1✔
1350
                "The name of a FIFO queue can only include alphanumeric characters, hyphens, or underscores, "
1351
                "must end with .fifo suffix and be 1 to 80 in length"
1352
            )
1353
        # The .fifo suffix counts towards the 80-character queue name quota.
1354
        queue_name = name[:-5] + "_fifo"
1✔
1355
        super()._assert_queue_name(queue_name)
1✔
1356

1357
    def validate_queue_attributes(self, attributes):
1✔
1358
        valid = [
1✔
1359
            k[1]
1360
            for k in inspect.getmembers(QueueAttributeName)
1361
            if k not in sqs_constants.INTERNAL_QUEUE_ATTRIBUTES
1362
        ]
1363
        for k in attributes.keys():
1✔
1364
            if k not in valid:
1✔
1365
                raise InvalidAttributeName(f"Unknown Attribute {k}.")
×
1366
        # Special Cases
1367
        fifo = attributes.get(QueueAttributeName.FifoQueue)
1✔
1368
        if fifo and fifo.lower() != "true":
1✔
1369
            raise InvalidAttributeValue(
1✔
1370
                "Invalid value for the parameter FifoQueue. Reason: Modifying queue type is not supported."
1371
            )
1372

1373
    def next_sequence_number(self):
1✔
1374
        return next(global_message_sequence())
1✔
1375

1376
    def clear(self):
1✔
1377
        with self.mutex:
1✔
1378
            super().clear()
1✔
1379
            self.message_groups.clear()
1✔
1380
            self.inflight_groups.clear()
1✔
1381
            self.message_group_queue.queue.clear()
1✔
1382
            self.deduplication.clear()
1✔
1383

1384

1385
class SqsStore(BaseStore):
1✔
1386
    queues: dict[str, FifoQueue | StandardQueue] = LocalAttribute(default=dict)
1✔
1387

1388
    deleted: dict[str, float] = LocalAttribute(default=dict)
1✔
1389

1390
    move_tasks: dict[str, MessageMoveTask] = LocalAttribute(default=dict)
1✔
1391
    """Maps task IDs to their ``MoveMessageTask`` object. Task IDs can be found by decoding a task handle."""
1✔
1392

1393
    tags: Tags = LocalAttribute(default=Tags)
1✔
1394

1395
    def expire_deleted(self):
1✔
1396
        for k in list(self.deleted.keys()):
1✔
1397
            if self.deleted[k] <= (time.time() - sqs_constants.RECENTLY_DELETED_TIMEOUT):
1✔
1398
                del self.deleted[k]
1✔
1399

1400

1401
sqs_stores = AccountRegionBundle("sqs", SqsStore)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc