• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 22334798432

23 Feb 2026 06:42PM UTC coverage: 86.956% (-0.02%) from 86.973%
22334798432

push

github

web-flow
S3: regenerate test snapshots & parity fixes (#13824)

69831 of 80306 relevant lines covered (86.96%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.14
/localstack-core/localstack/services/sqs/provider.py
1
import json
1✔
2
import logging
1✔
3
import re
1✔
4
import threading
1✔
5
import time
1✔
6
from collections.abc import Iterable
1✔
7
from concurrent.futures.thread import ThreadPoolExecutor
1✔
8
from itertools import islice
1✔
9

10
from botocore.utils import InvalidArnException
1✔
11
from werkzeug import Request as WerkzeugRequest
1✔
12

13
from localstack import config
1✔
14
from localstack.aws.api import RequestContext, ServiceException
1✔
15
from localstack.aws.api.sqs import (
1✔
16
    ActionNameList,
17
    AttributeNameList,
18
    AWSAccountIdList,
19
    BatchEntryIdsNotDistinct,
20
    BatchRequestTooLong,
21
    BatchResultErrorEntry,
22
    BoxedInteger,
23
    CancelMessageMoveTaskResult,
24
    ChangeMessageVisibilityBatchRequestEntryList,
25
    ChangeMessageVisibilityBatchResult,
26
    CreateQueueResult,
27
    DeleteMessageBatchRequestEntryList,
28
    DeleteMessageBatchResult,
29
    DeleteMessageBatchResultEntry,
30
    EmptyBatchRequest,
31
    GetQueueAttributesResult,
32
    GetQueueUrlResult,
33
    InvalidAttributeName,
34
    InvalidBatchEntryId,
35
    InvalidMessageContents,
36
    ListDeadLetterSourceQueuesResult,
37
    ListMessageMoveTasksResult,
38
    ListMessageMoveTasksResultEntry,
39
    ListQueuesResult,
40
    ListQueueTagsResult,
41
    Message,
42
    MessageAttributeNameList,
43
    MessageBodyAttributeMap,
44
    MessageBodySystemAttributeMap,
45
    MessageSystemAttributeList,
46
    MessageSystemAttributeName,
47
    NullableInteger,
48
    PurgeQueueInProgress,
49
    QueueAttributeMap,
50
    QueueAttributeName,
51
    QueueDeletedRecently,
52
    QueueDoesNotExist,
53
    QueueNameExists,
54
    ReceiveMessageResult,
55
    ResourceNotFoundException,
56
    SendMessageBatchRequestEntryList,
57
    SendMessageBatchResult,
58
    SendMessageBatchResultEntry,
59
    SendMessageResult,
60
    SqsApi,
61
    StartMessageMoveTaskResult,
62
    String,
63
    TagKeyList,
64
    TagMap,
65
    Token,
66
    TooManyEntriesInBatchRequest,
67
)
68
from localstack.aws.spec import load_service
1✔
69
from localstack.config import SQS_DISABLE_MAX_NUMBER_OF_MESSAGE_LIMIT
1✔
70
from localstack.services.edge import ROUTER
1✔
71
from localstack.services.plugins import ServiceLifecycleHook
1✔
72
from localstack.services.sqs import constants as sqs_constants
1✔
73
from localstack.services.sqs import query_api
1✔
74
from localstack.services.sqs.constants import (
1✔
75
    HEADER_LOCALSTACK_SQS_OVERRIDE_MESSAGE_COUNT,
76
    HEADER_LOCALSTACK_SQS_OVERRIDE_WAIT_TIME_SECONDS,
77
    MAX_RESULT_LIMIT,
78
)
79
from localstack.services.sqs.developer_api import SqsDeveloperApi
1✔
80
from localstack.services.sqs.exceptions import (
1✔
81
    InvalidParameterValueException,
82
    MissingRequiredParameterException,
83
)
84
from localstack.services.sqs.models import (
1✔
85
    FifoQueue,
86
    MessageMoveTask,
87
    MessageMoveTaskStatus,
88
    SqsMessage,
89
    SqsQueue,
90
    SqsStore,
91
    StandardQueue,
92
    sqs_stores,
93
    to_sqs_api_message,
94
)
95
from localstack.services.sqs.utils import (
1✔
96
    create_message_attribute_hash,
97
    decode_move_task_handle,
98
    generate_message_id,
99
    is_fifo_queue,
100
    is_message_deduplication_id_required,
101
    parse_queue_url,
102
)
103
from localstack.services.stores import AccountRegionBundle
1✔
104
from localstack.state import StateVisitor
1✔
105
from localstack.utils.aws.arns import parse_arn
1✔
106
from localstack.utils.bootstrap import is_api_enabled
1✔
107
from localstack.utils.cloudwatch.cloudwatch_util import (
1✔
108
    SqsMetricBatchData,
109
    publish_sqs_metric,
110
    publish_sqs_metric_batch,
111
)
112
from localstack.utils.collections import PaginatedList
1✔
113
from localstack.utils.run import FuncThread
1✔
114
from localstack.utils.scheduler import Scheduler
1✔
115
from localstack.utils.strings import md5, token_generator
1✔
116
from localstack.utils.threads import start_thread
1✔
117
from localstack.utils.time import now
1✔
118

119
LOG = logging.getLogger(__name__)
1✔
120

121
MAX_NUMBER_OF_MESSAGES = 10
1✔
122
_STORE_LOCK = threading.RLock()
1✔
123

124

125
def assert_queue_name(queue_name: str, fifo: bool = False):
1✔
126
    if queue_name.endswith(".fifo"):
×
127
        if not fifo:
×
128
            # Standard queues with .fifo suffix are not allowed
129
            raise InvalidParameterValueException(
×
130
                "Can only include alphanumeric characters, hyphens, or underscores. 1 to 80 in length"
131
            )
132
        # The .fifo suffix counts towards the 80-character queue name quota.
133
        queue_name = queue_name[:-5] + "_fifo"
×
134

135
    # slashes are actually not allowed, but we've allowed it explicitly in localstack
136
    if not re.match(r"^[a-zA-Z0-9/_-]{1,80}$", queue_name):
×
137
        raise InvalidParameterValueException(
×
138
            "Can only include alphanumeric characters, hyphens, or underscores. 1 to 80 in length"
139
        )
140

141

142
def check_message_min_size(message_body: str):
1✔
143
    if _message_body_size(message_body) == 0:
1✔
144
        raise MissingRequiredParameterException(
1✔
145
            "The request must contain the parameter MessageBody."
146
        )
147

148

149
def check_message_max_size(
1✔
150
    message_body: str, message_attributes: MessageBodyAttributeMap, max_message_size: int
151
):
152
    # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-messages.html
153
    error = "One or more parameters are invalid. "
1✔
154
    error += f"Reason: Message must be shorter than {max_message_size} bytes."
1✔
155
    if (
1✔
156
        _message_body_size(message_body) + _message_attributes_size(message_attributes)
157
        > max_message_size
158
    ):
159
        raise InvalidParameterValueException(error)
1✔
160

161

162
def _message_body_size(body: str):
1✔
163
    return _bytesize(body)
1✔
164

165

166
def _message_attributes_size(attributes: MessageBodyAttributeMap):
1✔
167
    if not attributes:
1✔
168
        return 0
1✔
169
    message_attributes_keys_size = sum(_bytesize(k) for k in attributes.keys())
1✔
170
    message_attributes_values_size = sum(
1✔
171
        sum(_bytesize(v) for v in attr.values()) for attr in attributes.values()
172
    )
173
    return message_attributes_keys_size + message_attributes_values_size
1✔
174

175

176
def _bytesize(value: str | bytes):
1✔
177
    # must encode as utf8 to get correct bytes with len
178
    return len(value.encode("utf8")) if isinstance(value, str) else len(value)
1✔
179

180

181
def check_message_content(message_body: str):
1✔
182
    error = "Invalid characters found. Valid unicode characters are #x9 | #xA | #xD | #x20 to #xD7FF | #xE000 to #xFFFD | #x10000 to #x10FFFF"
1✔
183

184
    if not re.match(sqs_constants.MSG_CONTENT_REGEX, message_body):
1✔
185
        raise InvalidMessageContents(error)
1✔
186

187

188
class CloudwatchDispatcher:
1✔
189
    """
190
    Dispatches SQS metrics for specific api-calls using a ThreadPool
191
    """
192

193
    def __init__(self, num_thread: int = 3):
1✔
194
        self.executor = ThreadPoolExecutor(
1✔
195
            num_thread, thread_name_prefix="sqs-metrics-cloudwatch-dispatcher"
196
        )
197
        self.running = True
1✔
198

199
    def shutdown(self):
1✔
200
        self.executor.shutdown(wait=False, cancel_futures=True)
1✔
201
        self.running = False
1✔
202

203
    def dispatch_sqs_metric(
1✔
204
        self,
205
        account_id: str,
206
        region: str,
207
        queue_name: str,
208
        metric: str,
209
        value: float = 1,
210
        unit: str = "Count",
211
    ):
212
        """
213
        Publishes a metric to Cloudwatch using a Threadpool
214
        :param account_id The account id that should be used for Cloudwatch client
215
        :param region The region that should be used for Cloudwatch client
216
        :param queue_name The name of the queue that the metric belongs to
217
        :param metric The name of the metric
218
        :param value The value for that metric, default 1
219
        :param unit The unit for the value, default "Count"
220
        """
221
        if not self.running:
1✔
222
            return
×
223

224
        self.executor.submit(
1✔
225
            publish_sqs_metric,
226
            account_id=account_id,
227
            region=region,
228
            queue_name=queue_name,
229
            metric=metric,
230
            value=value,
231
            unit=unit,
232
        )
233

234
    def dispatch_metric_message_sent(self, queue: SqsQueue, message_body_size: int):
1✔
235
        """
236
        Sends metric 'NumberOfMessagesSent' and 'SentMessageSize' to Cloudwatch
237
        :param queue The Queue for which the metric will be send
238
        :param message_body_size the size of the message in bytes
239
        """
240
        self.dispatch_sqs_metric(
1✔
241
            account_id=queue.account_id,
242
            region=queue.region,
243
            queue_name=queue.name,
244
            metric="NumberOfMessagesSent",
245
        )
246
        self.dispatch_sqs_metric(
1✔
247
            account_id=queue.account_id,
248
            region=queue.region,
249
            queue_name=queue.name,
250
            metric="SentMessageSize",
251
            value=message_body_size,
252
            unit="Bytes",
253
        )
254

255
    def dispatch_metric_message_deleted(self, queue: SqsQueue, deleted: int = 1):
1✔
256
        """
257
        Sends metric 'NumberOfMessagesDeleted' to Cloudwatch
258
        :param queue The Queue for which the metric will be sent
259
        :param deleted The number of messages that were successfully deleted, default: 1
260
        """
261
        self.dispatch_sqs_metric(
1✔
262
            account_id=queue.account_id,
263
            region=queue.region,
264
            queue_name=queue.name,
265
            metric="NumberOfMessagesDeleted",
266
            value=deleted,
267
        )
268

269
    def dispatch_metric_received(self, queue: SqsQueue, received: int):
1✔
270
        """
271
        Sends metric 'NumberOfMessagesReceived' (if received > 0), or 'NumberOfEmptyReceives' to Cloudwatch
272
        :param queue The Queue for which the metric will be send
273
        :param received The number of messages that have been received
274
        """
275
        if received > 0:
1✔
276
            self.dispatch_sqs_metric(
1✔
277
                account_id=queue.account_id,
278
                region=queue.region,
279
                queue_name=queue.name,
280
                metric="NumberOfMessagesReceived",
281
                value=received,
282
            )
283
        else:
284
            self.dispatch_sqs_metric(
1✔
285
                account_id=queue.account_id,
286
                region=queue.region,
287
                queue_name=queue.name,
288
                metric="NumberOfEmptyReceives",
289
            )
290

291

292
class CloudwatchPublishWorker:
1✔
293
    """
294
    Regularly publish metrics data about approximate messages to Cloudwatch.
295
    Includes: ApproximateNumberOfMessagesVisible, ApproximateNumberOfMessagesNotVisible
296
        and ApproximateNumberOfMessagesDelayed
297
    """
298

299
    def __init__(self) -> None:
1✔
300
        super().__init__()
1✔
301
        self.scheduler = Scheduler()
1✔
302
        self.thread: FuncThread | None = None
1✔
303

304
    def publish_approximate_cloudwatch_metrics(self):
1✔
305
        """Publishes the metrics for ApproximateNumberOfMessagesVisible, ApproximateNumberOfMessagesNotVisible
306
        and ApproximateNumberOfMessagesDelayed to CloudWatch"""
307
        # TODO ApproximateAgeOfOldestMessage is missing
308
        #  https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-available-cloudwatch-metrics.html
309

310
        for account_id, region, store in sqs_stores.iter_stores():
1✔
311
            start = 0
1✔
312
            # we can include up to 1000 metric queries for one put-metric-data call
313
            #  and we currently include 3 metrics per queue
314
            batch_size = 300
1✔
315

316
            while start < len(store.queues):
1✔
317
                batch_data = []
1✔
318
                # Process the current batch
319
                for queue in islice(store.queues.values(), start, start + batch_size):
1✔
320
                    batch_data.append(
1✔
321
                        SqsMetricBatchData(
322
                            QueueName=queue.name,
323
                            MetricName="ApproximateNumberOfMessagesVisible",
324
                            Value=queue.approximate_number_of_messages,
325
                        )
326
                    )
327
                    batch_data.append(
1✔
328
                        SqsMetricBatchData(
329
                            QueueName=queue.name,
330
                            MetricName="ApproximateNumberOfMessagesNotVisible",
331
                            Value=queue.approximate_number_of_messages_not_visible,
332
                        )
333
                    )
334
                    batch_data.append(
1✔
335
                        SqsMetricBatchData(
336
                            QueueName=queue.name,
337
                            MetricName="ApproximateNumberOfMessagesDelayed",
338
                            Value=queue.approximate_number_of_messages_delayed,
339
                        )
340
                    )
341

342
                publish_sqs_metric_batch(
1✔
343
                    account_id=account_id, region=region, sqs_metric_batch_data=batch_data
344
                )
345
                # Update for the next batch
346
                start += batch_size
1✔
347

348
    def start(self):
1✔
349
        if self.thread:
1✔
350
            return
×
351

352
        self.scheduler = Scheduler()
1✔
353
        self.scheduler.schedule(
1✔
354
            self.publish_approximate_cloudwatch_metrics,
355
            period=config.SQS_CLOUDWATCH_METRICS_REPORT_INTERVAL,
356
        )
357

358
        def _run(*_args):
1✔
359
            self.scheduler.run()
1✔
360

361
        self.thread = start_thread(_run, name="sqs-approx-metrics-cloudwatch-publisher")
1✔
362

363
    def stop(self):
1✔
364
        if self.scheduler:
1✔
365
            self.scheduler.close()
1✔
366

367
        if self.thread:
1✔
368
            self.thread.stop()
1✔
369

370
        self.thread = None
1✔
371
        self.scheduler = None
1✔
372

373

374
class QueueUpdateWorker:
1✔
375
    """
376
    Regularly re-queues inflight and delayed messages whose visibility timeout has expired or delay deadline has been
377
    reached.
378
    """
379

380
    def __init__(self) -> None:
1✔
381
        super().__init__()
1✔
382
        self.scheduler = Scheduler()
1✔
383
        self.thread: FuncThread | None = None
1✔
384
        self.mutex = threading.RLock()
1✔
385

386
    def iter_queues(self) -> Iterable[SqsQueue]:
1✔
387
        for account_id, region, store in sqs_stores.iter_stores():
1✔
388
            yield from store.queues.values()
1✔
389

390
    def do_update_all_queues(self):
1✔
391
        for queue in self.iter_queues():
1✔
392
            try:
1✔
393
                queue.requeue_inflight_messages()
1✔
394
            except Exception:
×
395
                LOG.error(
×
396
                    "error re-queueing inflight messages",
397
                    exc_info=LOG.isEnabledFor(logging.DEBUG),
398
                )
399

400
            try:
1✔
401
                queue.enqueue_delayed_messages()
1✔
402
            except Exception:
×
403
                LOG.error(
×
404
                    "error enqueueing delayed messages",
405
                    exc_info=LOG.isEnabledFor(logging.DEBUG),
406
                )
407

408
            if config.SQS_ENABLE_MESSAGE_RETENTION_PERIOD:
1✔
409
                try:
1✔
410
                    queue.remove_expired_messages()
1✔
411
                except Exception:
×
412
                    LOG.error(
×
413
                        "error removing expired messages",
414
                        exc_info=LOG.isEnabledFor(logging.DEBUG),
415
                    )
416

417
    def start(self):
1✔
418
        with self.mutex:
1✔
419
            if self.thread:
1✔
420
                return
×
421

422
            self.scheduler = Scheduler()
1✔
423
            self.scheduler.schedule(self.do_update_all_queues, period=1)
1✔
424

425
            def _run(*_args):
1✔
426
                self.scheduler.run()
1✔
427

428
            self.thread = start_thread(_run, name="sqs-queue-update-worker")
1✔
429

430
    def stop(self):
1✔
431
        with self.mutex:
1✔
432
            if self.scheduler:
1✔
433
                self.scheduler.close()
1✔
434

435
            if self.thread:
1✔
436
                self.thread.stop()
1✔
437

438
            self.thread = None
1✔
439
            self.scheduler = None
1✔
440

441

442
class MessageMoveTaskManager:
1✔
443
    """
444
    Manages and runs MessageMoveTasks.
445

446
    TODO: we should check how AWS really moves messages internally: do they use the API?
447
     it's hard to know how AWS really does moving of messages. there are a number of things we could do
448
     to understand it better, including creating a DLQ chain and letting move tasks fail to see whether
449
     move tasks cause message consuming and create receipt handles. for now, we're doing a middle-layer
450
     transactional move, foregoing the API layer but using receipt handles and transactions.
451

452
    TODO: restoring move tasks from persistence doesn't work, may be a fringe case though
453

454
    TODO: re-drive into multiple original source queues
455
    """
456

457
    def __init__(self, stores: AccountRegionBundle[SqsStore] = None) -> None:
1✔
458
        self.stores = stores or sqs_stores
1✔
459
        self.mutex = threading.RLock()
1✔
460
        self.move_tasks: dict[str, MessageMoveTask] = {}
1✔
461
        self.executor = ThreadPoolExecutor(max_workers=100, thread_name_prefix="sqs-move-message")
1✔
462

463
    def submit(self, move_task: MessageMoveTask):
1✔
464
        with self.mutex:
1✔
465
            try:
1✔
466
                source_queue = self._get_queue_by_arn(move_task.source_arn)
1✔
467
                move_task.approximate_number_of_messages_to_move = (
1✔
468
                    source_queue.approximate_number_of_messages
469
                )
470
                move_task.approximate_number_of_messages_moved = 0
1✔
471
                move_task.mark_started()
1✔
472
                self.move_tasks[move_task.task_id] = move_task
1✔
473
                self.executor.submit(self._run, move_task)
1✔
474
            except Exception as e:
×
475
                self._fail_task(move_task, e)
×
476
                raise
×
477

478
    def cancel(self, move_task: MessageMoveTask):
1✔
479
        with self.mutex:
1✔
480
            move_task.status = MessageMoveTaskStatus.CANCELLING
1✔
481
            move_task.cancel_event.set()
1✔
482

483
    def close(self):
1✔
484
        with self.mutex:
1✔
485
            for move_task in self.move_tasks.values():
1✔
486
                move_task.cancel_event.set()
1✔
487

488
            self.executor.shutdown(wait=False, cancel_futures=True)
1✔
489

490
    def _run(self, move_task: MessageMoveTask):
1✔
491
        try:
1✔
492
            if move_task.destination_arn:
1✔
493
                LOG.info(
1✔
494
                    "Move task started %s (%s -> %s)",
495
                    move_task.task_id,
496
                    move_task.source_arn,
497
                    move_task.destination_arn,
498
                )
499
            else:
500
                LOG.info(
1✔
501
                    "Move task started %s (%s -> original sources)",
502
                    move_task.task_id,
503
                    move_task.source_arn,
504
                )
505

506
            while not move_task.cancel_event.is_set():
1✔
507
                # look up queues for every message in case they are removed
508
                source_queue = self._get_queue_by_arn(move_task.source_arn)
1✔
509

510
                receive_result = source_queue.receive(num_messages=1, visibility_timeout=1)
1✔
511

512
                if receive_result.dead_letter_messages:
1✔
513
                    raise NotImplementedError("Cannot deal with DLQ chains in move tasks")
514

515
                if not receive_result.successful:
1✔
516
                    # queue empty, task done
517
                    break
1✔
518

519
                message = receive_result.successful[0]
1✔
520
                receipt_handle = receive_result.receipt_handles[0]
1✔
521

522
                if move_task.destination_arn:
1✔
523
                    target_queue = self._get_queue_by_arn(move_task.destination_arn)
1✔
524
                else:
525
                    # we assume that dead_letter_source_arn is set since the message comes from a DLQ
526
                    target_queue = self._get_queue_by_arn(message.dead_letter_queue_source_arn)
1✔
527

528
                target_queue.put(
1✔
529
                    message=message.message,
530
                    message_group_id=message.message_group_id,
531
                    message_deduplication_id=message.message_deduplication_id,
532
                )
533
                source_queue.remove(receipt_handle)
1✔
534
                move_task.approximate_number_of_messages_moved += 1
1✔
535

536
                if rate := move_task.max_number_of_messages_per_second:
1✔
537
                    move_task.cancel_event.wait(timeout=(1 / rate))
1✔
538

539
        except Exception as e:
1✔
540
            self._fail_task(move_task, e)
1✔
541
        else:
542
            if move_task.cancel_event.is_set():
1✔
543
                LOG.info("Move task cancelled %s", move_task.task_id)
1✔
544
                move_task.status = MessageMoveTaskStatus.CANCELLED
1✔
545
            else:
546
                LOG.info("Move task completed successfully %s", move_task.task_id)
1✔
547
                move_task.status = MessageMoveTaskStatus.COMPLETED
1✔
548

549
    def _get_queue_by_arn(self, queue_arn: str) -> SqsQueue:
1✔
550
        arn = parse_arn(queue_arn)
1✔
551
        return SqsProvider._require_queue(arn["account"], arn["region"], arn["resource"])
1✔
552

553
    def _fail_task(self, task: MessageMoveTask, reason: Exception):
1✔
554
        """
555
        Marks a given task as failed due to the given reason.
556

557
        :param task: the task to mark as failed
558
        :param reason: the failure reason
559
        """
560
        LOG.info(
1✔
561
            "Exception occurred during move task %s: %s",
562
            task.task_id,
563
            reason,
564
            exc_info=LOG.isEnabledFor(logging.DEBUG),
565
        )
566
        task.status = MessageMoveTaskStatus.FAILED
1✔
567
        if isinstance(reason, ServiceException):
1✔
568
            task.failure_reason = reason.code
1✔
569
        else:
570
            task.failure_reason = reason.__class__.__name__
×
571

572

573
def check_attributes(message_attributes: MessageBodyAttributeMap):
1✔
574
    if not message_attributes:
1✔
575
        return
1✔
576
    for attribute_name in message_attributes:
1✔
577
        if len(attribute_name) >= 256:
1✔
578
            raise InvalidParameterValueException(
1✔
579
                "Message (user) attribute names must be shorter than 256 Bytes"
580
            )
581
        if not re.match(sqs_constants.ATTR_NAME_CHAR_REGEX, attribute_name.lower()):
1✔
582
            raise InvalidParameterValueException(
1✔
583
                "Message (user) attributes name can only contain upper and lower score characters, digits, periods, "
584
                "hyphens and underscores. "
585
            )
586
        if not re.match(sqs_constants.ATTR_NAME_PREFIX_SUFFIX_REGEX, attribute_name.lower()):
1✔
587
            raise InvalidParameterValueException(
1✔
588
                "You can't use message attribute names beginning with 'AWS.' or 'Amazon.'. "
589
                "These strings are reserved for internal use. Additionally, they cannot start or end with '.'."
590
            )
591

592
        attribute = message_attributes[attribute_name]
1✔
593
        attribute_type = attribute.get("DataType")
1✔
594
        if not attribute_type:
1✔
595
            raise InvalidParameterValueException("Missing required parameter DataType")
×
596
        if not re.match(sqs_constants.ATTR_TYPE_REGEX, attribute_type):
1✔
597
            raise InvalidParameterValueException(
1✔
598
                f"Type for parameter MessageAttributes.Attribute_name.DataType must be prefixed"
599
                f'with "String", "Binary", or "Number", but was: {attribute_type}'
600
            )
601
        if len(attribute_type) >= 256:
1✔
602
            raise InvalidParameterValueException(
1✔
603
                "Message (user) attribute types must be shorter than 256 Bytes"
604
            )
605

606
        if attribute_type == "String":
1✔
607
            try:
1✔
608
                attribute_value = attribute.get("StringValue")
1✔
609

610
                if not attribute_value:
1✔
611
                    raise InvalidParameterValueException(
1✔
612
                        f"Message (user) attribute '{attribute_name}' must contain a non-empty value of type 'String'."
613
                    )
614

615
                check_message_content(attribute_value)
1✔
616
            except InvalidMessageContents as e:
1✔
617
                # AWS throws a different exception here
618
                raise InvalidParameterValueException(e.args[0])
1✔
619

620

621
def check_fifo_id(fifo_id: str | None, parameter: str):
1✔
622
    if fifo_id is None:
1✔
623
        return
1✔
624
    if not re.match(sqs_constants.FIFO_MSG_REGEX, fifo_id):
1✔
625
        raise InvalidParameterValueException(
1✔
626
            f"Value {fifo_id} for parameter {parameter} is invalid. "
627
            f"Reason: {parameter} can only include alphanumeric and punctuation characters. 1 to 128 in length."
628
        )
629

630

631
class SqsProvider(SqsApi, ServiceLifecycleHook):
1✔
632
    """
633
    LocalStack SQS Provider.
634

635
    LIMITATIONS:
636
        - Pagination of results (NextToken)
637
        - Delivery guarantees
638
        - The region is not encoded in the queue URL
639

640
    CROSS-ACCOUNT ACCESS:
641
    LocalStack permits cross-account access for all operations. However, AWS
642
    disallows the same for following operations:
643
        - AddPermission
644
        - CreateQueue
645
        - DeleteQueue
646
        - ListQueues
647
        - ListQueueTags
648
        - RemovePermission
649
        - SetQueueAttributes
650
        - TagQueue
651
        - UntagQueue
652
    """
653

654
    queues: dict[str, SqsQueue]
1✔
655

656
    def __init__(self) -> None:
1✔
657
        super().__init__()
1✔
658
        self._queue_update_worker = QueueUpdateWorker()
1✔
659
        self._message_move_task_manager = MessageMoveTaskManager()
1✔
660
        self._router_rules = []
1✔
661
        self._init_cloudwatch_metrics_reporting()
1✔
662

663
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
664
        visitor.visit(sqs_stores)
×
665

666
    @staticmethod
1✔
667
    def get_store(account_id: str, region: str) -> SqsStore:
1✔
668
        return sqs_stores[account_id][region]
1✔
669

670
    def on_after_init(self):
1✔
671
        # this configuration increases the processing power for Query protocol requests, which are form-encoded and by
672
        # default are limited to 500kb payload size by Werkzeug. we make sure we only *increase* the limit if it's
673
        # already set, and if it's already set to unlimited we leave it.
674
        from rolo import Request as RoloRequest
1✔
675

676
        # needed for the webserver integration (webservers create Werkzeug request objects)
677
        if WerkzeugRequest.max_form_memory_size is not None:
1✔
678
            WerkzeugRequest.max_form_memory_size = max(
1✔
679
                WerkzeugRequest.max_form_memory_size, sqs_constants.DEFAULT_MAXIMUM_MESSAGE_SIZE * 2
680
            )
681
        # needed for internal/proxy requests (which create rolo request objects)
682
        if RoloRequest.max_form_memory_size is not None:
1✔
683
            RoloRequest.max_form_memory_size = max(
1✔
684
                RoloRequest.max_form_memory_size, sqs_constants.DEFAULT_MAXIMUM_MESSAGE_SIZE * 2
685
            )
686

687
    def on_before_start(self):
1✔
688
        query_api.register(ROUTER)
1✔
689
        self._router_rules = ROUTER.add(SqsDeveloperApi())
1✔
690
        self._queue_update_worker.start()
1✔
691
        self._start_cloudwatch_metrics_reporting()
1✔
692

693
    def on_before_stop(self):
1✔
694
        ROUTER.remove(self._router_rules)
1✔
695

696
        self._queue_update_worker.stop()
1✔
697
        self._message_move_task_manager.close()
1✔
698
        for _, _, store in sqs_stores.iter_stores():
1✔
699
            for queue in store.queues.values():
1✔
700
                queue.shutdown()
1✔
701

702
        self._stop_cloudwatch_metrics_reporting()
1✔
703

704
    @staticmethod
1✔
705
    def _require_queue(
1✔
706
        account_id: str, region_name: str, name: str, is_query: bool = False
707
    ) -> SqsQueue:
708
        """
709
        Returns the queue for the given name, or raises QueueDoesNotExist if it does not exist.
710

711
        :param: context: the request context
712
        :param name: the name to look for
713
        :param is_query: whether the request is using query protocol (error message is different)
714
        :returns: the queue
715
        :raises QueueDoesNotExist: if the queue does not exist
716
        """
717
        store = SqsProvider.get_store(account_id, region_name)
1✔
718
        with _STORE_LOCK:
1✔
719
            if name not in store.queues:
1✔
720
                if is_query:
1✔
721
                    message = "The specified queue does not exist for this wsdl version."
1✔
722
                else:
723
                    message = "The specified queue does not exist."
1✔
724
                raise QueueDoesNotExist(message)
1✔
725

726
            return store.queues[name]
1✔
727

728
    def _require_queue_by_arn(self, context: RequestContext, queue_arn: str) -> SqsQueue:
1✔
729
        arn = parse_arn(queue_arn)
1✔
730
        return self._require_queue(
1✔
731
            arn["account"],
732
            arn["region"],
733
            arn["resource"],
734
            is_query=context.service.protocol == "query",
735
        )
736

737
    def _resolve_queue(
1✔
738
        self,
739
        context: RequestContext,
740
        queue_name: str | None = None,
741
        queue_url: str | None = None,
742
    ) -> SqsQueue:
743
        """
744
        Determines the name of the queue from available information (request context, queue URL) to return the respective queue,
745
        or raises QueueDoesNotExist if it does not exist.
746

747
        :param context: the request context, used for getting region and account_id, and optionally the queue_url
748
        :param queue_name: the queue name (if this is set, then this will be used for the key)
749
        :param queue_url: the queue url (if name is not set, this will be used to determine the queue name)
750
        :returns: the queue
751
        :raises QueueDoesNotExist: if the queue does not exist
752
        """
753
        account_id, region_name, name = resolve_queue_location(context, queue_name, queue_url)
1✔
754
        is_query = context.service.protocol == "query"
1✔
755
        return self._require_queue(
1✔
756
            account_id, region_name or context.region, name, is_query=is_query
757
        )
758

759
    def create_queue(
1✔
760
        self,
761
        context: RequestContext,
762
        queue_name: String,
763
        attributes: QueueAttributeMap = None,
764
        tags: TagMap = None,
765
        **kwargs,
766
    ) -> CreateQueueResult:
767
        fifo = attributes and (
1✔
768
            attributes.get(QueueAttributeName.FifoQueue, "false").lower() == "true"
769
        )
770

771
        # Special Case TODO: why is an emtpy policy passed at all? same in set_queue_attributes
772
        if attributes and attributes.get(QueueAttributeName.Policy) == "":
1✔
773
            del attributes[QueueAttributeName.Policy]
×
774

775
        store = self.get_store(context.account_id, context.region)
1✔
776

777
        with _STORE_LOCK:
1✔
778
            if queue_name in store.queues:
1✔
779
                queue = store.queues[queue_name]
1✔
780

781
                if attributes:
1✔
782
                    # if attributes are set, then we check whether the existing attributes match the passed ones
783
                    queue.validate_queue_attributes(attributes)
1✔
784
                    for k, v in attributes.items():
1✔
785
                        if queue.attributes.get(k) != v:
1✔
786
                            LOG.debug(
1✔
787
                                "queue attribute values %s for queue %s do not match %s (existing) != %s (new)",
788
                                k,
789
                                queue_name,
790
                                queue.attributes.get(k),
791
                                v,
792
                            )
793
                            raise QueueNameExists(
1✔
794
                                f"A queue already exists with the same name and a different value for attribute {k}"
795
                            )
796

797
                return CreateQueueResult(QueueUrl=queue.url(context))
1✔
798

799
            if config.SQS_DELAY_RECENTLY_DELETED:
1✔
800
                deleted = store.deleted.get(queue_name)
1✔
801
                if deleted and deleted > (time.time() - sqs_constants.RECENTLY_DELETED_TIMEOUT):
1✔
802
                    raise QueueDeletedRecently(
1✔
803
                        "You must wait 60 seconds after deleting a queue before you can create "
804
                        "another with the same name."
805
                    )
806
            store.expire_deleted()
1✔
807

808
            # create the appropriate queue
809
            if fifo:
1✔
810
                queue = FifoQueue(queue_name, context.region, context.account_id, attributes, tags)
1✔
811
            else:
812
                queue = StandardQueue(
1✔
813
                    queue_name, context.region, context.account_id, attributes, tags
814
                )
815
            if tags:
1✔
816
                self._tag_queue(context.account_id, context.region, queue.arn, tags)
1✔
817

818
            LOG.debug("creating queue key=%s attributes=%s tags=%s", queue_name, attributes, tags)
1✔
819

820
            store.queues[queue_name] = queue
1✔
821

822
        return CreateQueueResult(QueueUrl=queue.url(context))
1✔
823

824
    def get_queue_url(
1✔
825
        self,
826
        context: RequestContext,
827
        queue_name: String,
828
        queue_owner_aws_account_id: String = None,
829
        **kwargs,
830
    ) -> GetQueueUrlResult:
831
        queue = self._require_queue(
1✔
832
            queue_owner_aws_account_id or context.account_id,
833
            context.region,
834
            queue_name,
835
            is_query=context.service.protocol == "query",
836
        )
837

838
        return GetQueueUrlResult(QueueUrl=queue.url(context))
1✔
839

840
    def list_queues(
1✔
841
        self,
842
        context: RequestContext,
843
        queue_name_prefix: String = None,
844
        next_token: Token = None,
845
        max_results: BoxedInteger = None,
846
        **kwargs,
847
    ) -> ListQueuesResult:
848
        store = self.get_store(context.account_id, context.region)
1✔
849

850
        if queue_name_prefix:
1✔
851
            urls = [
1✔
852
                queue.url(context)
853
                for queue in store.queues.values()
854
                if queue.name.startswith(queue_name_prefix)
855
            ]
856
        else:
857
            urls = [queue.url(context) for queue in store.queues.values()]
1✔
858

859
        paginated_list = PaginatedList(urls)
1✔
860

861
        page_size = max_results if max_results else MAX_RESULT_LIMIT
1✔
862
        paginated_urls, next_token = paginated_list.get_page(
1✔
863
            token_generator=token_generator, next_token=next_token, page_size=page_size
864
        )
865

866
        if len(urls) == 0:
1✔
867
            return ListQueuesResult()
1✔
868

869
        return ListQueuesResult(QueueUrls=paginated_urls, NextToken=next_token)
1✔
870

871
    def change_message_visibility(
1✔
872
        self,
873
        context: RequestContext,
874
        queue_url: String,
875
        receipt_handle: String,
876
        visibility_timeout: NullableInteger,
877
        **kwargs,
878
    ) -> None:
879
        queue = self._resolve_queue(context, queue_url=queue_url)
1✔
880
        queue.update_visibility_timeout(receipt_handle, visibility_timeout)
1✔
881

882
    def change_message_visibility_batch(
1✔
883
        self,
884
        context: RequestContext,
885
        queue_url: String,
886
        entries: ChangeMessageVisibilityBatchRequestEntryList,
887
        **kwargs,
888
    ) -> ChangeMessageVisibilityBatchResult:
889
        queue = self._resolve_queue(context, queue_url=queue_url)
1✔
890

891
        self._assert_batch(entries)
1✔
892

893
        successful = []
×
894
        failed = []
×
895

896
        with queue.mutex:
×
897
            for entry in entries:
×
898
                try:
×
899
                    queue.update_visibility_timeout(
×
900
                        entry["ReceiptHandle"], entry["VisibilityTimeout"]
901
                    )
902
                    successful.append({"Id": entry["Id"]})
×
903
                except Exception as e:
×
904
                    failed.append(
×
905
                        BatchResultErrorEntry(
906
                            Id=entry["Id"],
907
                            SenderFault=False,
908
                            Code=e.__class__.__name__,
909
                            Message=str(e),
910
                        )
911
                    )
912

913
        return ChangeMessageVisibilityBatchResult(
×
914
            Successful=successful,
915
            Failed=failed,
916
        )
917

918
    def delete_queue(self, context: RequestContext, queue_url: String, **kwargs) -> None:
1✔
919
        account_id, region, name = parse_queue_url(queue_url)
1✔
920
        if region is None:
1✔
921
            region = context.region
1✔
922

923
        if account_id != context.account_id:
1✔
924
            LOG.warning(
1✔
925
                "Attempting a cross-account DeleteQueue operation (account from context: %s, account from queue url: %s, which is not allowed in AWS",
926
                account_id,
927
                context.account_id,
928
            )
929

930
        with _STORE_LOCK:
1✔
931
            store = self.get_store(account_id, region)
1✔
932
            queue = self._resolve_queue(context, queue_url=queue_url)
1✔
933
            LOG.debug(
1✔
934
                "deleting queue name=%s, region=%s, account=%s",
935
                queue.name,
936
                queue.region,
937
                queue.account_id,
938
            )
939
            # Trigger a shutdown prior to removing the queue resource
940
            store.queues[queue.name].shutdown()
1✔
941
            del store.queues[queue.name]
1✔
942
            store.deleted[queue.name] = time.time()
1✔
943
            self._remove_all_queue_tags(context.account_id, context.region, queue.arn)
1✔
944

945
    def get_queue_attributes(
1✔
946
        self,
947
        context: RequestContext,
948
        queue_url: String,
949
        attribute_names: AttributeNameList = None,
950
        **kwargs,
951
    ) -> GetQueueAttributesResult:
952
        queue = self._resolve_queue(context, queue_url=queue_url)
1✔
953
        result = queue.get_queue_attributes(attribute_names=attribute_names)
1✔
954

955
        return GetQueueAttributesResult(Attributes=(result if result else None))
1✔
956

957
    def send_message(
1✔
958
        self,
959
        context: RequestContext,
960
        queue_url: String,
961
        message_body: String,
962
        delay_seconds: NullableInteger = None,
963
        message_attributes: MessageBodyAttributeMap = None,
964
        message_system_attributes: MessageBodySystemAttributeMap = None,
965
        message_deduplication_id: String = None,
966
        message_group_id: String = None,
967
        **kwargs,
968
    ) -> SendMessageResult:
969
        queue = self._resolve_queue(context, queue_url=queue_url)
1✔
970

971
        queue_item = self._put_message(
1✔
972
            queue,
973
            context,
974
            message_body,
975
            delay_seconds,
976
            message_attributes,
977
            message_system_attributes,
978
            message_deduplication_id,
979
            message_group_id,
980
        )
981
        message = queue_item.message
1✔
982
        return SendMessageResult(
1✔
983
            MessageId=message["MessageId"],
984
            MD5OfMessageBody=message["MD5OfBody"],
985
            MD5OfMessageAttributes=message.get("MD5OfMessageAttributes"),
986
            SequenceNumber=queue_item.sequence_number,
987
            MD5OfMessageSystemAttributes=create_message_attribute_hash(message_system_attributes),
988
        )
989

990
    def send_message_batch(
1✔
991
        self,
992
        context: RequestContext,
993
        queue_url: String,
994
        entries: SendMessageBatchRequestEntryList,
995
        **kwargs,
996
    ) -> SendMessageBatchResult:
997
        queue = self._resolve_queue(context, queue_url=queue_url)
1✔
998

999
        self._assert_batch(
1✔
1000
            entries,
1001
            require_fifo_queue_params=is_fifo_queue(queue),
1002
            require_message_deduplication_id=is_message_deduplication_id_required(queue),
1003
        )
1004
        # check the total batch size first and raise BatchRequestTooLong id > DEFAULT_MAXIMUM_MESSAGE_SIZE.
1005
        # This is checked before any messages in the batch are sent.  Raising the exception here should
1006
        # cause error response, rather than batching error results and returning
1007
        self._assert_valid_batch_size(entries, sqs_constants.DEFAULT_MAXIMUM_MESSAGE_SIZE)
1✔
1008

1009
        successful = []
1✔
1010
        failed = []
1✔
1011

1012
        with queue.mutex:
1✔
1013
            for entry in entries:
1✔
1014
                try:
1✔
1015
                    queue_item = self._put_message(
1✔
1016
                        queue,
1017
                        context,
1018
                        message_body=entry.get("MessageBody"),
1019
                        delay_seconds=entry.get("DelaySeconds"),
1020
                        message_attributes=entry.get("MessageAttributes"),
1021
                        message_system_attributes=entry.get("MessageSystemAttributes"),
1022
                        message_deduplication_id=entry.get("MessageDeduplicationId"),
1023
                        message_group_id=entry.get("MessageGroupId"),
1024
                    )
1025
                    message = queue_item.message
1✔
1026

1027
                    successful.append(
1✔
1028
                        SendMessageBatchResultEntry(
1029
                            Id=entry["Id"],
1030
                            MessageId=message.get("MessageId"),
1031
                            MD5OfMessageBody=message.get("MD5OfBody"),
1032
                            MD5OfMessageAttributes=message.get("MD5OfMessageAttributes"),
1033
                            MD5OfMessageSystemAttributes=create_message_attribute_hash(
1034
                                message.get("message_system_attributes")
1035
                            ),
1036
                            SequenceNumber=queue_item.sequence_number,
1037
                        )
1038
                    )
1039
                except ServiceException as e:
1✔
1040
                    failed.append(
1✔
1041
                        BatchResultErrorEntry(
1042
                            Id=entry["Id"],
1043
                            SenderFault=e.sender_fault,
1044
                            Code=e.code,
1045
                            Message=e.message,
1046
                        )
1047
                    )
1048
                except Exception as e:
×
1049
                    failed.append(
×
1050
                        BatchResultErrorEntry(
1051
                            Id=entry["Id"],
1052
                            SenderFault=False,
1053
                            Code=e.__class__.__name__,
1054
                            Message=str(e),
1055
                        )
1056
                    )
1057

1058
        return SendMessageBatchResult(
1✔
1059
            Successful=(successful if successful else None),
1060
            Failed=(failed if failed else None),
1061
        )
1062

1063
    def _put_message(
1✔
1064
        self,
1065
        queue: SqsQueue,
1066
        context: RequestContext,
1067
        message_body: String,
1068
        delay_seconds: NullableInteger = None,
1069
        message_attributes: MessageBodyAttributeMap = None,
1070
        message_system_attributes: MessageBodySystemAttributeMap = None,
1071
        message_deduplication_id: String = None,
1072
        message_group_id: String = None,
1073
    ) -> SqsMessage:
1074
        check_message_min_size(message_body)
1✔
1075
        check_message_max_size(message_body, message_attributes, queue.maximum_message_size)
1✔
1076
        check_message_content(message_body)
1✔
1077
        check_attributes(message_attributes)
1✔
1078
        check_attributes(message_system_attributes)
1✔
1079
        check_fifo_id(message_deduplication_id, "MessageDeduplicationId")
1✔
1080
        check_fifo_id(message_group_id, "MessageGroupId")
1✔
1081

1082
        message = Message(
1✔
1083
            MessageId=generate_message_id(),
1084
            MD5OfBody=md5(message_body),
1085
            Body=message_body,
1086
            Attributes=self._create_message_attributes(context, message_system_attributes),
1087
            MD5OfMessageAttributes=create_message_attribute_hash(message_attributes),
1088
            MessageAttributes=message_attributes,
1089
        )
1090
        if self._cloudwatch_dispatcher:
1✔
1091
            self._cloudwatch_dispatcher.dispatch_metric_message_sent(
1✔
1092
                queue=queue, message_body_size=len(message_body.encode("utf-8"))
1093
            )
1094

1095
        return queue.put(
1✔
1096
            message=message,
1097
            message_deduplication_id=message_deduplication_id,
1098
            message_group_id=message_group_id,
1099
            delay_seconds=int(delay_seconds) if delay_seconds is not None else None,
1100
        )
1101

1102
    def receive_message(
1✔
1103
        self,
1104
        context: RequestContext,
1105
        queue_url: String,
1106
        attribute_names: AttributeNameList = None,
1107
        message_system_attribute_names: MessageSystemAttributeList = None,
1108
        message_attribute_names: MessageAttributeNameList = None,
1109
        max_number_of_messages: NullableInteger = None,
1110
        visibility_timeout: NullableInteger = None,
1111
        wait_time_seconds: NullableInteger = None,
1112
        receive_request_attempt_id: String = None,
1113
        **kwargs,
1114
    ) -> ReceiveMessageResult:
1115
        # TODO add support for message_system_attribute_names
1116
        queue = self._resolve_queue(context, queue_url=queue_url)
1✔
1117

1118
        poll_empty_queue = False
1✔
1119
        if override := extract_wait_time_seconds_from_headers(context):
1✔
1120
            wait_time_seconds = override
1✔
1121
            poll_empty_queue = True
1✔
1122
        elif wait_time_seconds is None:
1✔
1123
            wait_time_seconds = queue.wait_time_seconds
1✔
1124
        elif wait_time_seconds < 0 or wait_time_seconds > 20:
1✔
1125
            raise InvalidParameterValueException(
1✔
1126
                f"Value {wait_time_seconds} for parameter WaitTimeSeconds is invalid. "
1127
                f"Reason: Must be >= 0 and <= 20, if provided."
1128
            )
1129
        num = max_number_of_messages or 1
1✔
1130

1131
        # override receive count with value from custom header
1132
        if override := extract_message_count_from_headers(context):
1✔
1133
            num = override
1✔
1134
        elif num == -1:
1✔
1135
            # backdoor to get all messages
1136
            num = queue.approximate_number_of_messages
×
1137
        elif (
1✔
1138
            num < 1 or num > MAX_NUMBER_OF_MESSAGES
1139
        ) and not SQS_DISABLE_MAX_NUMBER_OF_MESSAGE_LIMIT:
1140
            raise InvalidParameterValueException(
1✔
1141
                f"Value {num} for parameter MaxNumberOfMessages is invalid. "
1142
                f"Reason: Must be between 1 and 10, if provided."
1143
            )
1144

1145
        # we chose to always return the maximum possible number of messages, even though AWS will typically return
1146
        # fewer messages than requested on small queues. at some point we could maybe change this to randomly sample
1147
        # between 1 and max_number_of_messages.
1148
        # see https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html
1149
        result = queue.receive(
1✔
1150
            num, wait_time_seconds, visibility_timeout, poll_empty_queue=poll_empty_queue
1151
        )
1152

1153
        # process dead letter messages
1154
        if result.dead_letter_messages:
1✔
1155
            dead_letter_target_arn = queue.redrive_policy["deadLetterTargetArn"]
1✔
1156
            dl_queue = self._require_queue_by_arn(context, dead_letter_target_arn)
1✔
1157
            # TODO: does this need to be atomic?
1158
            for standard_message in result.dead_letter_messages:
1✔
1159
                message = standard_message.message
1✔
1160
                message["Attributes"][MessageSystemAttributeName.DeadLetterQueueSourceArn] = (
1✔
1161
                    queue.arn
1162
                )
1163
                dl_queue.put(
1✔
1164
                    message=message,
1165
                    message_deduplication_id=standard_message.message_deduplication_id,
1166
                    message_group_id=standard_message.message_group_id,
1167
                )
1168

1169
                if isinstance(queue, FifoQueue):
1✔
1170
                    message_group = queue.get_message_group(standard_message.message_group_id)
1✔
1171
                    queue.update_message_group_visibility(message_group)
1✔
1172

1173
        # prepare result
1174
        messages = []
1✔
1175
        message_system_attribute_names = message_system_attribute_names or attribute_names
1✔
1176
        for i, standard_message in enumerate(result.successful):
1✔
1177
            message = to_sqs_api_message(
1✔
1178
                standard_message, message_system_attribute_names, message_attribute_names
1179
            )
1180
            message["ReceiptHandle"] = result.receipt_handles[i]
1✔
1181
            messages.append(message)
1✔
1182

1183
        if self._cloudwatch_dispatcher:
1✔
1184
            self._cloudwatch_dispatcher.dispatch_metric_received(queue, received=len(messages))
1✔
1185

1186
        # TODO: how does receiving behave if the queue was deleted in the meantime?
1187
        return ReceiveMessageResult(Messages=(messages if messages else None))
1✔
1188

1189
    def list_dead_letter_source_queues(
1✔
1190
        self,
1191
        context: RequestContext,
1192
        queue_url: String,
1193
        next_token: Token = None,
1194
        max_results: BoxedInteger = None,
1195
        **kwargs,
1196
    ) -> ListDeadLetterSourceQueuesResult:
1197
        urls = []
1✔
1198
        store = self.get_store(context.account_id, context.region)
1✔
1199
        dead_letter_queue = self._resolve_queue(context, queue_url=queue_url)
1✔
1200
        for queue in store.queues.values():
1✔
1201
            if policy := queue.redrive_policy:
1✔
1202
                if policy.get("deadLetterTargetArn") == dead_letter_queue.arn:
1✔
1203
                    urls.append(queue.url(context))
1✔
1204
        return ListDeadLetterSourceQueuesResult(queueUrls=urls)
1✔
1205

1206
    def delete_message(
1✔
1207
        self, context: RequestContext, queue_url: String, receipt_handle: String, **kwargs
1208
    ) -> None:
1209
        queue = self._resolve_queue(context, queue_url=queue_url)
1✔
1210
        queue.remove(receipt_handle)
1✔
1211
        if self._cloudwatch_dispatcher:
1✔
1212
            self._cloudwatch_dispatcher.dispatch_metric_message_deleted(queue)
1✔
1213

1214
    def delete_message_batch(
1✔
1215
        self,
1216
        context: RequestContext,
1217
        queue_url: String,
1218
        entries: DeleteMessageBatchRequestEntryList,
1219
        **kwargs,
1220
    ) -> DeleteMessageBatchResult:
1221
        queue = self._resolve_queue(context, queue_url=queue_url)
1✔
1222
        override = extract_message_count_from_headers(context)
1✔
1223
        self._assert_batch(entries, max_messages_override=override)
1✔
1224
        self._assert_valid_message_ids(entries)
1✔
1225

1226
        successful = []
1✔
1227
        failed = []
1✔
1228

1229
        with queue.mutex:
1✔
1230
            for entry in entries:
1✔
1231
                try:
1✔
1232
                    queue.remove(entry["ReceiptHandle"])
1✔
1233
                    successful.append(DeleteMessageBatchResultEntry(Id=entry["Id"]))
1✔
1234
                except Exception as e:
×
1235
                    failed.append(
×
1236
                        BatchResultErrorEntry(
1237
                            Id=entry["Id"],
1238
                            SenderFault=False,
1239
                            Code=e.__class__.__name__,
1240
                            Message=str(e),
1241
                        )
1242
                    )
1243
        if self._cloudwatch_dispatcher:
1✔
1244
            self._cloudwatch_dispatcher.dispatch_metric_message_deleted(
1✔
1245
                queue, deleted=len(successful)
1246
            )
1247

1248
        return DeleteMessageBatchResult(
1✔
1249
            Successful=successful,
1250
            Failed=failed,
1251
        )
1252

1253
    def purge_queue(self, context: RequestContext, queue_url: String, **kwargs) -> None:
1✔
1254
        queue = self._resolve_queue(context, queue_url=queue_url)
1✔
1255

1256
        with queue.mutex:
1✔
1257
            if config.SQS_DELAY_PURGE_RETRY:
1✔
1258
                if queue.purge_timestamp and (queue.purge_timestamp + 60) > time.time():
1✔
1259
                    raise PurgeQueueInProgress(
1✔
1260
                        f"Only one PurgeQueue operation on {queue.name} is allowed every 60 seconds.",
1261
                        status_code=403,
1262
                    )
1263
            queue.purge_timestamp = time.time()
1✔
1264
            queue.clear()
1✔
1265

1266
    def set_queue_attributes(
1✔
1267
        self, context: RequestContext, queue_url: String, attributes: QueueAttributeMap, **kwargs
1268
    ) -> None:
1269
        queue = self._resolve_queue(context, queue_url=queue_url)
1✔
1270

1271
        if not attributes:
1✔
1272
            return
×
1273

1274
        queue.validate_queue_attributes(attributes)
1✔
1275

1276
        for k, v in attributes.items():
1✔
1277
            if k in sqs_constants.INTERNAL_QUEUE_ATTRIBUTES:
1✔
1278
                raise InvalidAttributeName(f"Unknown Attribute {k}.")
×
1279
            if k in sqs_constants.DELETE_IF_DEFAULT and v == sqs_constants.DELETE_IF_DEFAULT[k]:
1✔
1280
                if k in queue.attributes:
1✔
1281
                    del queue.attributes[k]
1✔
1282
            else:
1283
                queue.attributes[k] = v
1✔
1284

1285
        # Special cases
1286
        if queue.attributes.get(QueueAttributeName.Policy) == "":
1✔
1287
            del queue.attributes[QueueAttributeName.Policy]
1✔
1288

1289
        redrive_policy = queue.attributes.get(QueueAttributeName.RedrivePolicy)
1✔
1290
        if redrive_policy == "":
1✔
1291
            del queue.attributes[QueueAttributeName.RedrivePolicy]
1✔
1292
            return
1✔
1293

1294
        if redrive_policy:
1✔
1295
            _redrive_policy = json.loads(redrive_policy)
1✔
1296
            dl_target_arn = _redrive_policy.get("deadLetterTargetArn")
1✔
1297
            max_receive_count = _redrive_policy.get("maxReceiveCount")
1✔
1298
            # TODO: use the actual AWS responses
1299
            if not dl_target_arn:
1✔
1300
                raise InvalidParameterValueException(
×
1301
                    "The required parameter 'deadLetterTargetArn' is missing"
1302
                )
1303
            if max_receive_count is None:
1✔
1304
                raise InvalidParameterValueException(
×
1305
                    "The required parameter 'maxReceiveCount' is missing"
1306
                )
1307
            try:
1✔
1308
                max_receive_count = int(max_receive_count)
1✔
1309
                valid_count = 1 <= max_receive_count <= 1000
1✔
1310
            except ValueError:
×
1311
                valid_count = False
×
1312
            if not valid_count:
1✔
1313
                raise InvalidParameterValueException(
×
1314
                    f"Value {redrive_policy} for parameter RedrivePolicy is invalid. Reason: Invalid value for "
1315
                    f"maxReceiveCount: {max_receive_count}, valid values are from 1 to 1000 both inclusive."
1316
                )
1317

1318
    def list_message_move_tasks(
1✔
1319
        self,
1320
        context: RequestContext,
1321
        source_arn: String,
1322
        max_results: NullableInteger = None,
1323
        **kwargs,
1324
    ) -> ListMessageMoveTasksResult:
1325
        try:
1✔
1326
            self._require_queue_by_arn(context, source_arn)
1✔
1327
        except InvalidArnException:
×
1328
            raise InvalidParameterValueException(
×
1329
                "You must use this format to specify the SourceArn: arn:<partition>:<service>:<region>:<account-id>:<resource-id>"
1330
            )
1331
        except QueueDoesNotExist:
×
1332
            raise ResourceNotFoundException(
×
1333
                "The resource that you specified for the SourceArn parameter doesn't exist."
1334
            )
1335

1336
        # get move tasks for queue and sort them by most-recent
1337
        store = self.get_store(context.account_id, context.region)
1✔
1338
        tasks = [
1✔
1339
            move_task
1340
            for move_task in store.move_tasks.values()
1341
            if move_task.source_arn == source_arn
1342
            and move_task.status != MessageMoveTaskStatus.CREATED
1343
        ]
1344
        tasks.sort(key=lambda t: t.started_timestamp, reverse=True)
1✔
1345

1346
        # convert to result list
1347
        n = max_results or 1
1✔
1348
        return ListMessageMoveTasksResult(
1✔
1349
            Results=[self._to_message_move_task_entry(task) for task in tasks[:n]]
1350
        )
1351

1352
    def _to_message_move_task_entry(
1✔
1353
        self, entity: MessageMoveTask
1354
    ) -> ListMessageMoveTasksResultEntry:
1355
        """
1356
        Converts a ``MoveMessageTask`` entity into a ``ListMessageMoveTasksResultEntry`` API concept.
1357

1358
        :param entity: the entity to convert
1359
        :return: the typed dict for use in the AWS API
1360
        """
1361
        entry = ListMessageMoveTasksResultEntry(
1✔
1362
            SourceArn=entity.source_arn,
1363
            DestinationArn=entity.destination_arn,
1364
            Status=entity.status,
1365
        )
1366

1367
        if entity.status == "RUNNING":
1✔
1368
            entry["TaskHandle"] = entity.task_handle
1✔
1369
        if entity.started_timestamp is not None:
1✔
1370
            entry["StartedTimestamp"] = int(entity.started_timestamp.timestamp() * 1000)
1✔
1371
        if entity.max_number_of_messages_per_second is not None:
1✔
1372
            entry["MaxNumberOfMessagesPerSecond"] = entity.max_number_of_messages_per_second
1✔
1373
        if entity.approximate_number_of_messages_to_move is not None:
1✔
1374
            entry["ApproximateNumberOfMessagesToMove"] = (
1✔
1375
                entity.approximate_number_of_messages_to_move
1376
            )
1377
        if entity.approximate_number_of_messages_moved is not None:
1✔
1378
            entry["ApproximateNumberOfMessagesMoved"] = entity.approximate_number_of_messages_moved
1✔
1379
        if entity.failure_reason is not None:
1✔
1380
            entry["FailureReason"] = entity.failure_reason
1✔
1381

1382
        return entry
1✔
1383

1384
    def start_message_move_task(
1✔
1385
        self,
1386
        context: RequestContext,
1387
        source_arn: String,
1388
        destination_arn: String = None,
1389
        max_number_of_messages_per_second: NullableInteger = None,
1390
        **kwargs,
1391
    ) -> StartMessageMoveTaskResult:
1392
        try:
1✔
1393
            self._require_queue_by_arn(context, source_arn)
1✔
1394
        except QueueDoesNotExist as e:
×
1395
            raise ResourceNotFoundException(
×
1396
                "The resource that you specified for the SourceArn parameter doesn't exist.",
1397
                status_code=404,
1398
            ) from e
1399

1400
        # check that the source queue is the dlq of some other queue
1401
        is_dlq = False
1✔
1402
        for _, _, store in sqs_stores.iter_stores():
1✔
1403
            for queue in store.queues.values():
1✔
1404
                if not queue.redrive_policy:
1✔
1405
                    continue
1✔
1406
                if queue.redrive_policy.get("deadLetterTargetArn") == source_arn:
1✔
1407
                    is_dlq = True
1✔
1408
                    break
1✔
1409
            if is_dlq:
1✔
1410
                break
1✔
1411
        if not is_dlq:
1✔
1412
            raise InvalidParameterValueException(
1✔
1413
                "Source queue must be configured as a Dead Letter Queue."
1414
            )
1415

1416
        # If destination_arn is left blank, the messages will be redriven back to their respective original
1417
        # source queues.
1418
        if destination_arn:
1✔
1419
            try:
1✔
1420
                self._require_queue_by_arn(context, destination_arn)
1✔
1421
            except QueueDoesNotExist as e:
1✔
1422
                raise ResourceNotFoundException(
1✔
1423
                    "The resource that you specified for the DestinationArn parameter doesn't exist.",
1424
                    status_code=404,
1425
                ) from e
1426

1427
        # check that only one active task exists
1428
        with self._message_move_task_manager.mutex:
1✔
1429
            store = self.get_store(context.account_id, context.region)
1✔
1430
            tasks = [
1✔
1431
                task
1432
                for task in store.move_tasks.values()
1433
                if task.source_arn == source_arn
1434
                and task.status
1435
                in [
1436
                    MessageMoveTaskStatus.CREATED,
1437
                    MessageMoveTaskStatus.RUNNING,
1438
                    MessageMoveTaskStatus.CANCELLING,
1439
                ]
1440
            ]
1441
            if len(tasks) > 0:
1✔
1442
                raise InvalidParameterValueException(
1✔
1443
                    "There is already a task running. Only one active task is allowed for a source queue "
1444
                    "arn at a given time."
1445
                )
1446

1447
            task = MessageMoveTask(
1✔
1448
                source_arn,
1449
                destination_arn,
1450
                max_number_of_messages_per_second,
1451
            )
1452
            store.move_tasks[task.task_id] = task
1✔
1453

1454
        self._message_move_task_manager.submit(task)
1✔
1455

1456
        return StartMessageMoveTaskResult(TaskHandle=task.task_handle)
1✔
1457

1458
    def cancel_message_move_task(
1✔
1459
        self, context: RequestContext, task_handle: String, **kwargs
1460
    ) -> CancelMessageMoveTaskResult:
1461
        try:
1✔
1462
            task_id, source_arn = decode_move_task_handle(task_handle)
1✔
1463
        except ValueError as e:
1✔
1464
            raise InvalidParameterValueException(
1✔
1465
                "Value for parameter TaskHandle is invalid."
1466
            ) from e
1467

1468
        try:
1✔
1469
            self._require_queue_by_arn(context, source_arn)
1✔
1470
        except QueueDoesNotExist as e:
1✔
1471
            raise ResourceNotFoundException(
1✔
1472
                "The resource that you specified for the SourceArn parameter doesn't exist.",
1473
                status_code=404,
1474
            ) from e
1475

1476
        store = self.get_store(context.account_id, context.region)
1✔
1477
        try:
1✔
1478
            move_task = store.move_tasks[task_id]
1✔
1479
        except KeyError:
1✔
1480
            raise ResourceNotFoundException("Task does not exist.", status_code=404)
1✔
1481

1482
        # TODO: what happens if move tasks are already cancelled?
1483

1484
        self._message_move_task_manager.cancel(move_task)
1✔
1485

1486
        return CancelMessageMoveTaskResult(
1✔
1487
            ApproximateNumberOfMessagesMoved=move_task.approximate_number_of_messages_moved,
1488
        )
1489

1490
    def tag_queue(self, context: RequestContext, queue_url: String, tags: TagMap, **kwargs) -> None:
1✔
1491
        if not tags:
1✔
1492
            return
1✔
1493

1494
        queue = self._resolve_queue(context, queue_url=queue_url)
1✔
1495
        self._tag_queue(context.account_id, context.region, queue.arn, tags)
1✔
1496

1497
    def list_queue_tags(
1✔
1498
        self, context: RequestContext, queue_url: String, **kwargs
1499
    ) -> ListQueueTagsResult:
1500
        queue = self._resolve_queue(context, queue_url=queue_url)
1✔
1501
        tags = self._get_queue_tags(context.account_id, context.region, queue.arn)
1✔
1502
        return ListQueueTagsResult(Tags=tags if tags else None)
1✔
1503

1504
    def untag_queue(
1✔
1505
        self, context: RequestContext, queue_url: String, tag_keys: TagKeyList, **kwargs
1506
    ) -> None:
1507
        queue = self._resolve_queue(context, queue_url=queue_url)
1✔
1508
        self._untag_queue(context.account_id, context.region, queue.arn, tag_keys)
1✔
1509

1510
    def add_permission(
1✔
1511
        self,
1512
        context: RequestContext,
1513
        queue_url: String,
1514
        label: String,
1515
        aws_account_ids: AWSAccountIdList,
1516
        actions: ActionNameList,
1517
        **kwargs,
1518
    ) -> None:
1519
        queue = self._resolve_queue(context, queue_url=queue_url)
1✔
1520

1521
        self._validate_actions(actions)
1✔
1522

1523
        queue.add_permission(label=label, actions=actions, account_ids=aws_account_ids)
1✔
1524

1525
    def remove_permission(
1✔
1526
        self, context: RequestContext, queue_url: String, label: String, **kwargs
1527
    ) -> None:
1528
        queue = self._resolve_queue(context, queue_url=queue_url)
1✔
1529

1530
        queue.remove_permission(label=label)
1✔
1531

1532
    def _create_message_attributes(
1✔
1533
        self,
1534
        context: RequestContext,
1535
        message_system_attributes: MessageBodySystemAttributeMap = None,
1536
    ) -> dict[MessageSystemAttributeName, str]:
1537
        result: dict[MessageSystemAttributeName, str] = {
1✔
1538
            MessageSystemAttributeName.SenderId: context.account_id,  # not the account ID in AWS
1539
            MessageSystemAttributeName.SentTimestamp: str(now(millis=True)),
1540
        }
1541
        # we are not using the `context.trace_context` here as it is automatically populated
1542
        # AWS only adds the `AWSTraceHeader` attribute if the header is explicitly present
1543
        # TODO: check maybe with X-Ray Active mode?
1544
        if "X-Amzn-Trace-Id" in context.request.headers:
1✔
1545
            result[MessageSystemAttributeName.AWSTraceHeader] = str(
1✔
1546
                context.request.headers["X-Amzn-Trace-Id"]
1547
            )
1548

1549
        if message_system_attributes is not None:
1✔
1550
            for attr in message_system_attributes:
1✔
1551
                result[attr] = message_system_attributes[attr]["StringValue"]
1✔
1552

1553
        return result
1✔
1554

1555
    def _validate_actions(self, actions: ActionNameList):
1✔
1556
        service = load_service(service=self.service, version=self.version)
1✔
1557
        # FIXME: this is a bit of a heuristic as it will also include actions like "ListQueues" which is not
1558
        #  associated with an action on a queue
1559
        valid = list(service.operation_names)
1✔
1560
        valid.append("*")
1✔
1561

1562
        for action in actions:
1✔
1563
            if action not in valid:
1✔
1564
                raise InvalidParameterValueException(
×
1565
                    f"Value SQS:{action} for parameter ActionName is invalid. Reason: Please refer to the appropriate "
1566
                    "WSDL for a list of valid actions. "
1567
                )
1568

1569
    def _assert_batch(
1✔
1570
        self,
1571
        batch: list,
1572
        *,
1573
        require_fifo_queue_params: bool = False,
1574
        require_message_deduplication_id: bool = False,
1575
        max_messages_override: int | None = None,
1576
    ) -> None:
1577
        if not batch:
1✔
1578
            raise EmptyBatchRequest
1✔
1579

1580
        max_messages_per_batch = max_messages_override or MAX_NUMBER_OF_MESSAGES
1✔
1581
        if batch and (no_entries := len(batch)) > max_messages_per_batch:
1✔
1582
            raise TooManyEntriesInBatchRequest(
1✔
1583
                f"Maximum number of entries per request are {max_messages_per_batch}. You have sent {no_entries}."
1584
            )
1585
        visited = set()
1✔
1586
        for entry in batch:
1✔
1587
            entry_id = entry["Id"]
1✔
1588
            if not re.search(r"^[\w-]+$", entry_id) or len(entry_id) > 80:
1✔
1589
                raise InvalidBatchEntryId(
1✔
1590
                    "A batch entry id can only contain alphanumeric characters, hyphens and underscores. "
1591
                    "It can be at most 80 letters long."
1592
                )
1593
            if require_message_deduplication_id and not entry.get("MessageDeduplicationId"):
1✔
1594
                raise InvalidParameterValueException(
1✔
1595
                    "The queue should either have ContentBasedDeduplication enabled or "
1596
                    "MessageDeduplicationId provided explicitly"
1597
                )
1598
            if require_fifo_queue_params and not entry.get("MessageGroupId"):
1✔
1599
                raise InvalidParameterValueException(
1✔
1600
                    "The request must contain the parameter MessageGroupId."
1601
                )
1602
            if entry_id in visited:
1✔
1603
                raise BatchEntryIdsNotDistinct()
×
1604
            else:
1605
                visited.add(entry_id)
1✔
1606

1607
    def _assert_valid_batch_size(self, batch: list, max_message_size: int):
1✔
1608
        batch_message_size = sum(
1✔
1609
            _message_body_size(entry.get("MessageBody"))
1610
            + _message_attributes_size(entry.get("MessageAttributes"))
1611
            for entry in batch
1612
        )
1613
        if batch_message_size > max_message_size:
1✔
1614
            error = f"Batch requests cannot be longer than {max_message_size} bytes."
1✔
1615
            error += f" You have sent {batch_message_size} bytes."
1✔
1616
            raise BatchRequestTooLong(error)
1✔
1617

1618
    def _assert_valid_message_ids(self, batch: list):
1✔
1619
        batch_id_regex = r"^[\w-]{1,80}$"
1✔
1620
        for message in batch:
1✔
1621
            if not re.match(batch_id_regex, message.get("Id", "")):
1✔
1622
                raise InvalidBatchEntryId(
×
1623
                    "A batch entry id can only contain alphanumeric characters, "
1624
                    "hyphens and underscores. It can be at most 80 letters long."
1625
                )
1626

1627
    def _init_cloudwatch_metrics_reporting(self):
1✔
1628
        self.cloudwatch_disabled: bool = (
1✔
1629
            config.SQS_DISABLE_CLOUDWATCH_METRICS or not is_api_enabled("cloudwatch")
1630
        )
1631

1632
        self._cloudwatch_publish_worker = (
1✔
1633
            None if self.cloudwatch_disabled else CloudwatchPublishWorker()
1634
        )
1635
        self._cloudwatch_dispatcher = None if self.cloudwatch_disabled else CloudwatchDispatcher()
1✔
1636

1637
    def _start_cloudwatch_metrics_reporting(self):
1✔
1638
        if not self.cloudwatch_disabled:
1✔
1639
            self._cloudwatch_publish_worker.start()
1✔
1640

1641
    def _stop_cloudwatch_metrics_reporting(self):
1✔
1642
        if not self.cloudwatch_disabled:
1✔
1643
            self._cloudwatch_publish_worker.stop()
1✔
1644
            self._cloudwatch_dispatcher.shutdown()
1✔
1645

1646
    def _get_queue_tags(self, account_id: str, region: str, resource_arn: str) -> TagMap:
1✔
1647
        store = self.get_store(account_id, region)
1✔
1648
        return store.tags.get_tags(resource_arn)
1✔
1649

1650
    def _tag_queue(self, account_id: str, region: str, resource_arn: str, tags: TagMap) -> None:
1✔
1651
        store = self.get_store(account_id, region)
1✔
1652
        store.tags.update_tags(resource_arn, tags)
1✔
1653

1654
    def _untag_queue(
1✔
1655
        self, account_id: str, region: str, resource_arn: str, tag_keys: TagKeyList
1656
    ) -> None:
1657
        store = self.get_store(account_id, region)
1✔
1658
        store.tags.delete_tags(resource_arn, tag_keys)
1✔
1659

1660
    def _remove_all_queue_tags(self, account_id: str, region: str, resource_arn: str) -> None:
1✔
1661
        store = self.get_store(account_id, region)
1✔
1662
        store.tags.delete_all_tags(resource_arn)
1✔
1663

1664

1665
def resolve_queue_location(
1✔
1666
    context: RequestContext, queue_name: str | None = None, queue_url: str | None = None
1667
) -> tuple[str, str | None, str]:
1668
    """
1669
    Resolves a queue location from the given information.
1670

1671
    :param context: the request context, used for getting region and account_id, and optionally the queue_url
1672
    :param queue_name: the queue name (if this is set, then this will be used for the key)
1673
    :param queue_url: the queue url (if name is not set, this will be used to determine the queue name)
1674
    :return: tuple of account id, region and queue_name
1675
    """
1676
    if not queue_name:
1✔
1677
        try:
1✔
1678
            if queue_url:
1✔
1679
                return parse_queue_url(queue_url)
1✔
1680
            else:
1681
                return parse_queue_url(context.request.base_url)
×
1682
        except ValueError:
1✔
1683
            # should work if queue name is passed in QueueUrl
1684
            return context.account_id, context.region, queue_url
1✔
1685

1686
    return context.account_id, context.region, queue_name
×
1687

1688

1689
def extract_message_count_from_headers(context: RequestContext) -> int | None:
1✔
1690
    if override := context.request.headers.get(
1✔
1691
        HEADER_LOCALSTACK_SQS_OVERRIDE_MESSAGE_COUNT, default=None, type=int
1692
    ):
1693
        return override
1✔
1694

1695
    return None
1✔
1696

1697

1698
def extract_wait_time_seconds_from_headers(context: RequestContext) -> int | None:
1✔
1699
    if override := context.request.headers.get(
1✔
1700
        HEADER_LOCALSTACK_SQS_OVERRIDE_WAIT_TIME_SECONDS, default=None, type=int
1701
    ):
1702
        return override
1✔
1703

1704
    return None
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc