• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 6cde8511-6c47-4c3e-8cf5-bd915ce7e07e

26 Feb 2025 03:36PM UTC coverage: 86.862% (+0.02%) from 86.845%
6cde8511-6c47-4c3e-8cf5-bd915ce7e07e

push

circleci

web-flow
[ESM] Add backoff between poll events calls (#12304)

25 of 25 new or added lines in 4 files covered. (100.0%)

21 existing lines in 6 files now uncovered.

61725 of 71061 relevant lines covered (86.86%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.06
/localstack-core/localstack/services/lambda_/event_source_mapping/esm_worker.py
1
import logging
1✔
2
import threading
1✔
3
from enum import StrEnum
1✔
4

5
from localstack.aws.api.lambda_ import (
1✔
6
    EventSourceMappingConfiguration,
7
)
8
from localstack.services.lambda_.event_source_mapping.pollers.poller import (
1✔
9
    EmptyPollResultsException,
10
    Poller,
11
)
12
from localstack.services.lambda_.invocation.models import LambdaStore, lambda_stores
1✔
13
from localstack.services.lambda_.provider_utils import get_function_version_from_arn
1✔
14
from localstack.utils.backoff import ExponentialBackoff
1✔
15
from localstack.utils.threads import FuncThread
1✔
16

17
POLL_INTERVAL_SEC: float = 1
1✔
18
MAX_BACKOFF_POLL_EMPTY_SEC: float = 10
1✔
19
MAX_BACKOFF_POLL_ERROR_SEC: float = 60
1✔
20

21

22
LOG = logging.getLogger(__name__)
1✔
23

24

25
class EsmState(StrEnum):
1✔
26
    # https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-response-State
27
    CREATING = "Creating"
1✔
28
    ENABLING = "Enabling"
1✔
29
    ENABLED = "Enabled"
1✔
30
    DISABLING = "Disabling"
1✔
31
    DISABLED = "Disabled"
1✔
32
    UPDATING = "Updating"
1✔
33
    DELETING = "Deleting"
1✔
34

35

36
class EsmStateReason(StrEnum):
1✔
37
    # Used for Kinesis and DynamoDB
38
    USER_ACTION = "User action"
1✔
39
    # Used for SQS
40
    USER_INITIATED = "USER_INITIATED"
1✔
41
    NO_RECORDS_PROCESSED = "No records processed"
1✔
42
    # TODO: add others?
43

44

45
class EsmWorker:
1✔
46
    esm_config: EventSourceMappingConfiguration
1✔
47
    enabled: bool
1✔
48
    current_state: EsmState
1✔
49
    state_transition_reason: EsmStateReason
1✔
50
    # Either USER_ACTION or USER_INITIATED (SQS) depending on the event source
51
    user_state_reason: EsmStateReason
1✔
52
    # TODO: test
53
    last_processing_result: str
1✔
54

55
    poller: Poller
1✔
56

57
    _state: LambdaStore
1✔
58
    _state_lock: threading.RLock
1✔
59
    _shutdown_event: threading.Event
1✔
60
    _poller_thread: FuncThread | None
1✔
61

62
    def __init__(
1✔
63
        self,
64
        esm_config: EventSourceMappingConfiguration,
65
        poller: Poller,
66
        enabled: bool = True,
67
        user_state_reason: EsmStateReason = EsmStateReason.USER_ACTION,
68
    ):
69
        self.esm_config = esm_config
1✔
70
        self.enabled = enabled
1✔
71
        self.current_state = EsmState.CREATING
1✔
72
        self.user_state_reason = user_state_reason
1✔
73
        self.state_transition_reason = self.user_state_reason
1✔
74

75
        self.poller = poller
1✔
76

77
        # TODO: implement lifecycle locking
78
        self._state_lock = threading.RLock()
1✔
79
        self._shutdown_event = threading.Event()
1✔
80
        self._poller_thread = None
1✔
81

82
        function_version = get_function_version_from_arn(self.esm_config["FunctionArn"])
1✔
83
        self._state = lambda_stores[function_version.id.account][function_version.id.region]
1✔
84

85
        # HACK: Flag used to check if a graceful shutdown was triggered.
86
        self._graceful_shutdown_triggered = False
1✔
87

88
    @property
1✔
89
    def uuid(self) -> str:
1✔
90
        return self.esm_config["UUID"]
1✔
91

92
    def stop_for_shutdown(self):
1✔
93
        # Signal the worker's poller_loop thread to gracefully shutdown
94
        # TODO: Once ESM state is de-coupled from lambda store, re-think this approach.
95
        self._shutdown_event.set()
1✔
96
        self._graceful_shutdown_triggered = True
1✔
97

98
    def create(self):
1✔
99
        if self.enabled:
1✔
100
            with self._state_lock:
1✔
101
                self.current_state = EsmState.CREATING
1✔
102
                self.state_transition_reason = self.user_state_reason
1✔
103
            self.start()
1✔
104
        else:
105
            # TODO: validate with tests
106
            with self._state_lock:
1✔
107
                self.current_state = EsmState.DISABLED
1✔
108
                self.state_transition_reason = self.user_state_reason
1✔
109
            self.update_esm_state_in_store(EsmState.DISABLED)
1✔
110

111
    def start(self):
1✔
112
        with self._state_lock:
1✔
113
            self.enabled = True
1✔
114
            # CREATING state takes precedence over ENABLING
115
            if self.current_state != EsmState.CREATING:
1✔
UNCOV
116
                self.current_state = EsmState.ENABLING
×
UNCOV
117
                self.state_transition_reason = self.user_state_reason
×
118
        # Reset the shutdown event such that we don't stop immediately after a restart
119
        self._shutdown_event.clear()
1✔
120
        self._poller_thread = FuncThread(
1✔
121
            self.poller_loop,
122
            name=f"event-source-mapping-poller-{self.uuid}",
123
        )
124
        self._poller_thread.start()
1✔
125

126
    def stop(self):
1✔
127
        with self._state_lock:
1✔
128
            self.enabled = False
1✔
129
            self.current_state = EsmState.DISABLING
1✔
130
            self.update_esm_state_in_store(EsmState.DISABLING)
1✔
131
            self.state_transition_reason = self.user_state_reason
1✔
132
        self._shutdown_event.set()
1✔
133

134
    def delete(self):
1✔
135
        with self._state_lock:
1✔
136
            self.current_state = EsmState.DELETING
1✔
137
            self.update_esm_state_in_store(EsmState.DELETING)
1✔
138
            self.state_transition_reason = self.user_state_reason
1✔
139
        self._shutdown_event.set()
1✔
140

141
    def poller_loop(self, *args, **kwargs):
1✔
142
        with self._state_lock:
1✔
143
            self.current_state = EsmState.ENABLED
1✔
144
            self.update_esm_state_in_store(EsmState.ENABLED)
1✔
145
            self.state_transition_reason = self.user_state_reason
1✔
146

147
        error_boff = ExponentialBackoff(initial_interval=2, max_interval=MAX_BACKOFF_POLL_ERROR_SEC)
1✔
148
        empty_boff = ExponentialBackoff(initial_interval=1, max_interval=MAX_BACKOFF_POLL_EMPTY_SEC)
1✔
149

150
        poll_interval_duration = POLL_INTERVAL_SEC
1✔
151

152
        while not self._shutdown_event.is_set():
1✔
153
            try:
1✔
154
                # TODO: update state transition reason?
155
                self.poller.poll_events()
1✔
156

157
                # If no exception encountered, reset the backoff
158
                error_boff.reset()
1✔
159
                empty_boff.reset()
1✔
160

161
                # Set the poll frequency back to the default
162
                poll_interval_duration = POLL_INTERVAL_SEC
1✔
163
            except EmptyPollResultsException as miss_ex:
1✔
164
                # If the event source is empty, backoff
165
                poll_interval_duration = empty_boff.next_backoff()
1✔
166
                LOG.debug(
1✔
167
                    "The event source %s is empty. Backing off for %s seconds until next request.",
168
                    miss_ex.source_arn,
169
                    poll_interval_duration,
170
                )
171
            except Exception as e:
1✔
172
                LOG.error(
1✔
173
                    "Error while polling messages for event source %s: %s",
174
                    self.esm_config.get("EventSourceArn")
175
                    or self.esm_config.get("SelfManagedEventSource"),
176
                    e,
177
                    exc_info=LOG.isEnabledFor(logging.DEBUG),
178
                )
179
                # Wait some time between retries to avoid running into the problem right again
180
                poll_interval_duration = error_boff.next_backoff()
1✔
181
            finally:
182
                self._shutdown_event.wait(poll_interval_duration)
1✔
183

184
        # Optionally closes internal components of Poller. This is a no-op for unimplemented pollers.
185
        self.poller.close()
1✔
186

187
        try:
1✔
188
            # Update state in store after async stop or delete
189
            if self.enabled and self.current_state == EsmState.DELETING:
1✔
190
                # TODO: we also need to remove the ESM worker reference from the Lambda provider to esm_worker
191
                # TODO: proper locking for store updates
192
                self.delete_esm_in_store()
1✔
193
            elif not self.enabled and self.current_state == EsmState.DISABLING:
1✔
194
                with self._state_lock:
1✔
195
                    self.current_state = EsmState.DISABLED
1✔
196
                    self.state_transition_reason = self.user_state_reason
1✔
197
                self.update_esm_state_in_store(EsmState.DISABLED)
1✔
198
            elif not self._graceful_shutdown_triggered:
1✔
199
                # HACK: If we reach this state and a graceful shutdown was not triggered, log a warning to indicate
200
                # an unexpected state.
201
                LOG.warning(
×
202
                    "Invalid state %s for event source mapping %s.",
203
                    self.current_state,
204
                    self.esm_config["UUID"],
205
                )
UNCOV
206
        except Exception as e:
×
UNCOV
207
            LOG.warning(
×
208
                "Failed to update state %s for event source mapping %s. Exception: %s ",
209
                self.current_state,
210
                self.esm_config["UUID"],
211
                e,
212
                exc_info=LOG.isEnabledFor(logging.DEBUG),
213
            )
214

215
    def delete_esm_in_store(self):
1✔
216
        self._state.event_source_mappings.pop(self.esm_config["UUID"], None)
1✔
217

218
    # TODO: how can we handle async state updates better? Async deletion or disabling needs to update the model state.
219
    def update_esm_state_in_store(self, new_state: EsmState):
1✔
220
        esm_update = {"State": new_state}
1✔
221
        # TODO: add proper locking for store updates
222
        self._state.event_source_mappings[self.esm_config["UUID"]].update(esm_update)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc