• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20420150350

19 Dec 2025 10:27AM UTC coverage: 86.92% (+0.007%) from 86.913%
20420150350

push

github

web-flow
Fix Lambda CI log pollution issues (#13546)

2 of 4 new or added lines in 1 file covered. (50.0%)

75 existing lines in 6 files now uncovered.

70016 of 80552 relevant lines covered (86.92%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.25
/localstack-core/localstack/services/lambda_/event_source_mapping/esm_worker.py
1
import logging
1✔
2
import threading
1✔
3
from enum import StrEnum
1✔
4

5
from localstack.aws.api.lambda_ import (
1✔
6
    EventSourceMappingConfiguration,
7
)
8
from localstack.config import (
1✔
9
    LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_EMPTY_POLL_SEC,
10
    LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_ERROR_SEC,
11
    LAMBDA_EVENT_SOURCE_MAPPING_POLL_INTERVAL_SEC,
12
)
13
from localstack.services.lambda_.analytics import EsmExecutionStatus, esm_counter
1✔
14
from localstack.services.lambda_.event_source_mapping.pollers.poller import (
1✔
15
    EmptyPollResultsException,
16
    Poller,
17
)
18
from localstack.services.lambda_.invocation.models import LambdaStore, lambda_stores
1✔
19
from localstack.services.lambda_.provider_utils import get_function_version_from_arn
1✔
20
from localstack.utils.aws.arns import parse_arn
1✔
21
from localstack.utils.backoff import ExponentialBackoff
1✔
22
from localstack.utils.threads import FuncThread
1✔
23

24
LOG = logging.getLogger(__name__)
1✔
25

26

27
class EsmState(StrEnum):
1✔
28
    # https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-response-State
29
    CREATING = "Creating"
1✔
30
    ENABLING = "Enabling"
1✔
31
    ENABLED = "Enabled"
1✔
32
    DISABLING = "Disabling"
1✔
33
    DISABLED = "Disabled"
1✔
34
    UPDATING = "Updating"
1✔
35
    DELETING = "Deleting"
1✔
36

37

38
class EsmStateReason(StrEnum):
1✔
39
    # Used for Kinesis and DynamoDB
40
    USER_ACTION = "User action"
1✔
41
    # Used for SQS
42
    USER_INITIATED = "USER_INITIATED"
1✔
43
    NO_RECORDS_PROCESSED = "No records processed"
1✔
44
    # TODO: add others?
45

46

47
class EsmWorker:
1✔
48
    esm_config: EventSourceMappingConfiguration
1✔
49
    enabled: bool
1✔
50
    current_state: EsmState
1✔
51
    state_transition_reason: EsmStateReason
1✔
52
    # Either USER_ACTION or USER_INITIATED (SQS) depending on the event source
53
    user_state_reason: EsmStateReason
1✔
54
    # TODO: test
55
    last_processing_result: str
1✔
56

57
    poller: Poller
1✔
58

59
    _state: LambdaStore
1✔
60
    _state_lock: threading.RLock
1✔
61
    _shutdown_event: threading.Event
1✔
62
    _poller_thread: FuncThread | None
1✔
63

64
    def __init__(
1✔
65
        self,
66
        esm_config: EventSourceMappingConfiguration,
67
        poller: Poller,
68
        enabled: bool = True,
69
        user_state_reason: EsmStateReason = EsmStateReason.USER_ACTION,
70
    ):
71
        self.esm_config = esm_config
1✔
72
        self.enabled = enabled
1✔
73
        self.current_state = EsmState.CREATING
1✔
74
        self.user_state_reason = user_state_reason
1✔
75
        self.state_transition_reason = self.user_state_reason
1✔
76

77
        self.poller = poller
1✔
78

79
        # TODO: implement lifecycle locking
80
        self._state_lock = threading.RLock()
1✔
81
        self._shutdown_event = threading.Event()
1✔
82
        self._poller_thread = None
1✔
83

84
        function_version = get_function_version_from_arn(self.esm_config["FunctionArn"])
1✔
85
        self._state = lambda_stores[function_version.id.account][function_version.id.region]
1✔
86

87
        # HACK: Flag used to check if a graceful shutdown was triggered.
88
        self._graceful_shutdown_triggered = False
1✔
89

90
    @property
1✔
91
    def uuid(self) -> str:
1✔
92
        return self.esm_config["UUID"]
1✔
93

94
    def stop_for_shutdown(self):
1✔
95
        # Signal the worker's poller_loop thread to gracefully shutdown
96
        # TODO: Once ESM state is de-coupled from lambda store, re-think this approach.
97
        self._shutdown_event.set()
1✔
98
        self._graceful_shutdown_triggered = True
1✔
99

100
    def create(self):
1✔
101
        if self.enabled:
1✔
102
            with self._state_lock:
1✔
103
                self.current_state = EsmState.CREATING
1✔
104
                self.state_transition_reason = self.user_state_reason
1✔
105
            self.start()
1✔
106
        else:
107
            # TODO: validate with tests
108
            with self._state_lock:
1✔
109
                self.current_state = EsmState.DISABLED
1✔
110
                self.state_transition_reason = self.user_state_reason
1✔
111
            self.update_esm_state_in_store(EsmState.DISABLED)
1✔
112

113
    def start(self):
1✔
114
        with self._state_lock:
1✔
115
            self.enabled = True
1✔
116
            # CREATING state takes precedence over ENABLING
117
            if self.current_state != EsmState.CREATING:
1✔
118
                self.current_state = EsmState.ENABLING
×
119
                self.state_transition_reason = self.user_state_reason
×
120
        # Reset the shutdown event such that we don't stop immediately after a restart
121
        self._shutdown_event.clear()
1✔
122
        self._poller_thread = FuncThread(
1✔
123
            self.poller_loop,
124
            name=f"event-source-mapping-poller-{self.uuid}",
125
        )
126
        self._poller_thread.start()
1✔
127

128
    def stop(self):
1✔
129
        with self._state_lock:
1✔
130
            self.enabled = False
1✔
131
            self.current_state = EsmState.DISABLING
1✔
132
            self.update_esm_state_in_store(EsmState.DISABLING)
1✔
133
            self.state_transition_reason = self.user_state_reason
1✔
134
        self._shutdown_event.set()
1✔
135

136
    def delete(self):
1✔
137
        with self._state_lock:
1✔
138
            self.current_state = EsmState.DELETING
1✔
139
            self.update_esm_state_in_store(EsmState.DELETING)
1✔
140
            self.state_transition_reason = self.user_state_reason
1✔
141
        self._shutdown_event.set()
1✔
142

143
    def poller_loop(self, *args, **kwargs):
1✔
144
        with self._state_lock:
1✔
145
            self.current_state = EsmState.ENABLED
1✔
146
            self.update_esm_state_in_store(EsmState.ENABLED)
1✔
147
            self.state_transition_reason = self.user_state_reason
1✔
148

149
        error_boff = ExponentialBackoff(
1✔
150
            initial_interval=2, max_interval=LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_ERROR_SEC
151
        )
152
        empty_boff = ExponentialBackoff(
1✔
153
            initial_interval=1,
154
            max_interval=LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_EMPTY_POLL_SEC,
155
        )
156

157
        poll_interval_duration = LAMBDA_EVENT_SOURCE_MAPPING_POLL_INTERVAL_SEC
1✔
158

159
        while not self._shutdown_event.is_set():
1✔
160
            try:
1✔
161
                # TODO: update state transition reason?
162
                self.poller.poll_events()
1✔
163

164
                # If no exception encountered, reset the backoff
165
                error_boff.reset()
1✔
166
                empty_boff.reset()
1✔
167

168
                # Set the poll frequency back to the default
169
                poll_interval_duration = LAMBDA_EVENT_SOURCE_MAPPING_POLL_INTERVAL_SEC
1✔
170
            except EmptyPollResultsException as miss_ex:
1✔
171
                # If the event source is empty, backoff
172
                poll_interval_duration = empty_boff.next_backoff()
1✔
173
                LOG.debug(
1✔
174
                    "The event source %s is empty. Backing off for %.2f seconds until next request.",
175
                    miss_ex.source_arn,
176
                    poll_interval_duration,
177
                )
UNCOV
178
            except Exception as e:
×
UNCOV
179
                LOG.error(
×
180
                    "Error while polling messages for event source %s: %s",
181
                    self.esm_config.get("EventSourceArn")
182
                    or self.esm_config.get("SelfManagedEventSource"),
183
                    e,
184
                    exc_info=LOG.isEnabledFor(logging.DEBUG),
185
                )
UNCOV
186
                event_source = parse_arn(self.esm_config.get("EventSourceArn")).get("service")
×
UNCOV
187
                esm_counter.labels(
×
188
                    source=event_source, status=EsmExecutionStatus.source_poller_error
189
                ).increment()
190
                # Wait some time between retries to avoid running into the problem right again
UNCOV
191
                poll_interval_duration = error_boff.next_backoff()
×
192
            finally:
193
                self._shutdown_event.wait(poll_interval_duration)
1✔
194

195
        # Optionally closes internal components of Poller. This is a no-op for unimplemented pollers.
196
        self.poller.close()
1✔
197

198
        try:
1✔
199
            # Update state in store after async stop or delete
200
            if self.enabled and self.current_state == EsmState.DELETING:
1✔
201
                # TODO: we also need to remove the ESM worker reference from the Lambda provider to esm_worker
202
                # TODO: proper locking for store updates
203
                self.delete_esm_in_store()
1✔
204
            elif not self.enabled and self.current_state == EsmState.DISABLING:
1✔
205
                with self._state_lock:
1✔
206
                    self.current_state = EsmState.DISABLED
1✔
207
                    self.state_transition_reason = self.user_state_reason
1✔
208
                self.update_esm_state_in_store(EsmState.DISABLED)
1✔
209
            elif not self._graceful_shutdown_triggered:
1✔
210
                # HACK: If we reach this state and a graceful shutdown was not triggered, log a warning to indicate
211
                # an unexpected state.
212
                LOG.warning(
×
213
                    "Invalid state %s for event source mapping %s.",
214
                    self.current_state,
215
                    self.esm_config["UUID"],
216
                )
217
        except Exception as e:
×
218
            LOG.warning(
×
219
                "Failed to update state %s for event source mapping %s. Exception: %s ",
220
                self.current_state,
221
                self.esm_config["UUID"],
222
                e,
223
                exc_info=LOG.isEnabledFor(logging.DEBUG),
224
            )
225

226
    def delete_esm_in_store(self):
1✔
227
        self._state.event_source_mappings.pop(self.esm_config["UUID"], None)
1✔
228

229
    # TODO: how can we handle async state updates better? Async deletion or disabling needs to update the model state.
230
    def update_esm_state_in_store(self, new_state: EsmState):
1✔
231
        esm_update = {"State": new_state}
1✔
232
        # TODO: add proper locking for store updates
233
        self._state.event_source_mappings[self.esm_config["UUID"]].update(esm_update)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc