• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 18928049101

29 Oct 2025 12:05PM UTC coverage: 86.912% (+0.01%) from 86.9%
18928049101

push

github

web-flow
Sns:v2 platform application crud (#13312)

77 of 78 new or added lines in 5 files covered. (98.72%)

8 existing lines in 5 files now uncovered.

68432 of 78737 relevant lines covered (86.91%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.69
/localstack-core/localstack/services/lambda_/invocation/event_manager.py
1
import base64
1✔
2
import dataclasses
1✔
3
import json
1✔
4
import logging
1✔
5
import threading
1✔
6
import time
1✔
7
from concurrent.futures import ThreadPoolExecutor
1✔
8
from datetime import datetime
1✔
9
from math import ceil
1✔
10

11
from botocore.config import Config
1✔
12

13
from localstack import config
1✔
14
from localstack.aws.api.lambda_ import InvocationType, TooManyRequestsException
1✔
15
from localstack.services.lambda_.analytics import (
1✔
16
    FunctionOperation,
17
    FunctionStatus,
18
    function_counter,
19
)
20
from localstack.services.lambda_.invocation.internal_sqs_queue import get_fake_sqs_client
1✔
21
from localstack.services.lambda_.invocation.lambda_models import (
1✔
22
    EventInvokeConfig,
23
    FunctionVersion,
24
    Invocation,
25
    InvocationResult,
26
)
27
from localstack.services.lambda_.invocation.version_manager import LambdaVersionManager
1✔
28
from localstack.utils.aws import dead_letter_queue
1✔
29
from localstack.utils.aws.message_forwarding import send_event_to_target
1✔
30
from localstack.utils.strings import md5, to_str
1✔
31
from localstack.utils.threads import FuncThread
1✔
32
from localstack.utils.time import timestamp_millis
1✔
33
from localstack.utils.xray.trace_header import TraceHeader
1✔
34

35
LOG = logging.getLogger(__name__)
1✔
36

37

38
def get_sqs_client(function_version: FunctionVersion, client_config=None):
1✔
39
    return get_fake_sqs_client()
1✔
40

41

42
# TODO: remove once DLQ handling is refactored following the removal of the legacy lambda provider
43
class LegacyInvocationException(Exception):
1✔
44
    def __init__(self, message, log_output=None, result=None):
1✔
45
        super().__init__(message)
1✔
46
        self.log_output = log_output
1✔
47
        self.result = result
1✔
48

49

50
@dataclasses.dataclass
1✔
51
class SQSInvocation:
1✔
52
    invocation: Invocation
1✔
53
    retries: int = 0
1✔
54
    exception_retries: int = 0
1✔
55

56
    def encode(self) -> str:
1✔
57
        # Encode TraceHeader as string
58
        aws_trace_header = self.invocation.trace_context.get("aws_trace_header")
1✔
59
        aws_trace_header_str = aws_trace_header.to_header_str()
1✔
60
        self.invocation.trace_context["aws_trace_header"] = aws_trace_header_str
1✔
61
        return json.dumps(
1✔
62
            {
63
                "payload": to_str(base64.b64encode(self.invocation.payload)),
64
                "invoked_arn": self.invocation.invoked_arn,
65
                "client_context": self.invocation.client_context,
66
                "invocation_type": self.invocation.invocation_type,
67
                "invoke_time": self.invocation.invoke_time.isoformat(),
68
                # = invocation_id
69
                "request_id": self.invocation.request_id,
70
                "retries": self.retries,
71
                "exception_retries": self.exception_retries,
72
                "trace_context": self.invocation.trace_context,
73
            }
74
        )
75

76
    @classmethod
1✔
77
    def decode(cls, message: str) -> "SQSInvocation":
1✔
78
        invocation_dict = json.loads(message)
1✔
79
        invocation = Invocation(
1✔
80
            payload=base64.b64decode(invocation_dict["payload"]),
81
            invoked_arn=invocation_dict["invoked_arn"],
82
            client_context=invocation_dict["client_context"],
83
            invocation_type=invocation_dict["invocation_type"],
84
            invoke_time=datetime.fromisoformat(invocation_dict["invoke_time"]),
85
            request_id=invocation_dict["request_id"],
86
            trace_context=invocation_dict.get("trace_context"),
87
        )
88
        # Decode TraceHeader
89
        aws_trace_header_str = invocation_dict.get("trace_context", {}).get("aws_trace_header")
1✔
90
        invocation_dict["trace_context"]["aws_trace_header"] = TraceHeader.from_header_str(
1✔
91
            aws_trace_header_str
92
        )
93
        return cls(
1✔
94
            invocation=invocation,
95
            retries=invocation_dict["retries"],
96
            exception_retries=invocation_dict["exception_retries"],
97
        )
98

99

100
def has_enough_time_for_retry(
1✔
101
    sqs_invocation: SQSInvocation, event_invoke_config: EventInvokeConfig
102
) -> bool:
103
    time_passed = datetime.now() - sqs_invocation.invocation.invoke_time
1✔
104
    delay_queue_invoke_seconds = (
1✔
105
        sqs_invocation.retries + 1
106
    ) * config.LAMBDA_RETRY_BASE_DELAY_SECONDS
107
    # 6 hours is the default based on these AWS sources:
108
    # https://repost.aws/questions/QUd214DdOQRkKWr7D8IuSMIw/why-is-aws-lambda-eventinvokeconfig-s-limit-for-maximumretryattempts-2
109
    # https://aws.amazon.com/blogs/compute/introducing-new-asynchronous-invocation-metrics-for-aws-lambda/
110
    # https://aws.amazon.com/about-aws/whats-new/2019/11/aws-lambda-supports-max-retry-attempts-event-age-asynchronous-invocations/
111
    maximum_event_age_in_seconds = 6 * 60 * 60
1✔
112
    if event_invoke_config and event_invoke_config.maximum_event_age_in_seconds is not None:
1✔
113
        maximum_event_age_in_seconds = event_invoke_config.maximum_event_age_in_seconds
1✔
114
    return (
1✔
115
        maximum_event_age_in_seconds
116
        and ceil(time_passed.total_seconds()) + delay_queue_invoke_seconds
117
        <= maximum_event_age_in_seconds
118
    )
119

120

121
# TODO: optimize this client configuration. Do we need to consider client caching here?
122
CLIENT_CONFIG = Config(
1✔
123
    connect_timeout=5,
124
    read_timeout=10,
125
    retries={"max_attempts": 0},
126
)
127

128

129
class Poller:
1✔
130
    version_manager: LambdaVersionManager
1✔
131
    event_queue_url: str
1✔
132
    _shutdown_event: threading.Event
1✔
133
    invoker_pool: ThreadPoolExecutor
1✔
134

135
    def __init__(self, version_manager: LambdaVersionManager, event_queue_url: str):
1✔
136
        self.version_manager = version_manager
1✔
137
        self.event_queue_url = event_queue_url
1✔
138
        self._shutdown_event = threading.Event()
1✔
139
        function_id = self.version_manager.function_version.id
1✔
140
        # TODO: think about scaling, test it, make it configurable?!
141
        self.invoker_pool = ThreadPoolExecutor(
1✔
142
            thread_name_prefix=f"lambda-invoker-{function_id.function_name}:{function_id.qualifier}"
143
        )
144

145
    def run(self, *args, **kwargs):
1✔
146
        sqs_client = get_sqs_client(
1✔
147
            self.version_manager.function_version, client_config=CLIENT_CONFIG
148
        )
149
        function_timeout = self.version_manager.function_version.config.timeout
1✔
150
        while not self._shutdown_event.is_set():
1✔
151
            try:
1✔
152
                response = sqs_client.receive_message(
1✔
153
                    QueueUrl=self.event_queue_url,
154
                    # TODO: consider replacing with short polling instead of long polling to prevent keeping connections open
155
                    # however, we had some serious performance issues when tried out, so those have to be investigated first
156
                    WaitTimeSeconds=2,
157
                    # Related: SQS event source mapping batches up to 10 messages:
158
                    # https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html
159
                    MaxNumberOfMessages=10,
160
                    VisibilityTimeout=function_timeout + 60,
161
                )
162
                if not response.get("Messages"):
1✔
163
                    continue
1✔
164
                LOG.debug("[%s] Got %d messages", self.event_queue_url, len(response["Messages"]))
1✔
165
                # Guard against shutdown event arriving while polling SQS for messages
166
                if not self._shutdown_event.is_set():
1✔
167
                    for message in response["Messages"]:
1✔
168
                        # NOTE: queueing within the thread pool executor could lead to double executions
169
                        #  due to the visibility timeout
170
                        self.invoker_pool.submit(self.handle_message, message)
1✔
171

172
            except Exception as e:
×
173
                # TODO: if the gateway shuts down before the shutdown event even is set,
174
                #  we might still get an error message
175
                # after shutdown of LS, we might expectedly get errors, if other components shut down.
176
                # In any case, after the event manager is shut down, we do not need to spam error logs in case
177
                # some resource is already missing
178
                if self._shutdown_event.is_set():
×
179
                    return
×
180
                LOG.error(
×
181
                    "Error while polling lambda events for function %s: %s",
182
                    self.version_manager.function_version.qualified_arn,
183
                    e,
184
                    exc_info=LOG.isEnabledFor(logging.DEBUG),
185
                )
186
                # some time between retries to avoid running into the problem right again
187
                time.sleep(1)
×
188

189
    def stop(self):
1✔
190
        LOG.debug(
1✔
191
            "Stopping event poller %s %s",
192
            self.version_manager.function_version.qualified_arn,
193
            id(self),
194
        )
195
        self._shutdown_event.set()
1✔
196
        self.invoker_pool.shutdown(cancel_futures=True, wait=False)
1✔
197

198
    def handle_message(self, message: dict) -> None:
1✔
199
        failure_cause = None
1✔
200
        qualifier = self.version_manager.function_version.id.qualifier
1✔
201
        event_invoke_config = self.version_manager.function.event_invoke_configs.get(qualifier)
1✔
202
        runtime = None
1✔
203
        status = None
1✔
204
        try:
1✔
205
            sqs_invocation = SQSInvocation.decode(message["Body"])
1✔
206
            invocation = sqs_invocation.invocation
1✔
207
            try:
1✔
208
                invocation_result = self.version_manager.invoke(invocation=invocation)
1✔
209
                function_config = self.version_manager.function_version.config
1✔
210
                function_counter.labels(
1✔
211
                    operation=FunctionOperation.invoke,
212
                    runtime=function_config.runtime or "n/a",
213
                    status=FunctionStatus.success,
214
                    invocation_type=InvocationType.Event,
215
                    package_type=function_config.package_type,
216
                ).increment()
217
            except Exception as e:
1✔
218
                # Reserved concurrency == 0
219
                if self.version_manager.function.reserved_concurrent_executions == 0:
1✔
220
                    failure_cause = "ZeroReservedConcurrency"
1✔
221
                    status = FunctionStatus.zero_reserved_concurrency_error
1✔
222
                # Maximum event age expired (lookahead for next retry)
223
                elif not has_enough_time_for_retry(sqs_invocation, event_invoke_config):
1✔
224
                    failure_cause = "EventAgeExceeded"
×
225
                    status = FunctionStatus.event_age_exceeded_error
×
226
                if failure_cause:
1✔
227
                    invocation_result = InvocationResult(
1✔
228
                        is_error=True, request_id=invocation.request_id, payload=None, logs=None
229
                    )
230
                    self.process_failure_destination(
1✔
231
                        sqs_invocation, invocation_result, event_invoke_config, failure_cause
232
                    )
233
                    self.process_dead_letter_queue(sqs_invocation, invocation_result)
1✔
234
                    return
1✔
235
                # 3) Otherwise, retry without increasing counter
236
                status = self.process_throttles_and_system_errors(sqs_invocation, e)
1✔
237
                return
1✔
238
            finally:
239
                sqs_client = get_sqs_client(self.version_manager.function_version)
1✔
240
                sqs_client.delete_message(
1✔
241
                    QueueUrl=self.event_queue_url, ReceiptHandle=message["ReceiptHandle"]
242
                )
243
                # status MUST be set before returning
244
                package_type = self.version_manager.function_version.config.package_type
1✔
245
                function_counter.labels(
1✔
246
                    operation=FunctionOperation.invoke,
247
                    runtime=runtime or "n/a",
248
                    status=status,
249
                    invocation_type=InvocationType.Event,
250
                    package_type=package_type,
251
                ).increment()
252

253
            # Good summary blogpost: https://haithai91.medium.com/aws-lambdas-retry-behaviors-edff90e1cf1b
254
            # Asynchronous invocation handling: https://docs.aws.amazon.com/lambda/latest/dg/invocation-async.html
255
            # https://aws.amazon.com/blogs/compute/introducing-new-asynchronous-invocation-metrics-for-aws-lambda/
256
            max_retry_attempts = 2
1✔
257
            if event_invoke_config and event_invoke_config.maximum_retry_attempts is not None:
1✔
258
                max_retry_attempts = event_invoke_config.maximum_retry_attempts
1✔
259

260
            # An invocation error either leads to a terminal failure or to a scheduled retry
261
            if invocation_result.is_error:  # invocation error
1✔
262
                failure_cause = None
1✔
263
                # Reserved concurrency == 0
264
                if self.version_manager.function.reserved_concurrent_executions == 0:
1✔
265
                    failure_cause = "ZeroReservedConcurrency"
×
266
                # Maximum retries exhausted
267
                elif sqs_invocation.retries >= max_retry_attempts:
1✔
268
                    failure_cause = "RetriesExhausted"
1✔
269
                # TODO: test what happens if max event age expired before it gets scheduled the first time?!
270
                # Maximum event age expired (lookahead for next retry)
271
                elif not has_enough_time_for_retry(sqs_invocation, event_invoke_config):
1✔
272
                    failure_cause = "EventAgeExceeded"
1✔
273

274
                if failure_cause:  # handle failure destination and DLQ
1✔
275
                    self.process_failure_destination(
1✔
276
                        sqs_invocation, invocation_result, event_invoke_config, failure_cause
277
                    )
278
                    self.process_dead_letter_queue(sqs_invocation, invocation_result)
1✔
279
                    return
1✔
280
                else:  # schedule retry
281
                    sqs_invocation.retries += 1
1✔
282
                    # Assumption: We assume that the internal exception retries counter is reset after
283
                    #  an invocation that does not throw an exception
284
                    sqs_invocation.exception_retries = 0
1✔
285
                    # LAMBDA_RETRY_BASE_DELAY_SECONDS has a limit of 300s because the maximum SQS DelaySeconds
286
                    # is 15 minutes (900s) and the maximum retry count is 3. SQS quota for "Message timer":
287
                    # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-messages.html
288
                    delay_seconds = sqs_invocation.retries * config.LAMBDA_RETRY_BASE_DELAY_SECONDS
1✔
289
                    # TODO: max SQS message size limit could break parity with AWS because
290
                    #  our SQSInvocation contains additional fields! 256kb is max for both Lambda payload + SQS
291
                    # TODO: write test with max SQS message size
292
                    sqs_client.send_message(
1✔
293
                        QueueUrl=self.event_queue_url,
294
                        MessageBody=sqs_invocation.encode(),
295
                        DelaySeconds=delay_seconds,
296
                    )
297
                    return
1✔
298
            else:  # invocation success
299
                self.process_success_destination(
1✔
300
                    sqs_invocation, invocation_result, event_invoke_config
301
                )
302
        except Exception as e:
×
303
            LOG.error(
×
304
                "Error handling lambda invoke %s", e, exc_info=LOG.isEnabledFor(logging.DEBUG)
305
            )
306

307
    def process_throttles_and_system_errors(
1✔
308
        self, sqs_invocation: SQSInvocation, error: Exception
309
    ) -> str:
310
        # If the function doesn't have enough concurrency available to process all events, additional
311
        # requests are throttled. For throttling errors (429) and system errors (500-series), Lambda returns
312
        # the event to the queue and attempts to run the function again for up to 6 hours. The retry interval
313
        # increases exponentially from 1 second after the first attempt to a maximum of 5 minutes. If the
314
        # queue contains many entries, Lambda increases the retry interval and reduces the rate at which it
315
        # reads events from the queue. Source:
316
        # https://docs.aws.amazon.com/lambda/latest/dg/invocation-async.html
317
        # Difference depending on error cause:
318
        # https://aws.amazon.com/blogs/compute/introducing-new-asynchronous-invocation-metrics-for-aws-lambda/
319
        # Troubleshooting 500 errors:
320
        # https://repost.aws/knowledge-center/lambda-troubleshoot-invoke-error-502-500
321
        if isinstance(error, TooManyRequestsException):  # Throttles 429
1✔
322
            LOG.debug("Throttled lambda %s: %s", self.version_manager.function_arn, error)
1✔
323
            status = FunctionStatus.throttle_error
1✔
324
        else:  # System errors 5xx
325
            LOG.debug(
1✔
326
                "Service exception in lambda %s: %s", self.version_manager.function_arn, error
327
            )
328
            status = FunctionStatus.system_error
1✔
329
        maximum_exception_retry_delay_seconds = 5 * 60
1✔
330
        delay_seconds = min(
1✔
331
            2**sqs_invocation.exception_retries, maximum_exception_retry_delay_seconds
332
        )
333
        # TODO: calculate delay seconds into max event age handling
334
        sqs_client = get_sqs_client(self.version_manager.function_version)
1✔
335
        sqs_client.send_message(
1✔
336
            QueueUrl=self.event_queue_url,
337
            MessageBody=sqs_invocation.encode(),
338
            DelaySeconds=delay_seconds,
339
        )
340
        return status
1✔
341

342
    def process_success_destination(
1✔
343
        self,
344
        sqs_invocation: SQSInvocation,
345
        invocation_result: InvocationResult,
346
        event_invoke_config: EventInvokeConfig | None,
347
    ) -> None:
348
        if event_invoke_config is None:
1✔
349
            return
1✔
350
        success_destination = event_invoke_config.destination_config.get("OnSuccess", {}).get(
1✔
351
            "Destination"
352
        )
353
        if success_destination is None:
1✔
354
            return
1✔
355
        LOG.debug("Handling success destination for %s", self.version_manager.function_arn)
1✔
356

357
        original_payload = sqs_invocation.invocation.payload
1✔
358
        destination_payload = {
1✔
359
            "version": "1.0",
360
            "timestamp": timestamp_millis(),
361
            "requestContext": {
362
                "requestId": invocation_result.request_id,
363
                "functionArn": self.version_manager.function_version.qualified_arn,
364
                "condition": "Success",
365
                "approximateInvokeCount": sqs_invocation.retries + 1,
366
            },
367
            "requestPayload": json.loads(to_str(original_payload)),
368
            "responseContext": {
369
                "statusCode": 200,
370
                "executedVersion": self.version_manager.function_version.id.qualifier,
371
            },
372
            "responsePayload": json.loads(to_str(invocation_result.payload or {})),
373
        }
374

375
        target_arn = event_invoke_config.destination_config["OnSuccess"]["Destination"]
1✔
376
        try:
1✔
377
            send_event_to_target(
1✔
378
                target_arn=target_arn,
379
                event=destination_payload,
380
                role=self.version_manager.function_version.config.role,
381
                source_arn=self.version_manager.function_version.id.unqualified_arn(),
382
                source_service="lambda",
383
                events_source="lambda",
384
                events_detail_type="Lambda Function Invocation Result - Success",
385
            )
386
        except Exception as e:
×
387
            LOG.warning("Error sending invocation result to %s: %s", target_arn, e)
×
388

389
    def process_failure_destination(
1✔
390
        self,
391
        sqs_invocation: SQSInvocation,
392
        invocation_result: InvocationResult,
393
        event_invoke_config: EventInvokeConfig | None,
394
        failure_cause: str,
395
    ):
396
        if event_invoke_config is None:
1✔
397
            return
1✔
398
        failure_destination = event_invoke_config.destination_config.get("OnFailure", {}).get(
1✔
399
            "Destination"
400
        )
401
        if failure_destination is None:
1✔
402
            return
1✔
403
        LOG.debug("Handling failure destination for %s", self.version_manager.function_arn)
1✔
404

405
        original_payload = sqs_invocation.invocation.payload
1✔
406
        if failure_cause == "ZeroReservedConcurrency":
1✔
407
            approximate_invoke_count = sqs_invocation.retries
1✔
408
        else:
409
            approximate_invoke_count = sqs_invocation.retries + 1
1✔
410
        destination_payload = {
1✔
411
            "version": "1.0",
412
            "timestamp": timestamp_millis(),
413
            "requestContext": {
414
                "requestId": invocation_result.request_id,
415
                "functionArn": self.version_manager.function_version.qualified_arn,
416
                "condition": failure_cause,
417
                "approximateInvokeCount": approximate_invoke_count,
418
            },
419
            "requestPayload": json.loads(to_str(original_payload)),
420
        }
421
        if failure_cause != "ZeroReservedConcurrency":
1✔
422
            destination_payload["responseContext"] = {
1✔
423
                "statusCode": 200,
424
                "executedVersion": self.version_manager.function_version.id.qualifier,
425
                "functionError": "Unhandled",
426
            }
427
            destination_payload["responsePayload"] = json.loads(to_str(invocation_result.payload))
1✔
428

429
        target_arn = event_invoke_config.destination_config["OnFailure"]["Destination"]
1✔
430
        try:
1✔
431
            send_event_to_target(
1✔
432
                target_arn=target_arn,
433
                event=destination_payload,
434
                role=self.version_manager.function_version.config.role,
435
                source_arn=self.version_manager.function_version.id.unqualified_arn(),
436
                source_service="lambda",
437
                events_source="lambda",
438
                events_detail_type="Lambda Function Invocation Result - Failure",
439
            )
440
        except Exception as e:
×
441
            LOG.warning("Error sending invocation result to %s: %s", target_arn, e)
×
442

443
    def process_dead_letter_queue(
1✔
444
        self,
445
        sqs_invocation: SQSInvocation,
446
        invocation_result: InvocationResult,
447
    ):
448
        LOG.debug("Handling dead letter queue for %s", self.version_manager.function_arn)
1✔
449
        try:
1✔
450
            dead_letter_queue._send_to_dead_letter_queue(
1✔
451
                source_arn=self.version_manager.function_arn,
452
                dlq_arn=self.version_manager.function_version.config.dead_letter_arn,
453
                event=json.loads(to_str(sqs_invocation.invocation.payload)),
454
                # TODO: Refactor DLQ handling by removing the invocation exception from the legacy lambda provider
455
                # TODO: Check message. Possibly remove because it is not used in the DLQ message?!
456
                error=LegacyInvocationException(
457
                    message="hi", result=to_str(invocation_result.payload)
458
                ),
459
                role=self.version_manager.function_version.config.role,
460
            )
461
        except Exception as e:
×
462
            LOG.warning(
×
463
                "Error sending invocation result to DLQ %s: %s",
464
                self.version_manager.function_version.config.dead_letter_arn,
465
                e,
466
            )
467

468

469
class LambdaEventManager:
1✔
470
    version_manager: LambdaVersionManager
1✔
471
    poller: Poller | None
1✔
472
    poller_thread: FuncThread | None
1✔
473
    event_queue_url: str | None
1✔
474
    lifecycle_lock: threading.RLock
1✔
475
    stopped: threading.Event
1✔
476

477
    def __init__(self, version_manager: LambdaVersionManager):
1✔
478
        self.version_manager = version_manager
1✔
479
        self.poller = None
1✔
480
        self.poller_thread = None
1✔
481
        self.event_queue_url = None
1✔
482
        self.lifecycle_lock = threading.RLock()
1✔
483
        self.stopped = threading.Event()
1✔
484

485
    def enqueue_event(self, invocation: Invocation) -> None:
1✔
486
        message_body = SQSInvocation(invocation).encode()
1✔
487
        sqs_client = get_sqs_client(self.version_manager.function_version)
1✔
488
        try:
1✔
489
            sqs_client.send_message(QueueUrl=self.event_queue_url, MessageBody=message_body)
1✔
490
        except Exception:
×
491
            LOG.error(
×
492
                "Failed to enqueue Lambda event into queue %s. Invocation: request_id=%s, invoked_arn=%s",
493
                self.event_queue_url,
494
                invocation.request_id,
495
                invocation.invoked_arn,
496
            )
497
            raise
×
498

499
    def start(self) -> None:
1✔
500
        LOG.debug(
1✔
501
            "Starting event manager %s id %s",
502
            self.version_manager.function_version.id.qualified_arn(),
503
            id(self),
504
        )
505
        with self.lifecycle_lock:
1✔
506
            if self.stopped.is_set():
1✔
507
                LOG.debug("Event manager already stopped before started.")
×
508
                return
×
509
            sqs_client = get_sqs_client(self.version_manager.function_version)
1✔
510
            function_id = self.version_manager.function_version.id
1✔
511
            # Truncate function name to ensure queue name limit of max 80 characters
512
            function_name_short = function_id.function_name[:47]
1✔
513
            # The instance id MUST be unique to the function and a given LocalStack instance
514
            queue_namespace = (
1✔
515
                f"{function_id.qualified_arn()}-{self.version_manager.function.instance_id}"
516
            )
517
            queue_name = f"{function_name_short}-{md5(queue_namespace)}"
1✔
518
            create_queue_response = sqs_client.create_queue(QueueName=queue_name)
1✔
519
            self.event_queue_url = create_queue_response["QueueUrl"]
1✔
520
            # We don't need to purge the queue for persistence or cloud pods because the instance id is MUST be unique
521

522
            self.poller = Poller(self.version_manager, self.event_queue_url)
1✔
523
            self.poller_thread = FuncThread(
1✔
524
                self.poller.run,
525
                name=f"lambda-poller-{function_id.function_name}:{function_id.qualifier}",
526
            )
527
            self.poller_thread.start()
1✔
528

529
    def stop_for_update(self) -> None:
1✔
530
        LOG.debug(
1✔
531
            "Stopping event manager but keep queue %s id %s",
532
            self.version_manager.function_version.qualified_arn,
533
            id(self),
534
        )
535
        with self.lifecycle_lock:
1✔
536
            if self.stopped.is_set():
1✔
537
                LOG.debug("Event manager already stopped!")
×
538
                return
×
539
            self.stopped.set()
1✔
540
            if self.poller:
1✔
541
                self.poller.stop()
1✔
542
                self.poller_thread.join(timeout=3)
1✔
543
                LOG.debug("Waited for poller thread %s", self.poller_thread)
1✔
544
                if self.poller_thread.is_alive():
1✔
545
                    LOG.error("Poller did not shutdown %s", self.poller_thread)
×
546
                self.poller = None
1✔
547

548
    def stop(self) -> None:
1✔
549
        LOG.debug(
1✔
550
            "Stopping event manager %s: %s id %s",
551
            self.version_manager.function_version.qualified_arn,
552
            self.poller,
553
            id(self),
554
        )
555
        with self.lifecycle_lock:
1✔
556
            if self.stopped.is_set():
1✔
557
                LOG.debug("Event manager already stopped!")
×
558
                return
×
559
            self.stopped.set()
1✔
560
            if self.poller:
1✔
561
                self.poller.stop()
1✔
562
                self.poller_thread.join(timeout=3)
1✔
563
                LOG.debug("Waited for poller thread %s", self.poller_thread)
1✔
564
                if self.poller_thread.is_alive():
1✔
UNCOV
565
                    LOG.error("Poller did not shutdown %s", self.poller_thread)
×
566
                self.poller = None
1✔
567
            if self.event_queue_url:
1✔
568
                sqs_client = get_sqs_client(
1✔
569
                    self.version_manager.function_version, client_config=CLIENT_CONFIG
570
                )
571
                sqs_client.delete_queue(QueueUrl=self.event_queue_url)
1✔
572
                self.event_queue_url = None
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc