• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20565403496

29 Dec 2025 05:11AM UTC coverage: 84.103% (-2.8%) from 86.921%
20565403496

Pull #13567

github

web-flow
Merge 4816837a5 into 2417384aa
Pull Request #13567: Update ASF APIs

67166 of 79862 relevant lines covered (84.1%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

33.58
/localstack-core/localstack/services/apigateway/next_gen/execute_api/integrations/aws.py
1
import base64
1✔
2
import json
1✔
3
import logging
1✔
4
from functools import lru_cache
1✔
5
from http import HTTPMethod
1✔
6
from typing import Literal, TypedDict
1✔
7
from urllib.parse import urlparse
1✔
8

9
import requests
1✔
10
from botocore.exceptions import ClientError
1✔
11
from werkzeug.datastructures import Headers
1✔
12

13
from localstack import config
1✔
14
from localstack.aws.connect import (
1✔
15
    INTERNAL_REQUEST_PARAMS_HEADER,
16
    InternalRequestParameters,
17
    connect_to,
18
    dump_dto,
19
)
20
from localstack.aws.spec import get_service_catalog
1✔
21
from localstack.constants import APPLICATION_JSON, INTERNAL_AWS_ACCESS_KEY_ID
1✔
22
from localstack.utils.aws.arns import extract_region_from_arn
1✔
23
from localstack.utils.aws.client_types import ServicePrincipal
1✔
24
from localstack.utils.strings import to_bytes, to_str
1✔
25

26
from ..context import (
1✔
27
    EndpointResponse,
28
    IntegrationRequest,
29
    InvocationRequest,
30
    RestApiInvocationContext,
31
)
32
from ..gateway_response import IntegrationFailureError, InternalServerError
1✔
33
from ..header_utils import build_multi_value_headers
1✔
34
from ..helpers import (
1✔
35
    get_lambda_function_arn_from_invocation_uri,
36
    get_source_arn,
37
    mime_type_matches_binary_media_types,
38
    render_uri_with_stage_variables,
39
    validate_sub_dict_of_typed_dict,
40
)
41
from ..variables import ContextVariables
1✔
42
from .core import RestApiIntegration
1✔
43

44
LOG = logging.getLogger(__name__)
1✔
45

46
NO_BODY_METHODS = {
1✔
47
    HTTPMethod.OPTIONS,
48
    HTTPMethod.GET,
49
    HTTPMethod.HEAD,
50
}
51

52

53
class LambdaProxyResponse(TypedDict, total=False):
1✔
54
    body: str | None
1✔
55
    statusCode: int | str | None
1✔
56
    headers: dict[str, str] | None
1✔
57
    isBase64Encoded: bool | None
1✔
58
    multiValueHeaders: dict[str, list[str]] | None
1✔
59

60

61
class LambdaInputEvent(TypedDict, total=False):
1✔
62
    body: str
1✔
63
    isBase64Encoded: bool
1✔
64
    httpMethod: str | HTTPMethod
1✔
65
    resource: str
1✔
66
    path: str
1✔
67
    headers: dict[str, str]
1✔
68
    multiValueHeaders: dict[str, list[str]]
1✔
69
    queryStringParameters: dict[str, str]
1✔
70
    multiValueQueryStringParameters: dict[str, list[str]]
1✔
71
    requestContext: ContextVariables
1✔
72
    pathParameters: dict[str, str]
1✔
73
    stageVariables: dict[str, str]
1✔
74

75

76
class ParsedAwsIntegrationUri(TypedDict):
1✔
77
    service_name: str
1✔
78
    region_name: str
1✔
79
    action_type: Literal["path", "action"]
1✔
80
    path: str
1✔
81

82

83
@lru_cache(maxsize=64)
1✔
84
def get_service_factory(region_name: str, role_arn: str):
1✔
85
    if role_arn:
×
86
        return connect_to.with_assumed_role(
×
87
            role_arn=role_arn,
88
            region_name=region_name,
89
            service_principal=ServicePrincipal.apigateway,
90
            session_name="BackplaneAssumeRoleSession",
91
        )
92
    else:
93
        return connect_to(region_name=region_name)
×
94

95

96
@lru_cache(maxsize=64)
1✔
97
def get_internal_mocked_headers(
1✔
98
    service_name: str,
99
    region_name: str,
100
    source_arn: str,
101
    role_arn: str | None,
102
) -> dict[str, str]:
103
    if role_arn:
×
104
        access_key_id = (
×
105
            connect_to()
106
            .sts.request_metadata(service_principal=ServicePrincipal.apigateway)
107
            .assume_role(RoleArn=role_arn, RoleSessionName="BackplaneAssumeRoleSession")[
108
                "Credentials"
109
            ]["AccessKeyId"]
110
        )
111
    else:
112
        access_key_id = INTERNAL_AWS_ACCESS_KEY_ID
×
113

114
    dto = InternalRequestParameters(
×
115
        service_principal=ServicePrincipal.apigateway, source_arn=source_arn
116
    )
117
    # TODO: maybe use the localstack.utils.aws.client.SigningHttpClient instead of directly mocking the Authorization
118
    #  header (but will need to select the right signer depending on the service?)
119
    headers = {
×
120
        "Authorization": (
121
            "AWS4-HMAC-SHA256 "
122
            + f"Credential={access_key_id}/20160623/{region_name}/{service_name}/aws4_request, "
123
            + "SignedHeaders=content-type;host;x-amz-date;x-amz-target, Signature=1234"
124
        ),
125
        INTERNAL_REQUEST_PARAMS_HEADER: dump_dto(dto),
126
    }
127

128
    return headers
×
129

130

131
@lru_cache(maxsize=64)
1✔
132
def get_target_prefix_for_service(service_name: str) -> str | None:
1✔
133
    return get_service_catalog().get(service_name).metadata.get("targetPrefix")
×
134

135

136
class RestApiAwsIntegration(RestApiIntegration):
1✔
137
    """
138
    This is a REST API integration responsible to directly interact with AWS services. It uses the `uri` to
139
    map the incoming request to the concerned AWS service, and can have 2 types.
140
    - `path`: the request is targeting the direct URI of the AWS service, like you would with an HTTP client
141
     example: For S3 GetObject call: arn:aws:apigateway:us-west-2:s3:path/{bucket}/{key}
142
    - `action`: this is a simpler way, where you can pass the request parameters like you would do with an SDK, and you
143
     can specify the service action (for ex. here S3 `GetObject`). It seems the request parameters can be pass as query
144
     string parameters, JSON body and maybe more. TODO: verify, 2 documentation pages indicates divergent information.
145
    (one indicates parameters through QS, one through request body)
146
     example: arn:aws:apigateway:us-west-2:s3:action/GetObject&Bucket={bucket}&Key={key}
147

148
    https://docs.aws.amazon.com/apigateway/latest/developerguide/integration-request-basic-setup.html
149

150

151
    TODO: it seems we can global AWS integration type, we should not need to subclass for each service
152
     we just need to separate usage between the `path` URI type and the `action` URI type.
153
     - `path`, we can simply pass along the full rendered request along with specific `mocked` AWS headers
154
     that are dependant of the service (retrieving for the ARN in the uri)
155
     - `action`, we might need either a full Boto call or use the Boto request serializer, as it seems the request
156
     parameters are expected as parameters
157
    """
158

159
    name = "AWS"
1✔
160

161
    # TODO: it seems in AWS, you don't need to manually set the `X-Amz-Target` header when using the `action` type.
162
    #  for now, we know `events` needs the user to manually add the header, but Kinesis and DynamoDB don't.
163
    #  Maybe reverse the list to exclude instead of include.
164
    SERVICES_AUTO_TARGET = ["dynamodb", "kinesis", "ssm", "stepfunctions"]
1✔
165

166
    # TODO: some services still target the Query protocol (validated with AWS), even though SSM for example is JSON for
167
    #  as long as the Boto SDK exists. We will need to emulate the Query protocol and translate it to JSON
168
    SERVICES_LEGACY_QUERY_PROTOCOL = ["ssm"]
1✔
169

170
    SERVICE_MAP = {
1✔
171
        "states": "stepfunctions",
172
    }
173

174
    def __init__(self):
1✔
175
        self._base_domain = config.internal_service_url()
1✔
176
        self._base_host = ""
1✔
177
        self._service_names = get_service_catalog().service_names
1✔
178

179
    def invoke(self, context: RestApiInvocationContext) -> EndpointResponse:
1✔
180
        integration_req: IntegrationRequest = context.integration_request
×
181
        method = integration_req["http_method"]
×
182
        parsed_uri = self.parse_aws_integration_uri(integration_req["uri"])
×
183
        service_name = parsed_uri["service_name"]
×
184
        integration_region = parsed_uri["region_name"]
×
185

186
        if credentials := context.integration.get("credentials"):
×
187
            credentials = render_uri_with_stage_variables(credentials, context.stage_variables)
×
188

189
        headers = integration_req["headers"]
×
190
        # Some integrations will use a special format for the service in the URI, like AppSync, and so those requests
191
        # are not directed to a service directly, so need to add the Authorization header. It would fail parsing
192
        # by our service name parser anyway
193
        if service_name in self._service_names:
×
194
            headers.update(
×
195
                get_internal_mocked_headers(
196
                    service_name=service_name,
197
                    region_name=integration_region,
198
                    source_arn=get_source_arn(context),
199
                    role_arn=credentials,
200
                )
201
            )
202
        query_params = integration_req["query_string_parameters"].copy()
×
203
        data = integration_req["body"]
×
204

205
        if parsed_uri["action_type"] == "path":
×
206
            # the Path action type allows you to override the path the request is sent to, like you would send to AWS
207
            path = f"/{parsed_uri['path']}"
×
208
        else:
209
            # Action passes the `Action` query string parameter
210
            path = ""
×
211
            action = parsed_uri["path"]
×
212

213
            if target := self.get_action_service_target(service_name, action):
×
214
                # TODO: properly implement the auto-`Content-Type` headers depending on the service protocol
215
                #  e.g. `x-amz-json-1.0` for DynamoDB
216
                #  this is needed to properly support multi-protocol
217
                headers["X-Amz-Target"] = target
×
218

219
            query_params["Action"] = action
×
220

221
            if service_name in self.SERVICES_LEGACY_QUERY_PROTOCOL:
×
222
                # this has been tested in AWS: for `ssm`, it fully overrides the body because SSM uses the Query
223
                # protocol, so we simulate it that way
224
                data = self.get_payload_from_query_string(query_params)
×
225

226
        url = f"{self._base_domain}{path}"
×
227
        headers["Host"] = self.get_internal_host_for_service(
×
228
            service_name=service_name, region_name=integration_region
229
        )
230

231
        request_parameters = {
×
232
            "method": method,
233
            "url": url,
234
            "params": query_params,
235
            "headers": headers,
236
        }
237

238
        if method not in NO_BODY_METHODS:
×
239
            request_parameters["data"] = data
×
240

241
        request_response = requests.request(**request_parameters)
×
242
        response_content = request_response.content
×
243

244
        if (
×
245
            parsed_uri["action_type"] == "action"
246
            and service_name in self.SERVICES_LEGACY_QUERY_PROTOCOL
247
        ):
248
            response_content = self.format_response_content_legacy(
×
249
                payload=response_content,
250
                service_name=service_name,
251
                action=parsed_uri["path"],
252
                request_id=context.context_variables["requestId"],
253
            )
254

255
        return EndpointResponse(
×
256
            body=response_content,
257
            status_code=request_response.status_code,
258
            headers=Headers(dict(request_response.headers)),
259
        )
260

261
    def parse_aws_integration_uri(self, uri: str) -> ParsedAwsIntegrationUri:
1✔
262
        """
263
        The URI can be of 2 shapes: Path or Action.
264
        Path  : arn:aws:apigateway:us-west-2:s3:path/{bucket}/{key}
265
        Action: arn:aws:apigateway:us-east-1:kinesis:action/PutRecord
266
        :param uri: the URI of the AWS integration
267
        :return: a ParsedAwsIntegrationUri containing the service name, the region and the type of action
268
        """
269
        arn, _, path = uri.partition("/")
×
270
        split_arn = arn.split(":", maxsplit=5)
×
271
        *_, region_name, service_name, action_type = split_arn
×
272
        boto_service_name = self.SERVICE_MAP.get(service_name, service_name)
×
273
        return ParsedAwsIntegrationUri(
×
274
            region_name=region_name,
275
            service_name=boto_service_name,
276
            action_type=action_type,
277
            path=path,
278
        )
279

280
    def get_action_service_target(self, service_name: str, action: str) -> str | None:
1✔
281
        if service_name not in self.SERVICES_AUTO_TARGET:
×
282
            return None
×
283

284
        target_prefix = get_target_prefix_for_service(service_name)
×
285
        if not target_prefix:
×
286
            return None
×
287

288
        return f"{target_prefix}.{action}"
×
289

290
    def get_internal_host_for_service(self, service_name: str, region_name: str):
1✔
291
        url = self._base_domain
×
292
        if service_name == "sqs":
×
293
            # This follow the new SQS_ENDPOINT_STRATEGY=standard
294
            url = config.external_service_url(subdomains=f"sqs.{region_name}")
×
295
        elif "-api" in service_name:
×
296
            # this could be an `<subdomain>.<service>-api`, used by some services
297
            url = config.external_service_url(subdomains=service_name)
×
298

299
        return urlparse(url).netloc
×
300

301
    @staticmethod
1✔
302
    def get_payload_from_query_string(query_string_parameters: dict) -> str:
1✔
303
        return json.dumps(query_string_parameters)
×
304

305
    @staticmethod
1✔
306
    def format_response_content_legacy(
1✔
307
        service_name: str, action: str, payload: bytes, request_id: str
308
    ) -> bytes:
309
        # TODO: not sure how much we need to support this, this supports SSM for now, once we write more tests for
310
        #  `action` type, see if we can generalize more
311
        data = json.loads(payload)
×
312
        try:
×
313
            # we try to populate the missing fields from the OperationModel of the operation
314
            operation_model = get_service_catalog().get(service_name).operation_model(action)
×
315
            for key in operation_model.output_shape.members:
×
316
                if key not in data:
×
317
                    data[key] = None
×
318

319
        except Exception:
×
320
            # the operation above is only for parity reason, skips if it fails
321
            pass
×
322

323
        wrapped = {
×
324
            f"{action}Response": {
325
                f"{action}Result": data,
326
                "ResponseMetadata": {
327
                    "RequestId": request_id,
328
                },
329
            }
330
        }
331
        return to_bytes(json.dumps(wrapped))
×
332

333

334
class RestApiAwsProxyIntegration(RestApiIntegration):
1✔
335
    """
336
    This is a custom, simplified REST API integration focused only on the Lambda service, with minimal modification from
337
    API Gateway. It passes the incoming request almost as is, in a custom created event payload, to the configured
338
    Lambda function.
339

340
    https://docs.aws.amazon.com/apigateway/latest/developerguide/set-up-lambda-proxy-integrations.html
341
    """
342

343
    name = "AWS_PROXY"
1✔
344

345
    def invoke(self, context: RestApiInvocationContext) -> EndpointResponse:
1✔
346
        integration_req: IntegrationRequest = context.integration_request
×
347
        method = integration_req["http_method"]
×
348

349
        if method != HTTPMethod.POST:
×
350
            LOG.warning(
×
351
                "The 'AWS_PROXY' integration can only be used with the POST integration method.",
352
            )
353
            raise IntegrationFailureError("Internal server error")
×
354

355
        input_event = self.create_lambda_input_event(context)
×
356

357
        # TODO: verify stage variables rendering in AWS_PROXY
358
        integration_uri = integration_req["uri"]
×
359

360
        function_arn = get_lambda_function_arn_from_invocation_uri(integration_uri)
×
361
        source_arn = get_source_arn(context)
×
362

363
        # TODO: write test for credentials rendering
364
        if credentials := context.integration.get("credentials"):
×
365
            credentials = render_uri_with_stage_variables(credentials, context.stage_variables)
×
366

367
        try:
×
368
            lambda_payload = self.call_lambda(
×
369
                function_arn=function_arn,
370
                event=to_bytes(json.dumps(input_event)),
371
                source_arn=source_arn,
372
                credentials=credentials,
373
            )
374

375
        except ClientError as e:
×
376
            LOG.warning(
×
377
                "Exception during integration invocation: '%s'",
378
                e,
379
            )
380
            status_code = 502
×
381
            if e.response["Error"]["Code"] == "AccessDeniedException":
×
382
                status_code = 500
×
383
            raise IntegrationFailureError("Internal server error", status_code=status_code) from e
×
384

385
        except Exception as e:
×
386
            LOG.warning(
×
387
                "Unexpected exception during integration invocation: '%s'",
388
                e,
389
            )
390
            raise IntegrationFailureError("Internal server error", status_code=502) from e
×
391

392
        lambda_response = self.parse_lambda_response(lambda_payload)
×
393

394
        headers = Headers({"Content-Type": APPLICATION_JSON})
×
395

396
        response_headers = self._merge_lambda_response_headers(lambda_response)
×
397
        headers.update(response_headers)
×
398

399
        # TODO: maybe centralize this flag inside the context, when we are also using it for other integration types
400
        #  AWS_PROXY behaves a bit differently, but this could checked only once earlier
401
        binary_response_accepted = mime_type_matches_binary_media_types(
×
402
            mime_type=context.invocation_request["headers"].get("Accept"),
403
            binary_media_types=context.deployment.rest_api.rest_api.get("binaryMediaTypes", []),
404
        )
405
        body = self._parse_body(
×
406
            body=lambda_response.get("body"),
407
            is_base64_encoded=binary_response_accepted and lambda_response.get("isBase64Encoded"),
408
        )
409

410
        return EndpointResponse(
×
411
            headers=headers,
412
            body=body,
413
            status_code=int(lambda_response.get("statusCode") or 200),
414
        )
415

416
    @staticmethod
1✔
417
    def call_lambda(
1✔
418
        function_arn: str,
419
        event: bytes,
420
        source_arn: str,
421
        credentials: str = None,
422
    ) -> bytes:
423
        lambda_client = get_service_factory(
×
424
            region_name=extract_region_from_arn(function_arn),
425
            role_arn=credentials,
426
        ).lambda_
427
        inv_result = lambda_client.request_metadata(
×
428
            service_principal=ServicePrincipal.apigateway,
429
            source_arn=source_arn,
430
        ).invoke(
431
            FunctionName=function_arn,
432
            Payload=event,
433
            InvocationType="RequestResponse",
434
        )
435
        if payload := inv_result.get("Payload"):
×
436
            return payload.read()
×
437
        return b""
×
438

439
    def parse_lambda_response(self, payload: bytes) -> LambdaProxyResponse:
1✔
440
        try:
×
441
            lambda_response = json.loads(payload)
×
442
        except json.JSONDecodeError:
×
443
            LOG.warning(
×
444
                'Lambda output should follow the next JSON format: { "isBase64Encoded": true|false, "statusCode": httpStatusCode, "headers": { "headerName": "headerValue", ... },"body": "..."} but was: %s',
445
                payload,
446
            )
447
            LOG.debug(
×
448
                "Execution failed due to configuration error: Malformed Lambda proxy response"
449
            )
450
            raise InternalServerError("Internal server error", status_code=502)
×
451

452
        # none of the lambda response fields are mandatory, but you cannot return any other fields
453
        if not self._is_lambda_response_valid(lambda_response):
×
454
            if "errorMessage" in lambda_response:
×
455
                LOG.debug(
×
456
                    "Lambda execution failed with status 200 due to customer function error: %s. Lambda request id: %s",
457
                    lambda_response["errorMessage"],
458
                    lambda_response.get("requestId", "<Unknown Request Id>"),
459
                )
460
            else:
461
                LOG.warning(
×
462
                    'Lambda output should follow the next JSON format: { "isBase64Encoded": true|false, "statusCode": httpStatusCode, "headers": { "headerName": "headerValue", ... },"body": "..."} but was: %s',
463
                    payload,
464
                )
465
                LOG.debug(
×
466
                    "Execution failed due to configuration error: Malformed Lambda proxy response"
467
                )
468
            raise InternalServerError("Internal server error", status_code=502)
×
469

470
        def serialize_header(value: bool | str) -> str:
×
471
            if isinstance(value, bool):
×
472
                return "true" if value else "false"
×
473
            return value
×
474

475
        if headers := lambda_response.get("headers"):
×
476
            lambda_response["headers"] = {k: serialize_header(v) for k, v in headers.items()}
×
477

478
        if multi_value_headers := lambda_response.get("multiValueHeaders"):
×
479
            lambda_response["multiValueHeaders"] = {
×
480
                k: [serialize_header(v) for v in values]
481
                for k, values in multi_value_headers.items()
482
            }
483

484
        return lambda_response
×
485

486
    @staticmethod
1✔
487
    def _is_lambda_response_valid(lambda_response: dict) -> bool:
1✔
488
        if not isinstance(lambda_response, dict):
×
489
            return False
×
490

491
        if not validate_sub_dict_of_typed_dict(LambdaProxyResponse, lambda_response):
×
492
            return False
×
493

494
        if (headers := lambda_response.get("headers")) is not None:
×
495
            if not isinstance(headers, dict):
×
496
                return False
×
497
            if any(not isinstance(header_value, (str, bool)) for header_value in headers.values()):
×
498
                return False
×
499

500
        if (multi_value_headers := lambda_response.get("multiValueHeaders")) is not None:
×
501
            if not isinstance(multi_value_headers, dict):
×
502
                return False
×
503
            if any(
×
504
                not isinstance(header_value, list) for header_value in multi_value_headers.values()
505
            ):
506
                return False
×
507

508
        if "statusCode" in lambda_response:
×
509
            try:
×
510
                int(lambda_response["statusCode"])
×
511
            except ValueError:
×
512
                return False
×
513

514
        # TODO: add more validations of the values' type
515
        return True
×
516

517
    def create_lambda_input_event(self, context: RestApiInvocationContext) -> LambdaInputEvent:
1✔
518
        # https://docs.aws.amazon.com/apigateway/latest/developerguide/set-up-lambda-proxy-integrations.html#api-gateway-simple-proxy-for-lambda-input-format
519
        # for building the Lambda Payload, we need access to the Invocation Request, as some data is not available in
520
        # the integration request and does not make sense for it
521
        invocation_req: InvocationRequest = context.invocation_request
×
522
        integration_req: IntegrationRequest = context.integration_request
×
523

524
        body, is_b64_encoded = self._format_body(integration_req["body"])
×
525

526
        if context.base_path:
×
527
            path = context.context_variables["path"]
×
528
        else:
529
            path = invocation_req["path"]
×
530

531
        input_event = LambdaInputEvent(
×
532
            headers=self._format_headers(dict(integration_req["headers"])),
533
            multiValueHeaders=self._format_headers(
534
                build_multi_value_headers(integration_req["headers"])
535
            ),
536
            body=body or None,
537
            isBase64Encoded=is_b64_encoded,
538
            requestContext=context.context_variables,
539
            stageVariables=context.stage_variables,
540
            # still using the InvocationRequest query string parameters as the logic is the same, maybe refactor?
541
            queryStringParameters=invocation_req["query_string_parameters"] or None,
542
            multiValueQueryStringParameters=invocation_req["multi_value_query_string_parameters"]
543
            or None,
544
            pathParameters=invocation_req["path_parameters"] or None,
545
            httpMethod=invocation_req["http_method"],
546
            path=path,
547
            resource=context.resource["path"],
548
        )
549

550
        return input_event
×
551

552
    @staticmethod
1✔
553
    def _format_headers(headers: dict[str, str | list[str]]) -> dict[str, str | list[str]]:
1✔
554
        # Some headers get capitalized like in CloudFront, see
555
        # https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/add-origin-custom-headers.html#add-origin-custom-headers-forward-authorization
556
        # It seems AWS_PROXY lambda integrations are behind CloudFront, as seen by the returned headers in AWS
557
        to_capitalize: list[str] = ["authorization", "user-agent"]  # some headers get capitalized
×
558
        to_filter: list[str] = ["content-length", "connection"]
×
559
        headers = {
×
560
            k.title() if k.lower() in to_capitalize else k: v
561
            for k, v in headers.items()
562
            if k.lower() not in to_filter
563
        }
564

565
        return headers
×
566

567
    @staticmethod
1✔
568
    def _format_body(body: bytes) -> tuple[str, bool]:
1✔
569
        try:
×
570
            return body.decode("utf-8"), False
×
571
        except UnicodeDecodeError:
×
572
            return to_str(base64.b64encode(body)), True
×
573

574
    @staticmethod
1✔
575
    def _parse_body(body: str | None, is_base64_encoded: bool) -> bytes:
1✔
576
        if not body:
×
577
            return b""
×
578

579
        if is_base64_encoded:
×
580
            try:
×
581
                return base64.b64decode(body)
×
582
            except Exception:
×
583
                raise InternalServerError("Internal server error", status_code=500)
×
584

585
        return to_bytes(body)
×
586

587
    @staticmethod
1✔
588
    def _merge_lambda_response_headers(lambda_response: LambdaProxyResponse) -> dict:
1✔
589
        headers = lambda_response.get("headers") or {}
×
590

591
        if multi_value_headers := lambda_response.get("multiValueHeaders"):
×
592
            # multiValueHeaders has the priority and will decide the casing of the final headers, as they are merged
593
            headers_low_keys = {k.lower(): v for k, v in headers.items()}
×
594

595
            for k, values in multi_value_headers.items():
×
596
                if (k_lower := k.lower()) in headers_low_keys:
×
597
                    headers[k] = [*values, headers_low_keys[k_lower]]
×
598
                else:
599
                    headers[k] = values
×
600

601
        return headers
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc