• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
Build has been canceled!

localstack / localstack / dca9ca20-d04b-4556-9d5e-54a876ecdd65

18 Feb 2025 04:38PM UTC coverage: 86.857% (-0.03%) from 86.888%
dca9ca20-d04b-4556-9d5e-54a876ecdd65

push

circleci

web-flow
[Utils] Create exponential backoff utility class (#12264)

36 of 39 new or added lines in 1 file covered. (92.31%)

23 existing lines in 14 files now uncovered.

61541 of 70853 relevant lines covered (86.86%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.6
/localstack-core/localstack/services/lambda_/invocation/event_manager.py
1
import base64
1✔
2
import dataclasses
1✔
3
import json
1✔
4
import logging
1✔
5
import threading
1✔
6
import time
1✔
7
from concurrent.futures import ThreadPoolExecutor
1✔
8
from datetime import datetime
1✔
9
from math import ceil
1✔
10

11
from botocore.config import Config
1✔
12

13
from localstack import config
1✔
14
from localstack.aws.api.lambda_ import TooManyRequestsException
1✔
15
from localstack.services.lambda_.invocation.internal_sqs_queue import get_fake_sqs_client
1✔
16
from localstack.services.lambda_.invocation.lambda_models import (
1✔
17
    EventInvokeConfig,
18
    FunctionVersion,
19
    Invocation,
20
    InvocationResult,
21
)
22
from localstack.services.lambda_.invocation.version_manager import LambdaVersionManager
1✔
23
from localstack.utils.aws import dead_letter_queue
1✔
24
from localstack.utils.aws.message_forwarding import send_event_to_target
1✔
25
from localstack.utils.strings import md5, to_str
1✔
26
from localstack.utils.threads import FuncThread
1✔
27
from localstack.utils.time import timestamp_millis
1✔
28
from localstack.utils.xray.trace_header import TraceHeader
1✔
29

30
LOG = logging.getLogger(__name__)
1✔
31

32

33
def get_sqs_client(function_version: FunctionVersion, client_config=None):
1✔
34
    return get_fake_sqs_client()
1✔
35

36

37
# TODO: remove once DLQ handling is refactored following the removal of the legacy lambda provider
38
class LegacyInvocationException(Exception):
1✔
39
    def __init__(self, message, log_output=None, result=None):
1✔
40
        super(LegacyInvocationException, self).__init__(message)
1✔
41
        self.log_output = log_output
1✔
42
        self.result = result
1✔
43

44

45
@dataclasses.dataclass
1✔
46
class SQSInvocation:
1✔
47
    invocation: Invocation
1✔
48
    retries: int = 0
1✔
49
    exception_retries: int = 0
1✔
50

51
    def encode(self) -> str:
1✔
52
        # Encode TraceHeader as string
53
        aws_trace_header = self.invocation.trace_context.get("aws_trace_header")
1✔
54
        aws_trace_header_str = aws_trace_header.to_header_str()
1✔
55
        self.invocation.trace_context["aws_trace_header"] = aws_trace_header_str
1✔
56
        return json.dumps(
1✔
57
            {
58
                "payload": to_str(base64.b64encode(self.invocation.payload)),
59
                "invoked_arn": self.invocation.invoked_arn,
60
                "client_context": self.invocation.client_context,
61
                "invocation_type": self.invocation.invocation_type,
62
                "invoke_time": self.invocation.invoke_time.isoformat(),
63
                # = invocation_id
64
                "request_id": self.invocation.request_id,
65
                "retries": self.retries,
66
                "exception_retries": self.exception_retries,
67
                "trace_context": self.invocation.trace_context,
68
            }
69
        )
70

71
    @classmethod
1✔
72
    def decode(cls, message: str) -> "SQSInvocation":
1✔
73
        invocation_dict = json.loads(message)
1✔
74
        invocation = Invocation(
1✔
75
            payload=base64.b64decode(invocation_dict["payload"]),
76
            invoked_arn=invocation_dict["invoked_arn"],
77
            client_context=invocation_dict["client_context"],
78
            invocation_type=invocation_dict["invocation_type"],
79
            invoke_time=datetime.fromisoformat(invocation_dict["invoke_time"]),
80
            request_id=invocation_dict["request_id"],
81
            trace_context=invocation_dict.get("trace_context"),
82
        )
83
        # Decode TraceHeader
84
        aws_trace_header_str = invocation_dict.get("trace_context", {}).get("aws_trace_header")
1✔
85
        invocation_dict["trace_context"]["aws_trace_header"] = TraceHeader.from_header_str(
1✔
86
            aws_trace_header_str
87
        )
88
        return cls(
1✔
89
            invocation=invocation,
90
            retries=invocation_dict["retries"],
91
            exception_retries=invocation_dict["exception_retries"],
92
        )
93

94

95
def has_enough_time_for_retry(
1✔
96
    sqs_invocation: SQSInvocation, event_invoke_config: EventInvokeConfig
97
) -> bool:
98
    time_passed = datetime.now() - sqs_invocation.invocation.invoke_time
1✔
99
    delay_queue_invoke_seconds = (
1✔
100
        sqs_invocation.retries + 1
101
    ) * config.LAMBDA_RETRY_BASE_DELAY_SECONDS
102
    # 6 hours is the default based on these AWS sources:
103
    # https://repost.aws/questions/QUd214DdOQRkKWr7D8IuSMIw/why-is-aws-lambda-eventinvokeconfig-s-limit-for-maximumretryattempts-2
104
    # https://aws.amazon.com/blogs/compute/introducing-new-asynchronous-invocation-metrics-for-aws-lambda/
105
    # https://aws.amazon.com/about-aws/whats-new/2019/11/aws-lambda-supports-max-retry-attempts-event-age-asynchronous-invocations/
106
    maximum_event_age_in_seconds = 6 * 60 * 60
1✔
107
    if event_invoke_config and event_invoke_config.maximum_event_age_in_seconds is not None:
1✔
108
        maximum_event_age_in_seconds = event_invoke_config.maximum_event_age_in_seconds
1✔
109
    return (
1✔
110
        maximum_event_age_in_seconds
111
        and ceil(time_passed.total_seconds()) + delay_queue_invoke_seconds
112
        <= maximum_event_age_in_seconds
113
    )
114

115

116
# TODO: optimize this client configuration. Do we need to consider client caching here?
117
CLIENT_CONFIG = Config(
1✔
118
    connect_timeout=5,
119
    read_timeout=10,
120
    retries={"max_attempts": 0},
121
)
122

123

124
class Poller:
1✔
125
    version_manager: LambdaVersionManager
1✔
126
    event_queue_url: str
1✔
127
    _shutdown_event: threading.Event
1✔
128
    invoker_pool: ThreadPoolExecutor
1✔
129

130
    def __init__(self, version_manager: LambdaVersionManager, event_queue_url: str):
1✔
131
        self.version_manager = version_manager
1✔
132
        self.event_queue_url = event_queue_url
1✔
133
        self._shutdown_event = threading.Event()
1✔
134
        function_id = self.version_manager.function_version.id
1✔
135
        # TODO: think about scaling, test it, make it configurable?!
136
        self.invoker_pool = ThreadPoolExecutor(
1✔
137
            thread_name_prefix=f"lambda-invoker-{function_id.function_name}:{function_id.qualifier}"
138
        )
139

140
    def run(self, *args, **kwargs):
1✔
141
        sqs_client = get_sqs_client(
1✔
142
            self.version_manager.function_version, client_config=CLIENT_CONFIG
143
        )
144
        function_timeout = self.version_manager.function_version.config.timeout
1✔
145
        while not self._shutdown_event.is_set():
1✔
146
            try:
1✔
147
                response = sqs_client.receive_message(
1✔
148
                    QueueUrl=self.event_queue_url,
149
                    # TODO: consider replacing with short polling instead of long polling to prevent keeping connections open
150
                    # however, we had some serious performance issues when tried out, so those have to be investigated first
151
                    WaitTimeSeconds=2,
152
                    # Related: SQS event source mapping batches up to 10 messages:
153
                    # https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html
154
                    MaxNumberOfMessages=10,
155
                    VisibilityTimeout=function_timeout + 60,
156
                )
157
                if not response.get("Messages"):
1✔
158
                    continue
1✔
159
                LOG.debug("[%s] Got %d messages", self.event_queue_url, len(response["Messages"]))
1✔
160
                # Guard against shutdown event arriving while polling SQS for messages
161
                if not self._shutdown_event.is_set():
1✔
162
                    for message in response["Messages"]:
1✔
163
                        # NOTE: queueing within the thread pool executor could lead to double executions
164
                        #  due to the visibility timeout
165
                        self.invoker_pool.submit(self.handle_message, message)
1✔
166

167
            except Exception as e:
×
168
                # TODO: if the gateway shuts down before the shutdown event even is set,
169
                #  we might still get an error message
170
                # after shutdown of LS, we might expectedly get errors, if other components shut down.
171
                # In any case, after the event manager is shut down, we do not need to spam error logs in case
172
                # some resource is already missing
173
                if self._shutdown_event.is_set():
×
174
                    return
×
175
                LOG.error(
×
176
                    "Error while polling lambda events for function %s: %s",
177
                    self.version_manager.function_version.qualified_arn,
178
                    e,
179
                    exc_info=LOG.isEnabledFor(logging.DEBUG),
180
                )
181
                # some time between retries to avoid running into the problem right again
182
                time.sleep(1)
×
183

184
    def stop(self):
1✔
185
        LOG.debug(
1✔
186
            "Stopping event poller %s %s",
187
            self.version_manager.function_version.qualified_arn,
188
            id(self),
189
        )
190
        self._shutdown_event.set()
1✔
191
        self.invoker_pool.shutdown(cancel_futures=True, wait=False)
1✔
192

193
    def handle_message(self, message: dict) -> None:
1✔
194
        failure_cause = None
1✔
195
        qualifier = self.version_manager.function_version.id.qualifier
1✔
196
        event_invoke_config = self.version_manager.function.event_invoke_configs.get(qualifier)
1✔
197
        try:
1✔
198
            sqs_invocation = SQSInvocation.decode(message["Body"])
1✔
199
            invocation = sqs_invocation.invocation
1✔
200
            try:
1✔
201
                invocation_result = self.version_manager.invoke(invocation=invocation)
1✔
202
            except Exception as e:
1✔
203
                # Reserved concurrency == 0
204
                if self.version_manager.function.reserved_concurrent_executions == 0:
1✔
205
                    failure_cause = "ZeroReservedConcurrency"
1✔
206
                # Maximum event age expired (lookahead for next retry)
207
                elif not has_enough_time_for_retry(sqs_invocation, event_invoke_config):
1✔
208
                    failure_cause = "EventAgeExceeded"
×
209
                if failure_cause:
1✔
210
                    invocation_result = InvocationResult(
1✔
211
                        is_error=True, request_id=invocation.request_id, payload=None, logs=None
212
                    )
213
                    self.process_failure_destination(
1✔
214
                        sqs_invocation, invocation_result, event_invoke_config, failure_cause
215
                    )
216
                    self.process_dead_letter_queue(sqs_invocation, invocation_result)
1✔
217
                    return
1✔
218
                # 3) Otherwise, retry without increasing counter
219
                self.process_throttles_and_system_errors(sqs_invocation, e)
1✔
220
                return
1✔
221
            finally:
222
                sqs_client = get_sqs_client(self.version_manager.function_version)
1✔
223
                sqs_client.delete_message(
1✔
224
                    QueueUrl=self.event_queue_url, ReceiptHandle=message["ReceiptHandle"]
225
                )
226

227
            # Good summary blogpost: https://haithai91.medium.com/aws-lambdas-retry-behaviors-edff90e1cf1b
228
            # Asynchronous invocation handling: https://docs.aws.amazon.com/lambda/latest/dg/invocation-async.html
229
            # https://aws.amazon.com/blogs/compute/introducing-new-asynchronous-invocation-metrics-for-aws-lambda/
230
            max_retry_attempts = 2
1✔
231
            if event_invoke_config and event_invoke_config.maximum_retry_attempts is not None:
1✔
232
                max_retry_attempts = event_invoke_config.maximum_retry_attempts
1✔
233

234
            # An invocation error either leads to a terminal failure or to a scheduled retry
235
            if invocation_result.is_error:  # invocation error
1✔
236
                failure_cause = None
1✔
237
                # Reserved concurrency == 0
238
                if self.version_manager.function.reserved_concurrent_executions == 0:
1✔
239
                    failure_cause = "ZeroReservedConcurrency"
×
240
                # Maximum retries exhausted
241
                elif sqs_invocation.retries >= max_retry_attempts:
1✔
242
                    failure_cause = "RetriesExhausted"
1✔
243
                # TODO: test what happens if max event age expired before it gets scheduled the first time?!
244
                # Maximum event age expired (lookahead for next retry)
245
                elif not has_enough_time_for_retry(sqs_invocation, event_invoke_config):
1✔
246
                    failure_cause = "EventAgeExceeded"
1✔
247

248
                if failure_cause:  # handle failure destination and DLQ
1✔
249
                    self.process_failure_destination(
1✔
250
                        sqs_invocation, invocation_result, event_invoke_config, failure_cause
251
                    )
252
                    self.process_dead_letter_queue(sqs_invocation, invocation_result)
1✔
253
                    return
1✔
254
                else:  # schedule retry
255
                    sqs_invocation.retries += 1
1✔
256
                    # Assumption: We assume that the internal exception retries counter is reset after
257
                    #  an invocation that does not throw an exception
258
                    sqs_invocation.exception_retries = 0
1✔
259
                    # LAMBDA_RETRY_BASE_DELAY_SECONDS has a limit of 300s because the maximum SQS DelaySeconds
260
                    # is 15 minutes (900s) and the maximum retry count is 3. SQS quota for "Message timer":
261
                    # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-messages.html
262
                    delay_seconds = sqs_invocation.retries * config.LAMBDA_RETRY_BASE_DELAY_SECONDS
1✔
263
                    # TODO: max SQS message size limit could break parity with AWS because
264
                    #  our SQSInvocation contains additional fields! 256kb is max for both Lambda payload + SQS
265
                    # TODO: write test with max SQS message size
266
                    sqs_client.send_message(
1✔
267
                        QueueUrl=self.event_queue_url,
268
                        MessageBody=sqs_invocation.encode(),
269
                        DelaySeconds=delay_seconds,
270
                    )
271
                    return
1✔
272
            else:  # invocation success
273
                self.process_success_destination(
1✔
274
                    sqs_invocation, invocation_result, event_invoke_config
275
                )
UNCOV
276
        except Exception as e:
×
UNCOV
277
            LOG.error(
×
278
                "Error handling lambda invoke %s", e, exc_info=LOG.isEnabledFor(logging.DEBUG)
279
            )
280

281
    def process_throttles_and_system_errors(self, sqs_invocation: SQSInvocation, error: Exception):
1✔
282
        # If the function doesn't have enough concurrency available to process all events, additional
283
        # requests are throttled. For throttling errors (429) and system errors (500-series), Lambda returns
284
        # the event to the queue and attempts to run the function again for up to 6 hours. The retry interval
285
        # increases exponentially from 1 second after the first attempt to a maximum of 5 minutes. If the
286
        # queue contains many entries, Lambda increases the retry interval and reduces the rate at which it
287
        # reads events from the queue. Source:
288
        # https://docs.aws.amazon.com/lambda/latest/dg/invocation-async.html
289
        # Difference depending on error cause:
290
        # https://aws.amazon.com/blogs/compute/introducing-new-asynchronous-invocation-metrics-for-aws-lambda/
291
        # Troubleshooting 500 errors:
292
        # https://repost.aws/knowledge-center/lambda-troubleshoot-invoke-error-502-500
293
        if isinstance(error, TooManyRequestsException):  # Throttles 429
1✔
294
            LOG.debug("Throttled lambda %s: %s", self.version_manager.function_arn, error)
1✔
295
        else:  # System errors 5xx
296
            LOG.debug(
1✔
297
                "Service exception in lambda %s: %s", self.version_manager.function_arn, error
298
            )
299
        maximum_exception_retry_delay_seconds = 5 * 60
1✔
300
        delay_seconds = min(
1✔
301
            2**sqs_invocation.exception_retries, maximum_exception_retry_delay_seconds
302
        )
303
        # TODO: calculate delay seconds into max event age handling
304
        sqs_client = get_sqs_client(self.version_manager.function_version)
1✔
305
        sqs_client.send_message(
1✔
306
            QueueUrl=self.event_queue_url,
307
            MessageBody=sqs_invocation.encode(),
308
            DelaySeconds=delay_seconds,
309
        )
310

311
    def process_success_destination(
1✔
312
        self,
313
        sqs_invocation: SQSInvocation,
314
        invocation_result: InvocationResult,
315
        event_invoke_config: EventInvokeConfig | None,
316
    ) -> None:
317
        if event_invoke_config is None:
1✔
318
            return
1✔
319
        success_destination = event_invoke_config.destination_config.get("OnSuccess", {}).get(
1✔
320
            "Destination"
321
        )
322
        if success_destination is None:
1✔
323
            return
1✔
324
        LOG.debug("Handling success destination for %s", self.version_manager.function_arn)
1✔
325

326
        original_payload = sqs_invocation.invocation.payload
1✔
327
        destination_payload = {
1✔
328
            "version": "1.0",
329
            "timestamp": timestamp_millis(),
330
            "requestContext": {
331
                "requestId": invocation_result.request_id,
332
                "functionArn": self.version_manager.function_version.qualified_arn,
333
                "condition": "Success",
334
                "approximateInvokeCount": sqs_invocation.retries + 1,
335
            },
336
            "requestPayload": json.loads(to_str(original_payload)),
337
            "responseContext": {
338
                "statusCode": 200,
339
                "executedVersion": self.version_manager.function_version.id.qualifier,
340
            },
341
            "responsePayload": json.loads(to_str(invocation_result.payload or {})),
342
        }
343

344
        target_arn = event_invoke_config.destination_config["OnSuccess"]["Destination"]
1✔
345
        try:
1✔
346
            send_event_to_target(
1✔
347
                target_arn=target_arn,
348
                event=destination_payload,
349
                role=self.version_manager.function_version.config.role,
350
                source_arn=self.version_manager.function_version.id.unqualified_arn(),
351
                source_service="lambda",
352
                events_source="lambda",
353
                events_detail_type="Lambda Function Invocation Result - Success",
354
            )
355
        except Exception as e:
×
356
            LOG.warning("Error sending invocation result to %s: %s", target_arn, e)
×
357

358
    def process_failure_destination(
1✔
359
        self,
360
        sqs_invocation: SQSInvocation,
361
        invocation_result: InvocationResult,
362
        event_invoke_config: EventInvokeConfig | None,
363
        failure_cause: str,
364
    ):
365
        if event_invoke_config is None:
1✔
366
            return
1✔
367
        failure_destination = event_invoke_config.destination_config.get("OnFailure", {}).get(
1✔
368
            "Destination"
369
        )
370
        if failure_destination is None:
1✔
371
            return
1✔
372
        LOG.debug("Handling failure destination for %s", self.version_manager.function_arn)
1✔
373

374
        original_payload = sqs_invocation.invocation.payload
1✔
375
        if failure_cause == "ZeroReservedConcurrency":
1✔
376
            approximate_invoke_count = sqs_invocation.retries
1✔
377
        else:
378
            approximate_invoke_count = sqs_invocation.retries + 1
1✔
379
        destination_payload = {
1✔
380
            "version": "1.0",
381
            "timestamp": timestamp_millis(),
382
            "requestContext": {
383
                "requestId": invocation_result.request_id,
384
                "functionArn": self.version_manager.function_version.qualified_arn,
385
                "condition": failure_cause,
386
                "approximateInvokeCount": approximate_invoke_count,
387
            },
388
            "requestPayload": json.loads(to_str(original_payload)),
389
        }
390
        if failure_cause != "ZeroReservedConcurrency":
1✔
391
            destination_payload["responseContext"] = {
1✔
392
                "statusCode": 200,
393
                "executedVersion": self.version_manager.function_version.id.qualifier,
394
                "functionError": "Unhandled",
395
            }
396
            destination_payload["responsePayload"] = json.loads(to_str(invocation_result.payload))
1✔
397

398
        target_arn = event_invoke_config.destination_config["OnFailure"]["Destination"]
1✔
399
        try:
1✔
400
            send_event_to_target(
1✔
401
                target_arn=target_arn,
402
                event=destination_payload,
403
                role=self.version_manager.function_version.config.role,
404
                source_arn=self.version_manager.function_version.id.unqualified_arn(),
405
                source_service="lambda",
406
                events_source="lambda",
407
                events_detail_type="Lambda Function Invocation Result - Failure",
408
            )
409
        except Exception as e:
×
410
            LOG.warning("Error sending invocation result to %s: %s", target_arn, e)
×
411

412
    def process_dead_letter_queue(
1✔
413
        self,
414
        sqs_invocation: SQSInvocation,
415
        invocation_result: InvocationResult,
416
    ):
417
        LOG.debug("Handling dead letter queue for %s", self.version_manager.function_arn)
1✔
418
        try:
1✔
419
            dead_letter_queue._send_to_dead_letter_queue(
1✔
420
                source_arn=self.version_manager.function_arn,
421
                dlq_arn=self.version_manager.function_version.config.dead_letter_arn,
422
                event=json.loads(to_str(sqs_invocation.invocation.payload)),
423
                # TODO: Refactor DLQ handling by removing the invocation exception from the legacy lambda provider
424
                # TODO: Check message. Possibly remove because it is not used in the DLQ message?!
425
                error=LegacyInvocationException(
426
                    message="hi", result=to_str(invocation_result.payload)
427
                ),
428
                role=self.version_manager.function_version.config.role,
429
            )
430
        except Exception as e:
×
431
            LOG.warning(
×
432
                "Error sending invocation result to DLQ %s: %s",
433
                self.version_manager.function_version.config.dead_letter_arn,
434
                e,
435
            )
436

437

438
class LambdaEventManager:
1✔
439
    version_manager: LambdaVersionManager
1✔
440
    poller: Poller | None
1✔
441
    poller_thread: FuncThread | None
1✔
442
    event_queue_url: str | None
1✔
443
    lifecycle_lock: threading.RLock
1✔
444
    stopped: threading.Event
1✔
445

446
    def __init__(self, version_manager: LambdaVersionManager):
1✔
447
        self.version_manager = version_manager
1✔
448
        self.poller = None
1✔
449
        self.poller_thread = None
1✔
450
        self.event_queue_url = None
1✔
451
        self.lifecycle_lock = threading.RLock()
1✔
452
        self.stopped = threading.Event()
1✔
453

454
    def enqueue_event(self, invocation: Invocation) -> None:
1✔
455
        message_body = SQSInvocation(invocation).encode()
1✔
456
        sqs_client = get_sqs_client(self.version_manager.function_version)
1✔
457
        try:
1✔
458
            sqs_client.send_message(QueueUrl=self.event_queue_url, MessageBody=message_body)
1✔
459
        except Exception:
×
460
            LOG.error(
×
461
                "Failed to enqueue Lambda event into queue %s. Invocation: request_id=%s, invoked_arn=%s",
462
                self.event_queue_url,
463
                invocation.request_id,
464
                invocation.invoked_arn,
465
            )
466
            raise
×
467

468
    def start(self) -> None:
1✔
469
        LOG.debug(
1✔
470
            "Starting event manager %s id %s",
471
            self.version_manager.function_version.id.qualified_arn(),
472
            id(self),
473
        )
474
        with self.lifecycle_lock:
1✔
475
            if self.stopped.is_set():
1✔
476
                LOG.debug("Event manager already stopped before started.")
×
477
                return
×
478
            sqs_client = get_sqs_client(self.version_manager.function_version)
1✔
479
            function_id = self.version_manager.function_version.id
1✔
480
            # Truncate function name to ensure queue name limit of max 80 characters
481
            function_name_short = function_id.function_name[:47]
1✔
482
            # The instance id MUST be unique to the function and a given LocalStack instance
483
            queue_namespace = (
1✔
484
                f"{function_id.qualified_arn()}-{self.version_manager.function.instance_id}"
485
            )
486
            queue_name = f"{function_name_short}-{md5(queue_namespace)}"
1✔
487
            create_queue_response = sqs_client.create_queue(QueueName=queue_name)
1✔
488
            self.event_queue_url = create_queue_response["QueueUrl"]
1✔
489
            # We don't need to purge the queue for persistence or cloud pods because the instance id is MUST be unique
490

491
            self.poller = Poller(self.version_manager, self.event_queue_url)
1✔
492
            self.poller_thread = FuncThread(
1✔
493
                self.poller.run,
494
                name=f"lambda-poller-{function_id.function_name}:{function_id.qualifier}",
495
            )
496
            self.poller_thread.start()
1✔
497

498
    def stop_for_update(self) -> None:
1✔
499
        LOG.debug(
1✔
500
            "Stopping event manager but keep queue %s id %s",
501
            self.version_manager.function_version.qualified_arn,
502
            id(self),
503
        )
504
        with self.lifecycle_lock:
1✔
505
            if self.stopped.is_set():
1✔
506
                LOG.debug("Event manager already stopped!")
×
507
                return
×
508
            self.stopped.set()
1✔
509
            if self.poller:
1✔
510
                self.poller.stop()
1✔
511
                self.poller_thread.join(timeout=3)
1✔
512
                LOG.debug("Waited for poller thread %s", self.poller_thread)
1✔
513
                if self.poller_thread.is_alive():
1✔
514
                    LOG.error("Poller did not shutdown %s", self.poller_thread)
×
515
                self.poller = None
1✔
516

517
    def stop(self) -> None:
1✔
518
        LOG.debug(
1✔
519
            "Stopping event manager %s: %s id %s",
520
            self.version_manager.function_version.qualified_arn,
521
            self.poller,
522
            id(self),
523
        )
524
        with self.lifecycle_lock:
1✔
525
            if self.stopped.is_set():
1✔
526
                LOG.debug("Event manager already stopped!")
×
527
                return
×
528
            self.stopped.set()
1✔
529
            if self.poller:
1✔
530
                self.poller.stop()
1✔
531
                self.poller_thread.join(timeout=3)
1✔
532
                LOG.debug("Waited for poller thread %s", self.poller_thread)
1✔
533
                if self.poller_thread.is_alive():
1✔
534
                    LOG.error("Poller did not shutdown %s", self.poller_thread)
×
535
                self.poller = None
1✔
536
            if self.event_queue_url:
1✔
537
                sqs_client = get_sqs_client(
1✔
538
                    self.version_manager.function_version, client_config=CLIENT_CONFIG
539
                )
540
                sqs_client.delete_queue(QueueUrl=self.event_queue_url)
1✔
541
                self.event_queue_url = None
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc