• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 274ae585-9ad2-4b5f-8087-866ef08d3d6e

24 Apr 2025 05:15PM UTC coverage: 85.262% (-1.0%) from 86.266%
274ae585-9ad2-4b5f-8087-866ef08d3d6e

push

circleci

web-flow
CFn v2: support outputs (#12536)

10 of 29 new or added lines in 3 files covered. (34.48%)

1105 existing lines in 26 files now uncovered.

63256 of 74190 relevant lines covered (85.26%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.75
/localstack-core/localstack/services/lambda_/invocation/executor_endpoint.py
1
import abc
1✔
2
import logging
1✔
3
import time
1✔
4
from concurrent.futures import CancelledError, Future
1✔
5
from http import HTTPStatus
1✔
6
from typing import Any, Dict, Optional
1✔
7

8
import requests
1✔
9
from werkzeug import Request
1✔
10

11
from localstack.http import Response, route
1✔
12
from localstack.services.edge import ROUTER
1✔
13
from localstack.services.lambda_.invocation.lambda_models import InvocationResult
1✔
14
from localstack.utils.backoff import ExponentialBackoff
1✔
15
from localstack.utils.lambda_debug_mode.lambda_debug_mode import (
1✔
16
    DEFAULT_LAMBDA_DEBUG_MODE_TIMEOUT_SECONDS,
17
    is_lambda_debug_mode,
18
)
19
from localstack.utils.objects import singleton_factory
1✔
20
from localstack.utils.strings import to_str
1✔
21

22
LOG = logging.getLogger(__name__)
1✔
23
INVOCATION_PORT = 9563
1✔
24

25
NAMESPACE = "/_localstack_lambda"
1✔
26

27

28
class InvokeSendError(Exception):
1✔
29
    def __init__(self, message):
1✔
UNCOV
30
        super().__init__(message)
×
31

32

33
class StatusErrorException(Exception):
1✔
34
    payload: bytes
1✔
35

36
    def __init__(self, message, payload: bytes):
1✔
37
        super().__init__(message)
1✔
38
        self.payload = payload
1✔
39

40

41
class ShutdownDuringStartup(Exception):
1✔
42
    def __init__(self, message):
1✔
43
        super().__init__(message)
1✔
44

45

46
class Endpoint(abc.ABC):
1✔
47
    @abc.abstractmethod
1✔
48
    def invocation_response(self, request: Request, req_id: str) -> Response:
1✔
UNCOV
49
        pass
×
50

51
    @abc.abstractmethod
1✔
52
    def invocation_error(self, request: Request, req_id: str) -> Response:
1✔
UNCOV
53
        pass
×
54

55
    @abc.abstractmethod
1✔
56
    def invocation_logs(self, request: Request, invoke_id: str) -> Response:
1✔
UNCOV
57
        pass
×
58

59
    @abc.abstractmethod
1✔
60
    def status_ready(self, request: Request, executor_id: str) -> Response:
1✔
UNCOV
61
        pass
×
62

63
    @abc.abstractmethod
1✔
64
    def status_error(self, request: Request, executor_id: str) -> Response:
1✔
UNCOV
65
        pass
×
66

67

68
class ExecutorRouter:
1✔
69
    endpoints: dict[str, Endpoint]
1✔
70

71
    def __init__(self):
1✔
72
        self.endpoints = {}
1✔
73

74
    def register_endpoint(self, executor_id: str, endpoint: Endpoint):
1✔
75
        self.endpoints[executor_id] = endpoint
1✔
76

77
    def unregister_endpoint(self, executor_id: str):
1✔
78
        self.endpoints.pop(executor_id)
1✔
79

80
    @route(f"{NAMESPACE}/<executor_id>/invocations/<req_id>/response", methods=["POST"])
1✔
81
    def invocation_response(self, request: Request, executor_id: str, req_id: str) -> Response:
1✔
82
        endpoint = self.endpoints[executor_id]
1✔
83
        return endpoint.invocation_response(request, req_id)
1✔
84

85
    @route(f"{NAMESPACE}/<executor_id>/invocations/<req_id>/error", methods=["POST"])
1✔
86
    def invocation_error(self, request: Request, executor_id: str, req_id: str) -> Response:
1✔
87
        endpoint = self.endpoints[executor_id]
1✔
88
        return endpoint.invocation_error(request, req_id)
1✔
89

90
    @route(f"{NAMESPACE}/<executor_id>/invocations/<invoke_id>/logs", methods=["POST"])
1✔
91
    def invocation_logs(self, request: Request, executor_id: str, invoke_id: str) -> Response:
1✔
92
        endpoint = self.endpoints[executor_id]
1✔
93
        return endpoint.invocation_logs(request, invoke_id)
1✔
94

95
    @route(f"{NAMESPACE}/<env_id>/status/<executor_id>/ready", methods=["POST"])
1✔
96
    def status_ready(self, request: Request, env_id: str, executor_id: str) -> Response:
1✔
97
        endpoint = self.endpoints[executor_id]
1✔
98
        return endpoint.status_ready(request, executor_id)
1✔
99

100
    @route(f"{NAMESPACE}/<env_id>/status/<executor_id>/error", methods=["POST"])
1✔
101
    def status_error(self, request: Request, env_id: str, executor_id: str) -> Response:
1✔
102
        endpoint = self.endpoints[executor_id]
1✔
103
        return endpoint.status_error(request, executor_id)
1✔
104

105

106
@singleton_factory
1✔
107
def executor_router():
1✔
108
    router = ExecutorRouter()
1✔
109
    ROUTER.add(router)
1✔
110
    return router
1✔
111

112

113
class ExecutorEndpoint(Endpoint):
1✔
114
    container_address: str
1✔
115
    container_port: int
1✔
116
    executor_id: str
1✔
117
    startup_future: Future[bool] | None
1✔
118
    invocation_future: Future[InvocationResult] | None
1✔
119
    logs: str | None
1✔
120

121
    def __init__(
1✔
122
        self,
123
        executor_id: str,
124
        container_address: Optional[str] = None,
125
        container_port: Optional[int] = INVOCATION_PORT,
126
    ) -> None:
127
        self.container_address = container_address
1✔
128
        self.container_port = container_port
1✔
129
        self.executor_id = executor_id
1✔
130
        self.startup_future = None
1✔
131
        self.invocation_future = None
1✔
132
        self.logs = None
1✔
133

134
    def invocation_response(self, request: Request, req_id: str) -> Response:
1✔
135
        result = InvocationResult(req_id, request.data, is_error=False, logs=self.logs)
1✔
136
        self.invocation_future.set_result(result)
1✔
137
        return Response(status=HTTPStatus.ACCEPTED)
1✔
138

139
    def invocation_error(self, request: Request, req_id: str) -> Response:
1✔
140
        result = InvocationResult(req_id, request.data, is_error=True, logs=self.logs)
1✔
141
        self.invocation_future.set_result(result)
1✔
142
        return Response(status=HTTPStatus.ACCEPTED)
1✔
143

144
    def invocation_logs(self, request: Request, invoke_id: str) -> Response:
1✔
145
        logs = request.json
1✔
146
        if isinstance(logs, Dict):
1✔
147
            self.logs = logs["logs"]
1✔
148
        else:
UNCOV
149
            LOG.error("Invalid logs from init! Logs: %s", logs)
×
150
        return Response(status=HTTPStatus.ACCEPTED)
1✔
151

152
    def status_ready(self, request: Request, executor_id: str) -> Response:
1✔
153
        self.startup_future.set_result(True)
1✔
154
        return Response(status=HTTPStatus.ACCEPTED)
1✔
155

156
    def status_error(self, request: Request, executor_id: str) -> Response:
1✔
157
        LOG.warning("Execution environment startup failed: %s", to_str(request.data))
1✔
158
        # TODO: debug Lambda runtime init to not send `runtime/init/error` twice
159
        if self.startup_future.done():
1✔
UNCOV
160
            return Response(status=HTTPStatus.BAD_REQUEST)
×
161
        self.startup_future.set_exception(
1✔
162
            StatusErrorException("Environment startup failed", payload=request.data)
163
        )
164
        return Response(status=HTTPStatus.ACCEPTED)
1✔
165

166
    def start(self) -> None:
1✔
167
        executor_router().register_endpoint(self.executor_id, self)
1✔
168
        self.startup_future = Future()
1✔
169

170
    def wait_for_startup(self):
1✔
171
        try:
1✔
172
            self.startup_future.result()
1✔
173
        except CancelledError as e:
1✔
174
            # Only happens if we shutdown the container during execution environment startup
175
            # Daniel: potential problem if we have a shutdown while we start the container (e.g., timeout) but wait_for_startup is not yet called
176
            raise ShutdownDuringStartup(
1✔
177
                "Executor environment shutdown during container startup"
178
            ) from e
179

180
    def get_endpoint_prefix(self):
1✔
181
        return f"{NAMESPACE}/{self.executor_id}"
1✔
182

183
    def shutdown(self) -> None:
1✔
184
        executor_router().unregister_endpoint(self.executor_id)
1✔
185
        self.startup_future.cancel()
1✔
186
        if self.invocation_future:
1✔
187
            self.invocation_future.cancel()
1✔
188

189
    def invoke(self, payload: Dict[str, str]) -> InvocationResult:
1✔
190
        self.invocation_future = Future()
1✔
191
        self.logs = None
1✔
192
        if not self.container_address:
1✔
UNCOV
193
            raise ValueError("Container address not set, but got an invoke.")
×
194
        invocation_url = f"http://{self.container_address}:{self.container_port}/invoke"
1✔
195
        # disable proxies for internal requests
196
        proxies = {"http": "", "https": ""}
1✔
197
        response = self._perform_invoke(
1✔
198
            invocation_url=invocation_url, proxies=proxies, payload=payload
199
        )
200
        if not response.ok:
1✔
UNCOV
201
            raise InvokeSendError(
×
202
                f"Error while sending invocation {payload} to {invocation_url}. Error Code: {response.status_code}"
203
            )
204

205
        # Set a reference future awaiting limit to ensure this process eventually ends,
206
        # with timeout errors being handled by the lambda evaluator.
207
        # The following logic selects which maximum waiting time to consider depending
208
        # on whether the application is being debugged or not.
209
        # Note that if timeouts are enforced for the lambda function invoked at this endpoint
210
        # (this is needs to be configured in the Lambda Debug Mode Config file), the lambda
211
        # function will continue to enforce the expected timeouts.
212
        if is_lambda_debug_mode():
1✔
213
            # The value is set to a default high value to ensure eventual termination.
UNCOV
214
            timeout_seconds = DEFAULT_LAMBDA_DEBUG_MODE_TIMEOUT_SECONDS
×
215
        else:
216
            # Do not wait longer for an invoke than the maximum lambda timeout plus a buffer
217
            lambda_max_timeout_seconds = 900
1✔
218
            invoke_timeout_buffer_seconds = 5
1✔
219
            timeout_seconds = lambda_max_timeout_seconds + invoke_timeout_buffer_seconds
1✔
220
        return self.invocation_future.result(timeout=timeout_seconds)
1✔
221

222
    @staticmethod
1✔
223
    def _perform_invoke(
1✔
224
        invocation_url: str,
225
        proxies: dict[str, str],
226
        payload: dict[str, Any],
227
    ) -> requests.Response:
228
        """
229
        Dispatches a Lambda invocation request to the specified container endpoint, with automatic
230
        retries in case of connection errors, using exponential backoff.
231

232
        The first attempt is made immediately. If it fails, exponential backoff is applied with
233
        retry intervals starting at 100ms, doubling each time for up to 5 total retries.
234

235
        Parameters:
236
            invocation_url (str): The full URL of the container's invocation endpoint.
237
            proxies (dict[str, str]): Proxy settings to be used for the HTTP request.
238
            payload (dict[str, Any]): The JSON payload to send to the container.
239

240
        Returns:
241
            Response: The successful HTTP response from the container.
242

243
        Raises:
244
            requests.exceptions.ConnectionError: If all retry attempts fail to connect.
245
        """
246
        backoff = None
1✔
247
        last_exception = None
1✔
248
        max_retry_on_connection_error = 5
1✔
249

250
        for attempt_count in range(max_retry_on_connection_error + 1):  # 1 initial + n retries
1✔
251
            try:
1✔
252
                response = requests.post(url=invocation_url, json=payload, proxies=proxies)
1✔
253
                return response
1✔
UNCOV
254
            except requests.exceptions.ConnectionError as connection_error:
×
UNCOV
255
                last_exception = connection_error
×
256

UNCOV
257
                if backoff is None:
×
UNCOV
258
                    LOG.debug(
×
259
                        "Initial connection attempt failed: %s. Starting backoff retries.",
260
                        connection_error,
261
                    )
UNCOV
262
                    backoff = ExponentialBackoff(
×
263
                        max_retries=max_retry_on_connection_error,
264
                        initial_interval=0.1,
265
                        multiplier=2.0,
266
                        randomization_factor=0.0,
267
                        max_interval=1,
268
                        max_time_elapsed=-1,
269
                    )
270

UNCOV
271
                delay = backoff.next_backoff()
×
UNCOV
272
                if delay > 0:
×
UNCOV
273
                    LOG.debug(
×
274
                        "Connection error on invoke attempt #%d: %s. Retrying in %.2f seconds",
275
                        attempt_count,
276
                        connection_error,
277
                        delay,
278
                    )
UNCOV
279
                    time.sleep(delay)
×
280

UNCOV
281
        LOG.debug("Connection error after all attempts exhausted: %s", last_exception)
×
UNCOV
282
        raise last_exception
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc