• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20565403496

29 Dec 2025 05:11AM UTC coverage: 84.103% (-2.8%) from 86.921%
20565403496

Pull #13567

github

web-flow
Merge 4816837a5 into 2417384aa
Pull Request #13567: Update ASF APIs

67166 of 79862 relevant lines covered (84.1%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.13
/localstack-core/localstack/services/apigateway/next_gen/execute_api/handlers/integration_request.py
1
import base64
1✔
2
import logging
1✔
3
from http import HTTPMethod
1✔
4

5
from werkzeug.datastructures import Headers
1✔
6

7
from localstack.aws.api.apigateway import ContentHandlingStrategy, Integration, IntegrationType
1✔
8
from localstack.constants import APPLICATION_JSON
1✔
9
from localstack.http import Request, Response
1✔
10
from localstack.utils.collections import merge_recursive
1✔
11
from localstack.utils.strings import to_bytes, to_str
1✔
12

13
from ..api import RestApiGatewayHandler, RestApiGatewayHandlerChain
1✔
14
from ..context import IntegrationRequest, InvocationRequest, RestApiInvocationContext
1✔
15
from ..gateway_response import InternalServerError, UnsupportedMediaTypeError
1✔
16
from ..header_utils import drop_headers, set_default_headers
1✔
17
from ..helpers import mime_type_matches_binary_media_types, render_integration_uri
1✔
18
from ..parameters_mapping import ParametersMapper, RequestDataMapping
1✔
19
from ..template_mapping import (
1✔
20
    ApiGatewayVtlTemplate,
21
    MappingTemplateInput,
22
    MappingTemplateParams,
23
    MappingTemplateVariables,
24
)
25
from ..variables import ContextVariableOverrides, ContextVarsRequestOverride
1✔
26

27
LOG = logging.getLogger(__name__)
1✔
28

29
# Illegal headers to include in transformation
30
ILLEGAL_INTEGRATION_REQUESTS_COMMON = [
1✔
31
    "content-length",
32
    "transfer-encoding",
33
    "x-amzn-trace-id",
34
    "X-Amzn-Apigateway-Api-Id",
35
]
36
ILLEGAL_INTEGRATION_REQUESTS_AWS = [
1✔
37
    *ILLEGAL_INTEGRATION_REQUESTS_COMMON,
38
    "authorization",
39
    "connection",
40
    "expect",
41
    "proxy-authenticate",
42
    "te",
43
]
44

45
# These are dropped after the templates override were applied. they will never make it to the requests.
46
DROPPED_FROM_INTEGRATION_REQUESTS_COMMON = ["Expect", "Proxy-Authenticate", "TE"]
1✔
47
DROPPED_FROM_INTEGRATION_REQUESTS_AWS = [*DROPPED_FROM_INTEGRATION_REQUESTS_COMMON, "Referer"]
1✔
48
DROPPED_FROM_INTEGRATION_REQUESTS_HTTP = [*DROPPED_FROM_INTEGRATION_REQUESTS_COMMON, "Via"]
1✔
49

50
# Default headers
51
DEFAULT_REQUEST_HEADERS = {"Accept": APPLICATION_JSON, "Connection": "keep-alive"}
1✔
52

53

54
class PassthroughBehavior(str):
1✔
55
    # TODO maybe this class should be moved where it can also be used for validation in
56
    #  the provider when we switch out of moto
57
    WHEN_NO_MATCH = "WHEN_NO_MATCH"
1✔
58
    WHEN_NO_TEMPLATES = "WHEN_NO_TEMPLATES"
1✔
59
    NEVER = "NEVER"
1✔
60

61

62
class IntegrationRequestHandler(RestApiGatewayHandler):
1✔
63
    """
64
    This class will take care of the Integration Request part, which is mostly linked to template mapping
65
    See https://docs.aws.amazon.com/apigateway/latest/developerguide/api-gateway-integration-settings-integration-request.html
66
    """
67

68
    def __init__(self):
1✔
69
        self._param_mapper = ParametersMapper()
1✔
70
        self._vtl_template = ApiGatewayVtlTemplate()
1✔
71

72
    def __call__(
1✔
73
        self,
74
        chain: RestApiGatewayHandlerChain,
75
        context: RestApiInvocationContext,
76
        response: Response,
77
    ):
78
        integration: Integration = context.integration
1✔
79
        integration_type = integration["type"]
1✔
80

81
        integration_request_parameters = integration["requestParameters"] or {}
1✔
82
        request_data_mapping = self.get_integration_request_data(
1✔
83
            context, integration_request_parameters
84
        )
85
        path_parameters = request_data_mapping["path"]
1✔
86

87
        if integration_type in (IntegrationType.AWS_PROXY, IntegrationType.HTTP_PROXY):
1✔
88
            # `PROXY` types cannot use integration mapping templates, they pass most of the data straight
89
            # We make a copy to avoid modifying the invocation headers and keep a cleaner history
90
            headers = context.invocation_request["headers"].copy()
×
91
            query_string_parameters: dict[str, list[str]] = context.invocation_request[
×
92
                "multi_value_query_string_parameters"
93
            ]
94
            body = context.invocation_request["body"]
×
95

96
            # HTTP_PROXY still make uses of the request data mappings, and merges it with the invocation request
97
            # this is undocumented but validated behavior
98
            if integration_type == IntegrationType.HTTP_PROXY:
×
99
                # These headers won't be passed through by default from the invocation.
100
                # They can however be added through request mappings.
101
                drop_headers(headers, ["Host", "Content-Encoding"])
×
102
                headers.update(request_data_mapping["header"])
×
103

104
                query_string_parameters = self._merge_http_proxy_query_string(
×
105
                    query_string_parameters, request_data_mapping["querystring"]
106
                )
107

108
            else:
109
                self._set_proxy_headers(headers, context.request)
×
110
                # AWS_PROXY does not allow URI path rendering
111
                # TODO: verify this
112
                path_parameters = {}
×
113

114
        else:
115
            # find request template to raise UnsupportedMediaTypeError early
116
            request_template = self.get_request_template(
1✔
117
                integration=integration, request=context.invocation_request
118
            )
119

120
            converted_body = self.convert_body(context)
1✔
121

122
            body, mapped_overrides = self.render_request_template_mapping(
1✔
123
                context=context, body=converted_body, template=request_template
124
            )
125
            # Update the context with the returned mapped overrides
126
            context.context_variable_overrides = mapped_overrides
1✔
127
            # mutate the ContextVariables with the requestOverride result, as we copy the context when rendering the
128
            # template to avoid mutation on other fields
129
            request_override: ContextVarsRequestOverride = mapped_overrides.get(
1✔
130
                "requestOverride", {}
131
            )
132
            # TODO: log every override that happens afterwards (in a loop on `request_override`)
133
            merge_recursive(request_override, request_data_mapping, overwrite=True)
1✔
134

135
            headers = Headers(request_data_mapping["header"])
1✔
136
            query_string_parameters = request_data_mapping["querystring"]
1✔
137

138
        # Some headers can't be modified by parameter mappings or mapping templates.
139
        # Aws will raise in those were present. Even for AWS_PROXY, where it is not applying them.
140
        if header_mappings := request_data_mapping["header"]:
1✔
141
            self._validate_headers_mapping(header_mappings, integration_type)
1✔
142

143
        self._apply_header_transforms(headers, integration_type, context)
1✔
144

145
        # looks like the stageVariables rendering part is done in the Integration part in AWS
146
        # but we can avoid duplication by doing it here for now
147
        # TODO: if the integration if of AWS Lambda type and the Lambda is in another account, we cannot render
148
        #  stageVariables. Work on that special case later (we can add a quick check for the URI region and set the
149
        #  stage variables to an empty dict)
150
        rendered_integration_uri = render_integration_uri(
1✔
151
            uri=integration["uri"],
152
            path_parameters=path_parameters,
153
            stage_variables=context.stage_variables,
154
        )
155

156
        # if the integration method is defined and is not ANY, we can use it for the integration
157
        if not (integration_method := integration["httpMethod"]) or integration_method == "ANY":
1✔
158
            # otherwise, fallback to the request's method
159
            integration_method = context.invocation_request["http_method"]
×
160

161
        integration_request = IntegrationRequest(
1✔
162
            http_method=integration_method,
163
            uri=rendered_integration_uri,
164
            query_string_parameters=query_string_parameters,
165
            headers=headers,
166
            body=body,
167
        )
168

169
        context.integration_request = integration_request
1✔
170

171
    def get_integration_request_data(
1✔
172
        self, context: RestApiInvocationContext, request_parameters: dict[str, str]
173
    ) -> RequestDataMapping:
174
        return self._param_mapper.map_integration_request(
1✔
175
            request_parameters=request_parameters,
176
            invocation_request=context.invocation_request,
177
            context_variables=context.context_variables,
178
            stage_variables=context.stage_variables,
179
        )
180

181
    def render_request_template_mapping(
1✔
182
        self,
183
        context: RestApiInvocationContext,
184
        body: str | bytes,
185
        template: str,
186
    ) -> tuple[bytes, ContextVariableOverrides]:
187
        request: InvocationRequest = context.invocation_request
1✔
188

189
        if not template:
1✔
190
            return to_bytes(body), context.context_variable_overrides
1✔
191

192
        try:
1✔
193
            body_utf8 = to_str(body)
1✔
194
        except UnicodeError:
×
195
            raise InternalServerError("Internal server error")
×
196

197
        body, mapped_overrides = self._vtl_template.render_request(
1✔
198
            template=template,
199
            variables=MappingTemplateVariables(
200
                context=context.context_variables,
201
                stageVariables=context.stage_variables or {},
202
                input=MappingTemplateInput(
203
                    body=body_utf8,
204
                    params=MappingTemplateParams(
205
                        path=request.get("path_parameters"),
206
                        querystring=request.get("query_string_parameters", {}),
207
                        header=request.get("headers"),
208
                    ),
209
                ),
210
            ),
211
            context_overrides=context.context_variable_overrides,
212
        )
213
        return to_bytes(body), mapped_overrides
1✔
214

215
    @staticmethod
1✔
216
    def get_request_template(integration: Integration, request: InvocationRequest) -> str:
1✔
217
        """
218
        Attempts to return the request template.
219
        Will raise UnsupportedMediaTypeError if there are no match according to passthrough behavior.
220
        """
221
        request_templates = integration.get("requestTemplates") or {}
1✔
222
        passthrough_behavior = integration.get("passthroughBehavior")
1✔
223
        # If content-type is not provided aws assumes application/json
224
        content_type = request["headers"].get("Content-Type", APPLICATION_JSON)
1✔
225
        # first look to for a template associated to the content-type, otherwise look for the $default template
226
        request_template = request_templates.get(content_type) or request_templates.get("$default")
1✔
227

228
        if request_template or passthrough_behavior == PassthroughBehavior.WHEN_NO_MATCH:
1✔
229
            return request_template
1✔
230

231
        match passthrough_behavior:
1✔
232
            case PassthroughBehavior.NEVER:
1✔
233
                LOG.debug(
1✔
234
                    "No request template found for '%s' and passthrough behavior set to NEVER",
235
                    content_type,
236
                )
237
                raise UnsupportedMediaTypeError("Unsupported Media Type")
1✔
238
            case PassthroughBehavior.WHEN_NO_TEMPLATES:
1✔
239
                if request_templates:
1✔
240
                    LOG.debug(
1✔
241
                        "No request template found for '%s' and passthrough behavior set to WHEN_NO_TEMPLATES",
242
                        content_type,
243
                    )
244
                    raise UnsupportedMediaTypeError("Unsupported Media Type")
1✔
245
            case _:
1✔
246
                LOG.debug("Unknown passthrough behavior: '%s'", passthrough_behavior)
1✔
247

248
        return request_template
1✔
249

250
    @staticmethod
1✔
251
    def convert_body(context: RestApiInvocationContext) -> bytes | str:
1✔
252
        """
253
        https://docs.aws.amazon.com/apigateway/latest/developerguide/api-gateway-payload-encodings.html
254
        https://docs.aws.amazon.com/apigateway/latest/developerguide/api-gateway-payload-encodings-workflow.html
255
        :param context:
256
        :return: the body, either as is, or converted depending on the table in the second link
257
        """
258
        request: InvocationRequest = context.invocation_request
1✔
259
        body = request["body"]
1✔
260

261
        is_binary_request = mime_type_matches_binary_media_types(
1✔
262
            mime_type=request["headers"].get("Content-Type"),
263
            binary_media_types=context.deployment.rest_api.rest_api.get("binaryMediaTypes", []),
264
        )
265
        content_handling = context.integration.get("contentHandling")
1✔
266
        if is_binary_request:
1✔
267
            if content_handling and content_handling == ContentHandlingStrategy.CONVERT_TO_TEXT:
1✔
268
                body = base64.b64encode(body)
1✔
269
            # if the content handling is not defined, or CONVERT_TO_BINARY, we do not touch the body and leave it as
270
            # proper binary
271
        else:
272
            if not content_handling or content_handling == ContentHandlingStrategy.CONVERT_TO_TEXT:
1✔
273
                body = body.decode(encoding="UTF-8", errors="replace")
1✔
274
            else:
275
                # it means we have CONVERT_TO_BINARY, so we need to try to decode the base64 string
276
                try:
1✔
277
                    body = base64.b64decode(body)
1✔
278
                except ValueError:
1✔
279
                    raise InternalServerError("Internal server error")
1✔
280

281
        return body
1✔
282

283
    @staticmethod
1✔
284
    def _merge_http_proxy_query_string(
1✔
285
        query_string_parameters: dict[str, list[str]],
286
        mapped_query_string: dict[str, str | list[str]],
287
    ):
288
        new_query_string_parameters = {k: v.copy() for k, v in query_string_parameters.items()}
×
289
        for param, value in mapped_query_string.items():
×
290
            if existing := new_query_string_parameters.get(param):
×
291
                if isinstance(value, list):
×
292
                    existing.extend(value)
×
293
                else:
294
                    existing.append(value)
×
295
            else:
296
                new_query_string_parameters[param] = value
×
297

298
        return new_query_string_parameters
×
299

300
    @staticmethod
1✔
301
    def _set_proxy_headers(headers: Headers, request: Request):
1✔
302
        headers.set("X-Forwarded-For", request.remote_addr)
×
303
        headers.set("X-Forwarded-Port", request.environ.get("SERVER_PORT"))
×
304
        headers.set(
×
305
            "X-Forwarded-Proto",
306
            request.environ.get("SERVER_PROTOCOL", "").split("/")[0],
307
        )
308

309
    @staticmethod
1✔
310
    def _apply_header_transforms(
1✔
311
        headers: Headers, integration_type: IntegrationType, context: RestApiInvocationContext
312
    ):
313
        # Dropping matching headers for the provided integration type
314
        match integration_type:
1✔
315
            case IntegrationType.AWS:
1✔
316
                drop_headers(headers, DROPPED_FROM_INTEGRATION_REQUESTS_AWS)
×
317
            case IntegrationType.HTTP | IntegrationType.HTTP_PROXY:
1✔
318
                drop_headers(headers, DROPPED_FROM_INTEGRATION_REQUESTS_HTTP)
1✔
319
            case _:
×
320
                drop_headers(headers, DROPPED_FROM_INTEGRATION_REQUESTS_COMMON)
×
321

322
        # Adding default headers to the requests headers
323
        default_headers = {
1✔
324
            **DEFAULT_REQUEST_HEADERS,
325
            "User-Agent": f"AmazonAPIGateway_{context.api_id}",
326
        }
327
        if (
1✔
328
            content_type := context.request.headers.get("Content-Type")
329
        ) and context.request.method not in {HTTPMethod.OPTIONS, HTTPMethod.GET, HTTPMethod.HEAD}:
330
            default_headers["Content-Type"] = content_type
×
331

332
        set_default_headers(headers, default_headers)
1✔
333
        headers.set("X-Amzn-Trace-Id", context.trace_id)
1✔
334
        if integration_type not in (IntegrationType.AWS_PROXY, IntegrationType.AWS):
1✔
335
            headers.set("X-Amzn-Apigateway-Api-Id", context.api_id)
1✔
336

337
    @staticmethod
1✔
338
    def _validate_headers_mapping(headers: dict[str, str], integration_type: IntegrationType):
1✔
339
        """Validates and raises an error when attempting to set an illegal header"""
340
        to_validate = ILLEGAL_INTEGRATION_REQUESTS_COMMON
1✔
341
        if integration_type in {IntegrationType.AWS, IntegrationType.AWS_PROXY}:
1✔
342
            to_validate = ILLEGAL_INTEGRATION_REQUESTS_AWS
×
343

344
        for header in headers:
1✔
345
            if header.lower() in to_validate:
1✔
346
                LOG.debug(
×
347
                    "Execution failed due to configuration error: %s header already present", header
348
                )
349
                raise InternalServerError("Internal server error")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc