• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20565403496

29 Dec 2025 05:11AM UTC coverage: 84.103% (-2.8%) from 86.921%
20565403496

Pull #13567

github

web-flow
Merge 4816837a5 into 2417384aa
Pull Request #13567: Update ASF APIs

67166 of 79862 relevant lines covered (84.1%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.77
/localstack-core/localstack/services/lambda_/invocation/event_manager.py
1
import base64
1✔
2
import dataclasses
1✔
3
import json
1✔
4
import logging
1✔
5
import threading
1✔
6
import time
1✔
7
from concurrent.futures import ThreadPoolExecutor
1✔
8
from datetime import datetime
1✔
9
from math import ceil
1✔
10

11
from botocore.config import Config
1✔
12

13
from localstack import config
1✔
14
from localstack.aws.api.lambda_ import InvocationType, TooManyRequestsException
1✔
15
from localstack.services.lambda_.analytics import (
1✔
16
    FunctionInitializationType,
17
    FunctionOperation,
18
    FunctionStatus,
19
    function_counter,
20
)
21
from localstack.services.lambda_.invocation.internal_sqs_queue import get_fake_sqs_client
1✔
22
from localstack.services.lambda_.invocation.lambda_models import (
1✔
23
    EventInvokeConfig,
24
    FunctionVersion,
25
    Invocation,
26
    InvocationResult,
27
)
28
from localstack.services.lambda_.invocation.version_manager import LambdaVersionManager
1✔
29
from localstack.utils.aws import dead_letter_queue
1✔
30
from localstack.utils.aws.message_forwarding import send_event_to_target
1✔
31
from localstack.utils.strings import md5, to_str
1✔
32
from localstack.utils.threads import FuncThread
1✔
33
from localstack.utils.time import timestamp_millis
1✔
34
from localstack.utils.xray.trace_header import TraceHeader
1✔
35

36
LOG = logging.getLogger(__name__)
1✔
37

38

39
def get_sqs_client(function_version: FunctionVersion, client_config=None):
1✔
40
    return get_fake_sqs_client()
1✔
41

42

43
# TODO: remove once DLQ handling is refactored following the removal of the legacy lambda provider
44
class LegacyInvocationException(Exception):
1✔
45
    def __init__(self, message, log_output=None, result=None):
1✔
46
        super().__init__(message)
1✔
47
        self.log_output = log_output
1✔
48
        self.result = result
1✔
49

50

51
@dataclasses.dataclass
1✔
52
class SQSInvocation:
1✔
53
    invocation: Invocation
1✔
54
    retries: int = 0
1✔
55
    exception_retries: int = 0
1✔
56

57
    def encode(self) -> str:
1✔
58
        # Encode TraceHeader as string
59
        aws_trace_header = self.invocation.trace_context.get("aws_trace_header")
1✔
60
        aws_trace_header_str = aws_trace_header.to_header_str()
1✔
61
        self.invocation.trace_context["aws_trace_header"] = aws_trace_header_str
1✔
62
        return json.dumps(
1✔
63
            {
64
                "payload": to_str(base64.b64encode(self.invocation.payload)),
65
                "invoked_arn": self.invocation.invoked_arn,
66
                "client_context": self.invocation.client_context,
67
                "invocation_type": self.invocation.invocation_type,
68
                "invoke_time": self.invocation.invoke_time.isoformat(),
69
                # = invocation_id
70
                "request_id": self.invocation.request_id,
71
                "retries": self.retries,
72
                "exception_retries": self.exception_retries,
73
                "trace_context": self.invocation.trace_context,
74
            }
75
        )
76

77
    @classmethod
1✔
78
    def decode(cls, message: str) -> "SQSInvocation":
1✔
79
        invocation_dict = json.loads(message)
1✔
80
        invocation = Invocation(
1✔
81
            payload=base64.b64decode(invocation_dict["payload"]),
82
            invoked_arn=invocation_dict["invoked_arn"],
83
            client_context=invocation_dict["client_context"],
84
            invocation_type=invocation_dict["invocation_type"],
85
            invoke_time=datetime.fromisoformat(invocation_dict["invoke_time"]),
86
            request_id=invocation_dict["request_id"],
87
            trace_context=invocation_dict.get("trace_context"),
88
        )
89
        # Decode TraceHeader
90
        aws_trace_header_str = invocation_dict.get("trace_context", {}).get("aws_trace_header")
1✔
91
        invocation_dict["trace_context"]["aws_trace_header"] = TraceHeader.from_header_str(
1✔
92
            aws_trace_header_str
93
        )
94
        return cls(
1✔
95
            invocation=invocation,
96
            retries=invocation_dict["retries"],
97
            exception_retries=invocation_dict["exception_retries"],
98
        )
99

100

101
def has_enough_time_for_retry(
1✔
102
    sqs_invocation: SQSInvocation, event_invoke_config: EventInvokeConfig
103
) -> bool:
104
    time_passed = datetime.now() - sqs_invocation.invocation.invoke_time
1✔
105
    delay_queue_invoke_seconds = (
1✔
106
        sqs_invocation.retries + 1
107
    ) * config.LAMBDA_RETRY_BASE_DELAY_SECONDS
108
    # 6 hours is the default based on these AWS sources:
109
    # https://repost.aws/questions/QUd214DdOQRkKWr7D8IuSMIw/why-is-aws-lambda-eventinvokeconfig-s-limit-for-maximumretryattempts-2
110
    # https://aws.amazon.com/blogs/compute/introducing-new-asynchronous-invocation-metrics-for-aws-lambda/
111
    # https://aws.amazon.com/about-aws/whats-new/2019/11/aws-lambda-supports-max-retry-attempts-event-age-asynchronous-invocations/
112
    maximum_event_age_in_seconds = 6 * 60 * 60
1✔
113
    if event_invoke_config and event_invoke_config.maximum_event_age_in_seconds is not None:
1✔
114
        maximum_event_age_in_seconds = event_invoke_config.maximum_event_age_in_seconds
1✔
115
    return (
1✔
116
        maximum_event_age_in_seconds
117
        and ceil(time_passed.total_seconds()) + delay_queue_invoke_seconds
118
        <= maximum_event_age_in_seconds
119
    )
120

121

122
# TODO: optimize this client configuration. Do we need to consider client caching here?
123
CLIENT_CONFIG = Config(
1✔
124
    connect_timeout=5,
125
    read_timeout=10,
126
    retries={"max_attempts": 0},
127
)
128

129

130
class Poller:
1✔
131
    version_manager: LambdaVersionManager
1✔
132
    event_queue_url: str
1✔
133
    _shutdown_event: threading.Event
1✔
134
    invoker_pool: ThreadPoolExecutor
1✔
135

136
    def __init__(self, version_manager: LambdaVersionManager, event_queue_url: str):
1✔
137
        self.version_manager = version_manager
1✔
138
        self.event_queue_url = event_queue_url
1✔
139
        self._shutdown_event = threading.Event()
1✔
140
        function_id = self.version_manager.function_version.id
1✔
141
        # TODO: think about scaling, test it, make it configurable?!
142
        self.invoker_pool = ThreadPoolExecutor(
1✔
143
            thread_name_prefix=f"lambda-invoker-{function_id.function_name}:{function_id.qualifier}"
144
        )
145

146
    def run(self, *args, **kwargs):
1✔
147
        sqs_client = get_sqs_client(
1✔
148
            self.version_manager.function_version, client_config=CLIENT_CONFIG
149
        )
150
        function_timeout = self.version_manager.function_version.config.timeout
1✔
151
        while not self._shutdown_event.is_set():
1✔
152
            try:
1✔
153
                response = sqs_client.receive_message(
1✔
154
                    QueueUrl=self.event_queue_url,
155
                    # TODO: consider replacing with short polling instead of long polling to prevent keeping connections open
156
                    # however, we had some serious performance issues when tried out, so those have to be investigated first
157
                    WaitTimeSeconds=2,
158
                    # Related: SQS event source mapping batches up to 10 messages:
159
                    # https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html
160
                    MaxNumberOfMessages=10,
161
                    VisibilityTimeout=function_timeout + 60,
162
                )
163
                if not response.get("Messages"):
1✔
164
                    continue
1✔
165
                LOG.debug("[%s] Got %d messages", self.event_queue_url, len(response["Messages"]))
1✔
166
                # Guard against shutdown event arriving while polling SQS for messages
167
                if not self._shutdown_event.is_set():
1✔
168
                    for message in response["Messages"]:
1✔
169
                        # NOTE: queueing within the thread pool executor could lead to double executions
170
                        #  due to the visibility timeout
171
                        self.invoker_pool.submit(self.handle_message, message)
1✔
172

173
            except Exception as e:
×
174
                # TODO: if the gateway shuts down before the shutdown event even is set,
175
                #  we might still get an error message
176
                # after shutdown of LS, we might expectedly get errors, if other components shut down.
177
                # In any case, after the event manager is shut down, we do not need to spam error logs in case
178
                # some resource is already missing
179
                if self._shutdown_event.is_set():
×
180
                    return
×
181
                LOG.error(
×
182
                    "Error while polling lambda events for function %s: %s",
183
                    self.version_manager.function_version.qualified_arn,
184
                    e,
185
                    exc_info=LOG.isEnabledFor(logging.DEBUG),
186
                )
187
                # some time between retries to avoid running into the problem right again
188
                time.sleep(1)
×
189

190
    def stop(self):
1✔
191
        LOG.debug(
1✔
192
            "Stopping event poller %s %s",
193
            self.version_manager.function_version.qualified_arn,
194
            id(self),
195
        )
196
        self._shutdown_event.set()
1✔
197
        self.invoker_pool.shutdown(cancel_futures=True, wait=False)
1✔
198

199
    def handle_message(self, message: dict) -> None:
1✔
200
        failure_cause = None
1✔
201
        qualifier = self.version_manager.function_version.id.qualifier
1✔
202
        function_config = self.version_manager.function_version.config
1✔
203
        event_invoke_config = self.version_manager.function.event_invoke_configs.get(qualifier)
1✔
204
        runtime = None
1✔
205
        status = None
1✔
206
        # TODO: handle initialization_type provisioned-concurrency, which requires enriching invocation_result
207
        initialization_type = (
1✔
208
            FunctionInitializationType.lambda_managed_instances
209
            if function_config.CapacityProviderConfig
210
            else FunctionInitializationType.on_demand
211
        )
212
        try:
1✔
213
            sqs_invocation = SQSInvocation.decode(message["Body"])
1✔
214
            invocation = sqs_invocation.invocation
1✔
215
            try:
1✔
216
                invocation_result = self.version_manager.invoke(invocation=invocation)
1✔
217
                status = FunctionStatus.success
1✔
218
            except Exception as e:
1✔
219
                # Reserved concurrency == 0
220
                if self.version_manager.function.reserved_concurrent_executions == 0:
1✔
221
                    failure_cause = "ZeroReservedConcurrency"
1✔
222
                    status = FunctionStatus.zero_reserved_concurrency_error
1✔
223
                # Maximum event age expired (lookahead for next retry)
224
                elif not has_enough_time_for_retry(sqs_invocation, event_invoke_config):
1✔
225
                    failure_cause = "EventAgeExceeded"
×
226
                    status = FunctionStatus.event_age_exceeded_error
×
227

228
                if failure_cause:
1✔
229
                    invocation_result = InvocationResult(
1✔
230
                        is_error=True, request_id=invocation.request_id, payload=None, logs=None
231
                    )
232
                    self.process_failure_destination(
1✔
233
                        sqs_invocation, invocation_result, event_invoke_config, failure_cause
234
                    )
235
                    self.process_dead_letter_queue(sqs_invocation, invocation_result)
1✔
236
                    return
1✔
237
                # 3) Otherwise, retry without increasing counter
238
                status = self.process_throttles_and_system_errors(sqs_invocation, e)
1✔
239
                return
1✔
240
            finally:
241
                sqs_client = get_sqs_client(self.version_manager.function_version)
1✔
242
                sqs_client.delete_message(
1✔
243
                    QueueUrl=self.event_queue_url, ReceiptHandle=message["ReceiptHandle"]
244
                )
245
                assert status, "status MUST be set before returning"
1✔
246
                function_counter.labels(
1✔
247
                    operation=FunctionOperation.invoke,
248
                    runtime=runtime or "n/a",
249
                    status=status,
250
                    invocation_type=InvocationType.Event,
251
                    package_type=function_config.package_type,
252
                    initialization_type=initialization_type,
253
                ).increment()
254

255
            # Good summary blogpost: https://haithai91.medium.com/aws-lambdas-retry-behaviors-edff90e1cf1b
256
            # Asynchronous invocation handling: https://docs.aws.amazon.com/lambda/latest/dg/invocation-async.html
257
            # https://aws.amazon.com/blogs/compute/introducing-new-asynchronous-invocation-metrics-for-aws-lambda/
258
            max_retry_attempts = 2
1✔
259
            if event_invoke_config and event_invoke_config.maximum_retry_attempts is not None:
1✔
260
                max_retry_attempts = event_invoke_config.maximum_retry_attempts
1✔
261

262
            assert invocation_result, "Invocation result MUST exist if we are not returning before"
1✔
263

264
            # An invocation error either leads to a terminal failure or to a scheduled retry
265
            if invocation_result.is_error:  # invocation error
1✔
266
                failure_cause = None
1✔
267
                # Reserved concurrency == 0
268
                if self.version_manager.function.reserved_concurrent_executions == 0:
1✔
269
                    failure_cause = "ZeroReservedConcurrency"
×
270
                # Maximum retries exhausted
271
                elif sqs_invocation.retries >= max_retry_attempts:
1✔
272
                    failure_cause = "RetriesExhausted"
1✔
273
                # TODO: test what happens if max event age expired before it gets scheduled the first time?!
274
                # Maximum event age expired (lookahead for next retry)
275
                elif not has_enough_time_for_retry(sqs_invocation, event_invoke_config):
1✔
276
                    failure_cause = "EventAgeExceeded"
1✔
277

278
                if failure_cause:  # handle failure destination and DLQ
1✔
279
                    self.process_failure_destination(
1✔
280
                        sqs_invocation, invocation_result, event_invoke_config, failure_cause
281
                    )
282
                    self.process_dead_letter_queue(sqs_invocation, invocation_result)
1✔
283
                    return
1✔
284
                else:  # schedule retry
285
                    sqs_invocation.retries += 1
1✔
286
                    # Assumption: We assume that the internal exception retries counter is reset after
287
                    #  an invocation that does not throw an exception
288
                    sqs_invocation.exception_retries = 0
1✔
289
                    # LAMBDA_RETRY_BASE_DELAY_SECONDS has a limit of 300s because the maximum SQS DelaySeconds
290
                    # is 15 minutes (900s) and the maximum retry count is 3. SQS quota for "Message timer":
291
                    # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-messages.html
292
                    delay_seconds = sqs_invocation.retries * config.LAMBDA_RETRY_BASE_DELAY_SECONDS
1✔
293
                    # TODO: max SQS message size limit could break parity with AWS because
294
                    #  our SQSInvocation contains additional fields! 256kb is max for both Lambda payload + SQS
295
                    # TODO: write test with max SQS message size
296
                    sqs_client.send_message(
1✔
297
                        QueueUrl=self.event_queue_url,
298
                        MessageBody=sqs_invocation.encode(),
299
                        DelaySeconds=delay_seconds,
300
                    )
301
                    return
1✔
302
            else:  # invocation success
303
                self.process_success_destination(
1✔
304
                    sqs_invocation, invocation_result, event_invoke_config
305
                )
306
        except Exception as e:
×
307
            LOG.error(
×
308
                "Error handling lambda invoke %s", e, exc_info=LOG.isEnabledFor(logging.DEBUG)
309
            )
310

311
    def process_throttles_and_system_errors(
1✔
312
        self, sqs_invocation: SQSInvocation, error: Exception
313
    ) -> str:
314
        # If the function doesn't have enough concurrency available to process all events, additional
315
        # requests are throttled. For throttling errors (429) and system errors (500-series), Lambda returns
316
        # the event to the queue and attempts to run the function again for up to 6 hours. The retry interval
317
        # increases exponentially from 1 second after the first attempt to a maximum of 5 minutes. If the
318
        # queue contains many entries, Lambda increases the retry interval and reduces the rate at which it
319
        # reads events from the queue. Source:
320
        # https://docs.aws.amazon.com/lambda/latest/dg/invocation-async.html
321
        # Difference depending on error cause:
322
        # https://aws.amazon.com/blogs/compute/introducing-new-asynchronous-invocation-metrics-for-aws-lambda/
323
        # Troubleshooting 500 errors:
324
        # https://repost.aws/knowledge-center/lambda-troubleshoot-invoke-error-502-500
325
        if isinstance(error, TooManyRequestsException):  # Throttles 429
1✔
326
            LOG.debug("Throttled lambda %s: %s", self.version_manager.function_arn, error)
1✔
327
            status = FunctionStatus.throttle_error
1✔
328
        else:  # System errors 5xx
329
            LOG.debug(
1✔
330
                "Service exception in lambda %s: %s", self.version_manager.function_arn, error
331
            )
332
            status = FunctionStatus.system_error
1✔
333
        maximum_exception_retry_delay_seconds = 5 * 60
1✔
334
        delay_seconds = min(
1✔
335
            2**sqs_invocation.exception_retries, maximum_exception_retry_delay_seconds
336
        )
337
        # TODO: calculate delay seconds into max event age handling
338
        sqs_client = get_sqs_client(self.version_manager.function_version)
1✔
339
        sqs_client.send_message(
1✔
340
            QueueUrl=self.event_queue_url,
341
            MessageBody=sqs_invocation.encode(),
342
            DelaySeconds=delay_seconds,
343
        )
344
        return status
1✔
345

346
    def process_success_destination(
1✔
347
        self,
348
        sqs_invocation: SQSInvocation,
349
        invocation_result: InvocationResult,
350
        event_invoke_config: EventInvokeConfig | None,
351
    ) -> None:
352
        if event_invoke_config is None:
1✔
353
            return
1✔
354
        success_destination = event_invoke_config.destination_config.get("OnSuccess", {}).get(
1✔
355
            "Destination"
356
        )
357
        if success_destination is None:
1✔
358
            return
1✔
359
        LOG.debug("Handling success destination for %s", self.version_manager.function_arn)
1✔
360

361
        original_payload = sqs_invocation.invocation.payload
1✔
362
        destination_payload = {
1✔
363
            "version": "1.0",
364
            "timestamp": timestamp_millis(),
365
            "requestContext": {
366
                "requestId": invocation_result.request_id,
367
                "functionArn": self.version_manager.function_version.qualified_arn,
368
                "condition": "Success",
369
                "approximateInvokeCount": sqs_invocation.retries + 1,
370
            },
371
            "requestPayload": json.loads(to_str(original_payload)),
372
            "responseContext": {
373
                "statusCode": 200,
374
                "executedVersion": self.version_manager.function_version.id.qualifier,
375
            },
376
            "responsePayload": json.loads(to_str(invocation_result.payload or {})),
377
        }
378

379
        target_arn = event_invoke_config.destination_config["OnSuccess"]["Destination"]
1✔
380
        try:
1✔
381
            send_event_to_target(
1✔
382
                target_arn=target_arn,
383
                event=destination_payload,
384
                role=self.version_manager.function_version.config.role,
385
                source_arn=self.version_manager.function_version.id.unqualified_arn(),
386
                source_service="lambda",
387
                events_source="lambda",
388
                events_detail_type="Lambda Function Invocation Result - Success",
389
            )
390
        except Exception as e:
×
391
            LOG.warning("Error sending invocation result to %s: %s", target_arn, e)
×
392

393
    def process_failure_destination(
1✔
394
        self,
395
        sqs_invocation: SQSInvocation,
396
        invocation_result: InvocationResult,
397
        event_invoke_config: EventInvokeConfig | None,
398
        failure_cause: str,
399
    ):
400
        if event_invoke_config is None:
1✔
401
            return
1✔
402
        failure_destination = event_invoke_config.destination_config.get("OnFailure", {}).get(
1✔
403
            "Destination"
404
        )
405
        if failure_destination is None:
1✔
406
            return
1✔
407
        LOG.debug("Handling failure destination for %s", self.version_manager.function_arn)
1✔
408

409
        original_payload = sqs_invocation.invocation.payload
1✔
410
        if failure_cause == "ZeroReservedConcurrency":
1✔
411
            approximate_invoke_count = sqs_invocation.retries
1✔
412
        else:
413
            approximate_invoke_count = sqs_invocation.retries + 1
1✔
414
        destination_payload = {
1✔
415
            "version": "1.0",
416
            "timestamp": timestamp_millis(),
417
            "requestContext": {
418
                "requestId": invocation_result.request_id,
419
                "functionArn": self.version_manager.function_version.qualified_arn,
420
                "condition": failure_cause,
421
                "approximateInvokeCount": approximate_invoke_count,
422
            },
423
            "requestPayload": json.loads(to_str(original_payload)),
424
        }
425
        if failure_cause != "ZeroReservedConcurrency":
1✔
426
            destination_payload["responseContext"] = {
1✔
427
                "statusCode": 200,
428
                "executedVersion": self.version_manager.function_version.id.qualifier,
429
                "functionError": "Unhandled",
430
            }
431
            destination_payload["responsePayload"] = json.loads(to_str(invocation_result.payload))
1✔
432

433
        target_arn = event_invoke_config.destination_config["OnFailure"]["Destination"]
1✔
434
        try:
1✔
435
            send_event_to_target(
1✔
436
                target_arn=target_arn,
437
                event=destination_payload,
438
                role=self.version_manager.function_version.config.role,
439
                source_arn=self.version_manager.function_version.id.unqualified_arn(),
440
                source_service="lambda",
441
                events_source="lambda",
442
                events_detail_type="Lambda Function Invocation Result - Failure",
443
            )
444
        except Exception as e:
×
445
            LOG.warning("Error sending invocation result to %s: %s", target_arn, e)
×
446

447
    def process_dead_letter_queue(
1✔
448
        self,
449
        sqs_invocation: SQSInvocation,
450
        invocation_result: InvocationResult,
451
    ):
452
        LOG.debug("Handling dead letter queue for %s", self.version_manager.function_arn)
1✔
453
        try:
1✔
454
            dead_letter_queue._send_to_dead_letter_queue(
1✔
455
                source_arn=self.version_manager.function_arn,
456
                dlq_arn=self.version_manager.function_version.config.dead_letter_arn,
457
                event=json.loads(to_str(sqs_invocation.invocation.payload)),
458
                # TODO: Refactor DLQ handling by removing the invocation exception from the legacy lambda provider
459
                # TODO: Check message. Possibly remove because it is not used in the DLQ message?!
460
                error=LegacyInvocationException(
461
                    message="hi", result=to_str(invocation_result.payload)
462
                ),
463
                role=self.version_manager.function_version.config.role,
464
            )
465
        except Exception as e:
×
466
            LOG.warning(
×
467
                "Error sending invocation result to DLQ %s: %s",
468
                self.version_manager.function_version.config.dead_letter_arn,
469
                e,
470
            )
471

472

473
class LambdaEventManager:
1✔
474
    version_manager: LambdaVersionManager
1✔
475
    poller: Poller | None
1✔
476
    poller_thread: FuncThread | None
1✔
477
    event_queue_url: str | None
1✔
478
    lifecycle_lock: threading.RLock
1✔
479
    stopped: threading.Event
1✔
480

481
    def __init__(self, version_manager: LambdaVersionManager):
1✔
482
        self.version_manager = version_manager
1✔
483
        self.poller = None
1✔
484
        self.poller_thread = None
1✔
485
        self.event_queue_url = None
1✔
486
        self.lifecycle_lock = threading.RLock()
1✔
487
        self.stopped = threading.Event()
1✔
488

489
    def enqueue_event(self, invocation: Invocation) -> None:
1✔
490
        message_body = SQSInvocation(invocation).encode()
1✔
491
        sqs_client = get_sqs_client(self.version_manager.function_version)
1✔
492
        try:
1✔
493
            sqs_client.send_message(QueueUrl=self.event_queue_url, MessageBody=message_body)
1✔
494
        except Exception:
×
495
            LOG.error(
×
496
                "Failed to enqueue Lambda event into queue %s. Invocation: request_id=%s, invoked_arn=%s",
497
                self.event_queue_url,
498
                invocation.request_id,
499
                invocation.invoked_arn,
500
            )
501
            raise
×
502

503
    def start(self) -> None:
1✔
504
        LOG.debug(
1✔
505
            "Starting event manager %s id %s",
506
            self.version_manager.function_version.id.qualified_arn(),
507
            id(self),
508
        )
509
        with self.lifecycle_lock:
1✔
510
            if self.stopped.is_set():
1✔
511
                LOG.debug("Event manager already stopped before started.")
×
512
                return
×
513
            sqs_client = get_sqs_client(self.version_manager.function_version)
1✔
514
            function_id = self.version_manager.function_version.id
1✔
515
            # Truncate function name to ensure queue name limit of max 80 characters
516
            function_name_short = function_id.function_name[:47]
1✔
517
            # The instance id MUST be unique to the function and a given LocalStack instance
518
            queue_namespace = (
1✔
519
                f"{function_id.qualified_arn()}-{self.version_manager.function.instance_id}"
520
            )
521
            queue_name = f"{function_name_short}-{md5(queue_namespace)}"
1✔
522
            create_queue_response = sqs_client.create_queue(QueueName=queue_name)
1✔
523
            self.event_queue_url = create_queue_response["QueueUrl"]
1✔
524
            # We don't need to purge the queue for persistence or cloud pods because the instance id is MUST be unique
525

526
            self.poller = Poller(self.version_manager, self.event_queue_url)
1✔
527
            self.poller_thread = FuncThread(
1✔
528
                self.poller.run,
529
                name=f"lambda-poller-{function_id.function_name}:{function_id.qualifier}",
530
            )
531
            self.poller_thread.start()
1✔
532

533
    def stop_for_update(self) -> None:
1✔
534
        LOG.debug(
1✔
535
            "Stopping event manager but keep queue %s id %s",
536
            self.version_manager.function_version.qualified_arn,
537
            id(self),
538
        )
539
        with self.lifecycle_lock:
1✔
540
            if self.stopped.is_set():
1✔
541
                LOG.debug("Event manager already stopped!")
×
542
                return
×
543
            self.stopped.set()
1✔
544
            if self.poller:
1✔
545
                self.poller.stop()
1✔
546
                self.poller_thread.join(timeout=3)
1✔
547
                LOG.debug("Waited for poller thread %s", self.poller_thread)
1✔
548
                if self.poller_thread.is_alive():
1✔
549
                    LOG.error("Poller did not shutdown %s", self.poller_thread)
×
550
                self.poller = None
1✔
551

552
    def stop(self) -> None:
1✔
553
        LOG.debug(
1✔
554
            "Stopping event manager %s: %s id %s",
555
            self.version_manager.function_version.qualified_arn,
556
            self.poller,
557
            id(self),
558
        )
559
        with self.lifecycle_lock:
1✔
560
            if self.stopped.is_set():
1✔
561
                LOG.debug("Event manager already stopped!")
×
562
                return
×
563
            self.stopped.set()
1✔
564
            if self.poller:
1✔
565
                self.poller.stop()
1✔
566
                self.poller_thread.join(timeout=3)
1✔
567
                LOG.debug("Waited for poller thread %s", self.poller_thread)
1✔
568
                if self.poller_thread.is_alive():
1✔
569
                    LOG.error("Poller did not shutdown %s", self.poller_thread)
×
570
                self.poller = None
1✔
571
            if self.event_queue_url:
1✔
572
                sqs_client = get_sqs_client(
1✔
573
                    self.version_manager.function_version, client_config=CLIENT_CONFIG
574
                )
575
                sqs_client.delete_queue(QueueUrl=self.event_queue_url)
1✔
576
                self.event_queue_url = None
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc