• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 836de691-8bbb-4693-9ed3-7ab4d196cd89

20 Mar 2025 04:14PM UTC coverage: 86.818% (+0.009%) from 86.809%
836de691-8bbb-4693-9ed3-7ab4d196cd89

push

circleci

web-flow
S3: fix MA/MR test (#12417)

62787 of 72320 relevant lines covered (86.82%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.0
/localstack-core/localstack/services/lambda_/event_source_mapping/esm_worker.py
1
import logging
1✔
2
import threading
1✔
3
from enum import StrEnum
1✔
4

5
from localstack.aws.api.lambda_ import (
1✔
6
    EventSourceMappingConfiguration,
7
)
8
from localstack.config import (
1✔
9
    LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_EMPTY_POLL_SEC,
10
    LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_ERROR_SEC,
11
    LAMBDA_EVENT_SOURCE_MAPPING_POLL_INTERVAL_SEC,
12
)
13
from localstack.services.lambda_.event_source_mapping.pollers.poller import (
1✔
14
    EmptyPollResultsException,
15
    Poller,
16
)
17
from localstack.services.lambda_.invocation.models import LambdaStore, lambda_stores
1✔
18
from localstack.services.lambda_.provider_utils import get_function_version_from_arn
1✔
19
from localstack.utils.backoff import ExponentialBackoff
1✔
20
from localstack.utils.threads import FuncThread
1✔
21

22
LOG = logging.getLogger(__name__)
1✔
23

24

25
class EsmState(StrEnum):
1✔
26
    # https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-response-State
27
    CREATING = "Creating"
1✔
28
    ENABLING = "Enabling"
1✔
29
    ENABLED = "Enabled"
1✔
30
    DISABLING = "Disabling"
1✔
31
    DISABLED = "Disabled"
1✔
32
    UPDATING = "Updating"
1✔
33
    DELETING = "Deleting"
1✔
34

35

36
class EsmStateReason(StrEnum):
1✔
37
    # Used for Kinesis and DynamoDB
38
    USER_ACTION = "User action"
1✔
39
    # Used for SQS
40
    USER_INITIATED = "USER_INITIATED"
1✔
41
    NO_RECORDS_PROCESSED = "No records processed"
1✔
42
    # TODO: add others?
43

44

45
class EsmWorker:
1✔
46
    esm_config: EventSourceMappingConfiguration
1✔
47
    enabled: bool
1✔
48
    current_state: EsmState
1✔
49
    state_transition_reason: EsmStateReason
1✔
50
    # Either USER_ACTION or USER_INITIATED (SQS) depending on the event source
51
    user_state_reason: EsmStateReason
1✔
52
    # TODO: test
53
    last_processing_result: str
1✔
54

55
    poller: Poller
1✔
56

57
    _state: LambdaStore
1✔
58
    _state_lock: threading.RLock
1✔
59
    _shutdown_event: threading.Event
1✔
60
    _poller_thread: FuncThread | None
1✔
61

62
    def __init__(
1✔
63
        self,
64
        esm_config: EventSourceMappingConfiguration,
65
        poller: Poller,
66
        enabled: bool = True,
67
        user_state_reason: EsmStateReason = EsmStateReason.USER_ACTION,
68
    ):
69
        self.esm_config = esm_config
1✔
70
        self.enabled = enabled
1✔
71
        self.current_state = EsmState.CREATING
1✔
72
        self.user_state_reason = user_state_reason
1✔
73
        self.state_transition_reason = self.user_state_reason
1✔
74

75
        self.poller = poller
1✔
76

77
        # TODO: implement lifecycle locking
78
        self._state_lock = threading.RLock()
1✔
79
        self._shutdown_event = threading.Event()
1✔
80
        self._poller_thread = None
1✔
81

82
        function_version = get_function_version_from_arn(self.esm_config["FunctionArn"])
1✔
83
        self._state = lambda_stores[function_version.id.account][function_version.id.region]
1✔
84

85
        # HACK: Flag used to check if a graceful shutdown was triggered.
86
        self._graceful_shutdown_triggered = False
1✔
87

88
    @property
1✔
89
    def uuid(self) -> str:
1✔
90
        return self.esm_config["UUID"]
1✔
91

92
    def stop_for_shutdown(self):
1✔
93
        # Signal the worker's poller_loop thread to gracefully shutdown
94
        # TODO: Once ESM state is de-coupled from lambda store, re-think this approach.
95
        self._shutdown_event.set()
1✔
96
        self._graceful_shutdown_triggered = True
1✔
97

98
    def create(self):
1✔
99
        if self.enabled:
1✔
100
            with self._state_lock:
1✔
101
                self.current_state = EsmState.CREATING
1✔
102
                self.state_transition_reason = self.user_state_reason
1✔
103
            self.start()
1✔
104
        else:
105
            # TODO: validate with tests
106
            with self._state_lock:
1✔
107
                self.current_state = EsmState.DISABLED
1✔
108
                self.state_transition_reason = self.user_state_reason
1✔
109
            self.update_esm_state_in_store(EsmState.DISABLED)
1✔
110

111
    def start(self):
1✔
112
        with self._state_lock:
1✔
113
            self.enabled = True
1✔
114
            # CREATING state takes precedence over ENABLING
115
            if self.current_state != EsmState.CREATING:
1✔
116
                self.current_state = EsmState.ENABLING
×
117
                self.state_transition_reason = self.user_state_reason
×
118
        # Reset the shutdown event such that we don't stop immediately after a restart
119
        self._shutdown_event.clear()
1✔
120
        self._poller_thread = FuncThread(
1✔
121
            self.poller_loop,
122
            name=f"event-source-mapping-poller-{self.uuid}",
123
        )
124
        self._poller_thread.start()
1✔
125

126
    def stop(self):
1✔
127
        with self._state_lock:
1✔
128
            self.enabled = False
1✔
129
            self.current_state = EsmState.DISABLING
1✔
130
            self.update_esm_state_in_store(EsmState.DISABLING)
1✔
131
            self.state_transition_reason = self.user_state_reason
1✔
132
        self._shutdown_event.set()
1✔
133

134
    def delete(self):
1✔
135
        with self._state_lock:
1✔
136
            self.current_state = EsmState.DELETING
1✔
137
            self.update_esm_state_in_store(EsmState.DELETING)
1✔
138
            self.state_transition_reason = self.user_state_reason
1✔
139
        self._shutdown_event.set()
1✔
140

141
    def poller_loop(self, *args, **kwargs):
1✔
142
        with self._state_lock:
1✔
143
            self.current_state = EsmState.ENABLED
1✔
144
            self.update_esm_state_in_store(EsmState.ENABLED)
1✔
145
            self.state_transition_reason = self.user_state_reason
1✔
146

147
        error_boff = ExponentialBackoff(
1✔
148
            initial_interval=2, max_interval=LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_ERROR_SEC
149
        )
150
        empty_boff = ExponentialBackoff(
1✔
151
            initial_interval=1,
152
            max_interval=LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_EMPTY_POLL_SEC,
153
        )
154

155
        poll_interval_duration = LAMBDA_EVENT_SOURCE_MAPPING_POLL_INTERVAL_SEC
1✔
156

157
        while not self._shutdown_event.is_set():
1✔
158
            try:
1✔
159
                # TODO: update state transition reason?
160
                self.poller.poll_events()
1✔
161

162
                # If no exception encountered, reset the backoff
163
                error_boff.reset()
1✔
164
                empty_boff.reset()
1✔
165

166
                # Set the poll frequency back to the default
167
                poll_interval_duration = LAMBDA_EVENT_SOURCE_MAPPING_POLL_INTERVAL_SEC
1✔
168
            except EmptyPollResultsException as miss_ex:
1✔
169
                # If the event source is empty, backoff
170
                poll_interval_duration = empty_boff.next_backoff()
1✔
171
                LOG.debug(
1✔
172
                    "The event source %s is empty. Backing off for %.2f seconds until next request.",
173
                    miss_ex.source_arn,
174
                    poll_interval_duration,
175
                )
176
            except Exception as e:
1✔
177
                LOG.error(
1✔
178
                    "Error while polling messages for event source %s: %s",
179
                    self.esm_config.get("EventSourceArn")
180
                    or self.esm_config.get("SelfManagedEventSource"),
181
                    e,
182
                    exc_info=LOG.isEnabledFor(logging.DEBUG),
183
                )
184
                # Wait some time between retries to avoid running into the problem right again
185
                poll_interval_duration = error_boff.next_backoff()
1✔
186
            finally:
187
                self._shutdown_event.wait(poll_interval_duration)
1✔
188

189
        # Optionally closes internal components of Poller. This is a no-op for unimplemented pollers.
190
        self.poller.close()
1✔
191

192
        try:
1✔
193
            # Update state in store after async stop or delete
194
            if self.enabled and self.current_state == EsmState.DELETING:
1✔
195
                # TODO: we also need to remove the ESM worker reference from the Lambda provider to esm_worker
196
                # TODO: proper locking for store updates
197
                self.delete_esm_in_store()
1✔
198
            elif not self.enabled and self.current_state == EsmState.DISABLING:
1✔
199
                with self._state_lock:
1✔
200
                    self.current_state = EsmState.DISABLED
1✔
201
                    self.state_transition_reason = self.user_state_reason
1✔
202
                self.update_esm_state_in_store(EsmState.DISABLED)
1✔
203
            elif not self._graceful_shutdown_triggered:
1✔
204
                # HACK: If we reach this state and a graceful shutdown was not triggered, log a warning to indicate
205
                # an unexpected state.
206
                LOG.warning(
×
207
                    "Invalid state %s for event source mapping %s.",
208
                    self.current_state,
209
                    self.esm_config["UUID"],
210
                )
211
        except Exception as e:
×
212
            LOG.warning(
×
213
                "Failed to update state %s for event source mapping %s. Exception: %s ",
214
                self.current_state,
215
                self.esm_config["UUID"],
216
                e,
217
                exc_info=LOG.isEnabledFor(logging.DEBUG),
218
            )
219

220
    def delete_esm_in_store(self):
1✔
221
        self._state.event_source_mappings.pop(self.esm_config["UUID"], None)
1✔
222

223
    # TODO: how can we handle async state updates better? Async deletion or disabling needs to update the model state.
224
    def update_esm_state_in_store(self, new_state: EsmState):
1✔
225
        esm_update = {"State": new_state}
1✔
226
        # TODO: add proper locking for store updates
227
        self._state.event_source_mappings[self.esm_config["UUID"]].update(esm_update)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc