• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 16621641216

30 Jul 2025 11:46AM UTC coverage: 51.389% (-0.001%) from 51.39%
16621641216

push

github

web-flow
KMS: return `NotImplementedError` for rotation of imported keys (#12932)

36972 of 71945 relevant lines covered (51.39%)

0.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.09
/localstack-core/localstack/aws/protocol/serializer.py
1
"""
2
Response serializers for the different AWS service protocols.
3

4
The module contains classes that take a service's response dict, and
5
given an operation model, serialize the HTTP response according to the
6
specified output shape.
7

8
It can be seen as the counterpart to the ``parse`` module in ``botocore``
9
(which parses the result of these serializer). It has a lot of
10
similarities with the ``serialize`` module in ``botocore``, but
11
serves a different purpose (serializing responses instead of requests).
12

13
The different protocols have many similarities. The class hierarchy is
14
designed such that the serializers share as much logic as possible.
15
The class hierarchy looks as follows:
16
::
17
                                      ┌───────────────────┐
18
                                      │ResponseSerializer │
19
                                      └───────────────────┘
20
                                          ▲    ▲      ▲
21
                   ┌──────────────────────┘    │      └──────────────────┐
22
      ┌────────────┴────────────┐ ┌────────────┴─────────────┐ ┌─────────┴────────────┐
23
      │BaseXMLResponseSerializer│ │BaseRestResponseSerializer│ │JSONResponseSerializer│
24
      └─────────────────────────┘ └──────────────────────────┘ └──────────────────────┘
25
                         ▲    ▲             ▲             ▲              ▲
26
  ┌──────────────────────┴─┐ ┌┴─────────────┴──────────┐ ┌┴──────────────┴──────────┐
27
  │QueryResponseSerializer │ │RestXMLResponseSerializer│ │RestJSONResponseSerializer│
28
  └────────────────────────┘ └─────────────────────────┘ └──────────────────────────┘
29
              ▲
30
   ┌──────────┴──────────┐
31
   │EC2ResponseSerializer│
32
   └─────────────────────┘
33
::
34

35
The ``ResponseSerializer`` contains the logic that is used among all the
36
different protocols (``query``, ``json``, ``rest-json``, ``rest-xml``, and
37
``ec2``).
38
The protocols relate to each other in the following ways:
39

40
* The ``query`` and the ``rest-xml`` protocols both have XML bodies in their
41
  responses which are serialized quite similarly (with some specifics for each
42
  type).
43
* The ``json`` and the ``rest-json`` protocols both have JSON bodies in their
44
  responses which are serialized the same way.
45
* The ``rest-json`` and ``rest-xml`` protocols serialize some metadata in
46
  the HTTP response's header fields
47
* The ``ec2`` protocol is basically similar to the ``query`` protocol with a
48
  specific error response formatting.
49

50
The serializer classes in this module correspond directly to the different
51
protocols. ``#create_serializer`` shows the explicit mapping between the
52
classes and the protocols.
53
The classes are structured as follows:
54

55
* The ``ResponseSerializer`` contains all the basic logic for the
56
  serialization which is shared among all different protocols.
57
* The ``BaseXMLResponseSerializer`` and the ``JSONResponseSerializer``
58
  contain the logic for the XML and the JSON serialization respectively.
59
* The ``BaseRestResponseSerializer`` contains the logic for the REST
60
  protocol specifics (i.e. specific HTTP header serializations).
61
* The ``RestXMLResponseSerializer`` and the ``RestJSONResponseSerializer``
62
  inherit the ReST specific logic from the ``BaseRestResponseSerializer``
63
  and the XML / JSON body serialization from their second super class.
64

65
The services and their protocols are defined by using AWS's Smithy
66
(a language to define services in a - somewhat - protocol-agnostic
67
way). The "peculiarities" in this serializer code usually correspond
68
to certain so-called "traits" in Smithy.
69

70
The result of the serialization methods is the HTTP response which can
71
be sent back to the calling client.
72
"""
73

74
import abc
1✔
75
import base64
1✔
76
import functools
1✔
77
import json
1✔
78
import logging
1✔
79
import string
1✔
80
from abc import ABC
1✔
81
from binascii import crc32
1✔
82
from datetime import datetime
1✔
83
from email.utils import formatdate
1✔
84
from struct import pack
1✔
85
from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union
1✔
86
from xml.etree import ElementTree as ETree
1✔
87

88
import xmltodict
1✔
89
from botocore.model import ListShape, MapShape, OperationModel, ServiceModel, Shape, StructureShape
1✔
90
from botocore.serialize import ISO8601, ISO8601_MICRO
1✔
91
from botocore.utils import calculate_md5, is_json_value_header, parse_to_aware_datetime
1✔
92

93
# cbor2: explicitly load from private _encoder module to avoid using the (non-patched) C-version
94
from cbor2._encoder import dumps as cbor2_dumps
1✔
95
from werkzeug import Request as WerkzeugRequest
1✔
96
from werkzeug import Response as WerkzeugResponse
1✔
97
from werkzeug.datastructures import Headers, MIMEAccept
1✔
98
from werkzeug.http import parse_accept_header
1✔
99

100
from localstack.aws.api import CommonServiceException, ServiceException
1✔
101
from localstack.aws.spec import ProtocolName, load_service
1✔
102
from localstack.constants import (
1✔
103
    APPLICATION_AMZ_CBOR_1_1,
104
    APPLICATION_AMZ_JSON_1_0,
105
    APPLICATION_AMZ_JSON_1_1,
106
    APPLICATION_CBOR,
107
    APPLICATION_JSON,
108
    APPLICATION_XML,
109
    TEXT_XML,
110
)
111
from localstack.http import Response
1✔
112
from localstack.utils.common import to_bytes, to_str
1✔
113
from localstack.utils.strings import long_uid
1✔
114
from localstack.utils.xml import strip_xmlns
1✔
115

116
LOG = logging.getLogger(__name__)
1✔
117

118
REQUEST_ID_CHARACTERS = string.digits + string.ascii_uppercase
1✔
119

120

121
class ResponseSerializerError(Exception):
1✔
122
    """
123
    Error which is thrown if the request serialization fails.
124
    Super class of all exceptions raised by the serializer.
125
    """
126

127
    pass
1✔
128

129

130
class UnknownSerializerError(ResponseSerializerError):
1✔
131
    """
132
    Error which indicates that the exception raised by the serializer could be caused by invalid data or by any other
133
    (unknown) issue. Errors like this should be reported and indicate an issue in the serializer itself.
134
    """
135

136
    pass
1✔
137

138

139
class ProtocolSerializerError(ResponseSerializerError):
1✔
140
    """
141
    Error which indicates that the given data is not compliant with the service's specification and cannot be
142
    serialized. This usually results in a response to the client with an HTTP 5xx status code (internal server error).
143
    """
144

145
    pass
1✔
146

147

148
def _handle_exceptions(func):
1✔
149
    """
150
    Decorator which handles the exceptions raised by the serializer. It ensures that all exceptions raised by the public
151
    methods of the parser are instances of ResponseSerializerError.
152
    :param func: to wrap in order to add the exception handling
153
    :return: wrapped function
154
    """
155

156
    @functools.wraps(func)
1✔
157
    def wrapper(*args, **kwargs):
1✔
158
        try:
1✔
159
            return func(*args, **kwargs)
1✔
160
        except ResponseSerializerError:
1✔
161
            raise
1✔
162
        except Exception as e:
1✔
163
            raise UnknownSerializerError(
1✔
164
                "An unknown error occurred when trying to serialize the response."
165
            ) from e
166

167
    return wrapper
1✔
168

169

170
class ResponseSerializer(abc.ABC):
1✔
171
    """
172
    The response serializer is responsible for the serialization of a service implementation's result to an actual
173
    HTTP response (which will be sent to the calling client).
174
    It is the base class of all serializers and therefore contains the basic logic which is used among all of them.
175
    """
176

177
    DEFAULT_ENCODING = "utf-8"
1✔
178
    # The default timestamp format is ISO8601, but this can be overwritten by subclasses.
179
    TIMESTAMP_FORMAT = "iso8601"
1✔
180
    # Event streaming binary data type mapping for type "string"
181
    AWS_BINARY_DATA_TYPE_STRING = 7
1✔
182
    # Defines the supported mime types of the specific serializer. Sorted by priority (preferred / default first).
183
    # Needs to be specified by subclasses.
184
    SUPPORTED_MIME_TYPES: List[str] = []
1✔
185

186
    @_handle_exceptions
1✔
187
    def serialize_to_response(
1✔
188
        self,
189
        response: dict,
190
        operation_model: OperationModel,
191
        headers: Optional[Dict | Headers],
192
        request_id: str,
193
    ) -> Response:
194
        """
195
        Takes a response dict and serializes it to an actual HttpResponse.
196

197
        :param response: to serialize
198
        :param operation_model: specification of the service & operation containing information about the shape of the
199
                                service's output / response
200
        :param headers: the headers of the incoming request this response should be serialized for. This is necessary
201
                        for features like Content-Negotiation (define response content type based on request headers).
202
        :param request_id: autogenerated AWS request ID identifying the original request
203
        :return: Response which can be sent to the calling client
204
        :raises: ResponseSerializerError (either a ProtocolSerializerError or an UnknownSerializerError)
205
        """
206

207
        # determine the preferred mime type (based on the serializer's supported mime types and the Accept header)
208
        mime_type = self._get_mime_type(headers)
1✔
209

210
        # if the operation has a streaming output, handle the serialization differently
211
        if operation_model.has_event_stream_output:
1✔
212
            return self._serialize_event_stream(response, operation_model, mime_type, request_id)
1✔
213

214
        serialized_response = self._create_default_response(operation_model, mime_type)
1✔
215
        shape = operation_model.output_shape
1✔
216
        # The shape can also be none (for empty responses), but it still needs to be serialized (to add some metadata)
217
        shape_members = shape.members if shape is not None else None
1✔
218
        self._serialize_response(
1✔
219
            response,
220
            serialized_response,
221
            shape,
222
            shape_members,
223
            operation_model,
224
            mime_type,
225
            request_id,
226
        )
227
        serialized_response = self._prepare_additional_traits_in_response(
1✔
228
            serialized_response, operation_model, request_id
229
        )
230
        return serialized_response
1✔
231

232
    @_handle_exceptions
1✔
233
    def serialize_error_to_response(
1✔
234
        self,
235
        error: ServiceException,
236
        operation_model: OperationModel,
237
        headers: Optional[Dict | Headers],
238
        request_id: str,
239
    ) -> Response:
240
        """
241
        Takes an error instance and serializes it to an actual HttpResponse.
242
        Therefore, this method is used for errors which should be serialized and transmitted to the calling client.
243

244
        :param error: to serialize
245
        :param operation_model: specification of the service & operation containing information about the shape of the
246
                                service's output / response
247
        :param headers: the headers of the incoming request this response should be serialized for. This is necessary
248
                        for features like Content-Negotiation (define response content type based on request headers).
249
        :param request_id: autogenerated AWS request ID identifying the original request
250
        :return: HttpResponse which can be sent to the calling client
251
        :raises: ResponseSerializerError (either a ProtocolSerializerError or an UnknownSerializerError)
252
        """
253
        # determine the preferred mime type (based on the serializer's supported mime types and the Accept header)
254
        mime_type = self._get_mime_type(headers)
1✔
255

256
        # TODO implement streaming error serialization
257
        serialized_response = self._create_default_response(operation_model, mime_type)
1✔
258
        if not error or not isinstance(error, ServiceException):
1✔
259
            raise ProtocolSerializerError(
1✔
260
                f"Error to serialize ({error.__class__.__name__ if error else None}) is not a ServiceException."
261
            )
262
        shape = operation_model.service_model.shape_for_error_code(error.code)
1✔
263
        serialized_response.status_code = error.status_code
1✔
264

265
        self._serialize_error(
1✔
266
            error, serialized_response, shape, operation_model, mime_type, request_id
267
        )
268
        serialized_response = self._prepare_additional_traits_in_response(
1✔
269
            serialized_response, operation_model, request_id
270
        )
271
        return serialized_response
1✔
272

273
    def _serialize_response(
1✔
274
        self,
275
        parameters: dict,
276
        response: Response,
277
        shape: Optional[Shape],
278
        shape_members: dict,
279
        operation_model: OperationModel,
280
        mime_type: str,
281
        request_id: str,
282
    ) -> None:
283
        raise NotImplementedError
284

285
    def _serialize_body_params(
1✔
286
        self,
287
        params: dict,
288
        shape: Shape,
289
        operation_model: OperationModel,
290
        mime_type: str,
291
        request_id: str,
292
    ) -> Optional[str]:
293
        """
294
        Actually serializes the given params for the given shape to a string for the transmission in the body of the
295
        response.
296
        :param params: to serialize
297
        :param shape: to know how to serialize the params
298
        :param operation_model: for additional metadata
299
        :param mime_type: Mime type which should be used to encode the payload
300
        :param request_id: autogenerated AWS request ID identifying the original request
301
        :return: string containing the serialized body
302
        """
303
        raise NotImplementedError
304

305
    def _serialize_error(
1✔
306
        self,
307
        error: ServiceException,
308
        response: Response,
309
        shape: StructureShape,
310
        operation_model: OperationModel,
311
        mime_type: str,
312
        request_id: str,
313
    ) -> None:
314
        raise NotImplementedError
315

316
    def _serialize_event_stream(
1✔
317
        self,
318
        response: dict,
319
        operation_model: OperationModel,
320
        mime_type: str,
321
        request_id: str,
322
    ) -> Response:
323
        """
324
        Serializes a given response dict (the return payload of a service implementation) to an _event stream_ using the
325
        given operation model.
326

327
        :param response: dictionary containing the payload for the response
328
        :param operation_model: describing the operation the response dict is being returned by
329
        :param mime_type: Mime type which should be used to encode the payload
330
        :param request_id: autogenerated AWS request ID identifying the original request
331
        :return: Response which can directly be sent to the client (in chunks)
332
        """
333
        event_stream_shape = operation_model.get_event_stream_output()
1✔
334
        event_stream_member_name = operation_model.output_shape.event_stream_name
1✔
335

336
        # wrap the generator in operation specific serialization
337
        def event_stream_serializer() -> Iterable[bytes]:
1✔
338
            yield self._encode_event_payload("initial-response")
1✔
339

340
            # create a default response
341
            serialized_event_response = self._create_default_response(operation_model, mime_type)
1✔
342
            # get the members of the event stream shape
343
            event_stream_shape_members = (
1✔
344
                event_stream_shape.members if event_stream_shape is not None else None
345
            )
346
            # extract the generator from the given response data
347
            event_generator = response.get(event_stream_member_name)
1✔
348
            if not isinstance(event_generator, Iterator):
1✔
349
                raise ProtocolSerializerError(
×
350
                    "Expected iterator for streaming event serialization."
351
                )
352

353
            # yield one event per generated event
354
            for event in event_generator:
1✔
355
                # find the actual event payload (the member with event=true)
356
                event_member_shape = None
1✔
357
                event_member_name = None
1✔
358
                for member_name, member_shape in event_stream_shape_members.items():
1✔
359
                    if member_shape.serialization.get("event") and member_name in event:
1✔
360
                        event_member_shape = member_shape
1✔
361
                        event_member_name = member_name
1✔
362
                        break
1✔
363
                if event_member_shape is None:
1✔
364
                    raise UnknownSerializerError("Couldn't find event shape for serialization.")
×
365

366
                # serialize the part of the response for the event
367
                self._serialize_response(
1✔
368
                    event.get(event_member_name),
369
                    serialized_event_response,
370
                    event_member_shape,
371
                    event_member_shape.members if event_member_shape is not None else None,
372
                    operation_model,
373
                    mime_type,
374
                    request_id,
375
                )
376
                # execute additional response traits (might be modifying the response)
377
                serialized_event_response = self._prepare_additional_traits_in_response(
1✔
378
                    serialized_event_response, operation_model, request_id
379
                )
380
                # encode the event and yield it
381
                yield self._encode_event_payload(
1✔
382
                    event_type=event_member_name, content=serialized_event_response.data
383
                )
384

385
        return Response(
1✔
386
            response=event_stream_serializer(),
387
            status=operation_model.http.get("responseCode", 200),
388
        )
389

390
    def _encode_event_payload(
1✔
391
        self,
392
        event_type: str,
393
        content: Union[str, bytes] = "",
394
        error_code: Optional[str] = None,
395
        error_message: Optional[str] = None,
396
    ) -> bytes:
397
        """
398
        Encodes the given event payload according to AWS specific binary event encoding.
399
        A specification of the format can be found in the AWS docs:
400
        https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html
401

402
        :param content: string or bytes of the event payload
403
        :param event_type: type of the event. Usually the name of the event shape or specific event types like
404
                            "initial-response".
405
        :param error_code: Optional. Error code if the payload represents an error.
406
        :param error_message: Optional. Error message if the payload represents an error.
407
        :return: bytes with the AWS-specific encoded event payload
408
        """
409

410
        # determine the event type (error if an error message or an error code is set)
411
        if error_message or error_code:
1✔
412
            message_type = "error"
×
413
        else:
414
            message_type = "event"
1✔
415

416
        # set the headers
417
        headers = {":event-type": event_type, ":message-type": message_type}
1✔
418
        if error_message:
1✔
419
            headers[":error-message"] = error_message
×
420
        if error_code:
1✔
421
            headers[":error-code"] = error_code
×
422

423
        # construct headers
424
        header_section = b""
1✔
425
        for key, value in headers.items():
1✔
426
            header_name = key.encode(self.DEFAULT_ENCODING)
1✔
427
            header_value = to_bytes(value)
1✔
428
            header_section += pack("!B", len(header_name))
1✔
429
            header_section += header_name
1✔
430
            header_section += pack("!B", self.AWS_BINARY_DATA_TYPE_STRING)
1✔
431
            header_section += pack("!H", len(header_value))
1✔
432
            header_section += header_value
1✔
433

434
        # construct body
435
        if isinstance(content, str):
1✔
436
            payload = bytes(content, self.DEFAULT_ENCODING)
1✔
437
        else:
438
            payload = content
1✔
439

440
        # calculate lengths
441
        headers_length = len(header_section)
1✔
442
        payload_length = len(payload)
1✔
443

444
        # construct message
445
        # - prelude
446
        result = pack("!I", payload_length + headers_length + 16)
1✔
447
        result += pack("!I", headers_length)
1✔
448
        # - prelude crc
449
        prelude_crc = crc32(result)
1✔
450
        result += pack("!I", prelude_crc)
1✔
451
        # - headers
452
        result += header_section
1✔
453
        # - payload
454
        result += payload
1✔
455
        # - message crc
456
        payload_crc = crc32(result)
1✔
457
        result += pack("!I", payload_crc)
1✔
458

459
        return result
1✔
460

461
    def _create_default_response(self, operation_model: OperationModel, mime_type: str) -> Response:
1✔
462
        """
463
        Creates a boilerplate default response to be used by subclasses as starting points.
464
        Uses the default HTTP response status code defined in the operation model (if defined), otherwise 200.
465

466
        :param operation_model: to extract the default HTTP status code
467
        :param mime_type: Mime type which should be used to encode the payload
468
        :return: boilerplate HTTP response
469
        """
470
        return Response(status=operation_model.http.get("responseCode", 200))
1✔
471

472
    def _get_mime_type(self, headers: Optional[Dict | Headers]) -> str:
1✔
473
        """
474
        Extracts the accepted mime type from the request headers and returns a matching, supported mime type for the
475
        serializer or the default mime type of the service if there is no match.
476
        :param headers: to extract the "Accept" header from
477
        :return: preferred mime type to be used by the serializer (if it is not accepted by the client,
478
                 an error is logged)
479
        """
480
        accept_header = None
1✔
481
        if headers and "Accept" in headers and not headers.get("Accept") == "*/*":
1✔
482
            accept_header = headers.get("Accept")
1✔
483
        elif headers and headers.get("Content-Type"):
1✔
484
            # If there is no specific Accept header given, we use the given Content-Type as a fallback.
485
            # i.e. if the request content was JSON encoded and the client doesn't send a specific an Accept header, the
486
            # serializer should prefer JSON encoding.
487
            content_type = headers.get("Content-Type")
1✔
488
            LOG.debug(
1✔
489
                "No accept header given. Using request's Content-Type (%s) as preferred response Content-Type.",
490
                content_type,
491
            )
492
            accept_header = content_type + ", */*"
1✔
493
        mime_accept: MIMEAccept = parse_accept_header(accept_header, MIMEAccept)
1✔
494
        mime_type = mime_accept.best_match(self.SUPPORTED_MIME_TYPES)
1✔
495
        if not mime_type:
1✔
496
            # There is no match between the supported mime types and the requested one(s)
497
            mime_type = self.SUPPORTED_MIME_TYPES[0]
1✔
498
            LOG.debug(
1✔
499
                "Determined accept type (%s) is not supported by this serializer. Using default of this serializer: %s",
500
                accept_header,
501
                mime_type,
502
            )
503
        return mime_type
1✔
504

505
    # Some extra utility methods subclasses can use.
506

507
    @staticmethod
1✔
508
    def _timestamp_iso8601(value: datetime) -> str:
1✔
509
        if value.microsecond > 0:
1✔
510
            timestamp_format = ISO8601_MICRO
1✔
511
        else:
512
            timestamp_format = ISO8601
×
513
        return value.strftime(timestamp_format)
1✔
514

515
    @staticmethod
1✔
516
    def _timestamp_unixtimestamp(value: datetime) -> float:
1✔
517
        return value.timestamp()
1✔
518

519
    def _timestamp_rfc822(self, value: datetime) -> str:
1✔
520
        if isinstance(value, datetime):
1✔
521
            value = self._timestamp_unixtimestamp(value)
1✔
522
        return formatdate(value, usegmt=True)
1✔
523

524
    def _convert_timestamp_to_str(
1✔
525
        self, value: Union[int, str, datetime], timestamp_format=None
526
    ) -> str:
527
        if timestamp_format is None:
1✔
528
            timestamp_format = self.TIMESTAMP_FORMAT
1✔
529
        timestamp_format = timestamp_format.lower()
1✔
530
        datetime_obj = parse_to_aware_datetime(value)
1✔
531
        converter = getattr(self, "_timestamp_%s" % timestamp_format)
1✔
532
        final_value = converter(datetime_obj)
1✔
533
        return final_value
1✔
534

535
    @staticmethod
1✔
536
    def _get_serialized_name(shape: Shape, default_name: str) -> str:
1✔
537
        """
538
        Returns the serialized name for the shape if it exists.
539
        Otherwise, it will return the passed in default_name.
540
        """
541
        return shape.serialization.get("name", default_name)
1✔
542

543
    def _get_base64(self, value: Union[str, bytes]):
1✔
544
        """
545
        Returns the base64-encoded version of value, handling
546
        both strings and bytes. The returned value is a string
547
        via the default encoding.
548
        """
549
        if isinstance(value, str):
1✔
550
            value = value.encode(self.DEFAULT_ENCODING)
×
551
        return base64.b64encode(value).strip().decode(self.DEFAULT_ENCODING)
1✔
552

553
    def _encode_payload(self, body: Union[bytes, str]) -> bytes:
1✔
554
        if isinstance(body, str):
1✔
555
            return body.encode(self.DEFAULT_ENCODING)
1✔
556
        return body
1✔
557

558
    def _prepare_additional_traits_in_response(
1✔
559
        self, response: Response, operation_model: OperationModel, request_id: str
560
    ):
561
        """Applies additional traits on the raw response for a given model or protocol."""
562
        if operation_model.http_checksum_required:
1✔
563
            self._add_md5_header(response)
×
564
        return response
1✔
565

566
    def _has_header(self, header_name: str, headers: dict):
1✔
567
        """Case-insensitive check for header key."""
568
        if header_name is None:
1✔
569
            return False
×
570
        else:
571
            return header_name.lower() in [key.lower() for key in headers.keys()]
1✔
572

573
    def _add_md5_header(self, response: Response):
1✔
574
        """Add a Content-MD5 header if not yet there. Adapted from botocore.utils"""
575
        headers = response.headers
×
576
        body = response.data
×
577
        if body is not None and "Content-MD5" not in headers:
×
578
            md5_digest = calculate_md5(body)
×
579
            headers["Content-MD5"] = md5_digest
×
580

581
    def _get_error_message(self, error: Exception) -> Optional[str]:
1✔
582
        return str(error) if error is not None and str(error) != "None" else None
1✔
583

584

585
class BaseXMLResponseSerializer(ResponseSerializer):
1✔
586
    """
587
    The BaseXMLResponseSerializer performs the basic logic for the XML response serialization.
588
    It is slightly adapted by the QueryResponseSerializer.
589
    While the botocore's RestXMLSerializer is quite similar, there are some subtle differences (since botocore's
590
    implementation handles the serialization of the requests from the client to the service, not the responses from the
591
    service to the client).
592
    """
593

594
    SUPPORTED_MIME_TYPES = [TEXT_XML, APPLICATION_XML, APPLICATION_JSON]
1✔
595

596
    def _serialize_error(
1✔
597
        self,
598
        error: ServiceException,
599
        response: Response,
600
        shape: StructureShape,
601
        operation_model: OperationModel,
602
        mime_type: str,
603
        request_id: str,
604
    ) -> None:
605
        # Check if we need to add a namespace
606
        attr = (
1✔
607
            {"xmlns": operation_model.metadata.get("xmlNamespace")}
608
            if "xmlNamespace" in operation_model.metadata
609
            else {}
610
        )
611
        root = ETree.Element("ErrorResponse", attr)
1✔
612

613
        error_tag = ETree.SubElement(root, "Error")
1✔
614
        self._add_error_tags(error, error_tag, mime_type)
1✔
615
        request_id_element = ETree.SubElement(root, "RequestId")
1✔
616
        request_id_element.text = request_id
1✔
617

618
        self._add_additional_error_tags(vars(error), root, shape, mime_type)
1✔
619

620
        response.set_response(self._encode_payload(self._node_to_string(root, mime_type)))
1✔
621

622
    def _add_error_tags(
1✔
623
        self, error: ServiceException, error_tag: ETree.Element, mime_type: str
624
    ) -> None:
625
        code_tag = ETree.SubElement(error_tag, "Code")
1✔
626
        code_tag.text = error.code
1✔
627
        message = self._get_error_message(error)
1✔
628
        if message:
1✔
629
            self._default_serialize(error_tag, message, None, "Message", mime_type)
1✔
630
        if error.sender_fault:
1✔
631
            # The sender fault is either not set or "Sender"
632
            self._default_serialize(error_tag, "Sender", None, "Type", mime_type)
1✔
633

634
    def _add_additional_error_tags(
1✔
635
        self, parameters: dict, node: ETree, shape: StructureShape, mime_type: str
636
    ):
637
        if shape:
1✔
638
            params = {}
1✔
639
            # TODO add a possibility to serialize simple non-modelled errors (like S3 NoSuchBucket#BucketName)
640
            for member in shape.members:
1✔
641
                # XML protocols do not add modeled default fields to the root node
642
                # (tested for cloudfront, route53, cloudwatch, iam)
643
                if member.lower() not in ["code", "message"] and member in parameters:
1✔
644
                    params[member] = parameters[member]
1✔
645

646
            # If there is an error shape with members which should be set, they need to be added to the node
647
            if params:
1✔
648
                # Serialize the remaining params
649
                root_name = shape.serialization.get("name", shape.name)
1✔
650
                pseudo_root = ETree.Element("")
1✔
651
                self._serialize(shape, params, pseudo_root, root_name, mime_type)
1✔
652
                real_root = list(pseudo_root)[0]
1✔
653
                # Add the child elements to the already created root error element
654
                for child in list(real_root):
1✔
655
                    node.append(child)
1✔
656

657
    def _serialize_body_params(
1✔
658
        self,
659
        params: dict,
660
        shape: Shape,
661
        operation_model: OperationModel,
662
        mime_type: str,
663
        request_id: str,
664
    ) -> Optional[str]:
665
        root = self._serialize_body_params_to_xml(params, shape, operation_model, mime_type)
1✔
666
        self._prepare_additional_traits_in_xml(root, request_id)
1✔
667
        return self._node_to_string(root, mime_type)
1✔
668

669
    def _serialize_body_params_to_xml(
1✔
670
        self, params: dict, shape: Shape, operation_model: OperationModel, mime_type: str
671
    ) -> Optional[ETree.Element]:
672
        if shape is None:
1✔
673
            return
1✔
674
        # The botocore serializer expects `shape.serialization["name"]`, but this isn't always present for responses
675
        root_name = shape.serialization.get("name", shape.name)
1✔
676
        pseudo_root = ETree.Element("")
1✔
677
        self._serialize(shape, params, pseudo_root, root_name, mime_type)
1✔
678
        real_root = list(pseudo_root)[0]
1✔
679
        return real_root
1✔
680

681
    def _serialize(
1✔
682
        self, shape: Shape, params: Any, xmlnode: ETree.Element, name: str, mime_type: str
683
    ) -> None:
684
        """This method dynamically invokes the correct `_serialize_type_*` method for each shape type."""
685
        if shape is None:
1✔
686
            return
×
687
        # Some output shapes define a `resultWrapper` in their serialization spec.
688
        # While the name would imply that the result is _wrapped_, it is actually renamed.
689
        if shape.serialization.get("resultWrapper"):
1✔
690
            name = shape.serialization.get("resultWrapper")
1✔
691

692
        try:
1✔
693
            method = getattr(self, "_serialize_type_%s" % shape.type_name, self._default_serialize)
1✔
694
            method(xmlnode, params, shape, name, mime_type)
1✔
695
        except (TypeError, ValueError, AttributeError) as e:
1✔
696
            raise ProtocolSerializerError(
1✔
697
                f"Invalid type when serializing {shape.name}: '{xmlnode}' cannot be parsed to {shape.type_name}."
698
            ) from e
699

700
    def _serialize_type_structure(
1✔
701
        self, xmlnode: ETree.Element, params: dict, shape: StructureShape, name: str, mime_type
702
    ) -> None:
703
        structure_node = ETree.SubElement(xmlnode, name)
1✔
704

705
        if "xmlNamespace" in shape.serialization:
1✔
706
            namespace_metadata = shape.serialization["xmlNamespace"]
×
707
            attribute_name = "xmlns"
×
708
            if namespace_metadata.get("prefix"):
×
709
                attribute_name += ":%s" % namespace_metadata["prefix"]
×
710
            structure_node.attrib[attribute_name] = namespace_metadata["uri"]
×
711
        for key, value in params.items():
1✔
712
            if value is None:
1✔
713
                # Don't serialize any param whose value is None.
714
                continue
1✔
715
            try:
1✔
716
                member_shape = shape.members[key]
1✔
717
            except KeyError:
1✔
718
                LOG.warning(
1✔
719
                    "Response object %s contains a member which is not specified: %s",
720
                    shape.name,
721
                    key,
722
                )
723
                continue
1✔
724
            member_name = member_shape.serialization.get("name", key)
1✔
725
            # We need to special case member shapes that are marked as an xmlAttribute.
726
            # Rather than serializing into an XML child node, we instead serialize the shape to
727
            # an XML attribute of the *current* node.
728
            if member_shape.serialization.get("xmlAttribute"):
1✔
729
                # xmlAttributes must have a serialization name.
730
                xml_attribute_name = member_shape.serialization["name"]
×
731
                structure_node.attrib[xml_attribute_name] = value
×
732
                continue
×
733
            self._serialize(member_shape, value, structure_node, member_name, mime_type)
1✔
734

735
    def _serialize_type_list(
1✔
736
        self, xmlnode: ETree.Element, params: list, shape: ListShape, name: str, mime_type: str
737
    ) -> None:
738
        if params is None:
1✔
739
            # Don't serialize any param whose value is None.
740
            return
×
741
        member_shape = shape.member
1✔
742
        if shape.serialization.get("flattened"):
1✔
743
            # If the list is flattened, either take the member's "name" or the name of the usual name for the parent
744
            # element for the children.
745
            element_name = self._get_serialized_name(member_shape, name)
1✔
746
            list_node = xmlnode
1✔
747
        else:
748
            element_name = self._get_serialized_name(member_shape, "member")
1✔
749
            list_node = ETree.SubElement(xmlnode, name)
1✔
750
        for item in params:
1✔
751
            # Don't serialize any item which is None
752
            if item is not None:
1✔
753
                self._serialize(member_shape, item, list_node, element_name, mime_type)
1✔
754

755
    def _serialize_type_map(
1✔
756
        self, xmlnode: ETree.Element, params: dict, shape: MapShape, name: str, mime_type: str
757
    ) -> None:
758
        """
759
        Given the ``name`` of MyMap, an input of {"key1": "val1", "key2": "val2"}, and the ``flattened: False``
760
        we serialize this as:
761
          <MyMap>
762
            <entry>
763
              <key>key1</key>
764
              <value>val1</value>
765
            </entry>
766
            <entry>
767
              <key>key2</key>
768
              <value>val2</value>
769
            </entry>
770
          </MyMap>
771
        If it is flattened, it is serialized as follows:
772
          <MyMap>
773
            <key>key1</key>
774
            <value>val1</value>
775
          </MyMap>
776
          <MyMap>
777
            <key>key2</key>
778
            <value>val2</value>
779
          </MyMap>
780
        """
781
        if params is None:
×
782
            # Don't serialize a non-existing map
783
            return
×
784
        if shape.serialization.get("flattened"):
×
785
            entries_node = xmlnode
×
786
            entry_node_name = name
×
787
        else:
788
            entries_node = ETree.SubElement(xmlnode, name)
×
789
            entry_node_name = "entry"
×
790

791
        for key, value in params.items():
×
792
            if value is None:
×
793
                # Don't serialize any param whose value is None.
794
                continue
×
795
            entry_node = ETree.SubElement(entries_node, entry_node_name)
×
796
            key_name = self._get_serialized_name(shape.key, default_name="key")
×
797
            val_name = self._get_serialized_name(shape.value, default_name="value")
×
798
            self._serialize(shape.key, key, entry_node, key_name, mime_type)
×
799
            self._serialize(shape.value, value, entry_node, val_name, mime_type)
×
800

801
    @staticmethod
1✔
802
    def _serialize_type_boolean(xmlnode: ETree.Element, params: bool, _, name: str, __) -> None:
1✔
803
        """
804
        For scalar types, the 'params' attr is actually just a scalar value representing the data
805
        we need to serialize as a boolean. It will either be 'true' or 'false'
806
        """
807
        node = ETree.SubElement(xmlnode, name)
1✔
808
        if params:
1✔
809
            str_value = "true"
1✔
810
        else:
811
            str_value = "false"
1✔
812
        node.text = str_value
1✔
813

814
    def _serialize_type_blob(
1✔
815
        self, xmlnode: ETree.Element, params: Union[str, bytes], _, name: str, __
816
    ) -> None:
817
        node = ETree.SubElement(xmlnode, name)
×
818
        node.text = self._get_base64(params)
×
819

820
    def _serialize_type_timestamp(
1✔
821
        self, xmlnode: ETree.Element, params: str, shape: Shape, name: str, mime_type: str
822
    ) -> None:
823
        node = ETree.SubElement(xmlnode, name)
1✔
824
        if mime_type != APPLICATION_JSON:
1✔
825
            # Default XML timestamp serialization
826
            node.text = self._convert_timestamp_to_str(
1✔
827
                params, shape.serialization.get("timestampFormat")
828
            )
829
        else:
830
            # For services with XML protocols, where the Accept header is JSON, timestamps are formatted like for JSON
831
            # protocols, but using the int representation instead of the float representation (f.e. requesting JSON
832
            # responses in STS).
833
            node.text = str(
1✔
834
                int(self._convert_timestamp_to_str(params, JSONResponseSerializer.TIMESTAMP_FORMAT))
835
            )
836

837
    def _default_serialize(self, xmlnode: ETree.Element, params: str, _, name: str, __) -> None:
1✔
838
        node = ETree.SubElement(xmlnode, name)
1✔
839
        node.text = str(params)
1✔
840

841
    def _prepare_additional_traits_in_xml(self, root: Optional[ETree.Element], request_id: str):
1✔
842
        """
843
        Prepares the XML root node before being serialized with additional traits (like the Response ID in the Query
844
        protocol).
845
        For some protocols (like rest-xml), the root can be None.
846
        """
847
        pass
1✔
848

849
    def _create_default_response(self, operation_model: OperationModel, mime_type: str) -> Response:
1✔
850
        response = super()._create_default_response(operation_model, mime_type)
1✔
851
        response.headers["Content-Type"] = mime_type
1✔
852
        return response
1✔
853

854
    def _node_to_string(self, root: Optional[ETree.Element], mime_type: str) -> Optional[str]:
1✔
855
        """Generates the string representation of the given XML element."""
856
        if root is not None:
1✔
857
            content = ETree.tostring(
1✔
858
                element=root, encoding=self.DEFAULT_ENCODING, xml_declaration=True
859
            )
860
            if mime_type == APPLICATION_JSON:
1✔
861
                # FIXME try to directly convert the ElementTree node to JSON
862
                xml_dict = xmltodict.parse(content)
1✔
863
                xml_dict = strip_xmlns(xml_dict)
1✔
864
                content = json.dumps(xml_dict)
1✔
865
            return content
1✔
866

867

868
class BaseRestResponseSerializer(ResponseSerializer, ABC):
1✔
869
    """
870
    The BaseRestResponseSerializer performs the basic logic for the ReST response serialization.
871
    In our case it basically only adds the request metadata to the HTTP header.
872
    """
873

874
    HEADER_TIMESTAMP_FORMAT = "rfc822"
1✔
875

876
    def _serialize_response(
1✔
877
        self,
878
        parameters: dict,
879
        response: Response,
880
        shape: Optional[Shape],
881
        shape_members: dict,
882
        operation_model: OperationModel,
883
        mime_type: str,
884
        request_id: str,
885
    ) -> None:
886
        header_params, payload_params = self._partition_members(parameters, shape)
1✔
887
        self._process_header_members(header_params, response, shape)
1✔
888
        # "HEAD" responses are basically "GET" responses without the actual body.
889
        # Do not process the body payload in this case (setting a body could also manipulate the headers)
890
        if operation_model.http.get("method") != "HEAD":
1✔
891
            self._serialize_payload(
1✔
892
                payload_params,
893
                response,
894
                shape,
895
                shape_members,
896
                operation_model,
897
                mime_type,
898
                request_id,
899
            )
900
        self._serialize_content_type(response, shape, shape_members, mime_type)
1✔
901
        self._prepare_additional_traits_in_response(response, operation_model, request_id)
1✔
902

903
    def _serialize_payload(
1✔
904
        self,
905
        parameters: dict,
906
        response: Response,
907
        shape: Optional[Shape],
908
        shape_members: dict,
909
        operation_model: OperationModel,
910
        mime_type: str,
911
        request_id: str,
912
    ) -> None:
913
        """
914
        Serializes the given payload.
915

916
        :param parameters: The user input params
917
        :param response: The final serialized Response
918
        :param shape: Describes the expected output shape (can be None in case of an "empty" response)
919
        :param shape_members: The members of the output struct shape
920
        :param operation_model: The specification of the operation of which the response is serialized here
921
        :param mime_type: Mime type which should be used to encode the payload
922
        :param request_id: autogenerated AWS request ID identifying the original request
923
        :return: None - the given `serialized` dict is modified
924
        """
925
        if shape is None:
1✔
926
            return
1✔
927

928
        payload_member = shape.serialization.get("payload")
1✔
929
        # If this shape is defined as being an event, we need to search for the payload member
930
        if not payload_member and shape.serialization.get("event"):
1✔
931
            for member_name, member_shape in shape_members.items():
1✔
932
                # Try to find the first shape which is marked as "eventpayload" and is given in the params dict
933
                if member_shape.serialization.get("eventpayload") and parameters.get(member_name):
1✔
934
                    payload_member = member_name
1✔
935
                    break
1✔
936
        if payload_member is not None and shape_members[payload_member].type_name in [
1✔
937
            "blob",
938
            "string",
939
        ]:
940
            # If it's streaming, then the body is just the value of the payload.
941
            body_payload = parameters.get(payload_member, b"")
1✔
942
            body_payload = self._encode_payload(body_payload)
1✔
943
            response.set_response(body_payload)
1✔
944
        elif payload_member is not None:
1✔
945
            # If there's a payload member, we serialized that member to the body.
946
            body_params = parameters.get(payload_member)
1✔
947
            if body_params is not None:
1✔
948
                response.set_response(
1✔
949
                    self._encode_payload(
950
                        self._serialize_body_params(
951
                            body_params,
952
                            shape_members[payload_member],
953
                            operation_model,
954
                            mime_type,
955
                            request_id,
956
                        )
957
                    )
958
                )
959
        else:
960
            # Otherwise, we use the "traditional" way of serializing the whole parameters dict recursively.
961
            response.set_response(
1✔
962
                self._encode_payload(
963
                    self._serialize_body_params(
964
                        parameters, shape, operation_model, mime_type, request_id
965
                    )
966
                )
967
            )
968

969
    def _serialize_content_type(
1✔
970
        self, serialized: Response, shape: Shape, shape_members: dict, mime_type: str
971
    ):
972
        """
973
        Some protocols require varied Content-Type headers depending on user input.
974
        This allows subclasses to apply this conditionally.
975
        """
976
        pass
1✔
977

978
    def _has_streaming_payload(self, payload: Optional[str], shape_members):
1✔
979
        """Determine if payload is streaming (a blob or string)."""
980
        return payload is not None and shape_members[payload].type_name in ["blob", "string"]
1✔
981

982
    def _prepare_additional_traits_in_response(
1✔
983
        self, response: Response, operation_model: OperationModel, request_id: str
984
    ):
985
        """Adds the request ID to the headers (in contrast to the body - as in the Query protocol)."""
986
        response = super()._prepare_additional_traits_in_response(
1✔
987
            response, operation_model, request_id
988
        )
989
        response.headers["x-amz-request-id"] = request_id
1✔
990
        return response
1✔
991

992
    def _process_header_members(self, parameters: dict, response: Response, shape: Shape):
1✔
993
        shape_members = shape.members if isinstance(shape, StructureShape) else []
1✔
994
        for name in shape_members:
1✔
995
            member_shape = shape_members[name]
1✔
996
            location = member_shape.serialization.get("location")
1✔
997
            if not location:
1✔
998
                continue
1✔
999
            if name not in parameters:
1✔
1000
                # ignores optional keys
1001
                continue
1✔
1002
            key = member_shape.serialization.get("name", name)
1✔
1003
            value = parameters[name]
1✔
1004
            if value is None:
1✔
1005
                continue
×
1006
            if location == "header":
1✔
1007
                response.headers[key] = self._serialize_header_value(member_shape, value)
1✔
1008
            elif location == "headers":
1✔
1009
                header_prefix = key
1✔
1010
                self._serialize_header_map(header_prefix, response, value)
1✔
1011
            elif location == "statusCode":
1✔
1012
                response.status_code = int(value)
1✔
1013

1014
    def _serialize_header_map(self, prefix: str, response: Response, params: dict) -> None:
1✔
1015
        """Serializes the header map for the location trait "headers"."""
1016
        for key, val in params.items():
1✔
1017
            actual_key = prefix + key
1✔
1018
            response.headers[actual_key] = val
1✔
1019

1020
    def _serialize_header_value(self, shape: Shape, value: Any):
1✔
1021
        """Serializes a value for the location trait "header"."""
1022
        if shape.type_name == "timestamp":
1✔
1023
            datetime_obj = parse_to_aware_datetime(value)
1✔
1024
            timestamp_format = shape.serialization.get(
1✔
1025
                "timestampFormat", self.HEADER_TIMESTAMP_FORMAT
1026
            )
1027
            return self._convert_timestamp_to_str(datetime_obj, timestamp_format)
1✔
1028
        elif shape.type_name == "list":
1✔
1029
            converted_value = [
×
1030
                self._serialize_header_value(shape.member, v) for v in value if v is not None
1031
            ]
1032
            return ",".join(converted_value)
×
1033
        elif shape.type_name == "boolean":
1✔
1034
            # Set the header value to "true" if the given value is truthy, otherwise set the header value to "false".
1035
            return "true" if value else "false"
1✔
1036
        elif is_json_value_header(shape):
1✔
1037
            # Serialize with no spaces after separators to save space in
1038
            # the header.
1039
            return self._get_base64(json.dumps(value, separators=(",", ":")))
×
1040
        else:
1041
            return value
1✔
1042

1043
    def _partition_members(self, parameters: dict, shape: Optional[Shape]) -> Tuple[dict, dict]:
1✔
1044
        """Separates the top-level keys in the given parameters dict into header- and payload-located params."""
1045
        if not isinstance(shape, StructureShape):
1✔
1046
            # If the shape isn't a structure, we default to the whole response being parsed in the body.
1047
            # Non-payload members are only loaded in the top-level hierarchy and those are always structures.
1048
            return {}, parameters
1✔
1049
        header_params = {}
1✔
1050
        payload_params = {}
1✔
1051
        shape_members = shape.members
1✔
1052
        for name in shape_members:
1✔
1053
            member_shape = shape_members[name]
1✔
1054
            if name not in parameters:
1✔
1055
                continue
1✔
1056
            location = member_shape.serialization.get("location")
1✔
1057
            if location:
1✔
1058
                header_params[name] = parameters[name]
1✔
1059
            else:
1060
                payload_params[name] = parameters[name]
1✔
1061
        return header_params, payload_params
1✔
1062

1063

1064
class RestXMLResponseSerializer(BaseRestResponseSerializer, BaseXMLResponseSerializer):
1✔
1065
    """
1066
    The ``RestXMLResponseSerializer`` is responsible for the serialization of responses from services with the
1067
    ``rest-xml`` protocol.
1068
    It combines the ``BaseRestResponseSerializer`` (for the ReST specific logic) with the ``BaseXMLResponseSerializer``
1069
    (for the XML body response serialization).
1070
    """
1071

1072
    pass
1✔
1073

1074

1075
class QueryResponseSerializer(BaseXMLResponseSerializer):
1✔
1076
    """
1077
    The ``QueryResponseSerializer`` is responsible for the serialization of responses from services which use the
1078
    ``query`` protocol. The responses of these services also use XML. It is basically a subset of the features, since it
1079
    does not allow any payload or location traits.
1080
    """
1081

1082
    def _serialize_response(
1✔
1083
        self,
1084
        parameters: dict,
1085
        response: Response,
1086
        shape: Optional[Shape],
1087
        shape_members: dict,
1088
        operation_model: OperationModel,
1089
        mime_type: str,
1090
        request_id: str,
1091
    ) -> None:
1092
        """
1093
        Serializes the given parameters as XML for the query protocol.
1094

1095
        :param parameters: The user input params
1096
        :param response: The final serialized Response
1097
        :param shape: Describes the expected output shape (can be None in case of an "empty" response)
1098
        :param shape_members: The members of the output struct shape
1099
        :param operation_model: The specification of the operation of which the response is serialized here
1100
        :param mime_type: Mime type which should be used to encode the payload
1101
        :param request_id: autogenerated AWS request ID identifying the original request
1102
        :return: None - the given `serialized` dict is modified
1103
        """
1104
        response.set_response(
1✔
1105
            self._encode_payload(
1106
                self._serialize_body_params(
1107
                    parameters, shape, operation_model, mime_type, request_id
1108
                )
1109
            )
1110
        )
1111

1112
    def _serialize_body_params_to_xml(
1✔
1113
        self, params: dict, shape: Shape, operation_model: OperationModel, mime_type: str
1114
    ) -> ETree.Element:
1115
        # The Query protocol responses have a root element which is not contained in the specification file.
1116
        # Therefore, we first call the super function to perform the normal XML serialization, and afterwards wrap the
1117
        # result in a root element based on the operation name.
1118
        node = super()._serialize_body_params_to_xml(params, shape, operation_model, mime_type)
1✔
1119

1120
        # Check if we need to add a namespace
1121
        attr = (
1✔
1122
            {"xmlns": operation_model.metadata.get("xmlNamespace")}
1123
            if "xmlNamespace" in operation_model.metadata
1124
            else None
1125
        )
1126

1127
        # Create the root element and add the result of the XML serializer as a child node
1128
        root = ETree.Element(f"{operation_model.name}Response", attr)
1✔
1129
        if node is not None:
1✔
1130
            root.append(node)
1✔
1131
        return root
1✔
1132

1133
    def _prepare_additional_traits_in_xml(self, root: Optional[ETree.Element], request_id: str):
1✔
1134
        # Add the response metadata here (it's not defined in the specs)
1135
        # For the ec2 and the query protocol, the root cannot be None at this time.
1136
        response_metadata = ETree.SubElement(root, "ResponseMetadata")
1✔
1137
        request_id_element = ETree.SubElement(response_metadata, "RequestId")
1✔
1138
        request_id_element.text = request_id
1✔
1139

1140

1141
class EC2ResponseSerializer(QueryResponseSerializer):
1✔
1142
    """
1143
    The ``EC2ResponseSerializer`` is responsible for the serialization of responses from services which use the
1144
    ``ec2`` protocol (basically the EC2 service). This protocol is basically equal to the ``query`` protocol with only
1145
    a few subtle differences.
1146
    """
1147

1148
    def _serialize_error(
1✔
1149
        self,
1150
        error: ServiceException,
1151
        response: Response,
1152
        shape: StructureShape,
1153
        operation_model: OperationModel,
1154
        mime_type: str,
1155
        request_id: str,
1156
    ) -> None:
1157
        # EC2 errors look like:
1158
        # <Response>
1159
        #   <Errors>
1160
        #     <Error>
1161
        #       <Code>InvalidInstanceID.Malformed</Code>
1162
        #       <Message>Invalid id: "1343124"</Message>
1163
        #     </Error>
1164
        #   </Errors>
1165
        #   <RequestID>12345</RequestID>
1166
        # </Response>
1167
        # This is different from QueryParser in that it's RequestID, not RequestId
1168
        # and that the Error tag is in an enclosing Errors tag.
1169
        attr = (
1✔
1170
            {"xmlns": operation_model.metadata.get("xmlNamespace")}
1171
            if "xmlNamespace" in operation_model.metadata
1172
            else None
1173
        )
1174
        root = ETree.Element("Response", attr)
1✔
1175
        errors_tag = ETree.SubElement(root, "Errors")
1✔
1176
        error_tag = ETree.SubElement(errors_tag, "Error")
1✔
1177
        self._add_error_tags(error, error_tag, mime_type)
1✔
1178
        request_id_element = ETree.SubElement(root, "RequestID")
1✔
1179
        request_id_element.text = request_id
1✔
1180
        response.set_response(self._encode_payload(self._node_to_string(root, mime_type)))
1✔
1181

1182
    def _prepare_additional_traits_in_xml(self, root: Optional[ETree.Element], request_id: str):
1✔
1183
        # The EC2 protocol does not use the root output shape, therefore we need to remove the hierarchy level
1184
        # below the root level
1185
        if len(root) > 0:
1✔
1186
            output_node = root[0]
1✔
1187
            for child in output_node:
1✔
1188
                root.append(child)
1✔
1189
            root.remove(output_node)
1✔
1190

1191
        # Add the requestId here (it's not defined in the specs)
1192
        # For the ec2 and the query protocol, the root cannot be None at this time.
1193
        request_id_element = ETree.SubElement(root, "requestId")
1✔
1194
        request_id_element.text = request_id
1✔
1195

1196

1197
class JSONResponseSerializer(ResponseSerializer):
1✔
1198
    """
1199
    The ``JSONResponseSerializer`` is responsible for the serialization of responses from services with the ``json``
1200
    protocol. It implements the JSON response body serialization, which is also used by the
1201
    ``RestJSONResponseSerializer``.
1202
    """
1203

1204
    JSON_TYPES = [APPLICATION_JSON, APPLICATION_AMZ_JSON_1_0, APPLICATION_AMZ_JSON_1_1]
1✔
1205
    CBOR_TYPES = [APPLICATION_CBOR, APPLICATION_AMZ_CBOR_1_1]
1✔
1206
    SUPPORTED_MIME_TYPES = JSON_TYPES + CBOR_TYPES
1✔
1207

1208
    TIMESTAMP_FORMAT = "unixtimestamp"
1✔
1209

1210
    def _serialize_error(
1✔
1211
        self,
1212
        error: ServiceException,
1213
        response: Response,
1214
        shape: StructureShape,
1215
        operation_model: OperationModel,
1216
        mime_type: str,
1217
        request_id: str,
1218
    ) -> None:
1219
        body = dict()
1✔
1220

1221
        # TODO implement different service-specific serializer configurations
1222
        #   - currently we set both, the `__type` member as well as the `X-Amzn-Errortype` header
1223
        #   - the specification defines that it's either the __type field OR the header
1224
        response.headers["X-Amzn-Errortype"] = error.code
1✔
1225
        body["__type"] = error.code
1✔
1226

1227
        if shape:
1✔
1228
            remaining_params = {}
1✔
1229
            # TODO add a possibility to serialize simple non-modelled errors (like S3 NoSuchBucket#BucketName)
1230
            for member in shape.members:
1✔
1231
                if hasattr(error, member):
1✔
1232
                    remaining_params[member] = getattr(error, member)
1✔
1233
                # Default error message fields can sometimes have different casing in the specs
1234
                elif member.lower() in ["code", "message"] and hasattr(error, member.lower()):
1✔
1235
                    remaining_params[member] = getattr(error, member.lower())
1✔
1236
            self._serialize(body, remaining_params, shape, None, mime_type)
1✔
1237

1238
        # Only set the message if it has not been set with the shape members
1239
        if "message" not in body and "Message" not in body:
1✔
1240
            message = self._get_error_message(error)
1✔
1241
            if message is not None:
1✔
1242
                body["message"] = message
1✔
1243

1244
        if mime_type in self.CBOR_TYPES:
1✔
1245
            response.set_response(cbor2_dumps(body, datetime_as_timestamp=True))
×
1246
            response.content_type = mime_type
×
1247
        else:
1248
            response.set_json(body)
1✔
1249

1250
    def _serialize_response(
1✔
1251
        self,
1252
        parameters: dict,
1253
        response: Response,
1254
        shape: Optional[Shape],
1255
        shape_members: dict,
1256
        operation_model: OperationModel,
1257
        mime_type: str,
1258
        request_id: str,
1259
    ) -> None:
1260
        if mime_type in self.CBOR_TYPES:
1✔
1261
            response.content_type = mime_type
1✔
1262
        else:
1263
            json_version = operation_model.metadata.get("jsonVersion")
1✔
1264
            if json_version is not None:
1✔
1265
                response.headers["Content-Type"] = "application/x-amz-json-%s" % json_version
1✔
1266
        response.set_response(
1✔
1267
            self._serialize_body_params(parameters, shape, operation_model, mime_type, request_id)
1268
        )
1269

1270
    def _serialize_body_params(
1✔
1271
        self,
1272
        params: dict,
1273
        shape: Shape,
1274
        operation_model: OperationModel,
1275
        mime_type: str,
1276
        request_id: str,
1277
    ) -> Optional[str]:
1278
        body = {}
1✔
1279
        if shape is not None:
1✔
1280
            self._serialize(body, params, shape, None, mime_type)
1✔
1281

1282
        if mime_type in self.CBOR_TYPES:
1✔
1283
            return cbor2_dumps(body, datetime_as_timestamp=True)
1✔
1284
        else:
1285
            return json.dumps(body)
1✔
1286

1287
    def _serialize(self, body: dict, value: Any, shape, key: Optional[str], mime_type: str):
1✔
1288
        """This method dynamically invokes the correct `_serialize_type_*` method for each shape type."""
1289
        try:
1✔
1290
            method = getattr(self, "_serialize_type_%s" % shape.type_name, self._default_serialize)
1✔
1291
            method(body, value, shape, key, mime_type)
1✔
1292
        except (TypeError, ValueError, AttributeError) as e:
×
1293
            raise ProtocolSerializerError(
×
1294
                f"Invalid type when serializing {shape.name}: '{value}' cannot be parsed to {shape.type_name}."
1295
            ) from e
1296

1297
    def _serialize_type_structure(
1✔
1298
        self, body: dict, value: dict, shape: StructureShape, key: Optional[str], mime_type: str
1299
    ):
1300
        if value is None:
1✔
1301
            return
×
1302
        if shape.is_document_type:
1✔
1303
            body[key] = value
×
1304
        else:
1305
            if key is not None:
1✔
1306
                # If a key is provided, this is a result of a recursive
1307
                # call, so we need to add a new child dict as the value
1308
                # of the passed in serialized dict.  We'll then add
1309
                # all the structure members as key/vals in the new serialized
1310
                # dictionary we just created.
1311
                new_serialized = {}
1✔
1312
                body[key] = new_serialized
1✔
1313
                body = new_serialized
1✔
1314
            members = shape.members
1✔
1315
            for member_key, member_value in value.items():
1✔
1316
                if member_value is None:
1✔
1317
                    continue
1✔
1318
                try:
1✔
1319
                    member_shape = members[member_key]
1✔
1320
                except KeyError:
1✔
1321
                    LOG.warning(
1✔
1322
                        "Response object %s contains a member which is not specified: %s",
1323
                        shape.name,
1324
                        member_key,
1325
                    )
1326
                    continue
1✔
1327
                if "name" in member_shape.serialization:
1✔
1328
                    member_key = member_shape.serialization["name"]
1✔
1329
                self._serialize(body, member_value, member_shape, member_key, mime_type)
1✔
1330

1331
    def _serialize_type_map(
1✔
1332
        self, body: dict, value: dict, shape: MapShape, key: str, mime_type: str
1333
    ):
1334
        if value is None:
1✔
1335
            return
×
1336
        map_obj = {}
1✔
1337
        body[key] = map_obj
1✔
1338
        for sub_key, sub_value in value.items():
1✔
1339
            if sub_value is not None:
1✔
1340
                self._serialize(map_obj, sub_value, shape.value, sub_key, mime_type)
1✔
1341

1342
    def _serialize_type_list(
1✔
1343
        self, body: dict, value: list, shape: ListShape, key: str, mime_type: str
1344
    ):
1345
        if value is None:
1✔
1346
            return
×
1347
        list_obj = []
1✔
1348
        body[key] = list_obj
1✔
1349
        for list_item in value:
1✔
1350
            if list_item is not None:
1✔
1351
                wrapper = {}
1✔
1352
                # The JSON list serialization is the only case where we aren't
1353
                # setting a key on a dict.  We handle this by using
1354
                # a __current__ key on a wrapper dict to serialize each
1355
                # list item before appending it to the serialized list.
1356
                self._serialize(wrapper, list_item, shape.member, "__current__", mime_type)
1✔
1357
                list_obj.append(wrapper["__current__"])
1✔
1358

1359
    def _default_serialize(self, body: dict, value: Any, _, key: str, __):
1✔
1360
        body[key] = value
1✔
1361

1362
    def _serialize_type_timestamp(
1✔
1363
        self, body: dict, value: Any, shape: Shape, key: str, mime_type: str
1364
    ):
1365
        if mime_type in self.CBOR_TYPES:
1✔
1366
            # CBOR has native support for timestamps
1367
            body[key] = value
×
1368
        else:
1369
            timestamp_format = shape.serialization.get("timestampFormat")
1✔
1370
            body[key] = self._convert_timestamp_to_str(value, timestamp_format)
1✔
1371

1372
    def _serialize_type_blob(
1✔
1373
        self, body: dict, value: Union[str, bytes], _, key: str, mime_type: str
1374
    ):
1375
        if mime_type in self.CBOR_TYPES:
1✔
1376
            body[key] = value
1✔
1377
        else:
1378
            body[key] = self._get_base64(value)
1✔
1379

1380
    def _prepare_additional_traits_in_response(
1✔
1381
        self, response: Response, operation_model: OperationModel, request_id: str
1382
    ):
1383
        response.headers["x-amzn-requestid"] = request_id
1✔
1384
        response = super()._prepare_additional_traits_in_response(
1✔
1385
            response, operation_model, request_id
1386
        )
1387
        return response
1✔
1388

1389

1390
class RestJSONResponseSerializer(BaseRestResponseSerializer, JSONResponseSerializer):
1✔
1391
    """
1392
    The ``RestJSONResponseSerializer`` is responsible for the serialization of responses from services with the
1393
    ``rest-json`` protocol.
1394
    It combines the ``BaseRestResponseSerializer`` (for the ReST specific logic) with the ``JSONResponseSerializer``
1395
    (for the JSOn body response serialization).
1396
    """
1397

1398
    def _serialize_content_type(
1✔
1399
        self, serialized: Response, shape: Shape, shape_members: dict, mime_type: str
1400
    ):
1401
        """Set Content-Type to application/json for all structured bodies."""
1402
        payload = shape.serialization.get("payload") if shape is not None else None
1✔
1403
        if self._has_streaming_payload(payload, shape_members):
1✔
1404
            # Don't apply content-type to streaming bodies
1405
            return
1✔
1406

1407
        has_body = serialized.data != b""
1✔
1408
        has_content_type = self._has_header("Content-Type", serialized.headers)
1✔
1409
        if has_body and not has_content_type:
1✔
1410
            serialized.headers["Content-Type"] = mime_type
×
1411

1412

1413
class S3ResponseSerializer(RestXMLResponseSerializer):
1✔
1414
    """
1415
    The ``S3ResponseSerializer`` adds some minor logic to handle S3 specific peculiarities with the error response
1416
    serialization and the root node tag.
1417
    """
1418

1419
    SUPPORTED_MIME_TYPES = [APPLICATION_XML, TEXT_XML]
1✔
1420
    _RESPONSE_ROOT_TAGS = {
1✔
1421
        "CompleteMultipartUploadOutput": "CompleteMultipartUploadResult",
1422
        "CopyObjectOutput": "CopyObjectResult",
1423
        "CreateMultipartUploadOutput": "InitiateMultipartUploadResult",
1424
        "DeleteObjectsOutput": "DeleteResult",
1425
        "GetBucketAccelerateConfigurationOutput": "AccelerateConfiguration",
1426
        "GetBucketAclOutput": "AccessControlPolicy",
1427
        "GetBucketAnalyticsConfigurationOutput": "AnalyticsConfiguration",
1428
        "GetBucketCorsOutput": "CORSConfiguration",
1429
        "GetBucketEncryptionOutput": "ServerSideEncryptionConfiguration",
1430
        "GetBucketIntelligentTieringConfigurationOutput": "IntelligentTieringConfiguration",
1431
        "GetBucketInventoryConfigurationOutput": "InventoryConfiguration",
1432
        "GetBucketLifecycleOutput": "LifecycleConfiguration",
1433
        "GetBucketLifecycleConfigurationOutput": "LifecycleConfiguration",
1434
        "GetBucketLoggingOutput": "BucketLoggingStatus",
1435
        "GetBucketMetricsConfigurationOutput": "MetricsConfiguration",
1436
        "NotificationConfigurationDeprecated": "NotificationConfiguration",
1437
        "GetBucketOwnershipControlsOutput": "OwnershipControls",
1438
        "GetBucketPolicyStatusOutput": "PolicyStatus",
1439
        "GetBucketReplicationOutput": "ReplicationConfiguration",
1440
        "GetBucketRequestPaymentOutput": "RequestPaymentConfiguration",
1441
        "GetBucketTaggingOutput": "Tagging",
1442
        "GetBucketVersioningOutput": "VersioningConfiguration",
1443
        "GetBucketWebsiteOutput": "WebsiteConfiguration",
1444
        "GetObjectAclOutput": "AccessControlPolicy",
1445
        "GetObjectLegalHoldOutput": "LegalHold",
1446
        "GetObjectLockConfigurationOutput": "ObjectLockConfiguration",
1447
        "GetObjectRetentionOutput": "Retention",
1448
        "GetObjectTaggingOutput": "Tagging",
1449
        "GetObjectAttributesOutput": "GetObjectAttributesResponse",
1450
        "GetPublicAccessBlockOutput": "PublicAccessBlockConfiguration",
1451
        "ListBucketAnalyticsConfigurationsOutput": "ListBucketAnalyticsConfigurationResult",
1452
        "ListBucketInventoryConfigurationsOutput": "ListInventoryConfigurationsResult",
1453
        "ListBucketMetricsConfigurationsOutput": "ListMetricsConfigurationsResult",
1454
        "ListBucketsOutput": "ListAllMyBucketsResult",
1455
        "ListMultipartUploadsOutput": "ListMultipartUploadsResult",
1456
        "ListObjectsOutput": "ListBucketResult",
1457
        "ListObjectsV2Output": "ListBucketResult",
1458
        "ListObjectVersionsOutput": "ListVersionsResult",
1459
        "ListPartsOutput": "ListPartsResult",
1460
        "UploadPartCopyOutput": "CopyPartResult",
1461
    }
1462

1463
    XML_NAMESPACE = "http://s3.amazonaws.com/doc/2006-03-01/"
1✔
1464

1465
    def _serialize_response(
1✔
1466
        self,
1467
        parameters: dict,
1468
        response: Response,
1469
        shape: Optional[Shape],
1470
        shape_members: dict,
1471
        operation_model: OperationModel,
1472
        mime_type: str,
1473
        request_id: str,
1474
    ) -> None:
1475
        header_params, payload_params = self._partition_members(parameters, shape)
1✔
1476
        self._process_header_members(header_params, response, shape)
1✔
1477
        # "HEAD" responses are basically "GET" responses without the actual body.
1478
        # Do not process the body payload in this case (setting a body could also manipulate the headers)
1479
        # - If the response is a redirection, the body should be empty as well
1480
        # - If the response is from a "PUT" request, the body should be empty except if there's a specific "payload"
1481
        #   field in the serialization (CopyObject and CopyObjectPart)
1482
        http_method = operation_model.http.get("method")
1✔
1483
        if (
1✔
1484
            http_method != "HEAD"
1485
            and not 300 <= response.status_code < 400
1486
            and not (http_method == "PUT" and shape and not shape.serialization.get("payload"))
1487
        ):
1488
            self._serialize_payload(
1✔
1489
                payload_params,
1490
                response,
1491
                shape,
1492
                shape_members,
1493
                operation_model,
1494
                mime_type,
1495
                request_id,
1496
            )
1497
        self._serialize_content_type(response, shape, shape_members, mime_type)
1✔
1498

1499
    def _serialize_error(
1✔
1500
        self,
1501
        error: ServiceException,
1502
        response: Response,
1503
        shape: StructureShape,
1504
        operation_model: OperationModel,
1505
        mime_type: str,
1506
        request_id: str,
1507
    ) -> None:
1508
        attr = (
1✔
1509
            {"xmlns": operation_model.metadata.get("xmlNamespace")}
1510
            if "xmlNamespace" in operation_model.metadata
1511
            else {}
1512
        )
1513
        root = ETree.Element("Error", attr)
1✔
1514
        self._add_error_tags(error, root, mime_type)
1✔
1515
        request_id_element = ETree.SubElement(root, "RequestId")
1✔
1516
        request_id_element.text = request_id
1✔
1517

1518
        header_params, payload_params = self._partition_members(vars(error), shape)
1✔
1519
        self._add_additional_error_tags(payload_params, root, shape, mime_type)
1✔
1520
        self._process_header_members(header_params, response, shape)
1✔
1521

1522
        response.set_response(self._encode_payload(self._node_to_string(root, mime_type)))
1✔
1523

1524
    def _serialize_body_params(
1✔
1525
        self,
1526
        params: dict,
1527
        shape: Shape,
1528
        operation_model: OperationModel,
1529
        mime_type: str,
1530
        request_id: str,
1531
    ) -> Optional[str]:
1532
        root = self._serialize_body_params_to_xml(params, shape, operation_model, mime_type)
1✔
1533
        # S3 does not follow the specs on the root tag name for 41 of 44 operations
1534
        root.tag = self._RESPONSE_ROOT_TAGS.get(root.tag, root.tag)
1✔
1535
        self._prepare_additional_traits_in_xml(root, request_id)
1✔
1536
        return self._node_to_string(root, mime_type)
1✔
1537

1538
    def _prepare_additional_traits_in_response(
1✔
1539
        self, response: Response, operation_model: OperationModel, request_id: str
1540
    ):
1541
        """Adds the request ID to the headers (in contrast to the body - as in the Query protocol)."""
1542
        response = super()._prepare_additional_traits_in_response(
1✔
1543
            response, operation_model, request_id
1544
        )
1545
        # s3 extended Request ID
1546
        # mostly used internally on AWS and corresponds to a HostId
1547
        response.headers["x-amz-id-2"] = (
1✔
1548
            "s9lzHYrFp76ZVxRcpX9+5cjAnEH2ROuNkd2BHfIa6UkFVdtjf5mKR3/eTPFvsiP/XV/VLi31234="
1549
        )
1550
        return response
1✔
1551

1552
    def _add_error_tags(
1✔
1553
        self, error: ServiceException, error_tag: ETree.Element, mime_type: str
1554
    ) -> None:
1555
        code_tag = ETree.SubElement(error_tag, "Code")
1✔
1556
        code_tag.text = error.code
1✔
1557
        message = self._get_error_message(error)
1✔
1558
        if message:
1✔
1559
            self._default_serialize(error_tag, message, None, "Message", mime_type)
1✔
1560
        else:
1561
            # In S3, if there's no message, create an empty node
1562
            self._create_empty_node(error_tag, "Message")
1✔
1563
        if error.sender_fault:
1✔
1564
            # The sender fault is either not set or "Sender"
1565
            self._default_serialize(error_tag, "Sender", None, "Type", mime_type)
×
1566

1567
    @staticmethod
1✔
1568
    def _create_empty_node(xmlnode: ETree.Element, name: str) -> None:
1✔
1569
        ETree.SubElement(xmlnode, name)
1✔
1570

1571
    def _prepare_additional_traits_in_xml(self, root: Optional[ETree.Element], request_id: str):
1✔
1572
        # some tools (Serverless) require a newline after the "<?xml ...>\n" preamble line, e.g., for LocationConstraint
1573
        if root and not root.tail:
1✔
1574
            root.tail = "\n"
1✔
1575

1576
        root.attrib["xmlns"] = self.XML_NAMESPACE
1✔
1577

1578
    @staticmethod
1✔
1579
    def _timestamp_iso8601(value: datetime) -> str:
1✔
1580
        """
1581
        This is very specific to S3, S3 returns an ISO8601 timestamp but with milliseconds always set to 000
1582
        Some SDKs are very picky about the length
1583
        """
1584
        return value.strftime("%Y-%m-%dT%H:%M:%S.000Z")
1✔
1585

1586

1587
class SqsQueryResponseSerializer(QueryResponseSerializer):
1✔
1588
    """
1589
    Unfortunately, SQS uses a rare interpretation of the XML protocol: It uses HTML entities within XML tag text nodes.
1590
    For example:
1591
    - Normal XML serializers: <Message>No need to escape quotes (like this: ") with HTML entities in XML.</Message>
1592
    - SQS XML serializer: <Message>No need to escape quotes (like this: &quot;) with HTML entities in XML.</Message>
1593

1594
    None of the prominent XML frameworks for python allow HTML entity escapes when serializing XML.
1595
    This serializer implements the following workaround:
1596
    - Escape quotes and \r with their HTML entities (&quot; and &#xD;).
1597
    - Since & is (correctly) escaped in XML, the serialized string contains &amp;quot; and &amp;#xD;
1598
    - These double-escapes are corrected by replacing such strings with their original.
1599
    """
1600

1601
    # those are deleted from the JSON specs, but need to be kept for legacy reason (sent in 'x-amzn-query-error')
1602
    QUERY_PREFIXED_ERRORS = {
1✔
1603
        "BatchEntryIdsNotDistinct",
1604
        "BatchRequestTooLong",
1605
        "EmptyBatchRequest",
1606
        "InvalidBatchEntryId",
1607
        "MessageNotInflight",
1608
        "PurgeQueueInProgress",
1609
        "QueueDeletedRecently",
1610
        "TooManyEntriesInBatchRequest",
1611
        "UnsupportedOperation",
1612
    }
1613

1614
    # Some error code changed between JSON and query, and we need to have a way to map it for legacy reason
1615
    JSON_TO_QUERY_ERROR_CODES = {
1✔
1616
        "InvalidParameterValueException": "InvalidParameterValue",
1617
        "MissingRequiredParameterException": "MissingParameter",
1618
        "AccessDeniedException": "AccessDenied",
1619
        "QueueDoesNotExist": "AWS.SimpleQueueService.NonExistentQueue",
1620
        "QueueNameExists": "QueueAlreadyExists",
1621
    }
1622

1623
    SENDER_FAULT_ERRORS = (
1✔
1624
        QUERY_PREFIXED_ERRORS
1625
        | JSON_TO_QUERY_ERROR_CODES.keys()
1626
        | {"OverLimit", "ResourceNotFoundException"}
1627
    )
1628

1629
    def _default_serialize(self, xmlnode: ETree.Element, params: str, _, name: str, __) -> None:
1✔
1630
        """
1631
        Ensures that we "mark" characters in the node's text which need to be specifically encoded.
1632
        This is necessary to easily identify these specific characters later, after the standard XML serialization is
1633
        done, while not replacing any other occurrences of these characters which might appear in the serialized string.
1634
        """
1635
        node = ETree.SubElement(xmlnode, name)
1✔
1636
        node.text = (
1✔
1637
            str(params)
1638
            .replace('"', '__marker__"__marker__')
1639
            .replace("\r", "__marker__-r__marker__")
1640
        )
1641

1642
    def _node_to_string(self, root: Optional[ETree.ElementTree], mime_type: str) -> Optional[str]:
1✔
1643
        """Replaces the previously "marked" characters with their encoded value."""
1644
        generated_string = super()._node_to_string(root, mime_type)
1✔
1645
        if generated_string is None:
1✔
1646
            return None
×
1647
        generated_string = to_str(generated_string)
1✔
1648
        # Undo the second escaping of the &
1649
        # Undo the second escaping of the carriage return (\r)
1650
        if mime_type == APPLICATION_JSON:
1✔
1651
            # At this point the json was already dumped and escaped, so we replace directly.
1652
            generated_string = generated_string.replace(r"__marker__\"__marker__", r"\"").replace(
1✔
1653
                "__marker__-r__marker__", r"\r"
1654
            )
1655
        else:
1656
            generated_string = generated_string.replace('__marker__"__marker__', "&quot;").replace(
1✔
1657
                "__marker__-r__marker__", "&#xD;"
1658
            )
1659

1660
        return to_bytes(generated_string)
1✔
1661

1662
    def _add_error_tags(
1✔
1663
        self, error: ServiceException, error_tag: ETree.Element, mime_type: str
1664
    ) -> None:
1665
        """The SQS API stubs is now generated from JSON specs, and some fields have been modified"""
1666
        code_tag = ETree.SubElement(error_tag, "Code")
1✔
1667

1668
        if error.code in self.JSON_TO_QUERY_ERROR_CODES:
1✔
1669
            error_code = self.JSON_TO_QUERY_ERROR_CODES[error.code]
×
1670
        elif error.code in self.QUERY_PREFIXED_ERRORS:
1✔
1671
            error_code = f"AWS.SimpleQueueService.{error.code}"
1✔
1672
        else:
1673
            error_code = error.code
1✔
1674
        code_tag.text = error_code
1✔
1675
        message = self._get_error_message(error)
1✔
1676
        if message:
1✔
1677
            self._default_serialize(error_tag, message, None, "Message", mime_type)
1✔
1678
        if error.code in self.SENDER_FAULT_ERRORS or error.sender_fault:
1✔
1679
            # The sender fault is either not set or "Sender"
1680
            self._default_serialize(error_tag, "Sender", None, "Type", mime_type)
1✔
1681

1682

1683
class SqsJsonResponseSerializer(JSONResponseSerializer):
1✔
1684
    # those are deleted from the JSON specs, but need to be kept for legacy reason (sent in 'x-amzn-query-error')
1685
    QUERY_PREFIXED_ERRORS = {
1✔
1686
        "BatchEntryIdsNotDistinct",
1687
        "BatchRequestTooLong",
1688
        "EmptyBatchRequest",
1689
        "InvalidBatchEntryId",
1690
        "MessageNotInflight",
1691
        "PurgeQueueInProgress",
1692
        "QueueDeletedRecently",
1693
        "TooManyEntriesInBatchRequest",
1694
        "UnsupportedOperation",
1695
    }
1696

1697
    # Some error code changed between JSON and query, and we need to have a way to map it for legacy reason
1698
    JSON_TO_QUERY_ERROR_CODES = {
1✔
1699
        "InvalidParameterValueException": "InvalidParameterValue",
1700
        "MissingRequiredParameterException": "MissingParameter",
1701
        "AccessDeniedException": "AccessDenied",
1702
        "QueueDoesNotExist": "AWS.SimpleQueueService.NonExistentQueue",
1703
        "QueueNameExists": "QueueAlreadyExists",
1704
    }
1705

1706
    def _serialize_error(
1✔
1707
        self,
1708
        error: ServiceException,
1709
        response: Response,
1710
        shape: StructureShape,
1711
        operation_model: OperationModel,
1712
        mime_type: str,
1713
        request_id: str,
1714
    ) -> None:
1715
        """
1716
        Overrides _serialize_error as SQS has a special header for query API legacy reason: 'x-amzn-query-error',
1717
        which contained the exception code as well as a Sender field.
1718
        Ex: 'x-amzn-query-error': 'InvalidParameterValue;Sender'
1719
        """
1720
        # TODO: for body["__type"] = error.code, it seems AWS differs from what we send for SQS
1721
        # AWS: "com.amazon.coral.service#InvalidParameterValueException"
1722
        # or AWS: "com.amazonaws.sqs#BatchRequestTooLong"
1723
        # LocalStack: "InvalidParameterValue"
1724
        super()._serialize_error(error, response, shape, operation_model, mime_type, request_id)
1✔
1725
        # We need to add a prefix to certain errors, as they have been deleted in the specs. These will not change
1726
        if error.code in self.JSON_TO_QUERY_ERROR_CODES:
1✔
1727
            code = self.JSON_TO_QUERY_ERROR_CODES[error.code]
×
1728
        elif error.code in self.QUERY_PREFIXED_ERRORS:
1✔
1729
            code = f"AWS.SimpleQueueService.{error.code}"
1✔
1730
        else:
1731
            code = error.code
×
1732

1733
        response.headers["x-amzn-query-error"] = f"{code};Sender"
1✔
1734

1735

1736
def gen_amzn_requestid():
1✔
1737
    """
1738
    Generate generic AWS request ID.
1739

1740
    3 uses a different format and set of request Ids.
1741

1742
    Examples:
1743
    996d38a0-a4e9-45de-bad4-480cd962d208
1744
    b9260553-df1b-4db6-ae41-97b89a5f85ea
1745
    """
1746
    return long_uid()
1✔
1747

1748

1749
@functools.cache
1✔
1750
def create_serializer(service: ServiceModel) -> ResponseSerializer:
1✔
1751
    """
1752
    Creates the right serializer for the given service model.
1753

1754
    :param service: to create the serializer for
1755
    :return: ResponseSerializer which can handle the protocol of the service
1756
    """
1757

1758
    # Unfortunately, some services show subtle differences in their serialized responses, even though their
1759
    # specification states they implement the same protocol.
1760
    # Since some clients might be stricter / less resilient than others, we need to mimic the serialization of the
1761
    # specific services as close as possible.
1762
    # Therefore, the service-specific serializer implementations (basically the implicit / informally more specific
1763
    # protocol implementation) has precedence over the more general protocol-specific serializers.
1764
    service_specific_serializers = {
1✔
1765
        "sqs": {"json": SqsJsonResponseSerializer, "query": SqsQueryResponseSerializer},
1766
        "s3": {"rest-xml": S3ResponseSerializer},
1767
    }
1768
    protocol_specific_serializers = {
1✔
1769
        "query": QueryResponseSerializer,
1770
        "json": JSONResponseSerializer,
1771
        "rest-json": RestJSONResponseSerializer,
1772
        "rest-xml": RestXMLResponseSerializer,
1773
        "ec2": EC2ResponseSerializer,
1774
    }
1775

1776
    # Try to select a service- and protocol-specific serializer implementation
1777
    if (
1✔
1778
        service.service_name in service_specific_serializers
1779
        and service.protocol in service_specific_serializers[service.service_name]
1780
    ):
1781
        return service_specific_serializers[service.service_name][service.protocol]()
1✔
1782
    else:
1783
        # Otherwise, pick the protocol-specific serializer for the protocol of the service
1784
        return protocol_specific_serializers[service.protocol]()
1✔
1785

1786

1787
def aws_response_serializer(
1✔
1788
    service_name: str, operation: str, protocol: Optional[ProtocolName] = None
1789
):
1790
    """
1791
    A decorator for an HTTP route that can serialize return values or exceptions into AWS responses.
1792
    This can be used to create AWS request handlers in a convenient way. Example usage::
1793

1794
        from localstack.http import route, Request
1795
        from localstack.aws.api.sqs import ListQueuesResult
1796

1797
        @route("/_aws/sqs/queues")
1798
        @aws_response_serializer("sqs", "ListQueues")
1799
        def my_route(request: Request):
1800
            if some_condition_on_request:
1801
                raise CommonServiceError("...")  # <- will be serialized into an error response
1802

1803
            return ListQueuesResult(QueueUrls=...)  # <- object from the SQS API will be serialized
1804

1805
    :param service_name: the AWS service (e.g., "sqs", "lambda")
1806
    :param protocol: the protocol of the AWS service to serialize to. If not set (by default) the default protocol
1807
                    of the service in botocore is used.
1808
    :param operation: the operation name (e.g., "ReceiveMessage", "ListFunctions")
1809
    :returns: a decorator
1810
    """
1811

1812
    def _decorate(fn):
1✔
1813
        service_model = load_service(service_name, protocol=protocol)
1✔
1814
        operation_model = service_model.operation_model(operation)
1✔
1815
        serializer = create_serializer(service_model)
1✔
1816

1817
        def _proxy(*args, **kwargs) -> WerkzeugResponse:
1✔
1818
            # extract request from function invocation (decorator can be used for methods as well as for functions).
1819
            if len(args) > 0 and isinstance(args[0], WerkzeugRequest):
1✔
1820
                # function
1821
                request = args[0]
1✔
1822
            elif len(args) > 1 and isinstance(args[1], WerkzeugRequest):
1✔
1823
                # method (arg[0] == self)
1824
                request = args[1]
1✔
1825
            elif "request" in kwargs:
1✔
1826
                request = kwargs["request"]
1✔
1827
            else:
1828
                raise ValueError(f"could not find Request in signature of function {fn}")
×
1829

1830
            # TODO: we have no context here
1831
            # TODO: maybe try to get the request ID from the headers first before generating a new one
1832
            request_id = gen_amzn_requestid()
1✔
1833

1834
            try:
1✔
1835
                response = fn(*args, **kwargs)
1✔
1836

1837
                if isinstance(response, WerkzeugResponse):
1✔
1838
                    return response
1✔
1839

1840
                return serializer.serialize_to_response(
1✔
1841
                    response, operation_model, request.headers, request_id
1842
                )
1843

1844
            except ServiceException as e:
1✔
1845
                return serializer.serialize_error_to_response(
1✔
1846
                    e, operation_model, request.headers, request_id
1847
                )
1848
            except Exception as e:
1✔
1849
                return serializer.serialize_error_to_response(
1✔
1850
                    CommonServiceException(
1851
                        "InternalError", f"An internal error occurred: {e}", status_code=500
1852
                    ),
1853
                    operation_model,
1854
                    request.headers,
1855
                    request_id,
1856
                )
1857

1858
        return _proxy
1✔
1859

1860
    return _decorate
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc