• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19558051963

20 Nov 2025 05:48PM UTC coverage: 86.859% (-0.05%) from 86.907%
19558051963

push

github

web-flow
Sns:v2 publish (#13399)

199 of 279 new or added lines in 5 files covered. (71.33%)

168 existing lines in 9 files now uncovered.

68851 of 79268 relevant lines covered (86.86%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.92
/localstack-core/localstack/services/sqs/models.py
1
import copy
1✔
2
import hashlib
1✔
3
import heapq
1✔
4
import inspect
1✔
5
import json
1✔
6
import logging
1✔
7
import re
1✔
8
import threading
1✔
9
import time
1✔
10
from datetime import datetime
1✔
11
from queue import Empty
1✔
12

13
from localstack import config
1✔
14
from localstack.aws.api import RequestContext
1✔
15
from localstack.aws.api.sqs import (
1✔
16
    AttributeNameList,
17
    InvalidAttributeName,
18
    Message,
19
    MessageAttributeNameList,
20
    MessageSystemAttributeName,
21
    QueueAttributeMap,
22
    QueueAttributeName,
23
    ReceiptHandleIsInvalid,
24
    TagMap,
25
)
26
from localstack.services.sqs import constants as sqs_constants
1✔
27
from localstack.services.sqs.exceptions import (
1✔
28
    InvalidAttributeValue,
29
    InvalidParameterValueException,
30
    MissingRequiredParameterException,
31
)
32
from localstack.services.sqs.queue import InterruptiblePriorityQueue, InterruptibleQueue
1✔
33
from localstack.services.sqs.utils import (
1✔
34
    create_message_attribute_hash,
35
    encode_move_task_handle,
36
    encode_receipt_handle,
37
    extract_receipt_handle_info,
38
    global_message_sequence,
39
    guess_endpoint_strategy_and_host,
40
    is_message_deduplication_id_required,
41
    message_filter_attributes,
42
    message_filter_message_attributes,
43
)
44
from localstack.services.stores import AccountRegionBundle, BaseStore, LocalAttribute
1✔
45
from localstack.utils.aws.arns import get_partition
1✔
46
from localstack.utils.strings import long_uid
1✔
47
from localstack.utils.time import now
1✔
48
from localstack.utils.urls import localstack_host
1✔
49

50
LOG = logging.getLogger(__name__)
1✔
51

52
ReceiptHandle = str
1✔
53

54

55
class SqsMessage:
1✔
56
    message: Message
1✔
57
    created: float
1✔
58
    visibility_timeout: int
1✔
59
    receive_count: int
1✔
60
    delay_seconds: int | None
1✔
61
    receipt_handles: set[str]
1✔
62
    last_received: float | None
1✔
63
    first_received: float | None
1✔
64
    visibility_deadline: float | None
1✔
65
    deleted: bool
1✔
66
    priority: float
1✔
67
    message_deduplication_id: str
1✔
68
    message_group_id: str
1✔
69
    sequence_number: str
1✔
70

71
    def __init__(
1✔
72
        self,
73
        priority: float,
74
        message: Message,
75
        message_deduplication_id: str = None,
76
        message_group_id: str = None,
77
        sequence_number: str = None,
78
    ) -> None:
79
        self.created = time.time()
1✔
80
        self.message = message
1✔
81
        self.receive_count = 0
1✔
82
        self.receipt_handles = set()
1✔
83

84
        self.delay_seconds = None
1✔
85
        self.last_received = None
1✔
86
        self.first_received = None
1✔
87
        self.visibility_deadline = None
1✔
88
        self.deleted = False
1✔
89
        self.priority = priority
1✔
90
        self.sequence_number = sequence_number
1✔
91

92
        attributes = {}
1✔
93
        if message_group_id is not None:
1✔
94
            attributes["MessageGroupId"] = message_group_id
1✔
95
        if message_deduplication_id is not None:
1✔
96
            attributes["MessageDeduplicationId"] = message_deduplication_id
1✔
97
        if sequence_number is not None:
1✔
98
            attributes["SequenceNumber"] = sequence_number
1✔
99

100
        if self.message.get("Attributes"):
1✔
101
            self.message["Attributes"].update(attributes)
1✔
102
        else:
103
            self.message["Attributes"] = attributes
×
104

105
        # set attribute default values if not set
106
        self.message["Attributes"].setdefault(
1✔
107
            MessageSystemAttributeName.ApproximateReceiveCount, "0"
108
        )
109

110
    @property
1✔
111
    def message_group_id(self) -> str | None:
1✔
112
        return self.message["Attributes"].get(MessageSystemAttributeName.MessageGroupId)
1✔
113

114
    @property
1✔
115
    def message_deduplication_id(self) -> str | None:
1✔
116
        return self.message["Attributes"].get(MessageSystemAttributeName.MessageDeduplicationId)
1✔
117

118
    @property
1✔
119
    def dead_letter_queue_source_arn(self) -> str | None:
1✔
120
        return self.message["Attributes"].get(MessageSystemAttributeName.DeadLetterQueueSourceArn)
1✔
121

122
    @property
1✔
123
    def message_id(self):
1✔
124
        return self.message["MessageId"]
1✔
125

126
    def increment_approximate_receive_count(self):
1✔
127
        """
128
        Increment the message system attribute ``ApproximateReceiveCount``.
129
        """
130
        # TODO: need better handling of system attributes
131
        cnt = int(
1✔
132
            self.message["Attributes"].get(MessageSystemAttributeName.ApproximateReceiveCount, "0")
133
        )
134
        cnt += 1
1✔
135
        self.message["Attributes"][MessageSystemAttributeName.ApproximateReceiveCount] = str(cnt)
1✔
136

137
    def set_last_received(self, timestamp: float):
1✔
138
        """
139
        Sets the last received timestamp of the message to the given value, and updates the visibility deadline
140
        accordingly.
141

142
        :param timestamp: the last time the message was received
143
        """
144
        self.last_received = timestamp
1✔
145
        self.visibility_deadline = timestamp + self.visibility_timeout
1✔
146

147
    def update_visibility_timeout(self, timeout: int):
1✔
148
        """
149
        Sets the visibility timeout of the message to the given value, and updates the visibility deadline accordingly.
150

151
        :param timeout: the timeout value in seconds
152
        """
153
        self.visibility_timeout = timeout
1✔
154
        self.visibility_deadline = time.time() + timeout
1✔
155

156
    @property
1✔
157
    def is_visible(self) -> bool:
1✔
158
        """
159
        Returns false if the message has a visibility deadline that is in the future.
160

161
        :return: whether the message is visible or not.
162
        """
163
        if self.visibility_deadline is None:
1✔
164
            return True
1✔
165
        if time.time() >= self.visibility_deadline:
1✔
166
            return True
1✔
167

168
        return False
1✔
169

170
    @property
1✔
171
    def is_delayed(self) -> bool:
1✔
172
        if self.delay_seconds is None:
1✔
173
            return False
×
174
        return time.time() <= self.created + self.delay_seconds
1✔
175

176
    def __gt__(self, other):
1✔
177
        return self.priority > other.priority
×
178

179
    def __ge__(self, other):
1✔
180
        return self.priority >= other.priority
×
181

182
    def __lt__(self, other):
1✔
183
        return self.priority < other.priority
1✔
184

185
    def __le__(self, other):
1✔
186
        return self.priority <= other.priority
×
187

188
    def __eq__(self, other):
1✔
189
        return self.message_id == other.message_id
×
190

191
    def __hash__(self):
1✔
192
        return self.message_id.__hash__()
1✔
193

194
    def __repr__(self):
195
        return f"SqsMessage(id={self.message_id},group={self.message_group_id})"
196

197

198
def to_sqs_api_message(
1✔
199
    standard_message: SqsMessage,
200
    attribute_names: AttributeNameList = None,
201
    message_attribute_names: MessageAttributeNameList = None,
202
) -> Message:
203
    """
204
    Utility function to convert an SQS message from LocalStack's internal representation to the AWS API
205
    concept 'Message', which is the format returned by the ``ReceiveMessage`` operation.
206

207
    :param standard_message: A LocalStack SQS message
208
    :param attribute_names: the attribute name list to filter
209
    :param message_attribute_names: the message attribute names to filter
210
    :return: a copy of the original Message with updated message attributes and MD5 attribute hash sums
211
    """
212
    # prepare message for receiver
213
    message = copy.deepcopy(standard_message.message)
1✔
214

215
    # update system attributes of the message copy
216
    message["Attributes"][MessageSystemAttributeName.ApproximateFirstReceiveTimestamp] = str(
1✔
217
        int((standard_message.first_received or 0) * 1000)
218
    )
219

220
    # filter attributes for receiver
221
    message_filter_attributes(message, attribute_names)
1✔
222
    message_filter_message_attributes(message, message_attribute_names)
1✔
223
    if message.get("MessageAttributes"):
1✔
224
        message["MD5OfMessageAttributes"] = create_message_attribute_hash(
1✔
225
            message["MessageAttributes"]
226
        )
227
    else:
228
        # delete the value that was computed when creating the message
229
        message.pop("MD5OfMessageAttributes", None)
1✔
230
    return message
1✔
231

232

233
class ReceiveMessageResult:
1✔
234
    """
235
    Object to communicate the result of a "receive messages" operation between the SqsProvider and
236
    the underlying datastructure holding the messages.
237
    """
238

239
    successful: list[SqsMessage]
1✔
240
    """The messages that were successfully received from the queue"""
1✔
241

242
    receipt_handles: list[str]
1✔
243
    """The array index position in ``successful`` and ``receipt_handles`` need to be the same (this
1✔
244
    assumption is needed when assembling the result in `SqsProvider.receive_message`)"""
245

246
    dead_letter_messages: list[SqsMessage]
1✔
247
    """All messages that were received more than maxReceiveCount in the redrive policy (if any)"""
1✔
248

249
    def __init__(self):
1✔
250
        self.successful = []
1✔
251
        self.receipt_handles = []
1✔
252
        self.dead_letter_messages = []
1✔
253

254

255
class MessageMoveTaskStatus(str):
1✔
256
    CREATED = "CREATED"  # not validated, for internal use
1✔
257
    RUNNING = "RUNNING"
1✔
258
    COMPLETED = "COMPLETED"
1✔
259
    CANCELLING = "CANCELLING"
1✔
260
    CANCELLED = "CANCELLED"
1✔
261
    FAILED = "FAILED"
1✔
262

263

264
class MessageMoveTask:
1✔
265
    """
266
    A task created by the ``StartMessageMoveTask`` operation.
267
    """
268

269
    # configurable fields
270
    source_arn: str
1✔
271
    """The arn of the DLQ the messages are currently in."""
1✔
272
    destination_arn: str | None = None
1✔
273
    """If the DestinationArn is not specified, the original source arn will be used as target."""
1✔
274
    max_number_of_messages_per_second: int | None = None
1✔
275

276
    # dynamic fields
277
    task_id: str
1✔
278
    status: str = MessageMoveTaskStatus.CREATED
1✔
279
    started_timestamp: datetime | None = None
1✔
280
    approximate_number_of_messages_moved: int | None = None
1✔
281
    approximate_number_of_messages_to_move: int | None = None
1✔
282
    failure_reason: str | None = None
1✔
283

284
    cancel_event: threading.Event
1✔
285

286
    def __init__(
1✔
287
        self, source_arn: str, destination_arn: str, max_number_of_messages_per_second: int = None
288
    ):
289
        self.task_id = long_uid()
1✔
290
        self.source_arn = source_arn
1✔
291
        self.destination_arn = destination_arn
1✔
292
        self.max_number_of_messages_per_second = max_number_of_messages_per_second
1✔
293
        self.cancel_event = threading.Event()
1✔
294

295
    def mark_started(self):
1✔
296
        self.started_timestamp = datetime.utcnow()
1✔
297
        self.status = MessageMoveTaskStatus.RUNNING
1✔
298
        self.cancel_event.clear()
1✔
299

300
    @property
1✔
301
    def task_handle(self) -> str:
1✔
302
        return encode_move_task_handle(self.task_id, self.source_arn)
1✔
303

304

305
class SqsQueue:
1✔
306
    name: str
1✔
307
    region: str
1✔
308
    account_id: str
1✔
309

310
    attributes: QueueAttributeMap
1✔
311
    tags: TagMap
1✔
312

313
    purge_in_progress: bool
1✔
314
    purge_timestamp: float | None
1✔
315

316
    delayed: set[SqsMessage]
1✔
317
    # Simulating an ordered set in python. Only the keys are used and of interest.
318
    inflight: dict[SqsMessage, None]
1✔
319
    receipts: dict[str, SqsMessage]
1✔
320

321
    def __init__(self, name: str, region: str, account_id: str, attributes=None, tags=None) -> None:
1✔
322
        self.name = name
1✔
323
        self.region = region
1✔
324
        self.account_id = account_id
1✔
325

326
        self._assert_queue_name(name)
1✔
327
        self.tags = tags or {}
1✔
328

329
        self.delayed = set()
1✔
330
        self.inflight = {}
1✔
331
        self.receipts = {}
1✔
332

333
        self.attributes = self.default_attributes()
1✔
334
        if attributes:
1✔
335
            self.validate_queue_attributes(attributes)
1✔
336
            self.attributes.update(attributes)
1✔
337

338
        self.purge_in_progress = False
1✔
339
        self.purge_timestamp = None
1✔
340

341
        self.permissions = set()
1✔
342
        self.mutex = threading.RLock()
1✔
343

344
    def shutdown(self):
1✔
UNCOV
345
        pass
×
346

347
    def default_attributes(self) -> QueueAttributeMap:
1✔
348
        return {
1✔
349
            QueueAttributeName.ApproximateNumberOfMessages: lambda: str(
350
                self.approx_number_of_messages
351
            ),
352
            QueueAttributeName.ApproximateNumberOfMessagesNotVisible: lambda: str(
353
                self.approx_number_of_messages_not_visible
354
            ),
355
            QueueAttributeName.ApproximateNumberOfMessagesDelayed: lambda: str(
356
                self.approx_number_of_messages_delayed
357
            ),
358
            QueueAttributeName.CreatedTimestamp: str(now()),
359
            QueueAttributeName.DelaySeconds: "0",
360
            QueueAttributeName.LastModifiedTimestamp: str(now()),
361
            QueueAttributeName.MaximumMessageSize: str(sqs_constants.DEFAULT_MAXIMUM_MESSAGE_SIZE),
362
            QueueAttributeName.MessageRetentionPeriod: "345600",
363
            QueueAttributeName.QueueArn: self.arn,
364
            QueueAttributeName.ReceiveMessageWaitTimeSeconds: "0",
365
            QueueAttributeName.VisibilityTimeout: "30",
366
            QueueAttributeName.SqsManagedSseEnabled: "true",
367
        }
368

369
    def update_delay_seconds(self, value: int):
1✔
370
        """
371
        For standard queues, the per-queue delay setting is not retroactive—changing the setting doesn't affect the
372
        delay of messages already in the queue. For FIFO queues, the per-queue delay setting is retroactive—changing
373
        the setting affects the delay of messages already in the queue.
374

375
        https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-delay-queues.html
376

377
        :param value: the number of seconds
378
        """
UNCOV
379
        self.attributes[QueueAttributeName.DelaySeconds] = str(value)
×
380

381
    def update_last_modified(self, timestamp: int = None):
1✔
382
        if timestamp is None:
×
UNCOV
383
            timestamp = now()
×
384

UNCOV
385
        self.attributes[QueueAttributeName.LastModifiedTimestamp] = str(timestamp)
×
386

387
    @property
1✔
388
    def arn(self) -> str:
1✔
389
        return f"arn:{get_partition(self.region)}:sqs:{self.region}:{self.account_id}:{self.name}"
1✔
390

391
    def url(self, context: RequestContext) -> str:
1✔
392
        """Return queue URL which depending on the endpoint strategy returns e.g.:
393
        * (standard) http://sqs.eu-west-1.localhost.localstack.cloud:4566/000000000000/myqueue
394
        * (domain) http://eu-west-1.queue.localhost.localstack.cloud:4566/000000000000/myqueue
395
        * (path) http://localhost.localstack.cloud:4566/queue/eu-central-1/000000000000/myqueue
396
        * otherwise: http://localhost.localstack.cloud:4566/000000000000/myqueue
397
        """
398

399
        scheme = config.get_protocol()  # TODO: should probably change to context.request.scheme
1✔
400
        host_definition = localstack_host()
1✔
401
        host_and_port = host_definition.host_and_port()
1✔
402

403
        endpoint_strategy = config.SQS_ENDPOINT_STRATEGY
1✔
404

405
        if endpoint_strategy == "dynamic":
1✔
UNCOV
406
            scheme = context.request.scheme
×
407
            # determine the endpoint strategy that should be used, and determine the host dynamically
UNCOV
408
            endpoint_strategy, host_and_port = guess_endpoint_strategy_and_host(
×
409
                context.request.host
410
            )
411

412
        if endpoint_strategy == "standard":
1✔
413
            # Region is always part of the queue URL
414
            # sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/my-queue
415
            scheme = context.request.scheme
1✔
416
            host_url = f"{scheme}://sqs.{self.region}.{host_and_port}"
1✔
417
        elif endpoint_strategy == "domain":
1✔
418
            # Legacy style
419
            # queue.localhost.localstack.cloud:4566/000000000000/my-queue (us-east-1)
420
            # or us-east-2.queue.localhost.localstack.cloud:4566/000000000000/my-queue
421
            region = "" if self.region == "us-east-1" else self.region + "."
1✔
422
            host_url = f"{scheme}://{region}queue.{host_and_port}"
1✔
423
        elif endpoint_strategy == "path":
1✔
424
            # https?://localhost:4566/queue/us-east-1/00000000000/my-queue (us-east-1)
425
            host_url = f"{scheme}://{host_and_port}/queue/{self.region}"
1✔
426
        else:
427
            host_url = f"{scheme}://{host_and_port}"
1✔
428

429
        return "{host}/{account_id}/{name}".format(
1✔
430
            host=host_url.rstrip("/"),
431
            account_id=self.account_id,
432
            name=self.name,
433
        )
434

435
    @property
1✔
436
    def redrive_policy(self) -> dict | None:
1✔
437
        if policy_document := self.attributes.get(QueueAttributeName.RedrivePolicy):
1✔
438
            return json.loads(policy_document)
1✔
439
        return None
1✔
440

441
    @property
1✔
442
    def max_receive_count(self) -> int | None:
1✔
443
        """
444
        Returns the maxReceiveCount attribute of the redrive policy. If no redrive policy is set, then it
445
        returns None.
446
        """
447
        if redrive_policy := self.redrive_policy:
1✔
448
            return int(redrive_policy["maxReceiveCount"])
1✔
449
        return None
1✔
450

451
    @property
1✔
452
    def visibility_timeout(self) -> int:
1✔
453
        return int(self.attributes[QueueAttributeName.VisibilityTimeout])
1✔
454

455
    @property
1✔
456
    def delay_seconds(self) -> int:
1✔
457
        return int(self.attributes[QueueAttributeName.DelaySeconds])
1✔
458

459
    @property
1✔
460
    def wait_time_seconds(self) -> int:
1✔
461
        return int(self.attributes[QueueAttributeName.ReceiveMessageWaitTimeSeconds])
1✔
462

463
    @property
1✔
464
    def message_retention_period(self) -> int:
1✔
465
        """
466
        ``MessageRetentionPeriod`` -- the length of time, in seconds, for which Amazon SQS retains a message. Valid
467
        values: An integer representing seconds, from 60 (1 minute) to 1,209,600 (14 days). Default: 345,600 (4 days).
468
        """
469
        return int(self.attributes[QueueAttributeName.MessageRetentionPeriod])
1✔
470

471
    @property
1✔
472
    def maximum_message_size(self) -> int:
1✔
473
        return int(self.attributes[QueueAttributeName.MaximumMessageSize])
1✔
474

475
    @property
1✔
476
    def approx_number_of_messages(self) -> int:
1✔
477
        raise NotImplementedError
478

479
    @property
1✔
480
    def approx_number_of_messages_not_visible(self) -> int:
1✔
481
        return len(self.inflight)
1✔
482

483
    @property
1✔
484
    def approx_number_of_messages_delayed(self) -> int:
1✔
485
        return len(self.delayed)
1✔
486

487
    def validate_receipt_handle(self, receipt_handle: str):
1✔
488
        if self.arn != extract_receipt_handle_info(receipt_handle).queue_arn:
1✔
UNCOV
489
            raise ReceiptHandleIsInvalid(
×
490
                f'The input receipt handle "{receipt_handle}" is not a valid receipt handle.'
491
            )
492

493
    def update_visibility_timeout(self, receipt_handle: str, visibility_timeout: int):
1✔
494
        with self.mutex:
1✔
495
            self.validate_receipt_handle(receipt_handle)
1✔
496

497
            if receipt_handle not in self.receipts:
1✔
498
                raise InvalidParameterValueException(
1✔
499
                    f"Value {receipt_handle} for parameter ReceiptHandle is invalid. Reason: Message does not exist "
500
                    f"or is not available for visibility timeout change."
501
                )
502

503
            standard_message = self.receipts[receipt_handle]
1✔
504

505
            if standard_message not in self.inflight:
1✔
506
                return
1✔
507

508
            standard_message.update_visibility_timeout(visibility_timeout)
1✔
509

510
            if visibility_timeout == 0:
1✔
511
                LOG.info(
1✔
512
                    "terminating the visibility timeout of %s",
513
                    standard_message.message["MessageId"],
514
                )
515
                # Terminating the visibility timeout for a message
516
                # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html#terminating-message-visibility-timeout
517
                del self.inflight[standard_message]
1✔
518
                self._put_message(standard_message)
1✔
519

520
    def remove(self, receipt_handle: str):
1✔
521
        with self.mutex:
1✔
522
            self.validate_receipt_handle(receipt_handle)
1✔
523

524
            if receipt_handle not in self.receipts:
1✔
525
                LOG.debug(
1✔
526
                    "no in-flight message found for receipt handle %s in queue %s",
527
                    receipt_handle,
528
                    self.arn,
529
                )
530
                return
1✔
531

532
            standard_message = self.receipts[receipt_handle]
1✔
533
            self._pre_delete_checks(standard_message, receipt_handle)
1✔
534
            standard_message.deleted = True
1✔
535
            LOG.debug(
1✔
536
                "deleting message %s from queue %s",
537
                standard_message.message["MessageId"],
538
                self.arn,
539
            )
540

541
            # remove all handles associated with this message
542
            for handle in standard_message.receipt_handles:
1✔
543
                del self.receipts[handle]
1✔
544
            standard_message.receipt_handles.clear()
1✔
545

546
            self._on_remove_message(standard_message)
1✔
547

548
    def _on_remove_message(self, message: SqsMessage):
1✔
549
        """Hook for queue-specific logic executed when a message is removed."""
UNCOV
550
        pass
×
551

552
    def put(
1✔
553
        self,
554
        message: Message,
555
        visibility_timeout: int = None,
556
        message_deduplication_id: str = None,
557
        message_group_id: str = None,
558
        delay_seconds: int = None,
559
    ) -> SqsMessage:
560
        raise NotImplementedError
561

562
    def receive(
1✔
563
        self,
564
        num_messages: int = 1,
565
        wait_time_seconds: int = None,
566
        visibility_timeout: int = None,
567
        *,
568
        poll_empty_queue: bool = False,
569
    ) -> ReceiveMessageResult:
570
        """
571
        Receive ``num_messages`` from the queue, and wait at max ``wait_time_seconds``. If a visibility
572
        timeout is given, also change the visibility timeout of all received messages accordingly.
573

574
        :param num_messages: the number of messages you want to get from the underlying queue
575
        :param wait_time_seconds: the number of seconds you want to wait
576
        :param visibility_timeout: an optional new visibility timeout
577
        :param poll_empty_queue: whether to keep polling an empty queue until the duration ``wait_time_seconds`` has elapsed
578
        :return: a ReceiveMessageResult object that contains the result of the operation
579
        """
580
        raise NotImplementedError
581

582
    def clear(self):
1✔
583
        """
584
        Calls clear on all internal datastructures that hold messages and data related to them.
585
        """
586
        with self.mutex:
1✔
587
            self.inflight.clear()
1✔
588
            self.delayed.clear()
1✔
589
            self.receipts.clear()
1✔
590

591
    def _put_message(self, message: SqsMessage):
1✔
592
        """Low-level put operation to put messages into a queue and modify visibilities accordingly."""
593
        raise NotImplementedError
594

595
    def create_receipt_handle(self, message: SqsMessage) -> str:
1✔
596
        return encode_receipt_handle(self.arn, message)
1✔
597

598
    def requeue_inflight_messages(self):
1✔
599
        if not self.inflight:
1✔
600
            return
1✔
601

602
        with self.mutex:
1✔
603
            messages = [message for message in self.inflight if message.is_visible]
1✔
604
            for standard_message in messages:
1✔
605
                LOG.debug(
1✔
606
                    "re-queueing inflight messages %s into queue %s",
607
                    standard_message,
608
                    self.arn,
609
                )
610
                del self.inflight[standard_message]
1✔
611
                self._put_message(standard_message)
1✔
612

613
    def add_inflight_message(self, message: SqsMessage):
1✔
614
        """
615
        We are simulating an ordered set with a dict. When a value is added, it is added as key to the dict, which
616
        is all we need. Hence all "values" in this ordered set are None
617
        :param message: The message to put in flight
618
        """
619
        self.inflight[message] = None
1✔
620

621
    def enqueue_delayed_messages(self):
1✔
622
        if not self.delayed:
1✔
623
            return
1✔
624

625
        with self.mutex:
1✔
626
            messages = [message for message in self.delayed if not message.is_delayed]
1✔
627
            for standard_message in messages:
1✔
628
                LOG.debug(
1✔
629
                    "enqueueing delayed messages %s into queue %s",
630
                    standard_message.message["MessageId"],
631
                    self.arn,
632
                )
633
                self.delayed.remove(standard_message)
1✔
634
                self._put_message(standard_message)
1✔
635

636
    def remove_expired_messages(self):
1✔
637
        """
638
        Removes messages from the queue whose retention period has expired.
639
        """
640
        raise NotImplementedError
641

642
    def _assert_queue_name(self, name):
1✔
643
        if not re.match(r"^[a-zA-Z0-9_-]{1,80}$", name):
1✔
644
            raise InvalidParameterValueException(
1✔
645
                "Can only include alphanumeric characters, hyphens, or underscores. 1 to 80 in length"
646
            )
647

648
    def validate_queue_attributes(self, attributes):
1✔
649
        pass
1✔
650

651
    def add_permission(self, label: str, actions: list[str], account_ids: list[str]) -> None:
1✔
652
        """
653
        Create / append to a policy for usage with the add_permission api call
654

655
        :param actions: List of actions to be included in the policy, without the SQS: prefix
656
        :param account_ids: List of account ids to be included in the policy
657
        :param label: Permission label
658
        """
659
        statement = {
1✔
660
            "Sid": label,
661
            "Effect": "Allow",
662
            "Principal": {
663
                "AWS": [
664
                    f"arn:{get_partition(self.region)}:iam::{account_id}:root"
665
                    for account_id in account_ids
666
                ]
667
                if len(account_ids) > 1
668
                else f"arn:{get_partition(self.region)}:iam::{account_ids[0]}:root"
669
            },
670
            "Action": [f"SQS:{action}" for action in actions]
671
            if len(actions) > 1
672
            else f"SQS:{actions[0]}",
673
            "Resource": self.arn,
674
        }
675
        if policy := self.attributes.get(QueueAttributeName.Policy):
1✔
676
            policy = json.loads(policy)
1✔
677
            policy.setdefault("Statement", [])
1✔
678
        else:
679
            policy = {
1✔
680
                "Version": "2008-10-17",
681
                "Id": f"{self.arn}/SQSDefaultPolicy",
682
                "Statement": [],
683
            }
684
        policy.setdefault("Statement", [])
1✔
685
        existing_statement_ids = [statement.get("Sid") for statement in policy["Statement"]]
1✔
686
        if label in existing_statement_ids:
1✔
687
            raise InvalidParameterValueException(
1✔
688
                f"Value {label} for parameter Label is invalid. Reason: Already exists."
689
            )
690
        policy["Statement"].append(statement)
1✔
691
        self.attributes[QueueAttributeName.Policy] = json.dumps(policy)
1✔
692

693
    def remove_permission(self, label: str) -> None:
1✔
694
        """
695
        Delete a policy statement for usage of the remove_permission call
696

697
        :param label: Permission label
698
        """
699
        if policy := self.attributes.get(QueueAttributeName.Policy):
1✔
700
            policy = json.loads(policy)
1✔
701
            # this should not be necessary, but we can upload custom policies, so it's better to be safe
702
            policy.setdefault("Statement", [])
1✔
703
        else:
704
            policy = {
1✔
705
                "Version": "2008-10-17",
706
                "Id": f"{self.arn}/SQSDefaultPolicy",
707
                "Statement": [],
708
            }
709
        existing_statement_ids = [statement.get("Sid") for statement in policy["Statement"]]
1✔
710
        if label not in existing_statement_ids:
1✔
711
            raise InvalidParameterValueException(
1✔
712
                f"Value {label} for parameter Label is invalid. Reason: can't find label."
713
            )
714
        policy["Statement"] = [
1✔
715
            statement for statement in policy["Statement"] if statement.get("Sid") != label
716
        ]
717
        if policy["Statement"]:
1✔
718
            self.attributes[QueueAttributeName.Policy] = json.dumps(policy)
1✔
719
        else:
720
            del self.attributes[QueueAttributeName.Policy]
1✔
721

722
    def get_queue_attributes(self, attribute_names: AttributeNameList = None) -> dict[str, str]:
1✔
723
        if not attribute_names:
1✔
724
            return {}
1✔
725

726
        if QueueAttributeName.All in attribute_names:
1✔
727
            attribute_names = self.attributes.keys()
1✔
728

729
        result: dict[QueueAttributeName, str] = {}
1✔
730

731
        for attr in attribute_names:
1✔
732
            try:
1✔
733
                getattr(QueueAttributeName, attr)
1✔
734
            except AttributeError:
1✔
735
                raise InvalidAttributeName(f"Unknown Attribute {attr}.")
1✔
736

737
            value = self.attributes.get(attr)
1✔
738
            if callable(value):
1✔
739
                func = value
1✔
740
                value = func()
1✔
741
                if value is not None:
1✔
742
                    result[attr] = value
1✔
743
            elif value == "False" or value == "True":
1✔
744
                result[attr] = value.lower()
1✔
745
            elif value is not None:
1✔
746
                result[attr] = value
1✔
747
        return result
1✔
748

749
    @staticmethod
1✔
750
    def remove_expired_messages_from_heap(
1✔
751
        heap: list[SqsMessage], message_retention_period: int
752
    ) -> list[SqsMessage]:
753
        """
754
        Removes from the given heap of SqsMessages all messages that have expired in the context of the current time
755
        and the given message retention period. The method manipulates the heap but retains the heap property.
756

757
        :param heap: an array satisfying the heap property
758
        :param message_retention_period: the message retention period to use in relation to the current time
759
        :return: a list of expired messages that have been removed from the heap
760
        """
761
        th = time.time() - message_retention_period
1✔
762

763
        expired = []
1✔
764
        while heap:
1✔
765
            # here we're leveraging the heap property "that a[0] is always its smallest element"
766
            # and the assumption that message.created == message.priority
767
            message = heap[0]
1✔
768
            if th < message.created:
1✔
769
                break
1✔
770
            # remove the expired element
771
            expired.append(message)
1✔
772
            heapq.heappop(heap)
1✔
773

774
        return expired
1✔
775

776
    def _pre_delete_checks(self, standard_message: SqsMessage, receipt_handle: str) -> None:
1✔
777
        """
778
        Runs any potential checks if a message that has been successfully identified via a receipt handle
779
        is indeed supposed to be deleted.
780
        For example, a receipt handle that has expired might not lead to deletion.
781

782
        :param standard_message: The message to be deleted
783
        :param receipt_handle: The handle associated with the message
784
        :return: None. Potential violations raise errors.
785
        """
786
        pass
1✔
787

788

789
class StandardQueue(SqsQueue):
1✔
790
    visible: InterruptiblePriorityQueue[SqsMessage]
1✔
791

792
    def __init__(self, name: str, region: str, account_id: str, attributes=None, tags=None) -> None:
1✔
793
        super().__init__(name, region, account_id, attributes, tags)
1✔
794
        self.visible = InterruptiblePriorityQueue()
1✔
795

796
    def clear(self):
1✔
797
        with self.mutex:
1✔
798
            super().clear()
1✔
799
            self.visible.queue.clear()
1✔
800

801
    @property
1✔
802
    def approx_number_of_messages(self):
1✔
803
        return self.visible.qsize()
1✔
804

805
    def shutdown(self):
1✔
806
        self.visible.shutdown()
1✔
807

808
    def put(
1✔
809
        self,
810
        message: Message,
811
        visibility_timeout: int = None,
812
        message_deduplication_id: str = None,
813
        message_group_id: str = None,
814
        delay_seconds: int = None,
815
    ):
816
        if message_deduplication_id:
1✔
UNCOV
817
            raise InvalidParameterValueException(
×
818
                f"Value {message_deduplication_id} for parameter MessageDeduplicationId is invalid. Reason: The "
819
                f"request includes a parameter that is not valid for this queue type."
820
            )
821

822
        standard_message = SqsMessage(
1✔
823
            time.time(),
824
            message,
825
            message_group_id=message_group_id,
826
        )
827

828
        if visibility_timeout is not None:
1✔
UNCOV
829
            standard_message.visibility_timeout = visibility_timeout
×
830
        else:
831
            # use the attribute from the queue
832
            standard_message.visibility_timeout = self.visibility_timeout
1✔
833

834
        if delay_seconds is not None:
1✔
835
            standard_message.delay_seconds = delay_seconds
1✔
836
        else:
837
            standard_message.delay_seconds = self.delay_seconds
1✔
838

839
        if standard_message.is_delayed:
1✔
840
            self.delayed.add(standard_message)
1✔
841
        else:
842
            self._put_message(standard_message)
1✔
843

844
        return standard_message
1✔
845

846
    def _put_message(self, message: SqsMessage):
1✔
847
        self.visible.put_nowait(message)
1✔
848

849
    def remove_expired_messages(self):
1✔
850
        with self.mutex:
1✔
851
            messages = self.remove_expired_messages_from_heap(
1✔
852
                self.visible.queue, self.message_retention_period
853
            )
854

855
        for message in messages:
1✔
856
            LOG.debug("Removed expired message %s from queue %s", message.message_id, self.arn)
1✔
857

858
    def receive(
1✔
859
        self,
860
        num_messages: int = 1,
861
        wait_time_seconds: int = None,
862
        visibility_timeout: int = None,
863
        *,
864
        poll_empty_queue: bool = False,
865
    ) -> ReceiveMessageResult:
866
        result = ReceiveMessageResult()
1✔
867

868
        max_receive_count = self.max_receive_count
1✔
869
        visibility_timeout = (
1✔
870
            self.visibility_timeout if visibility_timeout is None else visibility_timeout
871
        )
872

873
        block = True if wait_time_seconds else False
1✔
874
        timeout = wait_time_seconds or 0
1✔
875
        start = time.time()
1✔
876

877
        # collect messages
878
        while True:
1✔
879
            try:
1✔
880
                message = self.visible.get(block=block, timeout=timeout)
1✔
881
            except Empty:
1✔
882
                break
1✔
883
            # setting block to false guarantees that, if we've already waited before, we don't wait the
884
            # full time again in the next iteration if max_number_of_messages is set but there are no more
885
            # messages in the queue. see https://github.com/localstack/localstack/issues/5824
886
            if not poll_empty_queue:
1✔
887
                block = False
1✔
888

889
            timeout -= time.time() - start
1✔
890
            if timeout < 0:
1✔
891
                timeout = 0
1✔
892

893
            if message.deleted:
1✔
894
                # filter messages that were deleted with an expired receipt handle after they have been
895
                # re-queued. this can only happen due to a race with `remove`.
UNCOV
896
                continue
×
897

898
            # update message attributes
899
            message.receive_count += 1
1✔
900
            message.update_visibility_timeout(visibility_timeout)
1✔
901
            message.set_last_received(time.time())
1✔
902
            if message.first_received is None:
1✔
903
                message.first_received = message.last_received
1✔
904

905
            LOG.debug("de-queued message %s from %s", message, self.arn)
1✔
906
            if max_receive_count and message.receive_count > max_receive_count:
1✔
907
                # the message needs to move to the DLQ
908
                LOG.debug(
1✔
909
                    "message %s has been received %d times, marking it for DLQ",
910
                    message,
911
                    message.receive_count,
912
                )
913
                result.dead_letter_messages.append(message)
1✔
914
            else:
915
                result.successful.append(message)
1✔
916
                message.increment_approximate_receive_count()
1✔
917

918
                # now we can return
919
                if len(result.successful) == num_messages:
1✔
920
                    break
1✔
921

922
        # now process the successful result messages: create receipt handles and manage visibility.
923
        for message in result.successful:
1✔
924
            # manage receipt handle
925
            receipt_handle = self.create_receipt_handle(message)
1✔
926
            message.receipt_handles.add(receipt_handle)
1✔
927
            self.receipts[receipt_handle] = message
1✔
928
            result.receipt_handles.append(receipt_handle)
1✔
929

930
            # manage message visibility
931
            if message.visibility_timeout == 0:
1✔
932
                self.visible.put_nowait(message)
1✔
933
            else:
934
                self.add_inflight_message(message)
1✔
935

936
        return result
1✔
937

938
    def _on_remove_message(self, message: SqsMessage):
1✔
939
        try:
1✔
940
            del self.inflight[message]
1✔
941
        except KeyError:
1✔
942
            # this likely means the message was removed with an expired receipt handle unfortunately this
943
            # means we need to scan the queue for the element and remove it from there, and then re-heapify
944
            # the queue
945
            try:
1✔
946
                self.visible.queue.remove(message)
1✔
947
                heapq.heapify(self.visible.queue)
1✔
948
            except ValueError:
1✔
949
                # this may happen if the message no longer exists because it was removed earlier
950
                pass
1✔
951

952
    def validate_queue_attributes(self, attributes):
1✔
953
        valid = [
1✔
954
            k[1]
955
            for k in inspect.getmembers(
956
                QueueAttributeName, lambda x: isinstance(x, str) and not x.startswith("__")
957
            )
958
            if k[1] not in sqs_constants.INVALID_STANDARD_QUEUE_ATTRIBUTES
959
        ]
960

961
        for k in attributes.keys():
1✔
962
            if k in [QueueAttributeName.FifoThroughputLimit, QueueAttributeName.DeduplicationScope]:
1✔
963
                raise InvalidAttributeName(
1✔
964
                    f"You can specify the {k} only when FifoQueue is set to true."
965
                )
966
            if k not in valid:
1✔
967
                raise InvalidAttributeName(f"Unknown Attribute {k}.")
1✔
968

969

970
class MessageGroup:
1✔
971
    message_group_id: str
1✔
972
    messages: list[SqsMessage]
1✔
973

974
    def __init__(self, message_group_id: str):
1✔
975
        self.message_group_id = message_group_id
1✔
976
        self.messages = []
1✔
977

978
    def empty(self) -> bool:
1✔
979
        return not self.messages
1✔
980

981
    def size(self) -> int:
1✔
UNCOV
982
        return len(self.messages)
×
983

984
    def pop(self) -> SqsMessage:
1✔
985
        return heapq.heappop(self.messages)
1✔
986

987
    def push(self, message: SqsMessage):
1✔
988
        heapq.heappush(self.messages, message)
1✔
989

990
    def __eq__(self, other):
1✔
UNCOV
991
        return self.message_group_id == other.message_group_id
×
992

993
    def __hash__(self):
1✔
994
        return self.message_group_id.__hash__()
1✔
995

996
    def __repr__(self):
997
        return f"MessageGroup(id={self.message_group_id}, size={len(self.messages)})"
998

999

1000
class FifoQueue(SqsQueue):
1✔
1001
    """
1002
    A FIFO queue behaves differently than a default queue. Most behavior has to be implemented separately.
1003

1004
    See https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html
1005

1006
    TODO: raise exceptions when trying to remove a message with an expired receipt handle
1007
    """
1008

1009
    deduplication: dict[str, SqsMessage]
1✔
1010
    message_groups: dict[str, MessageGroup]
1✔
1011
    inflight_groups: set[MessageGroup]
1✔
1012
    message_group_queue: InterruptibleQueue
1✔
1013
    deduplication_scope: str
1✔
1014

1015
    def __init__(self, name: str, region: str, account_id: str, attributes=None, tags=None) -> None:
1✔
1016
        super().__init__(name, region, account_id, attributes, tags)
1✔
1017
        self.deduplication = {}
1✔
1018

1019
        self.message_groups = {}
1✔
1020
        self.inflight_groups = set()
1✔
1021
        self.message_group_queue = InterruptibleQueue()
1✔
1022

1023
        # SQS does not seem to change the deduplication behaviour of fifo queues if you
1024
        # change to/from 'queue'/'messageGroup' scope after creation -> we need to set this on creation
1025
        self.deduplication_scope = self.attributes[QueueAttributeName.DeduplicationScope]
1✔
1026

1027
    @property
1✔
1028
    def approx_number_of_messages(self):
1✔
1029
        n = 0
1✔
1030
        for message_group in self.message_groups.values():
1✔
1031
            n += len(message_group.messages)
1✔
1032
        return n
1✔
1033

1034
    def shutdown(self):
1✔
1035
        self.message_group_queue.shutdown()
1✔
1036

1037
    def get_message_group(self, message_group_id: str) -> MessageGroup:
1✔
1038
        """
1039
        Thread safe lazy factory for MessageGroup objects.
1040

1041
        :param message_group_id: the message group ID
1042
        :return: a new or existing MessageGroup object
1043
        """
1044
        with self.mutex:
1✔
1045
            if message_group_id not in self.message_groups:
1✔
1046
                self.message_groups[message_group_id] = MessageGroup(message_group_id)
1✔
1047

1048
            return self.message_groups.get(message_group_id)
1✔
1049

1050
    def default_attributes(self) -> QueueAttributeMap:
1✔
1051
        return {
1✔
1052
            **super().default_attributes(),
1053
            QueueAttributeName.ContentBasedDeduplication: "false",
1054
            QueueAttributeName.DeduplicationScope: "queue",
1055
            QueueAttributeName.FifoThroughputLimit: "perQueue",
1056
        }
1057

1058
    def update_delay_seconds(self, value: int):
1✔
UNCOV
1059
        super().update_delay_seconds(value)
×
UNCOV
1060
        for message in self.delayed:
×
UNCOV
1061
            message.delay_seconds = value
×
1062

1063
    def _pre_delete_checks(self, message: SqsMessage, receipt_handle: str) -> None:
1✔
1064
        if message.is_visible:
1✔
1065
            raise InvalidParameterValueException(
1✔
1066
                f"Value {receipt_handle} for parameter ReceiptHandle is invalid. Reason: The receipt handle has expired."
1067
            )
1068

1069
    def remove(self, receipt_handle: str):
1✔
1070
        self.validate_receipt_handle(receipt_handle)
1✔
1071

1072
        super().remove(receipt_handle)
1✔
1073

1074
    def put(
1✔
1075
        self,
1076
        message: Message,
1077
        visibility_timeout: int = None,
1078
        message_deduplication_id: str = None,
1079
        message_group_id: str = None,
1080
        delay_seconds: int = None,
1081
    ):
1082
        if delay_seconds:
1✔
1083
            # in fifo queues, delay is only applied on queue level. However, explicitly setting delay_seconds=0 is valid
1084
            raise InvalidParameterValueException(
1✔
1085
                f"Value {delay_seconds} for parameter DelaySeconds is invalid. Reason: The request include parameter "
1086
                f"that is not valid for this queue type."
1087
            )
1088

1089
        if not message_group_id:
1✔
1090
            raise MissingRequiredParameterException(
1✔
1091
                "The request must contain the parameter MessageGroupId."
1092
            )
1093
        dedup_id = message_deduplication_id
1✔
1094
        content_based_deduplication = not is_message_deduplication_id_required(self)
1✔
1095
        if not dedup_id and content_based_deduplication:
1✔
1096
            dedup_id = hashlib.sha256(message.get("Body").encode("utf-8")).hexdigest()
1✔
1097
        if not dedup_id:
1✔
1098
            raise InvalidParameterValueException(
1✔
1099
                "The queue should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided explicitly"
1100
            )
1101

1102
        fifo_message = SqsMessage(
1✔
1103
            time.time(),
1104
            message,
1105
            message_deduplication_id=dedup_id,
1106
            message_group_id=message_group_id,
1107
            sequence_number=str(self.next_sequence_number()),
1108
        )
1109
        if visibility_timeout is not None:
1✔
UNCOV
1110
            fifo_message.visibility_timeout = visibility_timeout
×
1111
        else:
1112
            # use the attribute from the queue
1113
            fifo_message.visibility_timeout = self.visibility_timeout
1✔
1114

1115
        # FIFO queues always use the queue level setting for 'DelaySeconds'
1116
        fifo_message.delay_seconds = self.delay_seconds
1✔
1117

1118
        original_message = self.deduplication.get(dedup_id)
1✔
1119
        if (
1✔
1120
            original_message
1121
            and original_message.priority + sqs_constants.DEDUPLICATION_INTERVAL_IN_SEC
1122
            > fifo_message.priority
1123
            # account for deduplication scope required for (but not restricted to) high-throughput-mode
1124
            and (
1125
                not self.deduplication_scope == "messageGroup"
1126
                or fifo_message.message_group_id == original_message.message_group_id
1127
            )
1128
        ):
1129
            message["MessageId"] = original_message.message["MessageId"]
1✔
1130
        else:
1131
            if fifo_message.is_delayed:
1✔
1132
                self.delayed.add(fifo_message)
1✔
1133
            else:
1134
                self._put_message(fifo_message)
1✔
1135

1136
            self.deduplication[dedup_id] = fifo_message
1✔
1137

1138
        return fifo_message
1✔
1139

1140
    def _put_message(self, message: SqsMessage):
1✔
1141
        """Once a message becomes visible in a FIFO queue, its message group also becomes visible."""
1142
        message_group = self.get_message_group(message.message_group_id)
1✔
1143

1144
        with self.mutex:
1✔
1145
            previously_empty = message_group.empty()
1✔
1146
            # put the message into the group
1147
            message_group.push(message)
1✔
1148

1149
            # new messages should not make groups visible that are currently inflight
1150
            if message.receive_count < 1 and message_group in self.inflight_groups:
1✔
1151
                return
1✔
1152
            # if an older message becomes visible again in the queue, that message's group becomes visible also.
1153
            if message_group in self.inflight_groups:
1✔
1154
                self.inflight_groups.remove(message_group)
1✔
1155
                self.message_group_queue.put_nowait(message_group)
1✔
1156
            # if the group was previously empty, it was not yet added back to the queue
1157
            elif previously_empty:
1✔
1158
                self.message_group_queue.put_nowait(message_group)
1✔
1159

1160
    def requeue_inflight_messages(self):
1✔
1161
        if not self.inflight:
1✔
1162
            return
1✔
1163

1164
        with self.mutex:
1✔
1165
            messages = list(self.inflight)
1✔
1166
            for standard_message in messages:
1✔
1167
                # in fifo, an invisible message blocks potentially visible messages afterwards
1168
                # this can happen for example if multiple message of the same group are received at once, then one
1169
                # message of this batch has its visibility timeout extended
1170
                if not standard_message.is_visible:
1✔
1171
                    return
1✔
1172
                LOG.debug(
1✔
1173
                    "re-queueing inflight messages %s into queue %s",
1174
                    standard_message,
1175
                    self.arn,
1176
                )
1177
                del self.inflight[standard_message]
1✔
1178
                self._put_message(standard_message)
1✔
1179

1180
    def remove_expired_messages(self):
1✔
1181
        with self.mutex:
1✔
1182
            retention_period = self.message_retention_period
1✔
1183
            for message_group in self.message_groups.values():
1✔
1184
                messages = self.remove_expired_messages_from_heap(
1✔
1185
                    message_group.messages, retention_period
1186
                )
1187

1188
                for message in messages:
1✔
1189
                    LOG.debug(
1✔
1190
                        "Removed expired message %s from message group %s in queue %s",
1191
                        message.message_id,
1192
                        message.message_group_id,
1193
                        self.arn,
1194
                    )
1195

1196
    def receive(
1✔
1197
        self,
1198
        num_messages: int = 1,
1199
        wait_time_seconds: int = None,
1200
        visibility_timeout: int = None,
1201
        *,
1202
        poll_empty_queue: bool = False,
1203
    ) -> ReceiveMessageResult:
1204
        """
1205
        Receive logic for FIFO queues is different from standard queues. See
1206
        https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues-understanding-logic.html.
1207

1208
        When receiving messages from a FIFO queue with multiple message group IDs, SQS first attempts to
1209
        return as many messages with the same message group ID as possible. This allows other consumers to
1210
        process messages with a different message group ID. When you receive a message with a message group
1211
        ID, no more messages for the same message group ID are returned unless you delete the message, or it
1212
        becomes visible.
1213
        """
1214
        result = ReceiveMessageResult()
1✔
1215

1216
        max_receive_count = self.max_receive_count
1✔
1217
        visibility_timeout = (
1✔
1218
            self.visibility_timeout if visibility_timeout is None else visibility_timeout
1219
        )
1220

1221
        block = True if wait_time_seconds else False
1✔
1222
        timeout = wait_time_seconds or 0
1✔
1223
        start = time.time()
1✔
1224

1225
        received_groups: set[MessageGroup] = set()
1✔
1226

1227
        # collect messages over potentially multiple groups
1228
        while True:
1✔
1229
            try:
1✔
1230
                group: MessageGroup = self.message_group_queue.get(block=block, timeout=timeout)
1✔
1231
            except Empty:
1✔
1232
                break
1✔
1233

1234
            if group.empty():
1✔
1235
                # this can be the case if all messages in the group are still invisible or
1236
                # if all messages of a group have been processed.
1237
                # TODO: it should be blocking until at least one message is in the queue, but we don't
1238
                #  want to block the group
1239
                # TODO: check behavior in case it happens if all messages were removed from a group due to message
1240
                #  retention period.
1241
                timeout -= time.time() - start
1✔
1242
                if timeout < 0:
1✔
1243
                    timeout = 0
1✔
1244
                continue
1✔
1245

1246
            self.inflight_groups.add(group)
1✔
1247

1248
            received_groups.add(group)
1✔
1249

1250
            if not poll_empty_queue:
1✔
1251
                block = False
1✔
1252

1253
            # we lock the queue while accessing the groups to not get into races with re-queueing/deleting
1254
            with self.mutex:
1✔
1255
                # collect messages from the group until a continue/break condition is met
1256
                while True:
1✔
1257
                    try:
1✔
1258
                        message = group.pop()
1✔
1259
                    except IndexError:
1✔
1260
                        break
1✔
1261

1262
                    if message.deleted:
1✔
1263
                        # this means the message was deleted with a receipt handle after its visibility
1264
                        # timeout expired and the messages was re-queued in the meantime.
UNCOV
1265
                        continue
×
1266

1267
                    # update message attributes
1268
                    message.receive_count += 1
1✔
1269
                    message.update_visibility_timeout(visibility_timeout)
1✔
1270
                    message.set_last_received(time.time())
1✔
1271
                    if message.first_received is None:
1✔
1272
                        message.first_received = message.last_received
1✔
1273

1274
                    LOG.debug("de-queued message %s from fifo queue %s", message, self.arn)
1✔
1275
                    if max_receive_count and message.receive_count > max_receive_count:
1✔
1276
                        # the message needs to move to the DLQ
1277
                        LOG.debug(
1✔
1278
                            "message %s has been received %d times, marking it for DLQ",
1279
                            message,
1280
                            message.receive_count,
1281
                        )
1282
                        result.dead_letter_messages.append(message)
1✔
1283
                    else:
1284
                        result.successful.append(message)
1✔
1285
                        message.increment_approximate_receive_count()
1✔
1286

1287
                        # now we can break the inner loop
1288
                        if len(result.successful) == num_messages:
1✔
1289
                            break
1✔
1290

1291
                # but we also need to check the condition to return from the outer loop
1292
                if len(result.successful) == num_messages:
1✔
1293
                    break
1✔
1294

1295
        # now process the successful result messages: create receipt handles and manage visibility.
1296
        # we use the mutex again because we are modifying the group
1297
        with self.mutex:
1✔
1298
            for message in result.successful:
1✔
1299
                # manage receipt handle
1300
                receipt_handle = self.create_receipt_handle(message)
1✔
1301
                message.receipt_handles.add(receipt_handle)
1✔
1302
                self.receipts[receipt_handle] = message
1✔
1303
                result.receipt_handles.append(receipt_handle)
1✔
1304

1305
                # manage message visibility
1306
                if message.visibility_timeout == 0:
1✔
1307
                    self._put_message(message)
1✔
1308
                else:
1309
                    self.add_inflight_message(message)
1✔
1310
        return result
1✔
1311

1312
    def _on_remove_message(self, message: SqsMessage):
1✔
1313
        # if a message is deleted from the queue, the message's group can become visible again
1314
        message_group = self.get_message_group(message.message_group_id)
1✔
1315

1316
        with self.mutex:
1✔
1317
            try:
1✔
1318
                del self.inflight[message]
1✔
UNCOV
1319
            except KeyError:
×
1320
                # in FIFO queues, this should not happen, as expired receipt handles cannot be used to
1321
                # delete a message.
UNCOV
1322
                pass
×
1323
            self.update_message_group_visibility(message_group)
1✔
1324

1325
    def update_message_group_visibility(self, message_group: MessageGroup):
1✔
1326
        """
1327
        Check if the passed message group should be made visible again
1328
        """
1329

1330
        with self.mutex:
1✔
1331
            if message_group in self.inflight_groups:
1✔
1332
                # it becomes visible again only if there are no other in flight messages in that group
1333
                for message in self.inflight:
1✔
1334
                    if message.message_group_id == message_group.message_group_id:
1✔
1335
                        return
1✔
1336

1337
                self.inflight_groups.remove(message_group)
1✔
1338
                if not message_group.empty():
1✔
1339
                    self.message_group_queue.put_nowait(message_group)
1✔
1340

1341
    def _assert_queue_name(self, name):
1✔
1342
        if not name.endswith(".fifo"):
1✔
1343
            raise InvalidParameterValueException(
1✔
1344
                "The name of a FIFO queue can only include alphanumeric characters, hyphens, or underscores, "
1345
                "must end with .fifo suffix and be 1 to 80 in length"
1346
            )
1347
        # The .fifo suffix counts towards the 80-character queue name quota.
1348
        queue_name = name[:-5] + "_fifo"
1✔
1349
        super()._assert_queue_name(queue_name)
1✔
1350

1351
    def validate_queue_attributes(self, attributes):
1✔
1352
        valid = [
1✔
1353
            k[1]
1354
            for k in inspect.getmembers(QueueAttributeName)
1355
            if k not in sqs_constants.INTERNAL_QUEUE_ATTRIBUTES
1356
        ]
1357
        for k in attributes.keys():
1✔
1358
            if k not in valid:
1✔
UNCOV
1359
                raise InvalidAttributeName(f"Unknown Attribute {k}.")
×
1360
        # Special Cases
1361
        fifo = attributes.get(QueueAttributeName.FifoQueue)
1✔
1362
        if fifo and fifo.lower() != "true":
1✔
1363
            raise InvalidAttributeValue(
1✔
1364
                "Invalid value for the parameter FifoQueue. Reason: Modifying queue type is not supported."
1365
            )
1366

1367
    def next_sequence_number(self):
1✔
1368
        return next(global_message_sequence())
1✔
1369

1370
    def clear(self):
1✔
1371
        with self.mutex:
1✔
1372
            super().clear()
1✔
1373
            self.message_groups.clear()
1✔
1374
            self.inflight_groups.clear()
1✔
1375
            self.message_group_queue.queue.clear()
1✔
1376
            self.deduplication.clear()
1✔
1377

1378

1379
class SqsStore(BaseStore):
1✔
1380
    queues: dict[str, SqsQueue] = LocalAttribute(default=dict)
1✔
1381

1382
    deleted: dict[str, float] = LocalAttribute(default=dict)
1✔
1383

1384
    move_tasks: dict[str, MessageMoveTask] = LocalAttribute(default=dict)
1✔
1385
    """Maps task IDs to their ``MoveMessageTask`` object. Task IDs can be found by decoding a task handle."""
1✔
1386

1387
    def expire_deleted(self):
1✔
1388
        for k in list(self.deleted.keys()):
1✔
1389
            if self.deleted[k] <= (time.time() - sqs_constants.RECENTLY_DELETED_TIMEOUT):
1✔
1390
                del self.deleted[k]
1✔
1391

1392

1393
sqs_stores = AccountRegionBundle("sqs", SqsStore)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc