• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20357994067

18 Dec 2025 10:01PM UTC coverage: 86.913% (-0.02%) from 86.929%
20357994067

push

github

web-flow
Fix CloudWatch model annotation (#13545)

1 of 1 new or added line in 1 file covered. (100.0%)

1391 existing lines in 33 files now uncovered.

70000 of 80540 relevant lines covered (86.91%)

1.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.72
/localstack-core/localstack/services/apigateway/next_gen/execute_api/handlers/integration_request.py
1
import base64
2✔
2
import logging
2✔
3
from http import HTTPMethod
2✔
4

5
from werkzeug.datastructures import Headers
2✔
6

7
from localstack.aws.api.apigateway import ContentHandlingStrategy, Integration, IntegrationType
2✔
8
from localstack.constants import APPLICATION_JSON
2✔
9
from localstack.http import Request, Response
2✔
10
from localstack.utils.collections import merge_recursive
2✔
11
from localstack.utils.strings import to_bytes, to_str
2✔
12

13
from ..api import RestApiGatewayHandler, RestApiGatewayHandlerChain
2✔
14
from ..context import IntegrationRequest, InvocationRequest, RestApiInvocationContext
2✔
15
from ..gateway_response import InternalServerError, UnsupportedMediaTypeError
2✔
16
from ..header_utils import drop_headers, set_default_headers
2✔
17
from ..helpers import mime_type_matches_binary_media_types, render_integration_uri
2✔
18
from ..parameters_mapping import ParametersMapper, RequestDataMapping
2✔
19
from ..template_mapping import (
2✔
20
    ApiGatewayVtlTemplate,
21
    MappingTemplateInput,
22
    MappingTemplateParams,
23
    MappingTemplateVariables,
24
)
25
from ..variables import ContextVariableOverrides, ContextVarsRequestOverride
2✔
26

27
LOG = logging.getLogger(__name__)
2✔
28

29
# Illegal headers to include in transformation
30
ILLEGAL_INTEGRATION_REQUESTS_COMMON = [
2✔
31
    "content-length",
32
    "transfer-encoding",
33
    "x-amzn-trace-id",
34
    "X-Amzn-Apigateway-Api-Id",
35
]
36
ILLEGAL_INTEGRATION_REQUESTS_AWS = [
2✔
37
    *ILLEGAL_INTEGRATION_REQUESTS_COMMON,
38
    "authorization",
39
    "connection",
40
    "expect",
41
    "proxy-authenticate",
42
    "te",
43
]
44

45
# These are dropped after the templates override were applied. they will never make it to the requests.
46
DROPPED_FROM_INTEGRATION_REQUESTS_COMMON = ["Expect", "Proxy-Authenticate", "TE"]
2✔
47
DROPPED_FROM_INTEGRATION_REQUESTS_AWS = [*DROPPED_FROM_INTEGRATION_REQUESTS_COMMON, "Referer"]
2✔
48
DROPPED_FROM_INTEGRATION_REQUESTS_HTTP = [*DROPPED_FROM_INTEGRATION_REQUESTS_COMMON, "Via"]
2✔
49

50
# Default headers
51
DEFAULT_REQUEST_HEADERS = {"Accept": APPLICATION_JSON, "Connection": "keep-alive"}
2✔
52

53

54
class PassthroughBehavior(str):
2✔
55
    # TODO maybe this class should be moved where it can also be used for validation in
56
    #  the provider when we switch out of moto
57
    WHEN_NO_MATCH = "WHEN_NO_MATCH"
2✔
58
    WHEN_NO_TEMPLATES = "WHEN_NO_TEMPLATES"
2✔
59
    NEVER = "NEVER"
2✔
60

61

62
class IntegrationRequestHandler(RestApiGatewayHandler):
2✔
63
    """
64
    This class will take care of the Integration Request part, which is mostly linked to template mapping
65
    See https://docs.aws.amazon.com/apigateway/latest/developerguide/api-gateway-integration-settings-integration-request.html
66
    """
67

68
    def __init__(self):
2✔
69
        self._param_mapper = ParametersMapper()
2✔
70
        self._vtl_template = ApiGatewayVtlTemplate()
2✔
71

72
    def __call__(
2✔
73
        self,
74
        chain: RestApiGatewayHandlerChain,
75
        context: RestApiInvocationContext,
76
        response: Response,
77
    ):
78
        integration: Integration = context.integration
2✔
79
        integration_type = integration["type"]
2✔
80

81
        integration_request_parameters = integration["requestParameters"] or {}
2✔
82
        request_data_mapping = self.get_integration_request_data(
2✔
83
            context, integration_request_parameters
84
        )
85
        path_parameters = request_data_mapping["path"]
2✔
86

87
        if integration_type in (IntegrationType.AWS_PROXY, IntegrationType.HTTP_PROXY):
2✔
88
            # `PROXY` types cannot use integration mapping templates, they pass most of the data straight
89
            # We make a copy to avoid modifying the invocation headers and keep a cleaner history
UNCOV
90
            headers = context.invocation_request["headers"].copy()
1✔
UNCOV
91
            query_string_parameters: dict[str, list[str]] = context.invocation_request[
1✔
92
                "multi_value_query_string_parameters"
93
            ]
UNCOV
94
            body = context.invocation_request["body"]
1✔
95

96
            # HTTP_PROXY still make uses of the request data mappings, and merges it with the invocation request
97
            # this is undocumented but validated behavior
UNCOV
98
            if integration_type == IntegrationType.HTTP_PROXY:
1✔
99
                # These headers won't be passed through by default from the invocation.
100
                # They can however be added through request mappings.
UNCOV
101
                drop_headers(headers, ["Host", "Content-Encoding"])
1✔
UNCOV
102
                headers.update(request_data_mapping["header"])
1✔
103

UNCOV
104
                query_string_parameters = self._merge_http_proxy_query_string(
1✔
105
                    query_string_parameters, request_data_mapping["querystring"]
106
                )
107

108
            else:
UNCOV
109
                self._set_proxy_headers(headers, context.request)
1✔
110
                # AWS_PROXY does not allow URI path rendering
111
                # TODO: verify this
UNCOV
112
                path_parameters = {}
1✔
113

114
        else:
115
            # find request template to raise UnsupportedMediaTypeError early
116
            request_template = self.get_request_template(
2✔
117
                integration=integration, request=context.invocation_request
118
            )
119

120
            converted_body = self.convert_body(context)
2✔
121

122
            body, mapped_overrides = self.render_request_template_mapping(
2✔
123
                context=context, body=converted_body, template=request_template
124
            )
125
            # Update the context with the returned mapped overrides
126
            context.context_variable_overrides = mapped_overrides
2✔
127
            # mutate the ContextVariables with the requestOverride result, as we copy the context when rendering the
128
            # template to avoid mutation on other fields
129
            request_override: ContextVarsRequestOverride = mapped_overrides.get(
2✔
130
                "requestOverride", {}
131
            )
132
            # TODO: log every override that happens afterwards (in a loop on `request_override`)
133
            merge_recursive(request_override, request_data_mapping, overwrite=True)
2✔
134

135
            headers = Headers(request_data_mapping["header"])
2✔
136
            query_string_parameters = request_data_mapping["querystring"]
2✔
137

138
        # Some headers can't be modified by parameter mappings or mapping templates.
139
        # Aws will raise in those were present. Even for AWS_PROXY, where it is not applying them.
140
        if header_mappings := request_data_mapping["header"]:
2✔
141
            self._validate_headers_mapping(header_mappings, integration_type)
2✔
142

143
        self._apply_header_transforms(headers, integration_type, context)
2✔
144

145
        # looks like the stageVariables rendering part is done in the Integration part in AWS
146
        # but we can avoid duplication by doing it here for now
147
        # TODO: if the integration if of AWS Lambda type and the Lambda is in another account, we cannot render
148
        #  stageVariables. Work on that special case later (we can add a quick check for the URI region and set the
149
        #  stage variables to an empty dict)
150
        rendered_integration_uri = render_integration_uri(
2✔
151
            uri=integration["uri"],
152
            path_parameters=path_parameters,
153
            stage_variables=context.stage_variables,
154
        )
155

156
        # if the integration method is defined and is not ANY, we can use it for the integration
157
        if not (integration_method := integration["httpMethod"]) or integration_method == "ANY":
2✔
158
            # otherwise, fallback to the request's method
UNCOV
159
            integration_method = context.invocation_request["http_method"]
1✔
160

161
        integration_request = IntegrationRequest(
2✔
162
            http_method=integration_method,
163
            uri=rendered_integration_uri,
164
            query_string_parameters=query_string_parameters,
165
            headers=headers,
166
            body=body,
167
        )
168

169
        context.integration_request = integration_request
2✔
170

171
    def get_integration_request_data(
2✔
172
        self, context: RestApiInvocationContext, request_parameters: dict[str, str]
173
    ) -> RequestDataMapping:
174
        return self._param_mapper.map_integration_request(
2✔
175
            request_parameters=request_parameters,
176
            invocation_request=context.invocation_request,
177
            context_variables=context.context_variables,
178
            stage_variables=context.stage_variables,
179
        )
180

181
    def render_request_template_mapping(
2✔
182
        self,
183
        context: RestApiInvocationContext,
184
        body: str | bytes,
185
        template: str,
186
    ) -> tuple[bytes, ContextVariableOverrides]:
187
        request: InvocationRequest = context.invocation_request
2✔
188

189
        if not template:
2✔
190
            return to_bytes(body), context.context_variable_overrides
2✔
191

192
        try:
2✔
193
            body_utf8 = to_str(body)
2✔
194
        except UnicodeError:
×
195
            raise InternalServerError("Internal server error")
×
196

197
        body, mapped_overrides = self._vtl_template.render_request(
2✔
198
            template=template,
199
            variables=MappingTemplateVariables(
200
                context=context.context_variables,
201
                stageVariables=context.stage_variables or {},
202
                input=MappingTemplateInput(
203
                    body=body_utf8,
204
                    params=MappingTemplateParams(
205
                        path=request.get("path_parameters"),
206
                        querystring=request.get("query_string_parameters", {}),
207
                        header=request.get("headers"),
208
                    ),
209
                ),
210
            ),
211
            context_overrides=context.context_variable_overrides,
212
        )
213
        return to_bytes(body), mapped_overrides
2✔
214

215
    @staticmethod
2✔
216
    def get_request_template(integration: Integration, request: InvocationRequest) -> str:
2✔
217
        """
218
        Attempts to return the request template.
219
        Will raise UnsupportedMediaTypeError if there are no match according to passthrough behavior.
220
        """
221
        request_templates = integration.get("requestTemplates") or {}
2✔
222
        passthrough_behavior = integration.get("passthroughBehavior")
2✔
223
        # If content-type is not provided aws assumes application/json
224
        content_type = request["headers"].get("Content-Type", APPLICATION_JSON)
2✔
225
        # first look to for a template associated to the content-type, otherwise look for the $default template
226
        request_template = request_templates.get(content_type) or request_templates.get("$default")
2✔
227

228
        if request_template or passthrough_behavior == PassthroughBehavior.WHEN_NO_MATCH:
2✔
229
            return request_template
2✔
230

231
        match passthrough_behavior:
2✔
232
            case PassthroughBehavior.NEVER:
2✔
233
                LOG.debug(
2✔
234
                    "No request template found for '%s' and passthrough behavior set to NEVER",
235
                    content_type,
236
                )
237
                raise UnsupportedMediaTypeError("Unsupported Media Type")
2✔
238
            case PassthroughBehavior.WHEN_NO_TEMPLATES:
2✔
239
                if request_templates:
2✔
240
                    LOG.debug(
2✔
241
                        "No request template found for '%s' and passthrough behavior set to WHEN_NO_TEMPLATES",
242
                        content_type,
243
                    )
244
                    raise UnsupportedMediaTypeError("Unsupported Media Type")
2✔
245
            case _:
2✔
246
                LOG.debug("Unknown passthrough behavior: '%s'", passthrough_behavior)
2✔
247

248
        return request_template
2✔
249

250
    @staticmethod
2✔
251
    def convert_body(context: RestApiInvocationContext) -> bytes | str:
2✔
252
        """
253
        https://docs.aws.amazon.com/apigateway/latest/developerguide/api-gateway-payload-encodings.html
254
        https://docs.aws.amazon.com/apigateway/latest/developerguide/api-gateway-payload-encodings-workflow.html
255
        :param context:
256
        :return: the body, either as is, or converted depending on the table in the second link
257
        """
258
        request: InvocationRequest = context.invocation_request
2✔
259
        body = request["body"]
2✔
260

261
        is_binary_request = mime_type_matches_binary_media_types(
2✔
262
            mime_type=request["headers"].get("Content-Type"),
263
            binary_media_types=context.deployment.rest_api.rest_api.get("binaryMediaTypes", []),
264
        )
265
        content_handling = context.integration.get("contentHandling")
2✔
266
        if is_binary_request:
2✔
267
            if content_handling and content_handling == ContentHandlingStrategy.CONVERT_TO_TEXT:
2✔
268
                body = base64.b64encode(body)
2✔
269
            # if the content handling is not defined, or CONVERT_TO_BINARY, we do not touch the body and leave it as
270
            # proper binary
271
        else:
272
            if not content_handling or content_handling == ContentHandlingStrategy.CONVERT_TO_TEXT:
2✔
273
                body = body.decode(encoding="UTF-8", errors="replace")
2✔
274
            else:
275
                # it means we have CONVERT_TO_BINARY, so we need to try to decode the base64 string
276
                try:
2✔
277
                    body = base64.b64decode(body)
2✔
278
                except ValueError:
2✔
279
                    raise InternalServerError("Internal server error")
2✔
280

281
        return body
2✔
282

283
    @staticmethod
2✔
284
    def _merge_http_proxy_query_string(
2✔
285
        query_string_parameters: dict[str, list[str]],
286
        mapped_query_string: dict[str, str | list[str]],
287
    ):
UNCOV
288
        new_query_string_parameters = {k: v.copy() for k, v in query_string_parameters.items()}
1✔
UNCOV
289
        for param, value in mapped_query_string.items():
1✔
290
            if existing := new_query_string_parameters.get(param):
×
291
                if isinstance(value, list):
×
292
                    existing.extend(value)
×
293
                else:
294
                    existing.append(value)
×
295
            else:
296
                new_query_string_parameters[param] = value
×
297

UNCOV
298
        return new_query_string_parameters
1✔
299

300
    @staticmethod
2✔
301
    def _set_proxy_headers(headers: Headers, request: Request):
2✔
UNCOV
302
        headers.set("X-Forwarded-For", request.remote_addr)
1✔
UNCOV
303
        headers.set("X-Forwarded-Port", request.environ.get("SERVER_PORT"))
1✔
UNCOV
304
        headers.set(
1✔
305
            "X-Forwarded-Proto",
306
            request.environ.get("SERVER_PROTOCOL", "").split("/")[0],
307
        )
308

309
    @staticmethod
2✔
310
    def _apply_header_transforms(
2✔
311
        headers: Headers, integration_type: IntegrationType, context: RestApiInvocationContext
312
    ):
313
        # Dropping matching headers for the provided integration type
314
        match integration_type:
2✔
315
            case IntegrationType.AWS:
2✔
316
                drop_headers(headers, DROPPED_FROM_INTEGRATION_REQUESTS_AWS)
×
317
            case IntegrationType.HTTP | IntegrationType.HTTP_PROXY:
2✔
318
                drop_headers(headers, DROPPED_FROM_INTEGRATION_REQUESTS_HTTP)
2✔
UNCOV
319
            case _:
1✔
UNCOV
320
                drop_headers(headers, DROPPED_FROM_INTEGRATION_REQUESTS_COMMON)
1✔
321

322
        # Adding default headers to the requests headers
323
        default_headers = {
2✔
324
            **DEFAULT_REQUEST_HEADERS,
325
            "User-Agent": f"AmazonAPIGateway_{context.api_id}",
326
        }
327
        if (
2✔
328
            content_type := context.request.headers.get("Content-Type")
329
        ) and context.request.method not in {HTTPMethod.OPTIONS, HTTPMethod.GET, HTTPMethod.HEAD}:
UNCOV
330
            default_headers["Content-Type"] = content_type
1✔
331

332
        set_default_headers(headers, default_headers)
2✔
333
        headers.set("X-Amzn-Trace-Id", context.trace_id)
2✔
334
        if integration_type not in (IntegrationType.AWS_PROXY, IntegrationType.AWS):
2✔
335
            headers.set("X-Amzn-Apigateway-Api-Id", context.api_id)
2✔
336

337
    @staticmethod
2✔
338
    def _validate_headers_mapping(headers: dict[str, str], integration_type: IntegrationType):
2✔
339
        """Validates and raises an error when attempting to set an illegal header"""
340
        to_validate = ILLEGAL_INTEGRATION_REQUESTS_COMMON
2✔
341
        if integration_type in {IntegrationType.AWS, IntegrationType.AWS_PROXY}:
2✔
342
            to_validate = ILLEGAL_INTEGRATION_REQUESTS_AWS
×
343

344
        for header in headers:
2✔
345
            if header.lower() in to_validate:
2✔
346
                LOG.debug(
×
347
                    "Execution failed due to configuration error: %s header already present", header
348
                )
349
                raise InternalServerError("Internal server error")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc