• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 6523d139-4c8d-4daf-a514-97baaa202bd9

04 Jun 2025 04:30PM UTC coverage: 86.762% (-0.006%) from 86.768%
6523d139-4c8d-4daf-a514-97baaa202bd9

push

circleci

web-flow
test(esm/sqs): Skip flaky test_report_batch_item_failures test (#12713)

65076 of 75005 relevant lines covered (86.76%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.1
/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py
1
import functools
1✔
2
import json
1✔
3
import logging
1✔
4
from collections import defaultdict
1✔
5
from functools import cached_property
1✔
6

7
from botocore.client import BaseClient
1✔
8

9
from localstack.aws.api.pipes import PipeSourceSqsQueueParameters
1✔
10
from localstack.aws.api.sqs import MessageSystemAttributeName
1✔
11
from localstack.aws.connect import connect_to
1✔
12
from localstack.services.lambda_.event_source_mapping.event_processor import (
1✔
13
    EventProcessor,
14
    PartialBatchFailureError,
15
)
16
from localstack.services.lambda_.event_source_mapping.pollers.poller import (
1✔
17
    EmptyPollResultsException,
18
    Poller,
19
    parse_batch_item_failures,
20
)
21
from localstack.services.lambda_.event_source_mapping.senders.sender_utils import (
1✔
22
    batched,
23
)
24
from localstack.services.sqs.constants import (
1✔
25
    HEADER_LOCALSTACK_SQS_OVERRIDE_MESSAGE_COUNT,
26
    HEADER_LOCALSTACK_SQS_OVERRIDE_WAIT_TIME_SECONDS,
27
)
28
from localstack.utils.aws.arns import parse_arn
1✔
29
from localstack.utils.strings import first_char_to_lower
1✔
30

31
LOG = logging.getLogger(__name__)
1✔
32

33
DEFAULT_MAX_RECEIVE_COUNT = 10
1✔
34
# See https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html
35
DEFAULT_MAX_WAIT_TIME_SECONDS = 20
1✔
36

37

38
class SqsPoller(Poller):
1✔
39
    queue_url: str
1✔
40

41
    batch_size: int
1✔
42
    maximum_batching_window: int
1✔
43

44
    def __init__(
1✔
45
        self,
46
        source_arn: str,
47
        source_parameters: dict | None = None,
48
        source_client: BaseClient | None = None,
49
        processor: EventProcessor | None = None,
50
    ):
51
        super().__init__(source_arn, source_parameters, source_client, processor)
1✔
52
        self.queue_url = get_queue_url(self.source_arn)
1✔
53

54
        self.batch_size = self.sqs_queue_parameters.get("BatchSize", DEFAULT_MAX_RECEIVE_COUNT)
1✔
55
        # HACK: When the MaximumBatchingWindowInSeconds is not set, just default to short-polling.
56
        # While set in ESM (via the config factory) setting this param as a default in Pipes causes
57
        # parity issues with a retrieved config since no default value is returned.
58
        self.maximum_batching_window = self.sqs_queue_parameters.get(
1✔
59
            "MaximumBatchingWindowInSeconds", 0
60
        )
61

62
        self._register_client_hooks()
1✔
63

64
    @property
1✔
65
    def sqs_queue_parameters(self) -> PipeSourceSqsQueueParameters:
1✔
66
        # TODO: De-couple Poller configuration params from ESM/Pipes specific config (i.e PipeSourceSqsQueueParameters)
67
        return self.source_parameters["SqsQueueParameters"]
1✔
68

69
    @cached_property
1✔
70
    def is_fifo_queue(self) -> bool:
1✔
71
        # Alternative heuristic: self.queue_url.endswith(".fifo"), but we need the call to get_queue_attributes for IAM
72
        return self.get_queue_attributes().get("FifoQueue", "false").lower() == "true"
1✔
73

74
    def _register_client_hooks(self):
1✔
75
        event_system = self.source_client.meta.events
1✔
76

77
        def handle_message_count_override(params, context, **kwargs):
1✔
78
            requested_count = params.pop("sqs_override_max_message_count", None)
1✔
79
            if not requested_count or requested_count <= DEFAULT_MAX_RECEIVE_COUNT:
1✔
80
                return
1✔
81

82
            context[HEADER_LOCALSTACK_SQS_OVERRIDE_MESSAGE_COUNT] = str(requested_count)
1✔
83

84
        def handle_message_wait_time_seconds_override(params, context, **kwargs):
1✔
85
            requested_wait = params.pop("sqs_override_wait_time_seconds", None)
1✔
86
            if not requested_wait or requested_wait <= DEFAULT_MAX_WAIT_TIME_SECONDS:
1✔
87
                return
1✔
88

89
            context[HEADER_LOCALSTACK_SQS_OVERRIDE_WAIT_TIME_SECONDS] = str(requested_wait)
1✔
90

91
        def handle_inject_headers(params, context, **kwargs):
1✔
92
            if override_message_count := context.pop(
1✔
93
                HEADER_LOCALSTACK_SQS_OVERRIDE_MESSAGE_COUNT, None
94
            ):
95
                params["headers"][HEADER_LOCALSTACK_SQS_OVERRIDE_MESSAGE_COUNT] = (
1✔
96
                    override_message_count
97
                )
98

99
            if override_wait_time := context.pop(
1✔
100
                HEADER_LOCALSTACK_SQS_OVERRIDE_WAIT_TIME_SECONDS, None
101
            ):
102
                params["headers"][HEADER_LOCALSTACK_SQS_OVERRIDE_WAIT_TIME_SECONDS] = (
1✔
103
                    override_wait_time
104
                )
105

106
        event_system.register(
1✔
107
            "provide-client-params.sqs.ReceiveMessage", handle_message_count_override
108
        )
109
        event_system.register(
1✔
110
            "provide-client-params.sqs.ReceiveMessage", handle_message_wait_time_seconds_override
111
        )
112
        # Since we delete SQS messages after processing, this allows us to remove up to 10K entries at a time.
113
        event_system.register(
1✔
114
            "provide-client-params.sqs.DeleteMessageBatch", handle_message_count_override
115
        )
116

117
        event_system.register("before-call.sqs.ReceiveMessage", handle_inject_headers)
1✔
118
        event_system.register("before-call.sqs.DeleteMessageBatch", handle_inject_headers)
1✔
119

120
    def get_queue_attributes(self) -> dict:
1✔
121
        """The API call to sqs:GetQueueAttributes is required for IAM policy streamsing."""
122
        get_queue_attributes_response = self.source_client.get_queue_attributes(
1✔
123
            QueueUrl=self.queue_url,
124
            AttributeNames=["FifoQueue"],
125
        )
126
        return get_queue_attributes_response.get("Attributes", {})
1✔
127

128
    def event_source(self) -> str:
1✔
129
        return "aws:sqs"
1✔
130

131
    def poll_events(self) -> None:
1✔
132
        # In order to improve performance, we've adopted long-polling for the SQS poll operation `ReceiveMessage` [1].
133
        # * Our LS-internal optimizations leverage custom boto-headers to set larger batch sizes and longer wait times than what the AWS API allows [2].
134
        # * Higher batch collection durations and no. of records retrieved per request mean fewer calls to the LocalStack gateway [3] when polling an event-source [4].
135
        # * LocalStack shutdown works because the LocalStack gateway shuts down and terminates the open connection.
136
        # * Provider lifecycle hooks have been added to ensure blocking long-poll calls are gracefully interrupted and returned.
137
        #
138
        # Pros (+) / Cons (-):
139
        # + Alleviates pressure on the gateway since each `ReceiveMessage` call only returns once we reach the desired `BatchSize` or the `WaitTimeSeconds` elapses.
140
        # + Matches the AWS behavior also using long-polling
141
        # - Blocks a LocalStack gateway thread (default 1k) for every open connection, which could lead to resource contention if used at scale.
142
        #
143
        # Refs / Notes:
144
        # [1] Amazon SQS short and long polling: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html
145
        # [2] PR (2025-02): https://github.com/localstack/localstack/pull/12002
146
        # [3] Note: Under high volumes of requests, the LocalStack gateway becomes a major performance bottleneck.
147
        # [4] ESM blog mentioning long-polling: https://aws.amazon.com/de/blogs/aws/aws-lambda-adds-amazon-simple-queue-service-to-supported-event-sources/
148

149
        # TODO: Handle exceptions differently i.e QueueNotExist or ConnectionFailed should retry with backoff
150
        response = self.source_client.receive_message(
1✔
151
            QueueUrl=self.queue_url,
152
            MaxNumberOfMessages=min(self.batch_size, DEFAULT_MAX_RECEIVE_COUNT),
153
            WaitTimeSeconds=min(self.maximum_batching_window, DEFAULT_MAX_WAIT_TIME_SECONDS),
154
            MessageAttributeNames=["All"],
155
            MessageSystemAttributeNames=[MessageSystemAttributeName.All],
156
            # Override how many messages we can receive per call
157
            sqs_override_max_message_count=self.batch_size,
158
            # Override how long to wait until batching conditions are met
159
            sqs_override_wait_time_seconds=self.maximum_batching_window,
160
        )
161

162
        messages = response.get("Messages", [])
1✔
163
        if not messages:
1✔
164
            raise EmptyPollResultsException(service="sqs", source_arn=self.source_arn)
1✔
165

166
        LOG.debug("Polled %d events from %s", len(messages), self.source_arn)
1✔
167
        # TODO: implement invocation payload size quota
168
        # NOTE: Split up a batch into mini-batches of up to 2.5K records each. This is to prevent exceeding the 6MB size-limit
169
        # imposed on payloads sent to a Lambda as well as LocalStack Lambdas failing to handle large payloads efficiently.
170
        # See https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html#invocation-eventsourcemapping-batching
171
        for message_batch in batched(messages, 2500):
1✔
172
            if len(message_batch) < len(messages):
1✔
173
                LOG.debug(
×
174
                    "Splitting events from %s into mini-batch (%d/%d)",
175
                    self.source_arn,
176
                    len(message_batch),
177
                    len(messages),
178
                )
179
            try:
1✔
180
                if self.is_fifo_queue:
1✔
181
                    # TODO: think about starvation behavior because once failing message could block other groups
182
                    fifo_groups = split_by_message_group_id(message_batch)
1✔
183
                    for fifo_group_messages in fifo_groups.values():
1✔
184
                        self.handle_messages(fifo_group_messages)
1✔
185
                else:
186
                    self.handle_messages(message_batch)
1✔
187

188
            # TODO: unify exception handling across pollers: should we catch and raise?
189
            except Exception as e:
1✔
190
                # TODO: improve error messages (produce same failure and design better error messages)
191
                LOG.warning(
1✔
192
                    "Polling or batch processing failed: %s",
193
                    e,
194
                    exc_info=LOG.isEnabledFor(logging.DEBUG),
195
                )
196

197
    def handle_messages(self, messages):
1✔
198
        polled_events = transform_into_events(messages)
1✔
199
        # Filtering: matching vs. discarded (i.e., not matching filter criteria)
200
        # TODO: implement format detection behavior (e.g., for JSON body):
201
        #  https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-filtering.html#pipes-filter-sqs
202
        #  Check whether we need poller-specific filter-preprocessing here without modifying the actual event!
203
        # convert to json for filtering (HACK for fixing parity with v1 and getting regression tests passing)
204
        for event in polled_events:
1✔
205
            try:
1✔
206
                event["body"] = json.loads(event["body"])
1✔
207
            except json.JSONDecodeError:
1✔
208
                LOG.debug(
1✔
209
                    "Unable to convert event body '%s' to json... Event might be dropped.",
210
                    event["body"],
211
                )
212
        matching_events = self.filter_events(polled_events)
1✔
213
        # convert them back (HACK for fixing parity with v1 and getting regression tests passing)
214
        for event in matching_events:
1✔
215
            event["body"] = (
1✔
216
                json.dumps(event["body"]) if not isinstance(event["body"], str) else event["body"]
217
            )
218

219
        all_message_ids = {message["MessageId"] for message in messages}
1✔
220
        matching_message_ids = {event["messageId"] for event in matching_events}
1✔
221
        discarded_message_ids = all_message_ids.difference(matching_message_ids)
1✔
222
        # Delete discarded events immediately:
223
        # https://lucvandonkersgoed.com/2022/01/20/the-9-ways-an-sqs-message-can-be-deleted/#7-event-source-mappings-with-filters
224
        self.delete_messages(messages, discarded_message_ids)
1✔
225

226
        # Don't trigger upon empty events
227
        if len(matching_events) == 0:
1✔
228
            return
×
229
        # Enrich events with metadata after filtering
230
        enriched_events = self.add_source_metadata(matching_events)
1✔
231

232
        # Invoke the processor (e.g., Pipe, ESM) and handle partial batch failures
233
        try:
1✔
234
            self.processor.process_events_batch(enriched_events)
1✔
235
            successful_message_ids = all_message_ids
1✔
236
        except PartialBatchFailureError as e:
1✔
237
            failed_message_ids = parse_batch_item_failures(
1✔
238
                e.partial_failure_payload, matching_message_ids
239
            )
240
            successful_message_ids = matching_message_ids.difference(failed_message_ids)
×
241

242
        # Only delete messages that are processed successfully as described here:
243
        # https://docs.aws.amazon.com/en_gb/lambda/latest/dg/with-sqs.html
244
        # When Lambda reads a batch, the messages stay in the queue but are hidden for the length of the queue's
245
        # visibility timeout. If your function successfully processes the batch, Lambda deletes the messages
246
        # from the queue. By default, if your function encounters an error while processing a batch,
247
        # all messages in that batch become visible in the queue again. For this reason, your function code must
248
        # be able to process the same message multiple times without unintended side effects.
249
        # Troubleshooting: https://repost.aws/knowledge-center/lambda-sqs-report-batch-item-failures
250
        # For FIFO queues, AWS also deletes successfully sent messages. Therefore, the AWS docs recommends:
251
        # "If you're using this feature with a FIFO queue, your function should stop processing messages after the first
252
        # failure and return all failed and unprocessed messages in batchItemFailures. This helps preserve the ordering
253
        # of messages in your queue."
254
        # Following this recommendation could result in the unsolved side effect that valid messages are continuously
255
        # placed in the same batch as failing messages:
256
        # * https://stackoverflow.com/questions/78694079/how-to-stop-fifo-sqs-messages-from-being-placed-in-a-batch-with-failing-messages
257
        # * https://stackoverflow.com/questions/76912394/can-i-report-only-messages-from-failing-group-id-in-reportbatchitemfailures-resp
258

259
        # TODO: Test blocking failure behavior for FIFO queues to guarantee strict ordering
260
        #  -> might require some checkpointing or retry control on the poller side?!
261
        # The poller should only proceed processing FIFO queues after having retried failing messages:
262
        # "If your pipe returns an error, the pipe attempts all retries on the affected messages before EventBridge
263
        # receives additional messages from the same group."
264
        # https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-sqs.html
265
        self.delete_messages(messages, successful_message_ids)
1✔
266

267
    def delete_messages(self, messages: list[dict], message_ids_to_delete: set):
1✔
268
        """Delete SQS `messages` from the source queue that match a MessageId within `message_ids_to_delete`"""
269
        # TODO: unclear how (partial) failures for deleting are handled, retry or fail batch? Hard to test against AWS
270
        if len(message_ids_to_delete) > 0:
1✔
271
            entries = [
1✔
272
                {"Id": str(count), "ReceiptHandle": message["ReceiptHandle"]}
273
                for count, message in enumerate(messages)
274
                if message["MessageId"] in message_ids_to_delete
275
            ]
276

277
            self.source_client.delete_message_batch(
1✔
278
                QueueUrl=self.queue_url,
279
                Entries=entries,
280
                # Override how many messages can be deleted at once
281
                sqs_override_max_message_count=self.batch_size,
282
            )
283

284

285
def split_by_message_group_id(messages) -> defaultdict[str, list[dict]]:
1✔
286
    """Splitting SQS messages by MessageGroupId to ensure strict ordering for FIFO queues"""
287
    fifo_groups = defaultdict(list)
1✔
288
    for message in messages:
1✔
289
        message_group_id = message["Attributes"]["MessageGroupId"]
1✔
290
        fifo_groups[message_group_id].append(message)
1✔
291
    return fifo_groups
1✔
292

293

294
def transform_into_events(messages: list[dict]) -> list[dict]:
1✔
295
    events = []
1✔
296
    for message in messages:
1✔
297
        # TODO: consolidate with SQS event source listener:
298
        #  localstack.services.lambda_.event_source_listeners.sqs_event_source_listener.SQSEventSourceListener._send_event_to_lambda
299
        message_attrs = message_attributes_to_lower(message.get("MessageAttributes"))
1✔
300
        event = {
1✔
301
            # Original SQS message attributes
302
            "messageId": message["MessageId"],
303
            "receiptHandle": message["ReceiptHandle"],
304
            # TODO: test with empty body
305
            # TODO: implement heuristic based on content type: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-filtering.html#pipes-filter-sqs
306
            "body": message.get("Body", "MessageBody"),
307
            "attributes": message.get("Attributes", {}),
308
            "messageAttributes": message_attrs,
309
            # TODO: test with empty body
310
            "md5OfBody": message.get("MD5OfBody") or message.get("MD5OfMessageBody"),
311
        }
312
        # TODO: test Pipe with message attributes (only covered by Lambda ESM SQS test so far)
313
        if md5_of_message_attributes := message.get("MD5OfMessageAttributes"):
1✔
314
            event["md5OfMessageAttributes"] = md5_of_message_attributes
1✔
315
        events.append(event)
1✔
316
    return events
1✔
317

318

319
@functools.cache
1✔
320
def get_queue_url(queue_arn: str) -> str:
1✔
321
    parsed_arn = parse_arn(queue_arn)
1✔
322

323
    queue_name = parsed_arn["resource"]
1✔
324
    account_id = parsed_arn["account"]
1✔
325
    region = parsed_arn["region"]
1✔
326

327
    sqs_client = connect_to(region_name=region).sqs
1✔
328
    queue_url = sqs_client.get_queue_url(QueueName=queue_name, QueueOwnerAWSAccountId=account_id)[
1✔
329
        "QueueUrl"
330
    ]
331
    return queue_url
1✔
332

333

334
def message_attributes_to_lower(message_attrs):
1✔
335
    """Convert message attribute details (first characters) to lower case (e.g., stringValue, dataType)."""
336
    message_attrs = message_attrs or {}
1✔
337
    for _, attr in message_attrs.items():
1✔
338
        if not isinstance(attr, dict):
1✔
339
            continue
×
340
        for key, value in dict(attr).items():
1✔
341
            attr[first_char_to_lower(key)] = attr.pop(key)
1✔
342
    return message_attrs
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc