• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 16665047018

31 Jul 2025 06:34PM UTC coverage: 86.897% (+0.1%) from 86.781%
16665047018

push

github

web-flow
Apigw/enable vpce routing (#12937)

5 of 5 new or added lines in 1 file covered. (100.0%)

314 existing lines in 13 files now uncovered.

66469 of 76492 relevant lines covered (86.9%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.75
/localstack-core/localstack/services/lambda_/invocation/executor_endpoint.py
1
import abc
1✔
2
import logging
1✔
3
import time
1✔
4
from concurrent.futures import CancelledError, Future
1✔
5
from http import HTTPStatus
1✔
6
from typing import Any, Dict, Optional
1✔
7

8
import requests
1✔
9
from werkzeug import Request
1✔
10

11
from localstack.http import Response, route
1✔
12
from localstack.services.edge import ROUTER
1✔
13
from localstack.services.lambda_ import ldm
1✔
14
from localstack.services.lambda_.invocation.lambda_models import InvocationResult
1✔
15
from localstack.utils.backoff import ExponentialBackoff
1✔
16
from localstack.utils.objects import singleton_factory
1✔
17
from localstack.utils.strings import to_str
1✔
18

19
LOG = logging.getLogger(__name__)
1✔
20
INVOCATION_PORT = 9563
1✔
21

22
NAMESPACE = "/_localstack_lambda"
1✔
23

24

25
class InvokeSendError(Exception):
1✔
26
    def __init__(self, message):
1✔
UNCOV
27
        super().__init__(message)
×
28

29

30
class StatusErrorException(Exception):
1✔
31
    payload: bytes
1✔
32

33
    def __init__(self, message, payload: bytes):
1✔
34
        super().__init__(message)
1✔
35
        self.payload = payload
1✔
36

37

38
class ShutdownDuringStartup(Exception):
1✔
39
    def __init__(self, message):
1✔
40
        super().__init__(message)
1✔
41

42

43
class Endpoint(abc.ABC):
1✔
44
    @abc.abstractmethod
1✔
45
    def invocation_response(self, request: Request, req_id: str) -> Response:
1✔
UNCOV
46
        pass
×
47

48
    @abc.abstractmethod
1✔
49
    def invocation_error(self, request: Request, req_id: str) -> Response:
1✔
UNCOV
50
        pass
×
51

52
    @abc.abstractmethod
1✔
53
    def invocation_logs(self, request: Request, invoke_id: str) -> Response:
1✔
UNCOV
54
        pass
×
55

56
    @abc.abstractmethod
1✔
57
    def status_ready(self, request: Request, executor_id: str) -> Response:
1✔
UNCOV
58
        pass
×
59

60
    @abc.abstractmethod
1✔
61
    def status_error(self, request: Request, executor_id: str) -> Response:
1✔
UNCOV
62
        pass
×
63

64

65
class ExecutorRouter:
1✔
66
    endpoints: dict[str, Endpoint]
1✔
67

68
    def __init__(self):
1✔
69
        self.endpoints = {}
1✔
70

71
    def register_endpoint(self, executor_id: str, endpoint: Endpoint):
1✔
72
        self.endpoints[executor_id] = endpoint
1✔
73

74
    def unregister_endpoint(self, executor_id: str):
1✔
75
        self.endpoints.pop(executor_id)
1✔
76

77
    @route(f"{NAMESPACE}/<executor_id>/invocations/<req_id>/response", methods=["POST"])
1✔
78
    def invocation_response(self, request: Request, executor_id: str, req_id: str) -> Response:
1✔
79
        endpoint = self.endpoints[executor_id]
1✔
80
        return endpoint.invocation_response(request, req_id)
1✔
81

82
    @route(f"{NAMESPACE}/<executor_id>/invocations/<req_id>/error", methods=["POST"])
1✔
83
    def invocation_error(self, request: Request, executor_id: str, req_id: str) -> Response:
1✔
84
        endpoint = self.endpoints[executor_id]
1✔
85
        return endpoint.invocation_error(request, req_id)
1✔
86

87
    @route(f"{NAMESPACE}/<executor_id>/invocations/<invoke_id>/logs", methods=["POST"])
1✔
88
    def invocation_logs(self, request: Request, executor_id: str, invoke_id: str) -> Response:
1✔
89
        endpoint = self.endpoints[executor_id]
1✔
90
        return endpoint.invocation_logs(request, invoke_id)
1✔
91

92
    @route(f"{NAMESPACE}/<env_id>/status/<executor_id>/ready", methods=["POST"])
1✔
93
    def status_ready(self, request: Request, env_id: str, executor_id: str) -> Response:
1✔
94
        endpoint = self.endpoints[executor_id]
1✔
95
        return endpoint.status_ready(request, executor_id)
1✔
96

97
    @route(f"{NAMESPACE}/<env_id>/status/<executor_id>/error", methods=["POST"])
1✔
98
    def status_error(self, request: Request, env_id: str, executor_id: str) -> Response:
1✔
99
        endpoint = self.endpoints[executor_id]
1✔
100
        return endpoint.status_error(request, executor_id)
1✔
101

102

103
@singleton_factory
1✔
104
def executor_router():
1✔
105
    router = ExecutorRouter()
1✔
106
    ROUTER.add(router)
1✔
107
    return router
1✔
108

109

110
class ExecutorEndpoint(Endpoint):
1✔
111
    container_address: str
1✔
112
    container_port: int
1✔
113
    executor_id: str
1✔
114
    startup_future: Future[bool] | None
1✔
115
    invocation_future: Future[InvocationResult] | None
1✔
116
    logs: str | None
1✔
117

118
    def __init__(
1✔
119
        self,
120
        executor_id: str,
121
        container_address: Optional[str] = None,
122
        container_port: Optional[int] = INVOCATION_PORT,
123
    ) -> None:
124
        self.container_address = container_address
1✔
125
        self.container_port = container_port
1✔
126
        self.executor_id = executor_id
1✔
127
        self.startup_future = None
1✔
128
        self.invocation_future = None
1✔
129
        self.logs = None
1✔
130

131
    def invocation_response(self, request: Request, req_id: str) -> Response:
1✔
132
        result = InvocationResult(req_id, request.data, is_error=False, logs=self.logs)
1✔
133
        self.invocation_future.set_result(result)
1✔
134
        return Response(status=HTTPStatus.ACCEPTED)
1✔
135

136
    def invocation_error(self, request: Request, req_id: str) -> Response:
1✔
137
        result = InvocationResult(req_id, request.data, is_error=True, logs=self.logs)
1✔
138
        self.invocation_future.set_result(result)
1✔
139
        return Response(status=HTTPStatus.ACCEPTED)
1✔
140

141
    def invocation_logs(self, request: Request, invoke_id: str) -> Response:
1✔
142
        logs = request.json
1✔
143
        if isinstance(logs, Dict):
1✔
144
            self.logs = logs["logs"]
1✔
145
        else:
UNCOV
146
            LOG.error("Invalid logs from init! Logs: %s", logs)
×
147
        return Response(status=HTTPStatus.ACCEPTED)
1✔
148

149
    def status_ready(self, request: Request, executor_id: str) -> Response:
1✔
150
        self.startup_future.set_result(True)
1✔
151
        return Response(status=HTTPStatus.ACCEPTED)
1✔
152

153
    def status_error(self, request: Request, executor_id: str) -> Response:
1✔
154
        LOG.warning("Execution environment startup failed: %s", to_str(request.data))
1✔
155
        # TODO: debug Lambda runtime init to not send `runtime/init/error` twice
156
        if self.startup_future.done():
1✔
UNCOV
157
            return Response(status=HTTPStatus.BAD_REQUEST)
×
158
        self.startup_future.set_exception(
1✔
159
            StatusErrorException("Environment startup failed", payload=request.data)
160
        )
161
        return Response(status=HTTPStatus.ACCEPTED)
1✔
162

163
    def start(self) -> None:
1✔
164
        executor_router().register_endpoint(self.executor_id, self)
1✔
165
        self.startup_future = Future()
1✔
166

167
    def wait_for_startup(self):
1✔
168
        try:
1✔
169
            self.startup_future.result()
1✔
170
        except CancelledError as e:
1✔
171
            # Only happens if we shutdown the container during execution environment startup
172
            # Daniel: potential problem if we have a shutdown while we start the container (e.g., timeout) but wait_for_startup is not yet called
173
            raise ShutdownDuringStartup(
1✔
174
                "Executor environment shutdown during container startup"
175
            ) from e
176

177
    def get_endpoint_prefix(self):
1✔
178
        return f"{NAMESPACE}/{self.executor_id}"
1✔
179

180
    def shutdown(self) -> None:
1✔
181
        executor_router().unregister_endpoint(self.executor_id)
1✔
182
        self.startup_future.cancel()
1✔
183
        if self.invocation_future:
1✔
184
            self.invocation_future.cancel()
1✔
185

186
    def invoke(self, payload: Dict[str, str]) -> InvocationResult:
1✔
187
        self.invocation_future = Future()
1✔
188
        self.logs = None
1✔
189
        if not self.container_address:
1✔
UNCOV
190
            raise ValueError("Container address not set, but got an invoke.")
×
191
        invocation_url = f"http://{self.container_address}:{self.container_port}/invoke"
1✔
192
        # disable proxies for internal requests
193
        proxies = {"http": "", "https": ""}
1✔
194
        response = self._perform_invoke(
1✔
195
            invocation_url=invocation_url, proxies=proxies, payload=payload
196
        )
197
        if not response.ok:
1✔
UNCOV
198
            raise InvokeSendError(
×
199
                f"Error while sending invocation {payload} to {invocation_url}. Error Code: {response.status_code}"
200
            )
201

202
        # Set a reference future awaiting limit to ensure this process eventually ends,
203
        # with timeout errors being handled by the lambda evaluator.
204
        # The following logic selects which maximum waiting time to consider depending
205
        # on whether the application is being debugged or not.
206
        # Note that if timeouts are enforced for the lambda function invoked at this endpoint
207
        # (this is needs to be configured in the Lambda Debug Mode Config file), the lambda
208
        # function will continue to enforce the expected timeouts.
209
        if ldm.IS_LDM_ENABLED:
1✔
210
            # The value is set to a default high value to ensure eventual termination.
UNCOV
211
            timeout_seconds = ldm.DEFAULT_LDM_TIMEOUT_SECONDS
×
212
        else:
213
            # Do not wait longer for an invoke than the maximum lambda timeout plus a buffer
214
            lambda_max_timeout_seconds = 900
1✔
215
            invoke_timeout_buffer_seconds = 5
1✔
216
            timeout_seconds = lambda_max_timeout_seconds + invoke_timeout_buffer_seconds
1✔
217
        return self.invocation_future.result(timeout=timeout_seconds)
1✔
218

219
    @staticmethod
1✔
220
    def _perform_invoke(
1✔
221
        invocation_url: str,
222
        proxies: dict[str, str],
223
        payload: dict[str, Any],
224
    ) -> requests.Response:
225
        """
226
        Dispatches a Lambda invocation request to the specified container endpoint, with automatic
227
        retries in case of connection errors, using exponential backoff.
228

229
        The first attempt is made immediately. If it fails, exponential backoff is applied with
230
        retry intervals starting at 100ms, doubling each time for up to 5 total retries.
231

232
        Parameters:
233
            invocation_url (str): The full URL of the container's invocation endpoint.
234
            proxies (dict[str, str]): Proxy settings to be used for the HTTP request.
235
            payload (dict[str, Any]): The JSON payload to send to the container.
236

237
        Returns:
238
            Response: The successful HTTP response from the container.
239

240
        Raises:
241
            requests.exceptions.ConnectionError: If all retry attempts fail to connect.
242
        """
243
        backoff = None
1✔
244
        last_exception = None
1✔
245
        max_retry_on_connection_error = 5
1✔
246

247
        for attempt_count in range(max_retry_on_connection_error + 1):  # 1 initial + n retries
1✔
248
            try:
1✔
249
                response = requests.post(url=invocation_url, json=payload, proxies=proxies)
1✔
250
                return response
1✔
UNCOV
251
            except requests.exceptions.ConnectionError as connection_error:
×
UNCOV
252
                last_exception = connection_error
×
253

254
                if backoff is None:
×
255
                    LOG.debug(
×
256
                        "Initial connection attempt failed: %s. Starting backoff retries.",
257
                        connection_error,
258
                    )
UNCOV
259
                    backoff = ExponentialBackoff(
×
260
                        max_retries=max_retry_on_connection_error,
261
                        initial_interval=0.1,
262
                        multiplier=2.0,
263
                        randomization_factor=0.0,
264
                        max_interval=1,
265
                        max_time_elapsed=-1,
266
                    )
267

UNCOV
268
                delay = backoff.next_backoff()
×
UNCOV
269
                if delay > 0:
×
UNCOV
270
                    LOG.debug(
×
271
                        "Connection error on invoke attempt #%d: %s. Retrying in %.2f seconds",
272
                        attempt_count,
273
                        connection_error,
274
                        delay,
275
                    )
UNCOV
276
                    time.sleep(delay)
×
277

UNCOV
278
        LOG.debug("Connection error after all attempts exhausted: %s", last_exception)
×
279
        raise last_exception
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc