• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19523508838

19 Nov 2025 04:57PM UTC coverage: 86.907% (-0.02%) from 86.922%
19523508838

push

github

web-flow
APIGW: fix errors when importing invalid json/yaml (#13394)

15 of 15 new or added lines in 2 files covered. (100.0%)

184 existing lines in 5 files now uncovered.

68655 of 78998 relevant lines covered (86.91%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.85
/localstack-core/localstack/services/sqs/models.py
1
import copy
1✔
2
import hashlib
1✔
3
import heapq
1✔
4
import inspect
1✔
5
import json
1✔
6
import logging
1✔
7
import re
1✔
8
import threading
1✔
9
import time
1✔
10
from datetime import datetime
1✔
11
from queue import Empty
1✔
12

13
from localstack import config
1✔
14
from localstack.aws.api import RequestContext
1✔
15
from localstack.aws.api.sqs import (
1✔
16
    AttributeNameList,
17
    InvalidAttributeName,
18
    Message,
19
    MessageAttributeNameList,
20
    MessageSystemAttributeName,
21
    QueueAttributeMap,
22
    QueueAttributeName,
23
    ReceiptHandleIsInvalid,
24
    TagMap,
25
)
26
from localstack.services.sqs import constants as sqs_constants
1✔
27
from localstack.services.sqs.exceptions import (
1✔
28
    InvalidAttributeValue,
29
    InvalidParameterValueException,
30
    MissingRequiredParameterException,
31
)
32
from localstack.services.sqs.queue import InterruptiblePriorityQueue, InterruptibleQueue
1✔
33
from localstack.services.sqs.utils import (
1✔
34
    create_message_attribute_hash,
35
    encode_move_task_handle,
36
    encode_receipt_handle,
37
    extract_receipt_handle_info,
38
    global_message_sequence,
39
    guess_endpoint_strategy_and_host,
40
    is_message_deduplication_id_required,
41
    message_filter_attributes,
42
    message_filter_message_attributes,
43
)
44
from localstack.services.stores import AccountRegionBundle, BaseStore, LocalAttribute
1✔
45
from localstack.utils.aws.arns import get_partition
1✔
46
from localstack.utils.strings import long_uid
1✔
47
from localstack.utils.time import now
1✔
48
from localstack.utils.urls import localstack_host
1✔
49

50
LOG = logging.getLogger(__name__)
1✔
51

52
ReceiptHandle = str
1✔
53

54

55
class SqsMessage:
1✔
56
    message: Message
1✔
57
    created: float
1✔
58
    visibility_timeout: int
1✔
59
    receive_count: int
1✔
60
    delay_seconds: int | None
1✔
61
    receipt_handles: set[str]
1✔
62
    last_received: float | None
1✔
63
    first_received: float | None
1✔
64
    visibility_deadline: float | None
1✔
65
    deleted: bool
1✔
66
    priority: float
1✔
67
    message_deduplication_id: str
1✔
68
    message_group_id: str
1✔
69
    sequence_number: str
1✔
70

71
    def __init__(
1✔
72
        self,
73
        priority: float,
74
        message: Message,
75
        message_deduplication_id: str = None,
76
        message_group_id: str = None,
77
        sequence_number: str = None,
78
    ) -> None:
79
        self.created = time.time()
1✔
80
        self.message = message
1✔
81
        self.receive_count = 0
1✔
82
        self.receipt_handles = set()
1✔
83

84
        self.delay_seconds = None
1✔
85
        self.last_received = None
1✔
86
        self.first_received = None
1✔
87
        self.visibility_deadline = None
1✔
88
        self.deleted = False
1✔
89
        self.priority = priority
1✔
90
        self.sequence_number = sequence_number
1✔
91

92
        attributes = {}
1✔
93
        if message_group_id is not None:
1✔
94
            attributes["MessageGroupId"] = message_group_id
1✔
95
        if message_deduplication_id is not None:
1✔
96
            attributes["MessageDeduplicationId"] = message_deduplication_id
1✔
97
        if sequence_number is not None:
1✔
98
            attributes["SequenceNumber"] = sequence_number
1✔
99

100
        if self.message.get("Attributes"):
1✔
101
            self.message["Attributes"].update(attributes)
1✔
102
        else:
103
            self.message["Attributes"] = attributes
×
104

105
        # set attribute default values if not set
106
        self.message["Attributes"].setdefault(
1✔
107
            MessageSystemAttributeName.ApproximateReceiveCount, "0"
108
        )
109

110
    @property
1✔
111
    def message_group_id(self) -> str | None:
1✔
112
        return self.message["Attributes"].get(MessageSystemAttributeName.MessageGroupId)
1✔
113

114
    @property
1✔
115
    def message_deduplication_id(self) -> str | None:
1✔
116
        return self.message["Attributes"].get(MessageSystemAttributeName.MessageDeduplicationId)
1✔
117

118
    @property
1✔
119
    def dead_letter_queue_source_arn(self) -> str | None:
1✔
120
        return self.message["Attributes"].get(MessageSystemAttributeName.DeadLetterQueueSourceArn)
1✔
121

122
    @property
1✔
123
    def message_id(self):
1✔
124
        return self.message["MessageId"]
1✔
125

126
    def increment_approximate_receive_count(self):
1✔
127
        """
128
        Increment the message system attribute ``ApproximateReceiveCount``.
129
        """
130
        # TODO: need better handling of system attributes
131
        cnt = int(
1✔
132
            self.message["Attributes"].get(MessageSystemAttributeName.ApproximateReceiveCount, "0")
133
        )
134
        cnt += 1
1✔
135
        self.message["Attributes"][MessageSystemAttributeName.ApproximateReceiveCount] = str(cnt)
1✔
136

137
    def set_last_received(self, timestamp: float):
1✔
138
        """
139
        Sets the last received timestamp of the message to the given value, and updates the visibility deadline
140
        accordingly.
141

142
        :param timestamp: the last time the message was received
143
        """
144
        self.last_received = timestamp
1✔
145
        self.visibility_deadline = timestamp + self.visibility_timeout
1✔
146

147
    def update_visibility_timeout(self, timeout: int):
1✔
148
        """
149
        Sets the visibility timeout of the message to the given value, and updates the visibility deadline accordingly.
150

151
        :param timeout: the timeout value in seconds
152
        """
153
        self.visibility_timeout = timeout
1✔
154
        self.visibility_deadline = time.time() + timeout
1✔
155

156
    @property
1✔
157
    def is_visible(self) -> bool:
1✔
158
        """
159
        Returns false if the message has a visibility deadline that is in the future.
160

161
        :return: whether the message is visible or not.
162
        """
163
        if self.visibility_deadline is None:
1✔
164
            return True
1✔
165
        if time.time() >= self.visibility_deadline:
1✔
166
            return True
1✔
167

168
        return False
1✔
169

170
    @property
1✔
171
    def is_delayed(self) -> bool:
1✔
172
        if self.delay_seconds is None:
1✔
173
            return False
×
174
        return time.time() <= self.created + self.delay_seconds
1✔
175

176
    def __gt__(self, other):
1✔
177
        return self.priority > other.priority
×
178

179
    def __ge__(self, other):
1✔
180
        return self.priority >= other.priority
×
181

182
    def __lt__(self, other):
1✔
183
        return self.priority < other.priority
1✔
184

185
    def __le__(self, other):
1✔
186
        return self.priority <= other.priority
×
187

188
    def __eq__(self, other):
1✔
189
        return self.message_id == other.message_id
×
190

191
    def __hash__(self):
1✔
192
        return self.message_id.__hash__()
1✔
193

194
    def __repr__(self):
195
        return f"SqsMessage(id={self.message_id},group={self.message_group_id})"
196

197

198
def to_sqs_api_message(
1✔
199
    standard_message: SqsMessage,
200
    attribute_names: AttributeNameList = None,
201
    message_attribute_names: MessageAttributeNameList = None,
202
) -> Message:
203
    """
204
    Utility function to convert an SQS message from LocalStack's internal representation to the AWS API
205
    concept 'Message', which is the format returned by the ``ReceiveMessage`` operation.
206

207
    :param standard_message: A LocalStack SQS message
208
    :param attribute_names: the attribute name list to filter
209
    :param message_attribute_names: the message attribute names to filter
210
    :return: a copy of the original Message with updated message attributes and MD5 attribute hash sums
211
    """
212
    # prepare message for receiver
213
    message = copy.deepcopy(standard_message.message)
1✔
214

215
    # update system attributes of the message copy
216
    message["Attributes"][MessageSystemAttributeName.ApproximateFirstReceiveTimestamp] = str(
1✔
217
        int((standard_message.first_received or 0) * 1000)
218
    )
219

220
    # filter attributes for receiver
221
    message_filter_attributes(message, attribute_names)
1✔
222
    message_filter_message_attributes(message, message_attribute_names)
1✔
223
    if message.get("MessageAttributes"):
1✔
224
        message["MD5OfMessageAttributes"] = create_message_attribute_hash(
1✔
225
            message["MessageAttributes"]
226
        )
227
    else:
228
        # delete the value that was computed when creating the message
229
        message.pop("MD5OfMessageAttributes", None)
1✔
230
    return message
1✔
231

232

233
class ReceiveMessageResult:
1✔
234
    """
235
    Object to communicate the result of a "receive messages" operation between the SqsProvider and
236
    the underlying datastructure holding the messages.
237
    """
238

239
    successful: list[SqsMessage]
1✔
240
    """The messages that were successfully received from the queue"""
1✔
241

242
    receipt_handles: list[str]
1✔
243
    """The array index position in ``successful`` and ``receipt_handles`` need to be the same (this
1✔
244
    assumption is needed when assembling the result in `SqsProvider.receive_message`)"""
245

246
    dead_letter_messages: list[SqsMessage]
1✔
247
    """All messages that were received more than maxReceiveCount in the redrive policy (if any)"""
1✔
248

249
    def __init__(self):
1✔
250
        self.successful = []
1✔
251
        self.receipt_handles = []
1✔
252
        self.dead_letter_messages = []
1✔
253

254

255
class MessageMoveTaskStatus(str):
1✔
256
    CREATED = "CREATED"  # not validated, for internal use
1✔
257
    RUNNING = "RUNNING"
1✔
258
    COMPLETED = "COMPLETED"
1✔
259
    CANCELLING = "CANCELLING"
1✔
260
    CANCELLED = "CANCELLED"
1✔
261
    FAILED = "FAILED"
1✔
262

263

264
class MessageMoveTask:
1✔
265
    """
266
    A task created by the ``StartMessageMoveTask`` operation.
267
    """
268

269
    # configurable fields
270
    source_arn: str
1✔
271
    """The arn of the DLQ the messages are currently in."""
1✔
272
    destination_arn: str | None = None
1✔
273
    """If the DestinationArn is not specified, the original source arn will be used as target."""
1✔
274
    max_number_of_messages_per_second: int | None = None
1✔
275

276
    # dynamic fields
277
    task_id: str
1✔
278
    status: str = MessageMoveTaskStatus.CREATED
1✔
279
    started_timestamp: datetime | None = None
1✔
280
    approximate_number_of_messages_moved: int | None = None
1✔
281
    approximate_number_of_messages_to_move: int | None = None
1✔
282
    failure_reason: str | None = None
1✔
283

284
    cancel_event: threading.Event
1✔
285

286
    def __init__(
1✔
287
        self, source_arn: str, destination_arn: str, max_number_of_messages_per_second: int = None
288
    ):
289
        self.task_id = long_uid()
1✔
290
        self.source_arn = source_arn
1✔
291
        self.destination_arn = destination_arn
1✔
292
        self.max_number_of_messages_per_second = max_number_of_messages_per_second
1✔
293
        self.cancel_event = threading.Event()
1✔
294

295
    def mark_started(self):
1✔
296
        self.started_timestamp = datetime.utcnow()
1✔
297
        self.status = MessageMoveTaskStatus.RUNNING
1✔
298
        self.cancel_event.clear()
1✔
299

300
    @property
1✔
301
    def task_handle(self) -> str:
1✔
302
        return encode_move_task_handle(self.task_id, self.source_arn)
1✔
303

304

305
class SqsQueue:
1✔
306
    name: str
1✔
307
    region: str
1✔
308
    account_id: str
1✔
309

310
    attributes: QueueAttributeMap
1✔
311
    tags: TagMap
1✔
312

313
    purge_in_progress: bool
1✔
314
    purge_timestamp: float | None
1✔
315

316
    delayed: set[SqsMessage]
1✔
317
    inflight: set[SqsMessage]
1✔
318
    receipts: dict[str, SqsMessage]
1✔
319

320
    def __init__(self, name: str, region: str, account_id: str, attributes=None, tags=None) -> None:
1✔
321
        self.name = name
1✔
322
        self.region = region
1✔
323
        self.account_id = account_id
1✔
324

325
        self._assert_queue_name(name)
1✔
326
        self.tags = tags or {}
1✔
327

328
        self.delayed = set()
1✔
329
        self.inflight = set()
1✔
330
        self.receipts = {}
1✔
331

332
        self.attributes = self.default_attributes()
1✔
333
        if attributes:
1✔
334
            self.validate_queue_attributes(attributes)
1✔
335
            self.attributes.update(attributes)
1✔
336

337
        self.purge_in_progress = False
1✔
338
        self.purge_timestamp = None
1✔
339

340
        self.permissions = set()
1✔
341
        self.mutex = threading.RLock()
1✔
342

343
    def shutdown(self):
1✔
UNCOV
344
        pass
×
345

346
    def default_attributes(self) -> QueueAttributeMap:
1✔
347
        return {
1✔
348
            QueueAttributeName.ApproximateNumberOfMessages: lambda: str(
349
                self.approx_number_of_messages
350
            ),
351
            QueueAttributeName.ApproximateNumberOfMessagesNotVisible: lambda: str(
352
                self.approx_number_of_messages_not_visible
353
            ),
354
            QueueAttributeName.ApproximateNumberOfMessagesDelayed: lambda: str(
355
                self.approx_number_of_messages_delayed
356
            ),
357
            QueueAttributeName.CreatedTimestamp: str(now()),
358
            QueueAttributeName.DelaySeconds: "0",
359
            QueueAttributeName.LastModifiedTimestamp: str(now()),
360
            QueueAttributeName.MaximumMessageSize: str(sqs_constants.DEFAULT_MAXIMUM_MESSAGE_SIZE),
361
            QueueAttributeName.MessageRetentionPeriod: "345600",
362
            QueueAttributeName.QueueArn: self.arn,
363
            QueueAttributeName.ReceiveMessageWaitTimeSeconds: "0",
364
            QueueAttributeName.VisibilityTimeout: "30",
365
            QueueAttributeName.SqsManagedSseEnabled: "true",
366
        }
367

368
    def update_delay_seconds(self, value: int):
1✔
369
        """
370
        For standard queues, the per-queue delay setting is not retroactive—changing the setting doesn't affect the
371
        delay of messages already in the queue. For FIFO queues, the per-queue delay setting is retroactive—changing
372
        the setting affects the delay of messages already in the queue.
373

374
        https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-delay-queues.html
375

376
        :param value: the number of seconds
377
        """
UNCOV
378
        self.attributes[QueueAttributeName.DelaySeconds] = str(value)
×
379

380
    def update_last_modified(self, timestamp: int = None):
1✔
UNCOV
381
        if timestamp is None:
×
382
            timestamp = now()
×
383

UNCOV
384
        self.attributes[QueueAttributeName.LastModifiedTimestamp] = str(timestamp)
×
385

386
    @property
1✔
387
    def arn(self) -> str:
1✔
388
        return f"arn:{get_partition(self.region)}:sqs:{self.region}:{self.account_id}:{self.name}"
1✔
389

390
    def url(self, context: RequestContext) -> str:
1✔
391
        """Return queue URL which depending on the endpoint strategy returns e.g.:
392
        * (standard) http://sqs.eu-west-1.localhost.localstack.cloud:4566/000000000000/myqueue
393
        * (domain) http://eu-west-1.queue.localhost.localstack.cloud:4566/000000000000/myqueue
394
        * (path) http://localhost.localstack.cloud:4566/queue/eu-central-1/000000000000/myqueue
395
        * otherwise: http://localhost.localstack.cloud:4566/000000000000/myqueue
396
        """
397

398
        scheme = config.get_protocol()  # TODO: should probably change to context.request.scheme
1✔
399
        host_definition = localstack_host()
1✔
400
        host_and_port = host_definition.host_and_port()
1✔
401

402
        endpoint_strategy = config.SQS_ENDPOINT_STRATEGY
1✔
403

404
        if endpoint_strategy == "dynamic":
1✔
UNCOV
405
            scheme = context.request.scheme
×
406
            # determine the endpoint strategy that should be used, and determine the host dynamically
UNCOV
407
            endpoint_strategy, host_and_port = guess_endpoint_strategy_and_host(
×
408
                context.request.host
409
            )
410

411
        if endpoint_strategy == "standard":
1✔
412
            # Region is always part of the queue URL
413
            # sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/my-queue
414
            scheme = context.request.scheme
1✔
415
            host_url = f"{scheme}://sqs.{self.region}.{host_and_port}"
1✔
416
        elif endpoint_strategy == "domain":
1✔
417
            # Legacy style
418
            # queue.localhost.localstack.cloud:4566/000000000000/my-queue (us-east-1)
419
            # or us-east-2.queue.localhost.localstack.cloud:4566/000000000000/my-queue
420
            region = "" if self.region == "us-east-1" else self.region + "."
1✔
421
            host_url = f"{scheme}://{region}queue.{host_and_port}"
1✔
422
        elif endpoint_strategy == "path":
1✔
423
            # https?://localhost:4566/queue/us-east-1/00000000000/my-queue (us-east-1)
424
            host_url = f"{scheme}://{host_and_port}/queue/{self.region}"
1✔
425
        else:
426
            host_url = f"{scheme}://{host_and_port}"
1✔
427

428
        return "{host}/{account_id}/{name}".format(
1✔
429
            host=host_url.rstrip("/"),
430
            account_id=self.account_id,
431
            name=self.name,
432
        )
433

434
    @property
1✔
435
    def redrive_policy(self) -> dict | None:
1✔
436
        if policy_document := self.attributes.get(QueueAttributeName.RedrivePolicy):
1✔
437
            return json.loads(policy_document)
1✔
438
        return None
1✔
439

440
    @property
1✔
441
    def max_receive_count(self) -> int | None:
1✔
442
        """
443
        Returns the maxReceiveCount attribute of the redrive policy. If no redrive policy is set, then it
444
        returns None.
445
        """
446
        if redrive_policy := self.redrive_policy:
1✔
447
            return int(redrive_policy["maxReceiveCount"])
1✔
448
        return None
1✔
449

450
    @property
1✔
451
    def visibility_timeout(self) -> int:
1✔
452
        return int(self.attributes[QueueAttributeName.VisibilityTimeout])
1✔
453

454
    @property
1✔
455
    def delay_seconds(self) -> int:
1✔
456
        return int(self.attributes[QueueAttributeName.DelaySeconds])
1✔
457

458
    @property
1✔
459
    def wait_time_seconds(self) -> int:
1✔
460
        return int(self.attributes[QueueAttributeName.ReceiveMessageWaitTimeSeconds])
1✔
461

462
    @property
1✔
463
    def message_retention_period(self) -> int:
1✔
464
        """
465
        ``MessageRetentionPeriod`` -- the length of time, in seconds, for which Amazon SQS retains a message. Valid
466
        values: An integer representing seconds, from 60 (1 minute) to 1,209,600 (14 days). Default: 345,600 (4 days).
467
        """
468
        return int(self.attributes[QueueAttributeName.MessageRetentionPeriod])
1✔
469

470
    @property
1✔
471
    def maximum_message_size(self) -> int:
1✔
472
        return int(self.attributes[QueueAttributeName.MaximumMessageSize])
1✔
473

474
    @property
1✔
475
    def approx_number_of_messages(self) -> int:
1✔
476
        raise NotImplementedError
477

478
    @property
1✔
479
    def approx_number_of_messages_not_visible(self) -> int:
1✔
480
        return len(self.inflight)
1✔
481

482
    @property
1✔
483
    def approx_number_of_messages_delayed(self) -> int:
1✔
484
        return len(self.delayed)
1✔
485

486
    def validate_receipt_handle(self, receipt_handle: str):
1✔
487
        if self.arn != extract_receipt_handle_info(receipt_handle).queue_arn:
1✔
UNCOV
488
            raise ReceiptHandleIsInvalid(
×
489
                f'The input receipt handle "{receipt_handle}" is not a valid receipt handle.'
490
            )
491

492
    def update_visibility_timeout(self, receipt_handle: str, visibility_timeout: int):
1✔
493
        with self.mutex:
1✔
494
            self.validate_receipt_handle(receipt_handle)
1✔
495

496
            if receipt_handle not in self.receipts:
1✔
497
                raise InvalidParameterValueException(
1✔
498
                    f"Value {receipt_handle} for parameter ReceiptHandle is invalid. Reason: Message does not exist "
499
                    f"or is not available for visibility timeout change."
500
                )
501

502
            standard_message = self.receipts[receipt_handle]
1✔
503

504
            if standard_message not in self.inflight:
1✔
505
                return
1✔
506

507
            standard_message.update_visibility_timeout(visibility_timeout)
1✔
508

509
            if visibility_timeout == 0:
1✔
510
                LOG.info(
1✔
511
                    "terminating the visibility timeout of %s",
512
                    standard_message.message["MessageId"],
513
                )
514
                # Terminating the visibility timeout for a message
515
                # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html#terminating-message-visibility-timeout
516
                self.inflight.remove(standard_message)
1✔
517
                self._put_message(standard_message)
1✔
518

519
    def remove(self, receipt_handle: str):
1✔
520
        with self.mutex:
1✔
521
            self.validate_receipt_handle(receipt_handle)
1✔
522

523
            if receipt_handle not in self.receipts:
1✔
524
                LOG.debug(
1✔
525
                    "no in-flight message found for receipt handle %s in queue %s",
526
                    receipt_handle,
527
                    self.arn,
528
                )
529
                return
1✔
530

531
            standard_message = self.receipts[receipt_handle]
1✔
532
            self._pre_delete_checks(standard_message, receipt_handle)
1✔
533
            standard_message.deleted = True
1✔
534
            LOG.debug(
1✔
535
                "deleting message %s from queue %s",
536
                standard_message.message["MessageId"],
537
                self.arn,
538
            )
539

540
            # remove all handles associated with this message
541
            for handle in standard_message.receipt_handles:
1✔
542
                del self.receipts[handle]
1✔
543
            standard_message.receipt_handles.clear()
1✔
544

545
            self._on_remove_message(standard_message)
1✔
546

547
    def _on_remove_message(self, message: SqsMessage):
1✔
548
        """Hook for queue-specific logic executed when a message is removed."""
UNCOV
549
        pass
×
550

551
    def put(
1✔
552
        self,
553
        message: Message,
554
        visibility_timeout: int = None,
555
        message_deduplication_id: str = None,
556
        message_group_id: str = None,
557
        delay_seconds: int = None,
558
    ) -> SqsMessage:
559
        raise NotImplementedError
560

561
    def receive(
1✔
562
        self,
563
        num_messages: int = 1,
564
        wait_time_seconds: int = None,
565
        visibility_timeout: int = None,
566
        *,
567
        poll_empty_queue: bool = False,
568
    ) -> ReceiveMessageResult:
569
        """
570
        Receive ``num_messages`` from the queue, and wait at max ``wait_time_seconds``. If a visibility
571
        timeout is given, also change the visibility timeout of all received messages accordingly.
572

573
        :param num_messages: the number of messages you want to get from the underlying queue
574
        :param wait_time_seconds: the number of seconds you want to wait
575
        :param visibility_timeout: an optional new visibility timeout
576
        :param poll_empty_queue: whether to keep polling an empty queue until the duration ``wait_time_seconds`` has elapsed
577
        :return: a ReceiveMessageResult object that contains the result of the operation
578
        """
579
        raise NotImplementedError
580

581
    def clear(self):
1✔
582
        """
583
        Calls clear on all internal datastructures that hold messages and data related to them.
584
        """
585
        with self.mutex:
1✔
586
            self.inflight.clear()
1✔
587
            self.delayed.clear()
1✔
588
            self.receipts.clear()
1✔
589

590
    def _put_message(self, message: SqsMessage):
1✔
591
        """Low-level put operation to put messages into a queue and modify visibilities accordingly."""
592
        raise NotImplementedError
593

594
    def create_receipt_handle(self, message: SqsMessage) -> str:
1✔
595
        return encode_receipt_handle(self.arn, message)
1✔
596

597
    def requeue_inflight_messages(self):
1✔
598
        if not self.inflight:
1✔
599
            return
1✔
600

601
        with self.mutex:
1✔
602
            messages = [message for message in self.inflight if message.is_visible]
1✔
603
            for standard_message in messages:
1✔
604
                LOG.debug(
1✔
605
                    "re-queueing inflight messages %s into queue %s",
606
                    standard_message,
607
                    self.arn,
608
                )
609
                self.inflight.remove(standard_message)
1✔
610
                self._put_message(standard_message)
1✔
611

612
    def enqueue_delayed_messages(self):
1✔
613
        if not self.delayed:
1✔
614
            return
1✔
615

616
        with self.mutex:
1✔
617
            messages = [message for message in self.delayed if not message.is_delayed]
1✔
618
            for standard_message in messages:
1✔
619
                LOG.debug(
1✔
620
                    "enqueueing delayed messages %s into queue %s",
621
                    standard_message.message["MessageId"],
622
                    self.arn,
623
                )
624
                self.delayed.remove(standard_message)
1✔
625
                self._put_message(standard_message)
1✔
626

627
    def remove_expired_messages(self):
1✔
628
        """
629
        Removes messages from the queue whose retention period has expired.
630
        """
631
        raise NotImplementedError
632

633
    def _assert_queue_name(self, name):
1✔
634
        if not re.match(r"^[a-zA-Z0-9_-]{1,80}$", name):
1✔
635
            raise InvalidParameterValueException(
1✔
636
                "Can only include alphanumeric characters, hyphens, or underscores. 1 to 80 in length"
637
            )
638

639
    def validate_queue_attributes(self, attributes):
1✔
640
        pass
1✔
641

642
    def add_permission(self, label: str, actions: list[str], account_ids: list[str]) -> None:
1✔
643
        """
644
        Create / append to a policy for usage with the add_permission api call
645

646
        :param actions: List of actions to be included in the policy, without the SQS: prefix
647
        :param account_ids: List of account ids to be included in the policy
648
        :param label: Permission label
649
        """
650
        statement = {
1✔
651
            "Sid": label,
652
            "Effect": "Allow",
653
            "Principal": {
654
                "AWS": [
655
                    f"arn:{get_partition(self.region)}:iam::{account_id}:root"
656
                    for account_id in account_ids
657
                ]
658
                if len(account_ids) > 1
659
                else f"arn:{get_partition(self.region)}:iam::{account_ids[0]}:root"
660
            },
661
            "Action": [f"SQS:{action}" for action in actions]
662
            if len(actions) > 1
663
            else f"SQS:{actions[0]}",
664
            "Resource": self.arn,
665
        }
666
        if policy := self.attributes.get(QueueAttributeName.Policy):
1✔
667
            policy = json.loads(policy)
1✔
668
            policy.setdefault("Statement", [])
1✔
669
        else:
670
            policy = {
1✔
671
                "Version": "2008-10-17",
672
                "Id": f"{self.arn}/SQSDefaultPolicy",
673
                "Statement": [],
674
            }
675
        policy.setdefault("Statement", [])
1✔
676
        existing_statement_ids = [statement.get("Sid") for statement in policy["Statement"]]
1✔
677
        if label in existing_statement_ids:
1✔
678
            raise InvalidParameterValueException(
1✔
679
                f"Value {label} for parameter Label is invalid. Reason: Already exists."
680
            )
681
        policy["Statement"].append(statement)
1✔
682
        self.attributes[QueueAttributeName.Policy] = json.dumps(policy)
1✔
683

684
    def remove_permission(self, label: str) -> None:
1✔
685
        """
686
        Delete a policy statement for usage of the remove_permission call
687

688
        :param label: Permission label
689
        """
690
        if policy := self.attributes.get(QueueAttributeName.Policy):
1✔
691
            policy = json.loads(policy)
1✔
692
            # this should not be necessary, but we can upload custom policies, so it's better to be safe
693
            policy.setdefault("Statement", [])
1✔
694
        else:
695
            policy = {
1✔
696
                "Version": "2008-10-17",
697
                "Id": f"{self.arn}/SQSDefaultPolicy",
698
                "Statement": [],
699
            }
700
        existing_statement_ids = [statement.get("Sid") for statement in policy["Statement"]]
1✔
701
        if label not in existing_statement_ids:
1✔
702
            raise InvalidParameterValueException(
1✔
703
                f"Value {label} for parameter Label is invalid. Reason: can't find label."
704
            )
705
        policy["Statement"] = [
1✔
706
            statement for statement in policy["Statement"] if statement.get("Sid") != label
707
        ]
708
        if policy["Statement"]:
1✔
709
            self.attributes[QueueAttributeName.Policy] = json.dumps(policy)
1✔
710
        else:
711
            del self.attributes[QueueAttributeName.Policy]
1✔
712

713
    def get_queue_attributes(self, attribute_names: AttributeNameList = None) -> dict[str, str]:
1✔
714
        if not attribute_names:
1✔
715
            return {}
1✔
716

717
        if QueueAttributeName.All in attribute_names:
1✔
718
            attribute_names = self.attributes.keys()
1✔
719

720
        result: dict[QueueAttributeName, str] = {}
1✔
721

722
        for attr in attribute_names:
1✔
723
            try:
1✔
724
                getattr(QueueAttributeName, attr)
1✔
725
            except AttributeError:
1✔
726
                raise InvalidAttributeName(f"Unknown Attribute {attr}.")
1✔
727

728
            value = self.attributes.get(attr)
1✔
729
            if callable(value):
1✔
730
                func = value
1✔
731
                value = func()
1✔
732
                if value is not None:
1✔
733
                    result[attr] = value
1✔
734
            elif value == "False" or value == "True":
1✔
735
                result[attr] = value.lower()
1✔
736
            elif value is not None:
1✔
737
                result[attr] = value
1✔
738
        return result
1✔
739

740
    @staticmethod
1✔
741
    def remove_expired_messages_from_heap(
1✔
742
        heap: list[SqsMessage], message_retention_period: int
743
    ) -> list[SqsMessage]:
744
        """
745
        Removes from the given heap of SqsMessages all messages that have expired in the context of the current time
746
        and the given message retention period. The method manipulates the heap but retains the heap property.
747

748
        :param heap: an array satisfying the heap property
749
        :param message_retention_period: the message retention period to use in relation to the current time
750
        :return: a list of expired messages that have been removed from the heap
751
        """
752
        th = time.time() - message_retention_period
1✔
753

754
        expired = []
1✔
755
        while heap:
1✔
756
            # here we're leveraging the heap property "that a[0] is always its smallest element"
757
            # and the assumption that message.created == message.priority
758
            message = heap[0]
1✔
759
            if th < message.created:
1✔
760
                break
1✔
761
            # remove the expired element
762
            expired.append(message)
1✔
763
            heapq.heappop(heap)
1✔
764

765
        return expired
1✔
766

767
    def _pre_delete_checks(self, standard_message: SqsMessage, receipt_handle: str) -> None:
1✔
768
        """
769
        Runs any potential checks if a message that has been successfully identified via a receipt handle
770
        is indeed supposed to be deleted.
771
        For example, a receipt handle that has expired might not lead to deletion.
772

773
        :param standard_message: The message to be deleted
774
        :param receipt_handle: The handle associated with the message
775
        :return: None. Potential violations raise errors.
776
        """
777
        pass
1✔
778

779

780
class StandardQueue(SqsQueue):
1✔
781
    visible: InterruptiblePriorityQueue[SqsMessage]
1✔
782
    inflight: set[SqsMessage]
1✔
783

784
    def __init__(self, name: str, region: str, account_id: str, attributes=None, tags=None) -> None:
1✔
785
        super().__init__(name, region, account_id, attributes, tags)
1✔
786
        self.visible = InterruptiblePriorityQueue()
1✔
787

788
    def clear(self):
1✔
789
        with self.mutex:
1✔
790
            super().clear()
1✔
791
            self.visible.queue.clear()
1✔
792

793
    @property
1✔
794
    def approx_number_of_messages(self):
1✔
795
        return self.visible.qsize()
1✔
796

797
    def shutdown(self):
1✔
798
        self.visible.shutdown()
1✔
799

800
    def put(
1✔
801
        self,
802
        message: Message,
803
        visibility_timeout: int = None,
804
        message_deduplication_id: str = None,
805
        message_group_id: str = None,
806
        delay_seconds: int = None,
807
    ):
808
        if message_deduplication_id:
1✔
UNCOV
809
            raise InvalidParameterValueException(
×
810
                f"Value {message_deduplication_id} for parameter MessageDeduplicationId is invalid. Reason: The "
811
                f"request includes a parameter that is not valid for this queue type."
812
            )
813

814
        standard_message = SqsMessage(
1✔
815
            time.time(),
816
            message,
817
            message_group_id=message_group_id,
818
        )
819

820
        if visibility_timeout is not None:
1✔
UNCOV
821
            standard_message.visibility_timeout = visibility_timeout
×
822
        else:
823
            # use the attribute from the queue
824
            standard_message.visibility_timeout = self.visibility_timeout
1✔
825

826
        if delay_seconds is not None:
1✔
827
            standard_message.delay_seconds = delay_seconds
1✔
828
        else:
829
            standard_message.delay_seconds = self.delay_seconds
1✔
830

831
        if standard_message.is_delayed:
1✔
832
            self.delayed.add(standard_message)
1✔
833
        else:
834
            self._put_message(standard_message)
1✔
835

836
        return standard_message
1✔
837

838
    def _put_message(self, message: SqsMessage):
1✔
839
        self.visible.put_nowait(message)
1✔
840

841
    def remove_expired_messages(self):
1✔
842
        with self.mutex:
1✔
843
            messages = self.remove_expired_messages_from_heap(
1✔
844
                self.visible.queue, self.message_retention_period
845
            )
846

847
        for message in messages:
1✔
848
            LOG.debug("Removed expired message %s from queue %s", message.message_id, self.arn)
1✔
849

850
    def receive(
1✔
851
        self,
852
        num_messages: int = 1,
853
        wait_time_seconds: int = None,
854
        visibility_timeout: int = None,
855
        *,
856
        poll_empty_queue: bool = False,
857
    ) -> ReceiveMessageResult:
858
        result = ReceiveMessageResult()
1✔
859

860
        max_receive_count = self.max_receive_count
1✔
861
        visibility_timeout = (
1✔
862
            self.visibility_timeout if visibility_timeout is None else visibility_timeout
863
        )
864

865
        block = True if wait_time_seconds else False
1✔
866
        timeout = wait_time_seconds or 0
1✔
867
        start = time.time()
1✔
868

869
        # collect messages
870
        while True:
1✔
871
            try:
1✔
872
                message = self.visible.get(block=block, timeout=timeout)
1✔
873
            except Empty:
1✔
874
                break
1✔
875
            # setting block to false guarantees that, if we've already waited before, we don't wait the
876
            # full time again in the next iteration if max_number_of_messages is set but there are no more
877
            # messages in the queue. see https://github.com/localstack/localstack/issues/5824
878
            if not poll_empty_queue:
1✔
879
                block = False
1✔
880

881
            timeout -= time.time() - start
1✔
882
            if timeout < 0:
1✔
883
                timeout = 0
1✔
884

885
            if message.deleted:
1✔
886
                # filter messages that were deleted with an expired receipt handle after they have been
887
                # re-queued. this can only happen due to a race with `remove`.
UNCOV
888
                continue
×
889

890
            # update message attributes
891
            message.receive_count += 1
1✔
892
            message.update_visibility_timeout(visibility_timeout)
1✔
893
            message.set_last_received(time.time())
1✔
894
            if message.first_received is None:
1✔
895
                message.first_received = message.last_received
1✔
896

897
            LOG.debug("de-queued message %s from %s", message, self.arn)
1✔
898
            if max_receive_count and message.receive_count > max_receive_count:
1✔
899
                # the message needs to move to the DLQ
900
                LOG.debug(
1✔
901
                    "message %s has been received %d times, marking it for DLQ",
902
                    message,
903
                    message.receive_count,
904
                )
905
                result.dead_letter_messages.append(message)
1✔
906
            else:
907
                result.successful.append(message)
1✔
908
                message.increment_approximate_receive_count()
1✔
909

910
                # now we can return
911
                if len(result.successful) == num_messages:
1✔
912
                    break
1✔
913

914
        # now process the successful result messages: create receipt handles and manage visibility.
915
        for message in result.successful:
1✔
916
            # manage receipt handle
917
            receipt_handle = self.create_receipt_handle(message)
1✔
918
            message.receipt_handles.add(receipt_handle)
1✔
919
            self.receipts[receipt_handle] = message
1✔
920
            result.receipt_handles.append(receipt_handle)
1✔
921

922
            # manage message visibility
923
            if message.visibility_timeout == 0:
1✔
924
                self.visible.put_nowait(message)
1✔
925
            else:
926
                self.inflight.add(message)
1✔
927

928
        return result
1✔
929

930
    def _on_remove_message(self, message: SqsMessage):
1✔
931
        try:
1✔
932
            self.inflight.remove(message)
1✔
933
        except KeyError:
1✔
934
            # this likely means the message was removed with an expired receipt handle unfortunately this
935
            # means we need to scan the queue for the element and remove it from there, and then re-heapify
936
            # the queue
937
            try:
1✔
938
                self.visible.queue.remove(message)
1✔
939
                heapq.heapify(self.visible.queue)
1✔
940
            except ValueError:
1✔
941
                # this may happen if the message no longer exists because it was removed earlier
942
                pass
1✔
943

944
    def validate_queue_attributes(self, attributes):
1✔
945
        valid = [
1✔
946
            k[1]
947
            for k in inspect.getmembers(
948
                QueueAttributeName, lambda x: isinstance(x, str) and not x.startswith("__")
949
            )
950
            if k[1] not in sqs_constants.INVALID_STANDARD_QUEUE_ATTRIBUTES
951
        ]
952

953
        for k in attributes.keys():
1✔
954
            if k in [QueueAttributeName.FifoThroughputLimit, QueueAttributeName.DeduplicationScope]:
1✔
955
                raise InvalidAttributeName(
1✔
956
                    f"You can specify the {k} only when FifoQueue is set to true."
957
                )
958
            if k not in valid:
1✔
959
                raise InvalidAttributeName(f"Unknown Attribute {k}.")
1✔
960

961

962
class MessageGroup:
1✔
963
    message_group_id: str
1✔
964
    messages: list[SqsMessage]
1✔
965

966
    def __init__(self, message_group_id: str):
1✔
967
        self.message_group_id = message_group_id
1✔
968
        self.messages = []
1✔
969

970
    def empty(self) -> bool:
1✔
971
        return not self.messages
1✔
972

973
    def size(self) -> int:
1✔
UNCOV
974
        return len(self.messages)
×
975

976
    def pop(self) -> SqsMessage:
1✔
977
        return heapq.heappop(self.messages)
1✔
978

979
    def push(self, message: SqsMessage):
1✔
980
        heapq.heappush(self.messages, message)
1✔
981

982
    def __eq__(self, other):
1✔
UNCOV
983
        return self.message_group_id == other.message_group_id
×
984

985
    def __hash__(self):
1✔
986
        return self.message_group_id.__hash__()
1✔
987

988
    def __repr__(self):
989
        return f"MessageGroup(id={self.message_group_id}, size={len(self.messages)})"
990

991

992
class FifoQueue(SqsQueue):
1✔
993
    """
994
    A FIFO queue behaves differently than a default queue. Most behavior has to be implemented separately.
995

996
    See https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html
997

998
    TODO: raise exceptions when trying to remove a message with an expired receipt handle
999
    """
1000

1001
    deduplication: dict[str, SqsMessage]
1✔
1002
    message_groups: dict[str, MessageGroup]
1✔
1003
    inflight_groups: set[MessageGroup]
1✔
1004
    message_group_queue: InterruptibleQueue
1✔
1005
    deduplication_scope: str
1✔
1006

1007
    def __init__(self, name: str, region: str, account_id: str, attributes=None, tags=None) -> None:
1✔
1008
        super().__init__(name, region, account_id, attributes, tags)
1✔
1009
        self.deduplication = {}
1✔
1010

1011
        self.message_groups = {}
1✔
1012
        self.inflight_groups = set()
1✔
1013
        self.message_group_queue = InterruptibleQueue()
1✔
1014

1015
        # SQS does not seem to change the deduplication behaviour of fifo queues if you
1016
        # change to/from 'queue'/'messageGroup' scope after creation -> we need to set this on creation
1017
        self.deduplication_scope = self.attributes[QueueAttributeName.DeduplicationScope]
1✔
1018

1019
    @property
1✔
1020
    def approx_number_of_messages(self):
1✔
1021
        n = 0
1✔
1022
        for message_group in self.message_groups.values():
1✔
1023
            n += len(message_group.messages)
1✔
1024
        return n
1✔
1025

1026
    def shutdown(self):
1✔
1027
        self.message_group_queue.shutdown()
1✔
1028

1029
    def get_message_group(self, message_group_id: str) -> MessageGroup:
1✔
1030
        """
1031
        Thread safe lazy factory for MessageGroup objects.
1032

1033
        :param message_group_id: the message group ID
1034
        :return: a new or existing MessageGroup object
1035
        """
1036
        with self.mutex:
1✔
1037
            if message_group_id not in self.message_groups:
1✔
1038
                self.message_groups[message_group_id] = MessageGroup(message_group_id)
1✔
1039

1040
            return self.message_groups.get(message_group_id)
1✔
1041

1042
    def default_attributes(self) -> QueueAttributeMap:
1✔
1043
        return {
1✔
1044
            **super().default_attributes(),
1045
            QueueAttributeName.ContentBasedDeduplication: "false",
1046
            QueueAttributeName.DeduplicationScope: "queue",
1047
            QueueAttributeName.FifoThroughputLimit: "perQueue",
1048
        }
1049

1050
    def update_delay_seconds(self, value: int):
1✔
UNCOV
1051
        super().update_delay_seconds(value)
×
UNCOV
1052
        for message in self.delayed:
×
UNCOV
1053
            message.delay_seconds = value
×
1054

1055
    def _pre_delete_checks(self, message: SqsMessage, receipt_handle: str) -> None:
1✔
1056
        if message.is_visible:
1✔
1057
            raise InvalidParameterValueException(
1✔
1058
                f"Value {receipt_handle} for parameter ReceiptHandle is invalid. Reason: The receipt handle has expired."
1059
            )
1060

1061
    def remove(self, receipt_handle: str):
1✔
1062
        self.validate_receipt_handle(receipt_handle)
1✔
1063

1064
        super().remove(receipt_handle)
1✔
1065

1066
    def put(
1✔
1067
        self,
1068
        message: Message,
1069
        visibility_timeout: int = None,
1070
        message_deduplication_id: str = None,
1071
        message_group_id: str = None,
1072
        delay_seconds: int = None,
1073
    ):
1074
        if delay_seconds:
1✔
1075
            # in fifo queues, delay is only applied on queue level. However, explicitly setting delay_seconds=0 is valid
1076
            raise InvalidParameterValueException(
1✔
1077
                f"Value {delay_seconds} for parameter DelaySeconds is invalid. Reason: The request include parameter "
1078
                f"that is not valid for this queue type."
1079
            )
1080

1081
        if not message_group_id:
1✔
1082
            raise MissingRequiredParameterException(
1✔
1083
                "The request must contain the parameter MessageGroupId."
1084
            )
1085
        dedup_id = message_deduplication_id
1✔
1086
        content_based_deduplication = not is_message_deduplication_id_required(self)
1✔
1087
        if not dedup_id and content_based_deduplication:
1✔
1088
            dedup_id = hashlib.sha256(message.get("Body").encode("utf-8")).hexdigest()
1✔
1089
        if not dedup_id:
1✔
1090
            raise InvalidParameterValueException(
1✔
1091
                "The queue should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided explicitly"
1092
            )
1093

1094
        fifo_message = SqsMessage(
1✔
1095
            time.time(),
1096
            message,
1097
            message_deduplication_id=dedup_id,
1098
            message_group_id=message_group_id,
1099
            sequence_number=str(self.next_sequence_number()),
1100
        )
1101
        if visibility_timeout is not None:
1✔
UNCOV
1102
            fifo_message.visibility_timeout = visibility_timeout
×
1103
        else:
1104
            # use the attribute from the queue
1105
            fifo_message.visibility_timeout = self.visibility_timeout
1✔
1106

1107
        # FIFO queues always use the queue level setting for 'DelaySeconds'
1108
        fifo_message.delay_seconds = self.delay_seconds
1✔
1109

1110
        original_message = self.deduplication.get(dedup_id)
1✔
1111
        if (
1✔
1112
            original_message
1113
            and original_message.priority + sqs_constants.DEDUPLICATION_INTERVAL_IN_SEC
1114
            > fifo_message.priority
1115
            # account for deduplication scope required for (but not restricted to) high-throughput-mode
1116
            and (
1117
                not self.deduplication_scope == "messageGroup"
1118
                or fifo_message.message_group_id == original_message.message_group_id
1119
            )
1120
        ):
1121
            message["MessageId"] = original_message.message["MessageId"]
1✔
1122
        else:
1123
            if fifo_message.is_delayed:
1✔
1124
                self.delayed.add(fifo_message)
1✔
1125
            else:
1126
                self._put_message(fifo_message)
1✔
1127

1128
            self.deduplication[dedup_id] = fifo_message
1✔
1129

1130
        return fifo_message
1✔
1131

1132
    def _put_message(self, message: SqsMessage):
1✔
1133
        """Once a message becomes visible in a FIFO queue, its message group also becomes visible."""
1134
        message_group = self.get_message_group(message.message_group_id)
1✔
1135

1136
        with self.mutex:
1✔
1137
            previously_empty = message_group.empty()
1✔
1138
            # put the message into the group
1139
            message_group.push(message)
1✔
1140

1141
            # new messages should not make groups visible that are currently inflight
1142
            if message.receive_count < 1 and message_group in self.inflight_groups:
1✔
1143
                return
1✔
1144
            # if an older message becomes visible again in the queue, that message's group becomes visible also.
1145
            if message_group in self.inflight_groups:
1✔
1146
                self.inflight_groups.remove(message_group)
1✔
1147
                self.message_group_queue.put_nowait(message_group)
1✔
1148
            # if the group was previously empty, it was not yet added back to the queue
1149
            elif previously_empty:
1✔
1150
                self.message_group_queue.put_nowait(message_group)
1✔
1151

1152
    def remove_expired_messages(self):
1✔
1153
        with self.mutex:
1✔
1154
            retention_period = self.message_retention_period
1✔
1155
            for message_group in self.message_groups.values():
1✔
1156
                messages = self.remove_expired_messages_from_heap(
1✔
1157
                    message_group.messages, retention_period
1158
                )
1159

1160
                for message in messages:
1✔
1161
                    LOG.debug(
1✔
1162
                        "Removed expired message %s from message group %s in queue %s",
1163
                        message.message_id,
1164
                        message.message_group_id,
1165
                        self.arn,
1166
                    )
1167

1168
    def receive(
1✔
1169
        self,
1170
        num_messages: int = 1,
1171
        wait_time_seconds: int = None,
1172
        visibility_timeout: int = None,
1173
        *,
1174
        poll_empty_queue: bool = False,
1175
    ) -> ReceiveMessageResult:
1176
        """
1177
        Receive logic for FIFO queues is different from standard queues. See
1178
        https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues-understanding-logic.html.
1179

1180
        When receiving messages from a FIFO queue with multiple message group IDs, SQS first attempts to
1181
        return as many messages with the same message group ID as possible. This allows other consumers to
1182
        process messages with a different message group ID. When you receive a message with a message group
1183
        ID, no more messages for the same message group ID are returned unless you delete the message, or it
1184
        becomes visible.
1185
        """
1186
        result = ReceiveMessageResult()
1✔
1187

1188
        max_receive_count = self.max_receive_count
1✔
1189
        visibility_timeout = (
1✔
1190
            self.visibility_timeout if visibility_timeout is None else visibility_timeout
1191
        )
1192

1193
        block = True if wait_time_seconds else False
1✔
1194
        timeout = wait_time_seconds or 0
1✔
1195
        start = time.time()
1✔
1196

1197
        received_groups: set[MessageGroup] = set()
1✔
1198

1199
        # collect messages over potentially multiple groups
1200
        while True:
1✔
1201
            try:
1✔
1202
                group: MessageGroup = self.message_group_queue.get(block=block, timeout=timeout)
1✔
1203
            except Empty:
1✔
1204
                break
1✔
1205

1206
            if group.empty():
1✔
1207
                # this can be the case if all messages in the group are still invisible or
1208
                # if all messages of a group have been processed.
1209
                # TODO: it should be blocking until at least one message is in the queue, but we don't
1210
                #  want to block the group
1211
                # TODO: check behavior in case it happens if all messages were removed from a group due to message
1212
                #  retention period.
1213
                timeout -= time.time() - start
1✔
1214
                if timeout < 0:
1✔
1215
                    timeout = 0
1✔
1216
                continue
1✔
1217

1218
            self.inflight_groups.add(group)
1✔
1219

1220
            received_groups.add(group)
1✔
1221

1222
            if not poll_empty_queue:
1✔
1223
                block = False
1✔
1224

1225
            # we lock the queue while accessing the groups to not get into races with re-queueing/deleting
1226
            with self.mutex:
1✔
1227
                # collect messages from the group until a continue/break condition is met
1228
                while True:
1✔
1229
                    try:
1✔
1230
                        message = group.pop()
1✔
1231
                    except IndexError:
1✔
1232
                        break
1✔
1233

1234
                    if message.deleted:
1✔
1235
                        # this means the message was deleted with a receipt handle after its visibility
1236
                        # timeout expired and the messages was re-queued in the meantime.
UNCOV
1237
                        continue
×
1238

1239
                    # update message attributes
1240
                    message.receive_count += 1
1✔
1241
                    message.update_visibility_timeout(visibility_timeout)
1✔
1242
                    message.set_last_received(time.time())
1✔
1243
                    if message.first_received is None:
1✔
1244
                        message.first_received = message.last_received
1✔
1245

1246
                    LOG.debug("de-queued message %s from fifo queue %s", message, self.arn)
1✔
1247
                    if max_receive_count and message.receive_count > max_receive_count:
1✔
1248
                        # the message needs to move to the DLQ
1249
                        LOG.debug(
1✔
1250
                            "message %s has been received %d times, marking it for DLQ",
1251
                            message,
1252
                            message.receive_count,
1253
                        )
1254
                        result.dead_letter_messages.append(message)
1✔
1255
                    else:
1256
                        result.successful.append(message)
1✔
1257
                        message.increment_approximate_receive_count()
1✔
1258

1259
                        # now we can break the inner loop
1260
                        if len(result.successful) == num_messages:
1✔
1261
                            break
1✔
1262

1263
                # but we also need to check the condition to return from the outer loop
1264
                if len(result.successful) == num_messages:
1✔
1265
                    break
1✔
1266

1267
        # now process the successful result messages: create receipt handles and manage visibility.
1268
        # we use the mutex again because we are modifying the group
1269
        with self.mutex:
1✔
1270
            for message in result.successful:
1✔
1271
                # manage receipt handle
1272
                receipt_handle = self.create_receipt_handle(message)
1✔
1273
                message.receipt_handles.add(receipt_handle)
1✔
1274
                self.receipts[receipt_handle] = message
1✔
1275
                result.receipt_handles.append(receipt_handle)
1✔
1276

1277
                # manage message visibility
1278
                if message.visibility_timeout == 0:
1✔
1279
                    self._put_message(message)
1✔
1280
                else:
1281
                    self.inflight.add(message)
1✔
1282

1283
        return result
1✔
1284

1285
    def _on_remove_message(self, message: SqsMessage):
1✔
1286
        # if a message is deleted from the queue, the message's group can become visible again
1287
        message_group = self.get_message_group(message.message_group_id)
1✔
1288

1289
        with self.mutex:
1✔
1290
            try:
1✔
1291
                self.inflight.remove(message)
1✔
UNCOV
1292
            except KeyError:
×
1293
                # in FIFO queues, this should not happen, as expired receipt handles cannot be used to
1294
                # delete a message.
UNCOV
1295
                pass
×
1296
            self.update_message_group_visibility(message_group)
1✔
1297

1298
    def update_message_group_visibility(self, message_group: MessageGroup):
1✔
1299
        """
1300
        Check if the passed message group should be made visible again
1301
        """
1302

1303
        with self.mutex:
1✔
1304
            if message_group in self.inflight_groups:
1✔
1305
                # it becomes visible again only if there are no other in flight messages in that group
1306
                for message in self.inflight:
1✔
1307
                    if message.message_group_id == message_group.message_group_id:
1✔
1308
                        return
1✔
1309

1310
                self.inflight_groups.remove(message_group)
1✔
1311
                if not message_group.empty():
1✔
1312
                    self.message_group_queue.put_nowait(message_group)
1✔
1313

1314
    def _assert_queue_name(self, name):
1✔
1315
        if not name.endswith(".fifo"):
1✔
1316
            raise InvalidParameterValueException(
1✔
1317
                "The name of a FIFO queue can only include alphanumeric characters, hyphens, or underscores, "
1318
                "must end with .fifo suffix and be 1 to 80 in length"
1319
            )
1320
        # The .fifo suffix counts towards the 80-character queue name quota.
1321
        queue_name = name[:-5] + "_fifo"
1✔
1322
        super()._assert_queue_name(queue_name)
1✔
1323

1324
    def validate_queue_attributes(self, attributes):
1✔
1325
        valid = [
1✔
1326
            k[1]
1327
            for k in inspect.getmembers(QueueAttributeName)
1328
            if k not in sqs_constants.INTERNAL_QUEUE_ATTRIBUTES
1329
        ]
1330
        for k in attributes.keys():
1✔
1331
            if k not in valid:
1✔
UNCOV
1332
                raise InvalidAttributeName(f"Unknown Attribute {k}.")
×
1333
        # Special Cases
1334
        fifo = attributes.get(QueueAttributeName.FifoQueue)
1✔
1335
        if fifo and fifo.lower() != "true":
1✔
1336
            raise InvalidAttributeValue(
1✔
1337
                "Invalid value for the parameter FifoQueue. Reason: Modifying queue type is not supported."
1338
            )
1339

1340
    def next_sequence_number(self):
1✔
1341
        return next(global_message_sequence())
1✔
1342

1343
    def clear(self):
1✔
1344
        with self.mutex:
1✔
1345
            super().clear()
1✔
1346
            self.message_groups.clear()
1✔
1347
            self.inflight_groups.clear()
1✔
1348
            self.message_group_queue.queue.clear()
1✔
1349
            self.deduplication.clear()
1✔
1350

1351

1352
class SqsStore(BaseStore):
1✔
1353
    queues: dict[str, SqsQueue] = LocalAttribute(default=dict)
1✔
1354

1355
    deleted: dict[str, float] = LocalAttribute(default=dict)
1✔
1356

1357
    move_tasks: dict[str, MessageMoveTask] = LocalAttribute(default=dict)
1✔
1358
    """Maps task IDs to their ``MoveMessageTask`` object. Task IDs can be found by decoding a task handle."""
1✔
1359

1360
    def expire_deleted(self):
1✔
1361
        for k in list(self.deleted.keys()):
1✔
1362
            if self.deleted[k] <= (time.time() - sqs_constants.RECENTLY_DELETED_TIMEOUT):
1✔
1363
                del self.deleted[k]
1✔
1364

1365

1366
sqs_stores = AccountRegionBundle("sqs", SqsStore)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc