• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 17902858349

19 Sep 2025 12:17PM UTC coverage: 86.864% (+0.02%) from 86.844%
17902858349

push

github

web-flow
ASF/CloudWatch: add support for multi-protocols (#13161)

34 of 37 new or added lines in 7 files covered. (91.89%)

136 existing lines in 10 files now uncovered.

67691 of 77928 relevant lines covered (86.86%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.67
/localstack-core/localstack/aws/protocol/serializer.py
1
"""
2
Response serializers for the different AWS service protocols.
3

4
The module contains classes that take a service's response dict, and
5
given an operation model, serialize the HTTP response according to the
6
specified output shape.
7

8
It can be seen as the counterpart to the ``parse`` module in ``botocore``
9
(which parses the result of these serializer). It has a lot of
10
similarities with the ``serialize`` module in ``botocore``, but
11
serves a different purpose (serializing responses instead of requests).
12

13
The different protocols have many similarities. The class hierarchy is
14
designed such that the serializers share as much logic as possible.
15
The class hierarchy looks as follows:
16
::
17
                                     ┌────────────────────┐
18
                                     │ ResponseSerializer │
19
                                     └────────────────────┘
20
                                           ▲    ▲    ▲
21
                 ┌─────────────────┬───────┘    │    └──────────────┬──────────────────────┐
22
    ┌────────────┴────────────┐    │    ┌───────┴──────────────┐    │         ┌────────────┴─────────────┐
23
    │BaseXMLResponseSerializer│    │    │JSONResponseSerializer│    │         │BaseCBORResponseSerializer│
24
    └─────────────────────────┘    │    └──────────────────────┘    │         └──────────────────────────┘
25
       ▲    ▲        ┌─────────────┴────────────┐     ▲       ┌─────┴─────────────────────┐  ▲         ▲
26
       │    │        │BaseRestResponseSerializer│     │       │BaseRpcV2ResponseSerializer│  │         │
27
       │    │        └──────────────────────────┘     │       └───────────────────────────┘  │         │
28
       │    │              ▲              ▲           │                          ▲           │         │
29
       │    │              │              │           │                          │           │         │
30
       │  ┌─┴──────────────┴────────┐  ┌──┴───────────┴───────────┐   ┌──────────┴───────────┴────┐    │
31
       │  │RestXMLResponseSerializer│  │RestJSONResponseSerializer│   │RpcV2CBORResponseSerializer│    │
32
       │  └─────────────────────────┘  └──────────────────────────┘   └───────────────────────────┘    │
33
 ┌─────┴──────────────────┐                                                                 ┌──────────┴─────────────┐
34
 │QueryResponseSerializer │                                                                 │ CBORResponseSerializer │
35
 └────────────────────────┘                                                                 └────────────────────────┘
36
             ▲
37
   ┌─────────┴───────────┐
38
   │EC2ResponseSerializer│
39
   └─────────────────────┘
40
::
41

42
The ``ResponseSerializer`` contains the logic that is used among all the
43
different protocols (``query``, ``json``, ``rest-json``, ``rest-xml``, ``cbor``
44
and ``ec2``).
45
The protocols relate to each other in the following ways:
46

47
* The ``query`` and the ``rest-xml`` protocols both have XML bodies in their
48
  responses which are serialized quite similarly (with some specifics for each
49
  type).
50
* The ``json`` and the ``rest-json`` protocols both have JSON bodies in their
51
  responses which are serialized the same way.
52
* The ``cbor`` protocol is not properly defined in the spec, but mirrors the
53
  ``json`` protocol.
54
* The ``rest-json`` and ``rest-xml`` protocols serialize some metadata in
55
  the HTTP response's header fields.
56
* The ``ec2`` protocol is basically similar to the ``query`` protocol with a
57
  specific error response formatting.
58
* The ``smithy-rpc-v2-cbor`` protocol defines a specific way to route request
59
  to services via the RPC v2 trait, and encodes its body with the CBOR format.
60

61
The serializer classes in this module correspond directly to the different
62
protocols. ``#create_serializer`` shows the explicit mapping between the
63
classes and the protocols.
64
The classes are structured as follows:
65

66
* The ``ResponseSerializer`` contains all the basic logic for the
67
  serialization which is shared among all different protocols.
68
* The ``BaseXMLResponseSerializer``, ``JSONResponseSerializer`` and
69
  ``BaseCBORResponseSerializer`` contain the logic for the XML, JSON
70
  and the CBOR serialization respectively.
71
* The ``BaseRestResponseSerializer`` contains the logic for the REST
72
  protocol specifics (i.e. specific HTTP header serializations).
73
* The ``BaseRpcV2ResponseSerializer`` contains the logic for the RPC v2
74
  protocol specifics (i.e. pretty bare, does not has any specific
75
  about body serialization).
76
* The ``RestXMLResponseSerializer`` and the ``RestJSONResponseSerializer``
77
  inherit the ReST specific logic from the ``BaseRestResponseSerializer``
78
  and the XML / JSON body serialization from their second super class.
79
* The ``RpcV2CBORResponseSerializer`` inherits the RPC v2 specific logic
80
  from the ``BaseRpcV2ResponseSerializer`` and the CBOR body serialization
81
  from its second super class.
82
* The ``CBORResponseSerializer`` contains the logic specific to the
83
  non-official ``cbor`` protocol, mirroring the ``json`` protocol but
84
  with CBOR encoded body
85

86
The services and their protocols are defined by using AWS's Smithy
87
(a language to define services in a - somewhat - protocol-agnostic
88
way). The "peculiarities" in this serializer code usually correspond
89
to certain so-called "traits" in Smithy.
90

91
The result of the serialization methods is the HTTP response which can
92
be sent back to the calling client.
93
"""
94

95
import abc
1✔
96
import base64
1✔
97
import copy
1✔
98
import datetime
1✔
99
import functools
1✔
100
import json
1✔
101
import logging
1✔
102
import math
1✔
103
import string
1✔
104
import struct
1✔
105
from abc import ABC
1✔
106
from binascii import crc32
1✔
107
from collections.abc import Iterable, Iterator
1✔
108
from email.utils import formatdate
1✔
109
from struct import pack
1✔
110
from typing import IO, Any
1✔
111
from xml.etree import ElementTree as ETree
1✔
112

113
import xmltodict
1✔
114
from botocore.model import (
1✔
115
    ListShape,
116
    MapShape,
117
    OperationModel,
118
    ServiceModel,
119
    Shape,
120
    StringShape,
121
    StructureShape,
122
)
123
from botocore.serialize import ISO8601, ISO8601_MICRO
1✔
124
from botocore.utils import calculate_md5, is_json_value_header, parse_to_aware_datetime
1✔
125

126
# cbor2: explicitly load from private _encoder module to avoid using the (non-patched) C-version
127
from cbor2._encoder import dumps as cbor2_dumps
1✔
128
from werkzeug import Request as WerkzeugRequest
1✔
129
from werkzeug import Response as WerkzeugResponse
1✔
130
from werkzeug.datastructures import Headers, MIMEAccept
1✔
131
from werkzeug.http import parse_accept_header
1✔
132

133
from localstack.aws.api import CommonServiceException, ServiceException
1✔
134
from localstack.aws.spec import ProtocolName, load_service
1✔
135
from localstack.constants import (
1✔
136
    APPLICATION_AMZ_CBOR_1_1,
137
    APPLICATION_AMZ_JSON_1_0,
138
    APPLICATION_AMZ_JSON_1_1,
139
    APPLICATION_CBOR,
140
    APPLICATION_JSON,
141
    APPLICATION_XML,
142
    TEXT_XML,
143
)
144
from localstack.http import Response
1✔
145
from localstack.utils.common import to_bytes, to_str
1✔
146
from localstack.utils.strings import long_uid
1✔
147
from localstack.utils.xml import strip_xmlns
1✔
148

149
LOG = logging.getLogger(__name__)
1✔
150

151
REQUEST_ID_CHARACTERS = string.digits + string.ascii_uppercase
1✔
152

153

154
class ResponseSerializerError(Exception):
1✔
155
    """
156
    Error which is thrown if the request serialization fails.
157
    Super class of all exceptions raised by the serializer.
158
    """
159

160
    pass
1✔
161

162

163
class UnknownSerializerError(ResponseSerializerError):
1✔
164
    """
165
    Error which indicates that the exception raised by the serializer could be caused by invalid data or by any other
166
    (unknown) issue. Errors like this should be reported and indicate an issue in the serializer itself.
167
    """
168

169
    pass
1✔
170

171

172
class ProtocolSerializerError(ResponseSerializerError):
1✔
173
    """
174
    Error which indicates that the given data is not compliant with the service's specification and cannot be
175
    serialized. This usually results in a response to the client with an HTTP 5xx status code (internal server error).
176
    """
177

178
    pass
1✔
179

180

181
def _handle_exceptions(func):
1✔
182
    """
183
    Decorator which handles the exceptions raised by the serializer. It ensures that all exceptions raised by the public
184
    methods of the parser are instances of ResponseSerializerError.
185
    :param func: to wrap in order to add the exception handling
186
    :return: wrapped function
187
    """
188

189
    @functools.wraps(func)
1✔
190
    def wrapper(*args, **kwargs):
1✔
191
        try:
1✔
192
            return func(*args, **kwargs)
1✔
193
        except ResponseSerializerError:
1✔
194
            raise
1✔
195
        except Exception as e:
1✔
196
            raise UnknownSerializerError(
1✔
197
                "An unknown error occurred when trying to serialize the response."
198
            ) from e
199

200
    return wrapper
1✔
201

202

203
class ResponseSerializer(abc.ABC):
1✔
204
    """
205
    The response serializer is responsible for the serialization of a service implementation's result to an actual
206
    HTTP response (which will be sent to the calling client).
207
    It is the base class of all serializers and therefore contains the basic logic which is used among all of them.
208
    """
209

210
    DEFAULT_ENCODING = "utf-8"
1✔
211
    # The default timestamp format is ISO8601, but this can be overwritten by subclasses.
212
    TIMESTAMP_FORMAT = "iso8601"
1✔
213
    # Event streaming binary data type mapping for type "string"
214
    AWS_BINARY_DATA_TYPE_STRING = 7
1✔
215
    # Defines the supported mime types of the specific serializer. Sorted by priority (preferred / default first).
216
    # Needs to be specified by subclasses.
217
    SUPPORTED_MIME_TYPES: list[str] = []
1✔
218

219
    @_handle_exceptions
1✔
220
    def serialize_to_response(
1✔
221
        self,
222
        response: dict,
223
        operation_model: OperationModel,
224
        headers: dict | Headers | None,
225
        request_id: str,
226
    ) -> Response:
227
        """
228
        Takes a response dict and serializes it to an actual HttpResponse.
229

230
        :param response: to serialize
231
        :param operation_model: specification of the service & operation containing information about the shape of the
232
                                service's output / response
233
        :param headers: the headers of the incoming request this response should be serialized for. This is necessary
234
                        for features like Content-Negotiation (define response content type based on request headers).
235
        :param request_id: autogenerated AWS request ID identifying the original request
236
        :return: Response which can be sent to the calling client
237
        :raises: ResponseSerializerError (either a ProtocolSerializerError or an UnknownSerializerError)
238
        """
239

240
        # determine the preferred mime type (based on the serializer's supported mime types and the Accept header)
241
        mime_type = self._get_mime_type(headers)
1✔
242

243
        # if the operation has a streaming output, handle the serialization differently
244
        if operation_model.has_event_stream_output:
1✔
245
            return self._serialize_event_stream(response, operation_model, mime_type, request_id)
1✔
246

247
        serialized_response = self._create_default_response(operation_model, mime_type)
1✔
248
        shape = operation_model.output_shape
1✔
249
        # The shape can also be none (for empty responses), but it still needs to be serialized (to add some metadata)
250
        shape_members = shape.members if shape is not None else None
1✔
251
        self._serialize_response(
1✔
252
            response,
253
            serialized_response,
254
            shape,
255
            shape_members,
256
            operation_model,
257
            mime_type,
258
            request_id,
259
        )
260
        serialized_response = self._prepare_additional_traits_in_response(
1✔
261
            serialized_response, operation_model, request_id
262
        )
263
        return serialized_response
1✔
264

265
    @_handle_exceptions
1✔
266
    def serialize_error_to_response(
1✔
267
        self,
268
        error: ServiceException,
269
        operation_model: OperationModel,
270
        headers: dict | Headers | None,
271
        request_id: str,
272
    ) -> Response:
273
        """
274
        Takes an error instance and serializes it to an actual HttpResponse.
275
        Therefore, this method is used for errors which should be serialized and transmitted to the calling client.
276

277
        :param error: to serialize
278
        :param operation_model: specification of the service & operation containing information about the shape of the
279
                                service's output / response
280
        :param headers: the headers of the incoming request this response should be serialized for. This is necessary
281
                        for features like Content-Negotiation (define response content type based on request headers).
282
        :param request_id: autogenerated AWS request ID identifying the original request
283
        :return: HttpResponse which can be sent to the calling client
284
        :raises: ResponseSerializerError (either a ProtocolSerializerError or an UnknownSerializerError)
285
        """
286
        # determine the preferred mime type (based on the serializer's supported mime types and the Accept header)
287
        mime_type = self._get_mime_type(headers)
1✔
288

289
        # TODO implement streaming error serialization
290
        serialized_response = self._create_default_response(operation_model, mime_type)
1✔
291
        if not error or not isinstance(error, ServiceException):
1✔
292
            raise ProtocolSerializerError(
1✔
293
                f"Error to serialize ({error.__class__.__name__ if error else None}) is not a ServiceException."
294
            )
295
        shape = operation_model.service_model.shape_for_error_code(error.code)
1✔
296
        serialized_response.status_code = error.status_code
1✔
297

298
        self._serialize_error(
1✔
299
            error, serialized_response, shape, operation_model, mime_type, request_id
300
        )
301
        serialized_response = self._prepare_additional_traits_in_response(
1✔
302
            serialized_response, operation_model, request_id
303
        )
304
        return serialized_response
1✔
305

306
    def _serialize_response(
1✔
307
        self,
308
        parameters: dict,
309
        response: Response,
310
        shape: Shape | None,
311
        shape_members: dict,
312
        operation_model: OperationModel,
313
        mime_type: str,
314
        request_id: str,
315
    ) -> None:
316
        raise NotImplementedError
317

318
    def _serialize_body_params(
1✔
319
        self,
320
        params: dict,
321
        shape: Shape,
322
        operation_model: OperationModel,
323
        mime_type: str,
324
        request_id: str,
325
    ) -> str | None:
326
        """
327
        Actually serializes the given params for the given shape to a string for the transmission in the body of the
328
        response.
329
        :param params: to serialize
330
        :param shape: to know how to serialize the params
331
        :param operation_model: for additional metadata
332
        :param mime_type: Mime type which should be used to encode the payload
333
        :param request_id: autogenerated AWS request ID identifying the original request
334
        :return: string containing the serialized body
335
        """
336
        raise NotImplementedError
337

338
    def _serialize_error(
1✔
339
        self,
340
        error: ServiceException,
341
        response: Response,
342
        shape: StructureShape,
343
        operation_model: OperationModel,
344
        mime_type: str,
345
        request_id: str,
346
    ) -> None:
347
        raise NotImplementedError
348

349
    def _serialize_event_stream(
1✔
350
        self,
351
        response: dict,
352
        operation_model: OperationModel,
353
        mime_type: str,
354
        request_id: str,
355
    ) -> Response:
356
        """
357
        Serializes a given response dict (the return payload of a service implementation) to an _event stream_ using the
358
        given operation model.
359

360
        :param response: dictionary containing the payload for the response
361
        :param operation_model: describing the operation the response dict is being returned by
362
        :param mime_type: Mime type which should be used to encode the payload
363
        :param request_id: autogenerated AWS request ID identifying the original request
364
        :return: Response which can directly be sent to the client (in chunks)
365
        """
366
        event_stream_shape = operation_model.get_event_stream_output()
1✔
367
        event_stream_member_name = operation_model.output_shape.event_stream_name
1✔
368

369
        # wrap the generator in operation specific serialization
370
        def event_stream_serializer() -> Iterable[bytes]:
1✔
371
            yield self._encode_event_payload("initial-response")
1✔
372

373
            # create a default response
374
            serialized_event_response = self._create_default_response(operation_model, mime_type)
1✔
375
            # get the members of the event stream shape
376
            event_stream_shape_members = (
1✔
377
                event_stream_shape.members if event_stream_shape is not None else None
378
            )
379
            # extract the generator from the given response data
380
            event_generator = response.get(event_stream_member_name)
1✔
381
            if not isinstance(event_generator, Iterator):
1✔
382
                raise ProtocolSerializerError(
×
383
                    "Expected iterator for streaming event serialization."
384
                )
385

386
            # yield one event per generated event
387
            for event in event_generator:
1✔
388
                # find the actual event payload (the member with event=true)
389
                event_member_shape = None
1✔
390
                event_member_name = None
1✔
391
                for member_name, member_shape in event_stream_shape_members.items():
1✔
392
                    if member_shape.serialization.get("event") and member_name in event:
1✔
393
                        event_member_shape = member_shape
1✔
394
                        event_member_name = member_name
1✔
395
                        break
1✔
396
                if event_member_shape is None:
1✔
397
                    raise UnknownSerializerError("Couldn't find event shape for serialization.")
×
398

399
                # serialize the part of the response for the event
400
                self._serialize_response(
1✔
401
                    event.get(event_member_name),
402
                    serialized_event_response,
403
                    event_member_shape,
404
                    event_member_shape.members if event_member_shape is not None else None,
405
                    operation_model,
406
                    mime_type,
407
                    request_id,
408
                )
409
                # execute additional response traits (might be modifying the response)
410
                serialized_event_response = self._prepare_additional_traits_in_response(
1✔
411
                    serialized_event_response, operation_model, request_id
412
                )
413
                # encode the event and yield it
414
                yield self._encode_event_payload(
1✔
415
                    event_type=event_member_name, content=serialized_event_response.data
416
                )
417

418
        return Response(
1✔
419
            response=event_stream_serializer(),
420
            status=operation_model.http.get("responseCode", 200),
421
        )
422

423
    def _encode_event_payload(
1✔
424
        self,
425
        event_type: str,
426
        content: str | bytes = "",
427
        error_code: str | None = None,
428
        error_message: str | None = None,
429
    ) -> bytes:
430
        """
431
        Encodes the given event payload according to AWS specific binary event encoding.
432
        A specification of the format can be found in the AWS docs:
433
        https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html
434

435
        :param content: string or bytes of the event payload
436
        :param event_type: type of the event. Usually the name of the event shape or specific event types like
437
                            "initial-response".
438
        :param error_code: Optional. Error code if the payload represents an error.
439
        :param error_message: Optional. Error message if the payload represents an error.
440
        :return: bytes with the AWS-specific encoded event payload
441
        """
442

443
        # determine the event type (error if an error message or an error code is set)
444
        if error_message or error_code:
1✔
445
            message_type = "error"
×
446
        else:
447
            message_type = "event"
1✔
448

449
        # set the headers
450
        headers = {":event-type": event_type, ":message-type": message_type}
1✔
451
        if error_message:
1✔
452
            headers[":error-message"] = error_message
×
453
        if error_code:
1✔
454
            headers[":error-code"] = error_code
×
455

456
        # construct headers
457
        header_section = b""
1✔
458
        for key, value in headers.items():
1✔
459
            header_name = key.encode(self.DEFAULT_ENCODING)
1✔
460
            header_value = to_bytes(value)
1✔
461
            header_section += pack("!B", len(header_name))
1✔
462
            header_section += header_name
1✔
463
            header_section += pack("!B", self.AWS_BINARY_DATA_TYPE_STRING)
1✔
464
            header_section += pack("!H", len(header_value))
1✔
465
            header_section += header_value
1✔
466

467
        # construct body
468
        if isinstance(content, str):
1✔
469
            payload = bytes(content, self.DEFAULT_ENCODING)
1✔
470
        else:
471
            payload = content
1✔
472

473
        # calculate lengths
474
        headers_length = len(header_section)
1✔
475
        payload_length = len(payload)
1✔
476

477
        # construct message
478
        # - prelude
479
        result = pack("!I", payload_length + headers_length + 16)
1✔
480
        result += pack("!I", headers_length)
1✔
481
        # - prelude crc
482
        prelude_crc = crc32(result)
1✔
483
        result += pack("!I", prelude_crc)
1✔
484
        # - headers
485
        result += header_section
1✔
486
        # - payload
487
        result += payload
1✔
488
        # - message crc
489
        payload_crc = crc32(result)
1✔
490
        result += pack("!I", payload_crc)
1✔
491

492
        return result
1✔
493

494
    def _create_default_response(self, operation_model: OperationModel, mime_type: str) -> Response:
1✔
495
        """
496
        Creates a boilerplate default response to be used by subclasses as starting points.
497
        Uses the default HTTP response status code defined in the operation model (if defined), otherwise 200.
498

499
        :param operation_model: to extract the default HTTP status code
500
        :param mime_type: Mime type which should be used to encode the payload
501
        :return: boilerplate HTTP response
502
        """
503
        return Response(status=operation_model.http.get("responseCode", 200))
1✔
504

505
    def _get_mime_type(self, headers: dict | Headers | None) -> str:
1✔
506
        """
507
        Extracts the accepted mime type from the request headers and returns a matching, supported mime type for the
508
        serializer or the default mime type of the service if there is no match.
509
        :param headers: to extract the "Accept" header from
510
        :return: preferred mime type to be used by the serializer (if it is not accepted by the client,
511
                 an error is logged)
512
        """
513
        accept_header = None
1✔
514
        if headers and "Accept" in headers and not headers.get("Accept") == "*/*":
1✔
515
            accept_header = headers.get("Accept")
1✔
516
        elif headers and headers.get("Content-Type"):
1✔
517
            # If there is no specific Accept header given, we use the given Content-Type as a fallback.
518
            # i.e. if the request content was JSON encoded and the client doesn't send a specific an Accept header, the
519
            # serializer should prefer JSON encoding.
520
            content_type = headers.get("Content-Type")
1✔
521
            LOG.debug(
1✔
522
                "No accept header given. Using request's Content-Type (%s) as preferred response Content-Type.",
523
                content_type,
524
            )
525
            accept_header = content_type + ", */*"
1✔
526
        mime_accept: MIMEAccept = parse_accept_header(accept_header, MIMEAccept)
1✔
527
        mime_type = mime_accept.best_match(self.SUPPORTED_MIME_TYPES)
1✔
528
        if not mime_type:
1✔
529
            # There is no match between the supported mime types and the requested one(s)
530
            mime_type = self.SUPPORTED_MIME_TYPES[0]
1✔
531
            LOG.debug(
1✔
532
                "Determined accept type (%s) is not supported by this serializer. Using default of this serializer: %s",
533
                accept_header,
534
                mime_type,
535
            )
536
        return mime_type
1✔
537

538
    # Some extra utility methods subclasses can use.
539

540
    @staticmethod
1✔
541
    def _timestamp_iso8601(value: datetime.datetime) -> str:
1✔
542
        if value.microsecond > 0:
1✔
543
            timestamp_format = ISO8601_MICRO
1✔
544
        else:
545
            timestamp_format = ISO8601
1✔
546
        return value.strftime(timestamp_format)
1✔
547

548
    @staticmethod
1✔
549
    def _timestamp_unixtimestamp(value: datetime.datetime) -> float:
1✔
550
        return value.timestamp()
1✔
551

552
    def _timestamp_rfc822(self, value: datetime.datetime) -> str:
1✔
553
        if isinstance(value, datetime.datetime):
1✔
554
            value = self._timestamp_unixtimestamp(value)
1✔
555
        return formatdate(value, usegmt=True)
1✔
556

557
    def _convert_timestamp_to_str(
1✔
558
        self, value: int | str | datetime.datetime, timestamp_format=None
559
    ) -> str:
560
        if timestamp_format is None:
1✔
561
            timestamp_format = self.TIMESTAMP_FORMAT
1✔
562
        timestamp_format = timestamp_format.lower()
1✔
563
        datetime_obj = parse_to_aware_datetime(value)
1✔
564
        converter = getattr(self, f"_timestamp_{timestamp_format}")
1✔
565
        final_value = converter(datetime_obj)
1✔
566
        return final_value
1✔
567

568
    @staticmethod
1✔
569
    def _get_serialized_name(shape: Shape, default_name: str) -> str:
1✔
570
        """
571
        Returns the serialized name for the shape if it exists.
572
        Otherwise, it will return the passed in default_name.
573
        """
574
        return shape.serialization.get("name", default_name)
1✔
575

576
    def _get_base64(self, value: str | bytes):
1✔
577
        """
578
        Returns the base64-encoded version of value, handling
579
        both strings and bytes. The returned value is a string
580
        via the default encoding.
581
        """
582
        if isinstance(value, str):
1✔
583
            value = value.encode(self.DEFAULT_ENCODING)
×
584
        return base64.b64encode(value).strip().decode(self.DEFAULT_ENCODING)
1✔
585

586
    def _encode_payload(self, body: bytes | str) -> bytes:
1✔
587
        if isinstance(body, str):
1✔
588
            return body.encode(self.DEFAULT_ENCODING)
1✔
589
        return body
1✔
590

591
    def _prepare_additional_traits_in_response(
1✔
592
        self, response: Response, operation_model: OperationModel, request_id: str
593
    ):
594
        """Applies additional traits on the raw response for a given model or protocol."""
595
        if operation_model.http_checksum_required:
1✔
596
            self._add_md5_header(response)
×
597
        return response
1✔
598

599
    def _has_header(self, header_name: str, headers: dict):
1✔
600
        """Case-insensitive check for header key."""
601
        if header_name is None:
1✔
602
            return False
×
603
        else:
604
            return header_name.lower() in [key.lower() for key in headers.keys()]
1✔
605

606
    def _add_md5_header(self, response: Response):
1✔
607
        """Add a Content-MD5 header if not yet there. Adapted from botocore.utils"""
608
        headers = response.headers
×
609
        body = response.data
×
610
        if body is not None and "Content-MD5" not in headers:
×
611
            md5_digest = calculate_md5(body)
×
612
            headers["Content-MD5"] = md5_digest
×
613

614
    def _get_error_message(self, error: Exception) -> str | None:
1✔
615
        return str(error) if error is not None and str(error) != "None" else None
1✔
616

617

618
class BaseXMLResponseSerializer(ResponseSerializer):
1✔
619
    """
620
    The BaseXMLResponseSerializer performs the basic logic for the XML response serialization.
621
    It is slightly adapted by the QueryResponseSerializer.
622
    While the botocore's RestXMLSerializer is quite similar, there are some subtle differences (since botocore's
623
    implementation handles the serialization of the requests from the client to the service, not the responses from the
624
    service to the client).
625
    """
626

627
    SUPPORTED_MIME_TYPES = [TEXT_XML, APPLICATION_XML, APPLICATION_JSON]
1✔
628

629
    def _serialize_error(
1✔
630
        self,
631
        error: ServiceException,
632
        response: Response,
633
        shape: StructureShape,
634
        operation_model: OperationModel,
635
        mime_type: str,
636
        request_id: str,
637
    ) -> None:
638
        # Check if we need to add a namespace
639
        attr = (
1✔
640
            {"xmlns": operation_model.metadata.get("xmlNamespace")}
641
            if "xmlNamespace" in operation_model.metadata
642
            else {}
643
        )
644
        root = ETree.Element("ErrorResponse", attr)
1✔
645

646
        error_tag = ETree.SubElement(root, "Error")
1✔
647
        self._add_error_tags(error, error_tag, mime_type)
1✔
648
        request_id_element = ETree.SubElement(root, "RequestId")
1✔
649
        request_id_element.text = request_id
1✔
650

651
        self._add_additional_error_tags(vars(error), root, shape, mime_type)
1✔
652

653
        response.set_response(self._encode_payload(self._node_to_string(root, mime_type)))
1✔
654

655
    def _add_error_tags(
1✔
656
        self, error: ServiceException, error_tag: ETree.Element, mime_type: str
657
    ) -> None:
658
        code_tag = ETree.SubElement(error_tag, "Code")
1✔
659
        code_tag.text = error.code
1✔
660
        message = self._get_error_message(error)
1✔
661
        if message:
1✔
662
            self._default_serialize(error_tag, message, None, "Message", mime_type)
1✔
663
        if error.sender_fault:
1✔
664
            # The sender fault is either not set or "Sender"
665
            self._default_serialize(error_tag, "Sender", None, "Type", mime_type)
1✔
666

667
    def _add_additional_error_tags(
1✔
668
        self, parameters: dict, node: ETree, shape: StructureShape, mime_type: str
669
    ):
670
        if shape:
1✔
671
            params = {}
1✔
672
            # TODO add a possibility to serialize simple non-modelled errors (like S3 NoSuchBucket#BucketName)
673
            for member in shape.members:
1✔
674
                # XML protocols do not add modeled default fields to the root node
675
                # (tested for cloudfront, route53, cloudwatch, iam)
676
                if member.lower() not in ["code", "message"] and member in parameters:
1✔
677
                    params[member] = parameters[member]
1✔
678

679
            # If there is an error shape with members which should be set, they need to be added to the node
680
            if params:
1✔
681
                # Serialize the remaining params
682
                root_name = shape.serialization.get("name", shape.name)
1✔
683
                pseudo_root = ETree.Element("")
1✔
684
                self._serialize(shape, params, pseudo_root, root_name, mime_type)
1✔
685
                real_root = list(pseudo_root)[0]
1✔
686
                # Add the child elements to the already created root error element
687
                for child in list(real_root):
1✔
688
                    node.append(child)
1✔
689

690
    def _serialize_body_params(
1✔
691
        self,
692
        params: dict,
693
        shape: Shape,
694
        operation_model: OperationModel,
695
        mime_type: str,
696
        request_id: str,
697
    ) -> str | None:
698
        root = self._serialize_body_params_to_xml(params, shape, operation_model, mime_type)
1✔
699
        self._prepare_additional_traits_in_xml(root, request_id)
1✔
700
        return self._node_to_string(root, mime_type)
1✔
701

702
    def _serialize_body_params_to_xml(
1✔
703
        self, params: dict, shape: Shape, operation_model: OperationModel, mime_type: str
704
    ) -> ETree.Element | None:
705
        if shape is None:
1✔
706
            return
1✔
707
        # The botocore serializer expects `shape.serialization["name"]`, but this isn't always present for responses
708
        root_name = shape.serialization.get("name", shape.name)
1✔
709
        pseudo_root = ETree.Element("")
1✔
710
        self._serialize(shape, params, pseudo_root, root_name, mime_type)
1✔
711
        real_root = list(pseudo_root)[0]
1✔
712
        return real_root
1✔
713

714
    def _serialize(
1✔
715
        self, shape: Shape, params: Any, xmlnode: ETree.Element, name: str, mime_type: str
716
    ) -> None:
717
        """This method dynamically invokes the correct `_serialize_type_*` method for each shape type."""
718
        if shape is None:
1✔
719
            return
×
720
        # Some output shapes define a `resultWrapper` in their serialization spec.
721
        # While the name would imply that the result is _wrapped_, it is actually renamed.
722
        if shape.serialization.get("resultWrapper"):
1✔
723
            name = shape.serialization.get("resultWrapper")
1✔
724

725
        try:
1✔
726
            method = getattr(self, f"_serialize_type_{shape.type_name}", self._default_serialize)
1✔
727
            method(xmlnode, params, shape, name, mime_type)
1✔
728
        except (TypeError, ValueError, AttributeError) as e:
1✔
729
            raise ProtocolSerializerError(
1✔
730
                f"Invalid type when serializing {shape.name}: '{xmlnode}' cannot be parsed to {shape.type_name}."
731
            ) from e
732

733
    def _serialize_type_structure(
1✔
734
        self, xmlnode: ETree.Element, params: dict, shape: StructureShape, name: str, mime_type
735
    ) -> None:
736
        structure_node = ETree.SubElement(xmlnode, name)
1✔
737

738
        if "xmlNamespace" in shape.serialization:
1✔
739
            namespace_metadata = shape.serialization["xmlNamespace"]
1✔
740
            attribute_name = "xmlns"
1✔
741
            if namespace_metadata.get("prefix"):
1✔
742
                attribute_name += ":{}".format(namespace_metadata["prefix"])
1✔
743
            structure_node.attrib[attribute_name] = namespace_metadata["uri"]
1✔
744
        for key, value in params.items():
1✔
745
            if value is None:
1✔
746
                # Don't serialize any param whose value is None.
747
                continue
1✔
748
            try:
1✔
749
                member_shape = shape.members[key]
1✔
750
            except KeyError:
1✔
751
                LOG.warning(
1✔
752
                    "Response object %s contains a member which is not specified: %s",
753
                    shape.name,
754
                    key,
755
                )
756
                continue
1✔
757
            member_name = member_shape.serialization.get("name", key)
1✔
758
            # We need to special case member shapes that are marked as an xmlAttribute.
759
            # Rather than serializing into an XML child node, we instead serialize the shape to
760
            # an XML attribute of the *current* node.
761
            if member_shape.serialization.get("xmlAttribute"):
1✔
762
                # xmlAttributes must have a serialization name.
763
                xml_attribute_name = member_shape.serialization["name"]
1✔
764
                structure_node.attrib[xml_attribute_name] = value
1✔
765
                continue
1✔
766
            self._serialize(member_shape, value, structure_node, member_name, mime_type)
1✔
767

768
    def _serialize_type_list(
1✔
769
        self, xmlnode: ETree.Element, params: list, shape: ListShape, name: str, mime_type: str
770
    ) -> None:
771
        if params is None:
1✔
772
            # Don't serialize any param whose value is None.
773
            return
×
774
        member_shape = shape.member
1✔
775
        if shape.serialization.get("flattened"):
1✔
776
            # If the list is flattened, either take the member's "name" or the name of the usual name for the parent
777
            # element for the children.
778
            element_name = self._get_serialized_name(member_shape, name)
1✔
779
            list_node = xmlnode
1✔
780
        else:
781
            element_name = self._get_serialized_name(member_shape, "member")
1✔
782
            list_node = ETree.SubElement(xmlnode, name)
1✔
783
        for item in params:
1✔
784
            # Don't serialize any item which is None
785
            if item is not None:
1✔
786
                self._serialize(member_shape, item, list_node, element_name, mime_type)
1✔
787

788
    def _serialize_type_map(
1✔
789
        self, xmlnode: ETree.Element, params: dict, shape: MapShape, name: str, mime_type: str
790
    ) -> None:
791
        """
792
        Given the ``name`` of MyMap, an input of {"key1": "val1", "key2": "val2"}, and the ``flattened: False``
793
        we serialize this as:
794
          <MyMap>
795
            <entry>
796
              <key>key1</key>
797
              <value>val1</value>
798
            </entry>
799
            <entry>
800
              <key>key2</key>
801
              <value>val2</value>
802
            </entry>
803
          </MyMap>
804
        If it is flattened, it is serialized as follows:
805
          <MyMap>
806
            <key>key1</key>
807
            <value>val1</value>
808
          </MyMap>
809
          <MyMap>
810
            <key>key2</key>
811
            <value>val2</value>
812
          </MyMap>
813
        """
814
        if params is None:
1✔
815
            # Don't serialize a non-existing map
816
            return
×
817
        if shape.serialization.get("flattened"):
1✔
818
            entries_node = xmlnode
1✔
819
            entry_node_name = name
1✔
820
        else:
821
            entries_node = ETree.SubElement(xmlnode, name)
1✔
822
            entry_node_name = "entry"
1✔
823

824
        for key, value in params.items():
1✔
825
            if value is None:
1✔
826
                # Don't serialize any param whose value is None.
827
                continue
×
828
            entry_node = ETree.SubElement(entries_node, entry_node_name)
1✔
829
            key_name = self._get_serialized_name(shape.key, default_name="key")
1✔
830
            val_name = self._get_serialized_name(shape.value, default_name="value")
1✔
831
            self._serialize(shape.key, key, entry_node, key_name, mime_type)
1✔
832
            self._serialize(shape.value, value, entry_node, val_name, mime_type)
1✔
833

834
    @staticmethod
1✔
835
    def _serialize_type_boolean(xmlnode: ETree.Element, params: bool, _, name: str, __) -> None:
1✔
836
        """
837
        For scalar types, the 'params' attr is actually just a scalar value representing the data
838
        we need to serialize as a boolean. It will either be 'true' or 'false'
839
        """
840
        node = ETree.SubElement(xmlnode, name)
1✔
841
        if params:
1✔
842
            str_value = "true"
1✔
843
        else:
844
            str_value = "false"
1✔
845
        node.text = str_value
1✔
846

847
    def _serialize_type_blob(
1✔
848
        self, xmlnode: ETree.Element, params: str | bytes, _, name: str, __
849
    ) -> None:
850
        node = ETree.SubElement(xmlnode, name)
1✔
851
        node.text = self._get_base64(params)
1✔
852

853
    def _serialize_type_timestamp(
1✔
854
        self, xmlnode: ETree.Element, params: str, shape: Shape, name: str, mime_type: str
855
    ) -> None:
856
        node = ETree.SubElement(xmlnode, name)
1✔
857
        if mime_type != APPLICATION_JSON:
1✔
858
            # Default XML timestamp serialization
859
            node.text = self._convert_timestamp_to_str(
1✔
860
                params, shape.serialization.get("timestampFormat")
861
            )
862
        else:
863
            # For services with XML protocols, where the Accept header is JSON, timestamps are formatted like for JSON
864
            # protocols, but using the int representation instead of the float representation (f.e. requesting JSON
865
            # responses in STS).
866
            node.text = str(
1✔
867
                int(self._convert_timestamp_to_str(params, JSONResponseSerializer.TIMESTAMP_FORMAT))
868
            )
869

870
    def _default_serialize(self, xmlnode: ETree.Element, params: str, _, name: str, __) -> None:
1✔
871
        node = ETree.SubElement(xmlnode, name)
1✔
872
        node.text = str(params)
1✔
873

874
    def _prepare_additional_traits_in_xml(self, root: ETree.Element | None, request_id: str):
1✔
875
        """
876
        Prepares the XML root node before being serialized with additional traits (like the Response ID in the Query
877
        protocol).
878
        For some protocols (like rest-xml), the root can be None.
879
        """
880
        pass
1✔
881

882
    def _create_default_response(self, operation_model: OperationModel, mime_type: str) -> Response:
1✔
883
        response = super()._create_default_response(operation_model, mime_type)
1✔
884
        response.headers["Content-Type"] = mime_type
1✔
885
        return response
1✔
886

887
    def _node_to_string(self, root: ETree.Element | None, mime_type: str) -> str | None:
1✔
888
        """Generates the string representation of the given XML element."""
889
        if root is not None:
1✔
890
            content = ETree.tostring(
1✔
891
                element=root, encoding=self.DEFAULT_ENCODING, xml_declaration=True
892
            )
893
            if mime_type == APPLICATION_JSON:
1✔
894
                # FIXME try to directly convert the ElementTree node to JSON
895
                xml_dict = xmltodict.parse(content)
1✔
896
                xml_dict = strip_xmlns(xml_dict)
1✔
897
                content = json.dumps(xml_dict)
1✔
898
            return content
1✔
899

900

901
class BaseRestResponseSerializer(ResponseSerializer, ABC):
1✔
902
    """
903
    The BaseRestResponseSerializer performs the basic logic for the ReST response serialization.
904
    In our case it basically only adds the request metadata to the HTTP header.
905
    """
906

907
    HEADER_TIMESTAMP_FORMAT = "rfc822"
1✔
908

909
    def _serialize_response(
1✔
910
        self,
911
        parameters: dict,
912
        response: Response,
913
        shape: Shape | None,
914
        shape_members: dict,
915
        operation_model: OperationModel,
916
        mime_type: str,
917
        request_id: str,
918
    ) -> None:
919
        header_params, payload_params = self._partition_members(parameters, shape)
1✔
920
        self._process_header_members(header_params, response, shape)
1✔
921
        # "HEAD" responses are basically "GET" responses without the actual body.
922
        # Do not process the body payload in this case (setting a body could also manipulate the headers)
923
        if operation_model.http.get("method") != "HEAD":
1✔
924
            self._serialize_payload(
1✔
925
                payload_params,
926
                response,
927
                shape,
928
                shape_members,
929
                operation_model,
930
                mime_type,
931
                request_id,
932
            )
933
        self._serialize_content_type(response, shape, shape_members, mime_type)
1✔
934
        self._prepare_additional_traits_in_response(response, operation_model, request_id)
1✔
935

936
    def _serialize_payload(
1✔
937
        self,
938
        parameters: dict,
939
        response: Response,
940
        shape: Shape | None,
941
        shape_members: dict,
942
        operation_model: OperationModel,
943
        mime_type: str,
944
        request_id: str,
945
    ) -> None:
946
        """
947
        Serializes the given payload.
948

949
        :param parameters: The user input params
950
        :param response: The final serialized Response
951
        :param shape: Describes the expected output shape (can be None in case of an "empty" response)
952
        :param shape_members: The members of the output struct shape
953
        :param operation_model: The specification of the operation of which the response is serialized here
954
        :param mime_type: Mime type which should be used to encode the payload
955
        :param request_id: autogenerated AWS request ID identifying the original request
956
        :return: None - the given `serialized` dict is modified
957
        """
958
        if shape is None:
1✔
959
            return
1✔
960

961
        payload_member = shape.serialization.get("payload")
1✔
962
        # If this shape is defined as being an event, we need to search for the payload member
963
        if not payload_member and shape.serialization.get("event"):
1✔
964
            for member_name, member_shape in shape_members.items():
1✔
965
                # Try to find the first shape which is marked as "eventpayload" and is given in the params dict
966
                if member_shape.serialization.get("eventpayload") and parameters.get(member_name):
1✔
967
                    payload_member = member_name
1✔
968
                    break
1✔
969
        if payload_member is not None and shape_members[payload_member].type_name in [
1✔
970
            "blob",
971
            "string",
972
        ]:
973
            # If it's streaming, then the body is just the value of the payload.
974
            body_payload = parameters.get(payload_member, b"")
1✔
975
            body_payload = self._encode_payload(body_payload)
1✔
976
            response.set_response(body_payload)
1✔
977
        elif payload_member is not None:
1✔
978
            # If there's a payload member, we serialized that member to the body.
979
            body_params = parameters.get(payload_member)
1✔
980
            if body_params is not None:
1✔
981
                response.set_response(
1✔
982
                    self._encode_payload(
983
                        self._serialize_body_params(
984
                            body_params,
985
                            shape_members[payload_member],
986
                            operation_model,
987
                            mime_type,
988
                            request_id,
989
                        )
990
                    )
991
                )
992
        else:
993
            # Otherwise, we use the "traditional" way of serializing the whole parameters dict recursively.
994
            response.set_response(
1✔
995
                self._encode_payload(
996
                    self._serialize_body_params(
997
                        parameters, shape, operation_model, mime_type, request_id
998
                    )
999
                )
1000
            )
1001

1002
    def _serialize_content_type(
1✔
1003
        self, serialized: Response, shape: Shape, shape_members: dict, mime_type: str
1004
    ):
1005
        """
1006
        Some protocols require varied Content-Type headers depending on user input.
1007
        This allows subclasses to apply this conditionally.
1008
        """
1009
        pass
1✔
1010

1011
    def _has_streaming_payload(self, payload: str | None, shape_members):
1✔
1012
        """Determine if payload is streaming (a blob or string)."""
1013
        return payload is not None and shape_members[payload].type_name in ["blob", "string"]
1✔
1014

1015
    def _prepare_additional_traits_in_response(
1✔
1016
        self, response: Response, operation_model: OperationModel, request_id: str
1017
    ):
1018
        """Adds the request ID to the headers (in contrast to the body - as in the Query protocol)."""
1019
        response = super()._prepare_additional_traits_in_response(
1✔
1020
            response, operation_model, request_id
1021
        )
1022
        response.headers["x-amz-request-id"] = request_id
1✔
1023
        return response
1✔
1024

1025
    def _process_header_members(self, parameters: dict, response: Response, shape: Shape):
1✔
1026
        shape_members = shape.members if isinstance(shape, StructureShape) else []
1✔
1027
        for name in shape_members:
1✔
1028
            member_shape = shape_members[name]
1✔
1029
            location = member_shape.serialization.get("location")
1✔
1030
            if not location:
1✔
1031
                continue
1✔
1032
            if name not in parameters:
1✔
1033
                # ignores optional keys
1034
                continue
1✔
1035
            key = member_shape.serialization.get("name", name)
1✔
1036
            value = parameters[name]
1✔
1037
            if value is None:
1✔
1038
                continue
×
1039
            if location == "header":
1✔
1040
                response.headers[key] = self._serialize_header_value(member_shape, value)
1✔
1041
            elif location == "headers":
1✔
1042
                header_prefix = key
1✔
1043
                self._serialize_header_map(header_prefix, response, value)
1✔
1044
            elif location == "statusCode":
1✔
1045
                response.status_code = int(value)
1✔
1046

1047
    def _serialize_header_map(self, prefix: str, response: Response, params: dict) -> None:
1✔
1048
        """Serializes the header map for the location trait "headers"."""
1049
        for key, val in params.items():
1✔
1050
            actual_key = prefix + key
1✔
1051
            response.headers[actual_key] = val
1✔
1052

1053
    def _serialize_header_value(self, shape: Shape, value: Any):
1✔
1054
        """Serializes a value for the location trait "header"."""
1055
        if shape.type_name == "timestamp":
1✔
1056
            datetime_obj = parse_to_aware_datetime(value)
1✔
1057
            timestamp_format = shape.serialization.get(
1✔
1058
                "timestampFormat", self.HEADER_TIMESTAMP_FORMAT
1059
            )
1060
            return self._convert_timestamp_to_str(datetime_obj, timestamp_format)
1✔
1061
        elif shape.type_name == "list":
1✔
1062
            converted_value = [
×
1063
                self._serialize_header_value(shape.member, v) for v in value if v is not None
1064
            ]
1065
            return ",".join(converted_value)
×
1066
        elif shape.type_name == "boolean":
1✔
1067
            # Set the header value to "true" if the given value is truthy, otherwise set the header value to "false".
1068
            return "true" if value else "false"
1✔
1069
        elif is_json_value_header(shape):
1✔
1070
            # Serialize with no spaces after separators to save space in
1071
            # the header.
1072
            return self._get_base64(json.dumps(value, separators=(",", ":")))
×
1073
        else:
1074
            return value
1✔
1075

1076
    def _partition_members(self, parameters: dict, shape: Shape | None) -> tuple[dict, dict]:
1✔
1077
        """Separates the top-level keys in the given parameters dict into header- and payload-located params."""
1078
        if not isinstance(shape, StructureShape):
1✔
1079
            # If the shape isn't a structure, we default to the whole response being parsed in the body.
1080
            # Non-payload members are only loaded in the top-level hierarchy and those are always structures.
1081
            return {}, parameters
1✔
1082
        header_params = {}
1✔
1083
        payload_params = {}
1✔
1084
        shape_members = shape.members
1✔
1085
        for name in shape_members:
1✔
1086
            member_shape = shape_members[name]
1✔
1087
            if name not in parameters:
1✔
1088
                continue
1✔
1089
            location = member_shape.serialization.get("location")
1✔
1090
            if location:
1✔
1091
                header_params[name] = parameters[name]
1✔
1092
            else:
1093
                payload_params[name] = parameters[name]
1✔
1094
        return header_params, payload_params
1✔
1095

1096

1097
class RestXMLResponseSerializer(BaseRestResponseSerializer, BaseXMLResponseSerializer):
1✔
1098
    """
1099
    The ``RestXMLResponseSerializer`` is responsible for the serialization of responses from services with the
1100
    ``rest-xml`` protocol.
1101
    It combines the ``BaseRestResponseSerializer`` (for the ReST specific logic) with the ``BaseXMLResponseSerializer``
1102
    (for the XML body response serialization).
1103
    """
1104

1105
    pass
1✔
1106

1107

1108
class QueryResponseSerializer(BaseXMLResponseSerializer):
1✔
1109
    """
1110
    The ``QueryResponseSerializer`` is responsible for the serialization of responses from services which use the
1111
    ``query`` protocol. The responses of these services also use XML. It is basically a subset of the features, since it
1112
    does not allow any payload or location traits.
1113
    """
1114

1115
    def _serialize_response(
1✔
1116
        self,
1117
        parameters: dict,
1118
        response: Response,
1119
        shape: Shape | None,
1120
        shape_members: dict,
1121
        operation_model: OperationModel,
1122
        mime_type: str,
1123
        request_id: str,
1124
    ) -> None:
1125
        """
1126
        Serializes the given parameters as XML for the query protocol.
1127

1128
        :param parameters: The user input params
1129
        :param response: The final serialized Response
1130
        :param shape: Describes the expected output shape (can be None in case of an "empty" response)
1131
        :param shape_members: The members of the output struct shape
1132
        :param operation_model: The specification of the operation of which the response is serialized here
1133
        :param mime_type: Mime type which should be used to encode the payload
1134
        :param request_id: autogenerated AWS request ID identifying the original request
1135
        :return: None - the given `serialized` dict is modified
1136
        """
1137
        response.set_response(
1✔
1138
            self._encode_payload(
1139
                self._serialize_body_params(
1140
                    parameters, shape, operation_model, mime_type, request_id
1141
                )
1142
            )
1143
        )
1144

1145
    def _serialize_body_params_to_xml(
1✔
1146
        self, params: dict, shape: Shape, operation_model: OperationModel, mime_type: str
1147
    ) -> ETree.Element:
1148
        # The Query protocol responses have a root element which is not contained in the specification file.
1149
        # Therefore, we first call the super function to perform the normal XML serialization, and afterwards wrap the
1150
        # result in a root element based on the operation name.
1151
        node = super()._serialize_body_params_to_xml(params, shape, operation_model, mime_type)
1✔
1152

1153
        # Check if we need to add a namespace
1154
        attr = (
1✔
1155
            {"xmlns": operation_model.metadata.get("xmlNamespace")}
1156
            if "xmlNamespace" in operation_model.metadata
1157
            else None
1158
        )
1159

1160
        # Create the root element and add the result of the XML serializer as a child node
1161
        root = ETree.Element(f"{operation_model.name}Response", attr)
1✔
1162
        if node is not None:
1✔
1163
            root.append(node)
1✔
1164
        return root
1✔
1165

1166
    def _prepare_additional_traits_in_xml(self, root: ETree.Element | None, request_id: str):
1✔
1167
        # Add the response metadata here (it's not defined in the specs)
1168
        # For the ec2 and the query protocol, the root cannot be None at this time.
1169
        response_metadata = ETree.SubElement(root, "ResponseMetadata")
1✔
1170
        request_id_element = ETree.SubElement(response_metadata, "RequestId")
1✔
1171
        request_id_element.text = request_id
1✔
1172

1173

1174
class EC2ResponseSerializer(QueryResponseSerializer):
1✔
1175
    """
1176
    The ``EC2ResponseSerializer`` is responsible for the serialization of responses from services which use the
1177
    ``ec2`` protocol (basically the EC2 service). This protocol is basically equal to the ``query`` protocol with only
1178
    a few subtle differences.
1179
    """
1180

1181
    def _serialize_error(
1✔
1182
        self,
1183
        error: ServiceException,
1184
        response: Response,
1185
        shape: StructureShape,
1186
        operation_model: OperationModel,
1187
        mime_type: str,
1188
        request_id: str,
1189
    ) -> None:
1190
        # EC2 errors look like:
1191
        # <Response>
1192
        #   <Errors>
1193
        #     <Error>
1194
        #       <Code>InvalidInstanceID.Malformed</Code>
1195
        #       <Message>Invalid id: "1343124"</Message>
1196
        #     </Error>
1197
        #   </Errors>
1198
        #   <RequestID>12345</RequestID>
1199
        # </Response>
1200
        # This is different from QueryParser in that it's RequestID, not RequestId
1201
        # and that the Error tag is in an enclosing Errors tag.
1202
        attr = (
1✔
1203
            {"xmlns": operation_model.metadata.get("xmlNamespace")}
1204
            if "xmlNamespace" in operation_model.metadata
1205
            else None
1206
        )
1207
        root = ETree.Element("Response", attr)
1✔
1208
        errors_tag = ETree.SubElement(root, "Errors")
1✔
1209
        error_tag = ETree.SubElement(errors_tag, "Error")
1✔
1210
        self._add_error_tags(error, error_tag, mime_type)
1✔
1211
        request_id_element = ETree.SubElement(root, "RequestID")
1✔
1212
        request_id_element.text = request_id
1✔
1213
        response.set_response(self._encode_payload(self._node_to_string(root, mime_type)))
1✔
1214

1215
    def _prepare_additional_traits_in_xml(self, root: ETree.Element | None, request_id: str):
1✔
1216
        # The EC2 protocol does not use the root output shape, therefore we need to remove the hierarchy level
1217
        # below the root level
1218
        if len(root) > 0:
1✔
1219
            output_node = root[0]
1✔
1220
            for child in output_node:
1✔
1221
                root.append(child)
1✔
1222
            root.remove(output_node)
1✔
1223

1224
        # Add the requestId here (it's not defined in the specs)
1225
        # For the ec2 and the query protocol, the root cannot be None at this time.
1226
        request_id_element = ETree.SubElement(root, "requestId")
1✔
1227
        request_id_element.text = request_id
1✔
1228

1229

1230
class JSONResponseSerializer(ResponseSerializer):
1✔
1231
    """
1232
    The ``JSONResponseSerializer`` is responsible for the serialization of responses from services with the ``json``
1233
    protocol. It implements the JSON response body serialization, which is also used by the
1234
    ``RestJSONResponseSerializer``.
1235
    """
1236

1237
    JSON_TYPES = [APPLICATION_JSON, APPLICATION_AMZ_JSON_1_0, APPLICATION_AMZ_JSON_1_1]
1✔
1238
    CBOR_TYPES = [APPLICATION_CBOR, APPLICATION_AMZ_CBOR_1_1]
1✔
1239
    SUPPORTED_MIME_TYPES = JSON_TYPES + CBOR_TYPES
1✔
1240

1241
    TIMESTAMP_FORMAT = "unixtimestamp"
1✔
1242

1243
    def _serialize_error(
1✔
1244
        self,
1245
        error: ServiceException,
1246
        response: Response,
1247
        shape: StructureShape,
1248
        operation_model: OperationModel,
1249
        mime_type: str,
1250
        request_id: str,
1251
    ) -> None:
1252
        body = {}
1✔
1253

1254
        # TODO implement different service-specific serializer configurations
1255
        #   - currently we set both, the `__type` member as well as the `X-Amzn-Errortype` header
1256
        #   - the specification defines that it's either the __type field OR the header
1257
        response.headers["X-Amzn-Errortype"] = error.code
1✔
1258
        body["__type"] = error.code
1✔
1259

1260
        if shape:
1✔
1261
            remaining_params = {}
1✔
1262
            # TODO add a possibility to serialize simple non-modelled errors (like S3 NoSuchBucket#BucketName)
1263
            for member in shape.members:
1✔
1264
                if hasattr(error, member):
1✔
1265
                    remaining_params[member] = getattr(error, member)
1✔
1266
                # Default error message fields can sometimes have different casing in the specs
1267
                elif member.lower() in ["code", "message"] and hasattr(error, member.lower()):
1✔
1268
                    remaining_params[member] = getattr(error, member.lower())
1✔
1269
            self._serialize(body, remaining_params, shape, None, mime_type)
1✔
1270

1271
        # Only set the message if it has not been set with the shape members
1272
        if "message" not in body and "Message" not in body:
1✔
1273
            message = self._get_error_message(error)
1✔
1274
            if message is not None:
1✔
1275
                body["message"] = message
1✔
1276

1277
        if mime_type in self.CBOR_TYPES:
1✔
1278
            response.set_response(cbor2_dumps(body, datetime_as_timestamp=True))
1✔
1279
            response.content_type = mime_type
1✔
1280
        else:
1281
            response.set_json(body)
1✔
1282

1283
    def _serialize_response(
1✔
1284
        self,
1285
        parameters: dict,
1286
        response: Response,
1287
        shape: Shape | None,
1288
        shape_members: dict,
1289
        operation_model: OperationModel,
1290
        mime_type: str,
1291
        request_id: str,
1292
    ) -> None:
1293
        if mime_type in self.CBOR_TYPES:
1✔
1294
            response.content_type = mime_type
1✔
1295
        else:
1296
            json_version = operation_model.metadata.get("jsonVersion")
1✔
1297
            if json_version is not None:
1✔
1298
                response.headers["Content-Type"] = f"application/x-amz-json-{json_version}"
1✔
1299
        response.set_response(
1✔
1300
            self._serialize_body_params(parameters, shape, operation_model, mime_type, request_id)
1301
        )
1302

1303
    def _serialize_body_params(
1✔
1304
        self,
1305
        params: dict,
1306
        shape: Shape,
1307
        operation_model: OperationModel,
1308
        mime_type: str,
1309
        request_id: str,
1310
    ) -> str | None:
1311
        body = {}
1✔
1312
        if shape is not None:
1✔
1313
            self._serialize(body, params, shape, None, mime_type)
1✔
1314

1315
        if mime_type in self.CBOR_TYPES:
1✔
1316
            return cbor2_dumps(body, datetime_as_timestamp=True)
1✔
1317
        else:
1318
            return json.dumps(body)
1✔
1319

1320
    def _serialize(self, body: dict, value: Any, shape, key: str | None, mime_type: str):
1✔
1321
        """This method dynamically invokes the correct `_serialize_type_*` method for each shape type."""
1322
        try:
1✔
1323
            method = getattr(self, f"_serialize_type_{shape.type_name}", self._default_serialize)
1✔
1324
            method(body, value, shape, key, mime_type)
1✔
1325
        except (TypeError, ValueError, AttributeError) as e:
×
1326
            raise ProtocolSerializerError(
×
1327
                f"Invalid type when serializing {shape.name}: '{value}' cannot be parsed to {shape.type_name}."
1328
            ) from e
1329

1330
    def _serialize_type_structure(
1✔
1331
        self, body: dict, value: dict, shape: StructureShape, key: str | None, mime_type: str
1332
    ):
1333
        if value is None:
1✔
1334
            return
×
1335
        if shape.is_document_type:
1✔
1336
            body[key] = value
×
1337
        else:
1338
            if key is not None:
1✔
1339
                # If a key is provided, this is a result of a recursive
1340
                # call, so we need to add a new child dict as the value
1341
                # of the passed in serialized dict.  We'll then add
1342
                # all the structure members as key/vals in the new serialized
1343
                # dictionary we just created.
1344
                new_serialized = {}
1✔
1345
                body[key] = new_serialized
1✔
1346
                body = new_serialized
1✔
1347
            members = shape.members
1✔
1348
            for member_key, member_value in value.items():
1✔
1349
                if member_value is None:
1✔
1350
                    continue
1✔
1351
                try:
1✔
1352
                    member_shape = members[member_key]
1✔
1353
                except KeyError:
1✔
1354
                    LOG.warning(
1✔
1355
                        "Response object %s contains a member which is not specified: %s",
1356
                        shape.name,
1357
                        member_key,
1358
                    )
1359
                    continue
1✔
1360
                if "name" in member_shape.serialization:
1✔
1361
                    member_key = member_shape.serialization["name"]
1✔
1362
                self._serialize(body, member_value, member_shape, member_key, mime_type)
1✔
1363

1364
    def _serialize_type_map(
1✔
1365
        self, body: dict, value: dict, shape: MapShape, key: str, mime_type: str
1366
    ):
1367
        if value is None:
1✔
1368
            return
×
1369
        map_obj = {}
1✔
1370
        body[key] = map_obj
1✔
1371
        for sub_key, sub_value in value.items():
1✔
1372
            if sub_value is not None:
1✔
1373
                self._serialize(map_obj, sub_value, shape.value, sub_key, mime_type)
1✔
1374

1375
    def _serialize_type_list(
1✔
1376
        self, body: dict, value: list, shape: ListShape, key: str, mime_type: str
1377
    ):
1378
        if value is None:
1✔
1379
            return
×
1380
        list_obj = []
1✔
1381
        body[key] = list_obj
1✔
1382
        for list_item in value:
1✔
1383
            if list_item is not None:
1✔
1384
                wrapper = {}
1✔
1385
                # The JSON list serialization is the only case where we aren't
1386
                # setting a key on a dict.  We handle this by using
1387
                # a __current__ key on a wrapper dict to serialize each
1388
                # list item before appending it to the serialized list.
1389
                self._serialize(wrapper, list_item, shape.member, "__current__", mime_type)
1✔
1390
                list_obj.append(wrapper["__current__"])
1✔
1391

1392
    def _default_serialize(self, body: dict, value: Any, _, key: str, __):
1✔
1393
        body[key] = value
1✔
1394

1395
    def _serialize_type_timestamp(
1✔
1396
        self, body: dict, value: Any, shape: Shape, key: str, mime_type: str
1397
    ):
1398
        if mime_type in self.CBOR_TYPES:
1✔
1399
            # CBOR has native support for timestamps
1400
            body[key] = value
1✔
1401
        else:
1402
            timestamp_format = shape.serialization.get("timestampFormat")
1✔
1403
            body[key] = self._convert_timestamp_to_str(value, timestamp_format)
1✔
1404

1405
    def _serialize_type_blob(self, body: dict, value: str | bytes, _, key: str, mime_type: str):
1✔
1406
        if mime_type in self.CBOR_TYPES:
1✔
1407
            body[key] = value
1✔
1408
        else:
1409
            body[key] = self._get_base64(value)
1✔
1410

1411
    def _prepare_additional_traits_in_response(
1✔
1412
        self, response: Response, operation_model: OperationModel, request_id: str
1413
    ):
1414
        response.headers["x-amzn-requestid"] = request_id
1✔
1415
        response = super()._prepare_additional_traits_in_response(
1✔
1416
            response, operation_model, request_id
1417
        )
1418
        return response
1✔
1419

1420

1421
class RestJSONResponseSerializer(BaseRestResponseSerializer, JSONResponseSerializer):
1✔
1422
    """
1423
    The ``RestJSONResponseSerializer`` is responsible for the serialization of responses from services with the
1424
    ``rest-json`` protocol.
1425
    It combines the ``BaseRestResponseSerializer`` (for the ReST specific logic) with the ``JSONResponseSerializer``
1426
    (for the JSOn body response serialization).
1427
    """
1428

1429
    def _serialize_content_type(
1✔
1430
        self, serialized: Response, shape: Shape, shape_members: dict, mime_type: str
1431
    ):
1432
        """Set Content-Type to application/json for all structured bodies."""
1433
        payload = shape.serialization.get("payload") if shape is not None else None
1✔
1434
        if self._has_streaming_payload(payload, shape_members):
1✔
1435
            # Don't apply content-type to streaming bodies
1436
            return
1✔
1437

1438
        has_body = serialized.data != b""
1✔
1439
        has_content_type = self._has_header("Content-Type", serialized.headers)
1✔
1440
        if has_body and not has_content_type:
1✔
1441
            serialized.headers["Content-Type"] = mime_type
×
1442

1443

1444
class BaseCBORResponseSerializer(ResponseSerializer):
1✔
1445
    """
1446
    The ``BaseCBORResponseSerializer`` performs the basic logic for the CBOR response serialization.
1447

1448
    There are two types of map/list in CBOR, indefinite length types and "defined" ones:
1449
    You can use the `\xbf` byte marker to indicate a map with indefinite length, then `\xff` to indicate the end
1450
     of the map.
1451
    You can also use, for example, `\xa4` to indicate a map with exactly 4 things in it, so `\xff` is not
1452
    required at the end.
1453
    AWS, for both Kinesis and `smithy-rpc-v2-cbor` services, is using indefinite data structures when returning
1454
    responses.
1455
    """
1456

1457
    SUPPORTED_MIME_TYPES = [APPLICATION_CBOR, APPLICATION_AMZ_CBOR_1_1]
1✔
1458

1459
    UNSIGNED_INT_MAJOR_TYPE = 0
1✔
1460
    NEGATIVE_INT_MAJOR_TYPE = 1
1✔
1461
    BLOB_MAJOR_TYPE = 2
1✔
1462
    STRING_MAJOR_TYPE = 3
1✔
1463
    LIST_MAJOR_TYPE = 4
1✔
1464
    MAP_MAJOR_TYPE = 5
1✔
1465
    TAG_MAJOR_TYPE = 6
1✔
1466
    FLOAT_AND_SIMPLE_MAJOR_TYPE = 7
1✔
1467

1468
    INDEFINITE_ITEM_ADDITIONAL_INFO = 31
1✔
1469
    BREAK_CODE = b"\xff"
1✔
1470
    USE_INDEFINITE_DATA_STRUCTURE = True
1✔
1471

1472
    def _serialize_data_item(
1✔
1473
        self, serialized: bytearray, value: Any, shape: Shape | None, name: str | None = None
1474
    ) -> None:
1475
        method = getattr(self, f"_serialize_type_{shape.type_name}")
1✔
1476
        if method is None:
1✔
1477
            raise ValueError(
×
1478
                f"Unrecognized C2J type: {shape.type_name}, unable to serialize request"
1479
            )
1480
        method(serialized, value, shape, name)
1✔
1481

1482
    def _serialize_type_integer(
1✔
1483
        self, serialized: bytearray, value: int, shape: Shape | None, name: str | None = None
1484
    ) -> None:
1485
        if value >= 0:
1✔
1486
            major_type = self.UNSIGNED_INT_MAJOR_TYPE
1✔
1487
        else:
1488
            major_type = self.NEGATIVE_INT_MAJOR_TYPE
×
1489
            # The only differences in serializing negative and positive integers is
1490
            # that for negative, we set the major type to 1 and set the value to -1
1491
            # minus the value
1492
            value = -1 - value
×
1493
        additional_info, num_bytes = self._get_additional_info_and_num_bytes(value)
1✔
1494
        initial_byte = self._get_initial_byte(major_type, additional_info)
1✔
1495
        if num_bytes == 0:
1✔
1496
            serialized.extend(initial_byte)
×
1497
        else:
1498
            serialized.extend(initial_byte + value.to_bytes(num_bytes, "big"))
1✔
1499

1500
    def _serialize_type_long(
1✔
1501
        self, serialized: bytearray, value: int, shape: Shape, name: str | None = None
1502
    ) -> None:
1503
        self._serialize_type_integer(serialized, value, shape, name)
×
1504

1505
    def _serialize_type_blob(
1✔
1506
        self,
1507
        serialized: bytearray,
1508
        value: str | bytes | IO[bytes],
1509
        shape: Shape | None,
1510
        name: str | None = None,
1511
    ) -> None:
1512
        if isinstance(value, str):
1✔
1513
            value = value.encode("utf-8")
×
1514
        elif not isinstance(value, (bytes, bytearray)):
1✔
1515
            # We support file-like objects for blobs; these already have been
1516
            # validated to ensure they have a read method
1517
            value = value.read()
×
1518
        length = len(value)
1✔
1519
        additional_info, num_bytes = self._get_additional_info_and_num_bytes(length)
1✔
1520
        initial_byte = self._get_initial_byte(self.BLOB_MAJOR_TYPE, additional_info)
1✔
1521
        if num_bytes == 0:
1✔
1522
            serialized.extend(initial_byte)
1✔
1523
        else:
1524
            serialized.extend(initial_byte + length.to_bytes(num_bytes, "big"))
×
1525
        serialized.extend(value)
1✔
1526

1527
    def _serialize_type_string(
1✔
1528
        self, serialized: bytearray, value: str, shape: Shape | None, name: str | None = None
1529
    ) -> None:
1530
        encoded = value.encode("utf-8")
1✔
1531
        length = len(encoded)
1✔
1532
        additional_info, num_bytes = self._get_additional_info_and_num_bytes(length)
1✔
1533
        initial_byte = self._get_initial_byte(self.STRING_MAJOR_TYPE, additional_info)
1✔
1534
        if num_bytes == 0:
1✔
1535
            serialized.extend(initial_byte + encoded)
1✔
1536
        else:
1537
            serialized.extend(initial_byte + length.to_bytes(num_bytes, "big") + encoded)
1✔
1538

1539
    def _serialize_type_list(
1✔
1540
        self, serialized: bytearray, value: list, shape: Shape | None, name: str | None = None
1541
    ) -> None:
1542
        initial_bytes, closing_bytes = self._get_bytes_for_data_structure(
1✔
1543
            value, self.LIST_MAJOR_TYPE
1544
        )
1545
        serialized.extend(initial_bytes)
1✔
1546

1547
        for item in value:
1✔
1548
            self._serialize_data_item(serialized, item, shape.member)
1✔
1549

1550
        if closing_bytes is not None:
1✔
1551
            serialized.extend(closing_bytes)
1✔
1552

1553
    def _serialize_type_map(
1✔
1554
        self, serialized: bytearray, value: dict, shape: Shape | None, name: str | None = None
1555
    ) -> None:
1556
        initial_bytes, closing_bytes = self._get_bytes_for_data_structure(
1✔
1557
            value, self.MAP_MAJOR_TYPE
1558
        )
1559
        serialized.extend(initial_bytes)
1✔
1560

1561
        for key_item, item in value.items():
1✔
1562
            self._serialize_data_item(serialized, key_item, shape.key)
1✔
1563
            self._serialize_data_item(serialized, item, shape.value)
1✔
1564

1565
        if closing_bytes is not None:
1✔
1566
            serialized.extend(closing_bytes)
1✔
1567

1568
    def _serialize_type_structure(
1✔
1569
        self, serialized: bytearray, value: dict, shape: Shape | None, name: str | None = None
1570
    ) -> None:
1571
        if name is not None:
1✔
1572
            # For nested structures, we need to serialize the key first
1573
            self._serialize_data_item(serialized, name, shape.key_shape)
×
1574

1575
        # Remove `None` values from the dictionary
1576
        value = {k: v for k, v in value.items() if v is not None}
1✔
1577

1578
        initial_bytes, closing_bytes = self._get_bytes_for_data_structure(
1✔
1579
            value, self.MAP_MAJOR_TYPE
1580
        )
1581
        serialized.extend(initial_bytes)
1✔
1582

1583
        members = shape.members
1✔
1584
        for member_key, member_value in value.items():
1✔
1585
            member_shape = members[member_key]
1✔
1586
            if "name" in member_shape.serialization:
1✔
1587
                member_key = member_shape.serialization["name"]
×
1588
            if member_value is not None:
1✔
1589
                self._serialize_type_string(serialized, member_key, None, None)
1✔
1590
                self._serialize_data_item(serialized, member_value, member_shape)
1✔
1591

1592
        if closing_bytes is not None:
1✔
1593
            serialized.extend(closing_bytes)
1✔
1594

1595
    def _serialize_type_timestamp(
1✔
1596
        self,
1597
        serialized: bytearray,
1598
        value: int | str | datetime.datetime,
1599
        shape: Shape | None,
1600
        name: str | None = None,
1601
    ) -> None:
1602
        # https://smithy.io/2.0/additional-specs/protocols/smithy-rpc-v2.html#timestamp-type-serialization
1603
        tag = 1  # Use tag 1 for unix timestamp
1✔
1604
        initial_byte = self._get_initial_byte(self.TAG_MAJOR_TYPE, tag)
1✔
1605
        serialized.extend(initial_byte)  # Tagging the timestamp
1✔
1606

1607
        # we encode the timestamp as a double, like the Go SDK
1608
        # https://github.com/aws/aws-sdk-go-v2/blob/5d7c17325a2581afae4455c150549174ebfd9428/internal/protocoltest/smithyrpcv2cbor/serializers.go#L664-L669
1609
        # Currently, the Botocore serializer using unsigned integers, but it does not conform to the Smithy specs:
1610
        # > This protocol uses epoch-seconds, also known as Unix timestamps, with millisecond
1611
        # > (1/1000th of a second) resolution.
1612
        timestamp = float(self._convert_timestamp_to_str(value))
1✔
1613
        initial_byte = self._get_initial_byte(self.FLOAT_AND_SIMPLE_MAJOR_TYPE, 27)
1✔
1614
        serialized.extend(initial_byte + struct.pack(">d", timestamp))
1✔
1615

1616
    def _serialize_type_float(
1✔
1617
        self, serialized: bytearray, value: float, shape: Shape | None, name: str | None = None
1618
    ) -> None:
1619
        if self._is_special_number(value):
×
1620
            serialized.extend(
×
1621
                self._get_bytes_for_special_numbers(value)
1622
            )  # Handle special values like NaN or Infinity
1623
        else:
1624
            initial_byte = self._get_initial_byte(self.FLOAT_AND_SIMPLE_MAJOR_TYPE, 26)
×
1625
            serialized.extend(initial_byte + struct.pack(">f", value))
×
1626

1627
    def _serialize_type_double(
1✔
1628
        self, serialized: bytearray, value: float, shape: Shape | None, name: str | None = None
1629
    ) -> None:
1630
        if self._is_special_number(value):
1✔
1631
            serialized.extend(
×
1632
                self._get_bytes_for_special_numbers(value)
1633
            )  # Handle special values like NaN or Infinity
1634
        else:
1635
            initial_byte = self._get_initial_byte(self.FLOAT_AND_SIMPLE_MAJOR_TYPE, 27)
1✔
1636
            serialized.extend(initial_byte + struct.pack(">d", value))
1✔
1637

1638
    def _serialize_type_boolean(
1✔
1639
        self, serialized: bytearray, value: bool, shape: Shape | None, name: str | None = None
1640
    ) -> None:
1641
        additional_info = 21 if value else 20
×
1642
        serialized.extend(self._get_initial_byte(self.FLOAT_AND_SIMPLE_MAJOR_TYPE, additional_info))
×
1643

1644
    @staticmethod
1✔
1645
    def _get_additional_info_and_num_bytes(value: int) -> tuple[int, int]:
1✔
1646
        # Values under 24 can be stored in the initial byte and don't need further
1647
        # encoding
1648
        if value < 24:
1✔
1649
            return value, 0
1✔
1650
        # Values between 24 and 255 (inclusive) can be stored in 1 byte and
1651
        # correspond to additional info 24
1652
        elif value < 256:
1✔
1653
            return 24, 1
1✔
1654
        # Values up to 65535 can be stored in two bytes and correspond to additional
1655
        # info 25
1656
        elif value < 65536:
×
1657
            return 25, 2
×
1658
        # Values up to 4294967296 can be stored in four bytes and correspond to
1659
        # additional info 26
1660
        elif value < 4294967296:
×
1661
            return 26, 4
×
1662
        # The maximum number of bytes in a definite length data items is 8 which
1663
        # to additional info 27
1664
        else:
1665
            return 27, 8
×
1666

1667
    def _get_initial_byte(self, major_type: int, additional_info: int) -> bytes:
1✔
1668
        # The highest order three bits are the major type, so we need to bitshift the
1669
        # major type by 5
1670
        major_type_bytes = major_type << 5
1✔
1671
        return (major_type_bytes | additional_info).to_bytes(1, "big")
1✔
1672

1673
    @staticmethod
1✔
1674
    def _is_special_number(value: int | float) -> bool:
1✔
1675
        return any(
1✔
1676
            [
1677
                value == float("inf"),
1678
                value == float("-inf"),
1679
                math.isnan(value),
1680
            ]
1681
        )
1682

1683
    def _get_bytes_for_special_numbers(self, value: int | float) -> bytes:
1✔
1684
        additional_info = 25
×
1685
        initial_byte = self._get_initial_byte(self.FLOAT_AND_SIMPLE_MAJOR_TYPE, additional_info)
×
1686
        if value == float("inf"):
×
1687
            return initial_byte + struct.pack(">H", 0x7C00)
×
1688
        elif value == float("-inf"):
×
1689
            return initial_byte + struct.pack(">H", 0xFC00)
×
1690
        elif math.isnan(value):
×
1691
            return initial_byte + struct.pack(">H", 0x7E00)
×
1692

1693
    def _get_bytes_for_data_structure(
1✔
1694
        self, value: list | dict, major_type: int
1695
    ) -> tuple[bytes, bytes | None]:
1696
        if self.USE_INDEFINITE_DATA_STRUCTURE:
1✔
1697
            additional_info = self.INDEFINITE_ITEM_ADDITIONAL_INFO
1✔
1698
            return self._get_initial_byte(major_type, additional_info), self.BREAK_CODE
1✔
1699
        else:
1700
            length = len(value)
×
1701
            additional_info, num_bytes = self._get_additional_info_and_num_bytes(length)
×
1702
            initial_byte = self._get_initial_byte(major_type, additional_info)
×
1703
            if num_bytes != 0:
×
1704
                initial_byte = initial_byte + length.to_bytes(num_bytes, "big")
×
1705

1706
            return initial_byte, None
×
1707

1708

1709
class CBORResponseSerializer(BaseCBORResponseSerializer):
1✔
1710
    """
1711
    The ``CBORResponseSerializer`` is responsible for the serialization of responses from services with the ``cbor``
1712
    protocol. It implements the CBOR response body serialization, which is only currently used by Kinesis and is derived
1713
    conceptually from the ``JSONResponseSerializer``
1714
    """
1715

1716
    TIMESTAMP_FORMAT = "unixtimestamp"
1✔
1717

1718
    def _serialize_error(
1✔
1719
        self,
1720
        error: ServiceException,
1721
        response: Response,
1722
        shape: StructureShape,
1723
        operation_model: OperationModel,
1724
        mime_type: str,
1725
        request_id: str,
1726
    ) -> None:
1727
        body = bytearray()
×
1728
        response.content_type = mime_type
×
1729
        response.headers["X-Amzn-Errortype"] = error.code
×
1730

1731
        if shape:
×
1732
            # FIXME: we need to manually add the `__type` field to the shape as it is not part of the specs
1733
            #  think about a better way, this is very hacky
1734
            shape_copy = copy.deepcopy(shape)
×
1735
            shape_copy.members["__type"] = StringShape(
×
1736
                shape_name="__type", shape_model={"type": "string"}
1737
            )
1738
            remaining_params = {"__type": error.code}
×
1739

1740
            for member_name in shape_copy.members:
×
1741
                if hasattr(error, member_name):
×
1742
                    remaining_params[member_name] = getattr(error, member_name)
×
1743
                # Default error message fields can sometimes have different casing in the specs
1744
                elif member_name.lower() in ["code", "message"] and hasattr(
×
1745
                    error, member_name.lower()
1746
                ):
1747
                    remaining_params[member_name] = getattr(error, member_name.lower())
×
1748

1749
            self._serialize_data_item(body, remaining_params, shape_copy, None)
×
1750

1751
        response.set_response(bytes(body))
×
1752

1753
    def _serialize_response(
1✔
1754
        self,
1755
        parameters: dict,
1756
        response: Response,
1757
        shape: Shape | None,
1758
        shape_members: dict,
1759
        operation_model: OperationModel,
1760
        mime_type: str,
1761
        request_id: str,
1762
    ) -> None:
1763
        response.content_type = mime_type
1✔
1764
        response.set_response(
1✔
1765
            self._serialize_body_params(parameters, shape, operation_model, mime_type, request_id)
1766
        )
1767

1768
    def _serialize_body_params(
1✔
1769
        self,
1770
        params: dict,
1771
        shape: Shape,
1772
        operation_model: OperationModel,
1773
        mime_type: str,
1774
        request_id: str,
1775
    ) -> bytes | None:
1776
        if shape is None:
1✔
NEW
1777
            return b""
×
1778
        body = bytearray()
1✔
1779
        self._serialize_data_item(body, params, shape)
1✔
1780
        return bytes(body)
1✔
1781

1782
    def _prepare_additional_traits_in_response(
1✔
1783
        self, response: Response, operation_model: OperationModel, request_id: str
1784
    ) -> Response:
1785
        response.headers["x-amzn-requestid"] = request_id
1✔
1786
        response = super()._prepare_additional_traits_in_response(
1✔
1787
            response, operation_model, request_id
1788
        )
1789
        return response
1✔
1790

1791

1792
class BaseRpcV2ResponseSerializer(ResponseSerializer):
1✔
1793
    """
1794
    The BaseRpcV2ResponseSerializer performs the basic logic for the RPC V2 response serialization.
1795
    The only variance between the various RPCv2 protocols is the way the body is serialized for regular responses,
1796
    and the way they will encode exceptions.
1797
    """
1798

1799
    def _serialize_response(
1✔
1800
        self,
1801
        parameters: dict,
1802
        response: Response,
1803
        shape: Shape | None,
1804
        shape_members: dict,
1805
        operation_model: OperationModel,
1806
        mime_type: str,
1807
        request_id: str,
1808
    ) -> None:
1809
        response.content_type = mime_type
1✔
1810
        response.set_response(
1✔
1811
            self._serialize_body_params(parameters, shape, operation_model, mime_type, request_id)
1812
        )
1813

1814
    def _serialize_body_params(
1✔
1815
        self,
1816
        params: dict,
1817
        shape: Shape,
1818
        operation_model: OperationModel,
1819
        mime_type: str,
1820
        request_id: str,
1821
    ) -> bytes | None:
1822
        raise NotImplementedError
1823

1824

1825
class RpcV2CBORResponseSerializer(BaseRpcV2ResponseSerializer, BaseCBORResponseSerializer):
1✔
1826
    """
1827
    The RpcV2CBORResponseSerializer implements the CBOR body serialization part for the RPC v2 protocol, and implements the
1828
    specific exception serialization.
1829
    https://smithy.io/2.0/additional-specs/protocols/smithy-rpc-v2.html
1830
    """
1831

1832
    # the Smithy spec defines that only `application/cbor` is supported for RPC v2 CBOR
1833
    SUPPORTED_MIME_TYPES = [APPLICATION_CBOR]
1✔
1834
    TIMESTAMP_FORMAT = "unixtimestamp"
1✔
1835

1836
    def _serialize_body_params(
1✔
1837
        self,
1838
        params: dict,
1839
        shape: Shape,
1840
        operation_model: OperationModel,
1841
        mime_type: str,
1842
        request_id: str,
1843
    ) -> bytes | None:
1844
        if shape is None:
1✔
1845
            return b""
1✔
1846
        body = bytearray()
1✔
1847
        self._serialize_data_item(body, params, shape)
1✔
1848
        return bytes(body)
1✔
1849

1850
    def _serialize_error(
1✔
1851
        self,
1852
        error: ServiceException,
1853
        response: Response,
1854
        shape: StructureShape,
1855
        operation_model: OperationModel,
1856
        mime_type: str,
1857
        request_id: str,
1858
    ) -> None:
1859
        body = bytearray()
1✔
1860
        response.content_type = mime_type  # can only be 'application/cbor'
1✔
1861
        # TODO: the Botocore parser is able to look at the `x-amzn-query-error` header for the RpcV2 CBOR protocol
1862
        #  we'll need to investigate which services need it
1863
        # Responses for the rpcv2Cbor protocol SHOULD NOT contain the X-Amzn-ErrorType header.
1864
        # Type information is always serialized in the payload. This is different than `json` protocol
1865

1866
        if shape:
1✔
1867
            # FIXME: we need to manually add the `__type` field to the shape as it is not part of the specs
1868
            #  think about a better way, this is very hacky
1869
            # Error responses in the rpcv2Cbor protocol MUST be serialized identically to standard responses with one
1870
            # additional component to distinguish which error is contained: a body field named __type.
1871
            shape_copy = copy.deepcopy(shape)
1✔
1872
            shape_copy.members["__type"] = StringShape(
1✔
1873
                shape_name="__type", shape_model={"type": "string"}
1874
            )
1875
            remaining_params = {"__type": error.code}
1✔
1876

1877
            for member_name in shape_copy.members:
1✔
1878
                if hasattr(error, member_name):
1✔
1879
                    remaining_params[member_name] = getattr(error, member_name)
1✔
1880
                # Default error message fields can sometimes have different casing in the specs
1881
                elif member_name.lower() in ["code", "message"] and hasattr(
1✔
1882
                    error, member_name.lower()
1883
                ):
1884
                    remaining_params[member_name] = getattr(error, member_name.lower())
×
1885

1886
            self._serialize_data_item(body, remaining_params, shape_copy, None)
1✔
1887

1888
        response.set_response(bytes(body))
1✔
1889

1890
    def _prepare_additional_traits_in_response(
1✔
1891
        self, response: Response, operation_model: OperationModel, request_id: str
1892
    ):
1893
        response.headers["x-amzn-requestid"] = request_id
1✔
1894
        response.headers["Smithy-Protocol"] = "rpc-v2-cbor"
1✔
1895
        response = super()._prepare_additional_traits_in_response(
1✔
1896
            response, operation_model, request_id
1897
        )
1898
        return response
1✔
1899

1900

1901
class S3ResponseSerializer(RestXMLResponseSerializer):
1✔
1902
    """
1903
    The ``S3ResponseSerializer`` adds some minor logic to handle S3 specific peculiarities with the error response
1904
    serialization and the root node tag.
1905
    """
1906

1907
    SUPPORTED_MIME_TYPES = [APPLICATION_XML, TEXT_XML]
1✔
1908
    _RESPONSE_ROOT_TAGS = {
1✔
1909
        "CompleteMultipartUploadOutput": "CompleteMultipartUploadResult",
1910
        "CopyObjectOutput": "CopyObjectResult",
1911
        "CreateMultipartUploadOutput": "InitiateMultipartUploadResult",
1912
        "DeleteObjectsOutput": "DeleteResult",
1913
        "GetBucketAccelerateConfigurationOutput": "AccelerateConfiguration",
1914
        "GetBucketAclOutput": "AccessControlPolicy",
1915
        "GetBucketAnalyticsConfigurationOutput": "AnalyticsConfiguration",
1916
        "GetBucketCorsOutput": "CORSConfiguration",
1917
        "GetBucketEncryptionOutput": "ServerSideEncryptionConfiguration",
1918
        "GetBucketIntelligentTieringConfigurationOutput": "IntelligentTieringConfiguration",
1919
        "GetBucketInventoryConfigurationOutput": "InventoryConfiguration",
1920
        "GetBucketLifecycleOutput": "LifecycleConfiguration",
1921
        "GetBucketLifecycleConfigurationOutput": "LifecycleConfiguration",
1922
        "GetBucketLoggingOutput": "BucketLoggingStatus",
1923
        "GetBucketMetricsConfigurationOutput": "MetricsConfiguration",
1924
        "NotificationConfigurationDeprecated": "NotificationConfiguration",
1925
        "GetBucketOwnershipControlsOutput": "OwnershipControls",
1926
        "GetBucketPolicyStatusOutput": "PolicyStatus",
1927
        "GetBucketReplicationOutput": "ReplicationConfiguration",
1928
        "GetBucketRequestPaymentOutput": "RequestPaymentConfiguration",
1929
        "GetBucketTaggingOutput": "Tagging",
1930
        "GetBucketVersioningOutput": "VersioningConfiguration",
1931
        "GetBucketWebsiteOutput": "WebsiteConfiguration",
1932
        "GetObjectAclOutput": "AccessControlPolicy",
1933
        "GetObjectLegalHoldOutput": "LegalHold",
1934
        "GetObjectLockConfigurationOutput": "ObjectLockConfiguration",
1935
        "GetObjectRetentionOutput": "Retention",
1936
        "GetObjectTaggingOutput": "Tagging",
1937
        "GetObjectAttributesOutput": "GetObjectAttributesResponse",
1938
        "GetPublicAccessBlockOutput": "PublicAccessBlockConfiguration",
1939
        "ListBucketAnalyticsConfigurationsOutput": "ListBucketAnalyticsConfigurationResult",
1940
        "ListBucketInventoryConfigurationsOutput": "ListInventoryConfigurationsResult",
1941
        "ListBucketMetricsConfigurationsOutput": "ListMetricsConfigurationsResult",
1942
        "ListBucketsOutput": "ListAllMyBucketsResult",
1943
        "ListMultipartUploadsOutput": "ListMultipartUploadsResult",
1944
        "ListObjectsOutput": "ListBucketResult",
1945
        "ListObjectsV2Output": "ListBucketResult",
1946
        "ListObjectVersionsOutput": "ListVersionsResult",
1947
        "ListPartsOutput": "ListPartsResult",
1948
        "UploadPartCopyOutput": "CopyPartResult",
1949
    }
1950

1951
    XML_NAMESPACE = "http://s3.amazonaws.com/doc/2006-03-01/"
1✔
1952

1953
    def _serialize_response(
1✔
1954
        self,
1955
        parameters: dict,
1956
        response: Response,
1957
        shape: Shape | None,
1958
        shape_members: dict,
1959
        operation_model: OperationModel,
1960
        mime_type: str,
1961
        request_id: str,
1962
    ) -> None:
1963
        header_params, payload_params = self._partition_members(parameters, shape)
1✔
1964
        self._process_header_members(header_params, response, shape)
1✔
1965
        # "HEAD" responses are basically "GET" responses without the actual body.
1966
        # Do not process the body payload in this case (setting a body could also manipulate the headers)
1967
        # - If the response is a redirection, the body should be empty as well
1968
        # - If the response is from a "PUT" request, the body should be empty except if there's a specific "payload"
1969
        #   field in the serialization (CopyObject and CopyObjectPart)
1970
        http_method = operation_model.http.get("method")
1✔
1971
        if (
1✔
1972
            http_method != "HEAD"
1973
            and not 300 <= response.status_code < 400
1974
            and not (http_method == "PUT" and shape and not shape.serialization.get("payload"))
1975
        ):
1976
            self._serialize_payload(
1✔
1977
                payload_params,
1978
                response,
1979
                shape,
1980
                shape_members,
1981
                operation_model,
1982
                mime_type,
1983
                request_id,
1984
            )
1985
        self._serialize_content_type(response, shape, shape_members, mime_type)
1✔
1986

1987
    def _serialize_error(
1✔
1988
        self,
1989
        error: ServiceException,
1990
        response: Response,
1991
        shape: StructureShape,
1992
        operation_model: OperationModel,
1993
        mime_type: str,
1994
        request_id: str,
1995
    ) -> None:
1996
        attr = (
1✔
1997
            {"xmlns": operation_model.metadata.get("xmlNamespace")}
1998
            if "xmlNamespace" in operation_model.metadata
1999
            else {}
2000
        )
2001
        root = ETree.Element("Error", attr)
1✔
2002
        self._add_error_tags(error, root, mime_type)
1✔
2003
        request_id_element = ETree.SubElement(root, "RequestId")
1✔
2004
        request_id_element.text = request_id
1✔
2005

2006
        header_params, payload_params = self._partition_members(vars(error), shape)
1✔
2007
        self._add_additional_error_tags(payload_params, root, shape, mime_type)
1✔
2008
        self._process_header_members(header_params, response, shape)
1✔
2009

2010
        response.set_response(self._encode_payload(self._node_to_string(root, mime_type)))
1✔
2011

2012
    def _serialize_body_params(
1✔
2013
        self,
2014
        params: dict,
2015
        shape: Shape,
2016
        operation_model: OperationModel,
2017
        mime_type: str,
2018
        request_id: str,
2019
    ) -> str | None:
2020
        root = self._serialize_body_params_to_xml(params, shape, operation_model, mime_type)
1✔
2021
        # S3 does not follow the specs on the root tag name for 41 of 44 operations
2022
        root.tag = self._RESPONSE_ROOT_TAGS.get(root.tag, root.tag)
1✔
2023
        self._prepare_additional_traits_in_xml(root, request_id)
1✔
2024
        return self._node_to_string(root, mime_type)
1✔
2025

2026
    def _prepare_additional_traits_in_response(
1✔
2027
        self, response: Response, operation_model: OperationModel, request_id: str
2028
    ):
2029
        """Adds the request ID to the headers (in contrast to the body - as in the Query protocol)."""
2030
        response = super()._prepare_additional_traits_in_response(
1✔
2031
            response, operation_model, request_id
2032
        )
2033
        # s3 extended Request ID
2034
        # mostly used internally on AWS and corresponds to a HostId
2035
        response.headers["x-amz-id-2"] = (
1✔
2036
            "s9lzHYrFp76ZVxRcpX9+5cjAnEH2ROuNkd2BHfIa6UkFVdtjf5mKR3/eTPFvsiP/XV/VLi31234="
2037
        )
2038
        return response
1✔
2039

2040
    def _add_error_tags(
1✔
2041
        self, error: ServiceException, error_tag: ETree.Element, mime_type: str
2042
    ) -> None:
2043
        code_tag = ETree.SubElement(error_tag, "Code")
1✔
2044
        code_tag.text = error.code
1✔
2045
        message = self._get_error_message(error)
1✔
2046
        if message:
1✔
2047
            self._default_serialize(error_tag, message, None, "Message", mime_type)
1✔
2048
        else:
2049
            # In S3, if there's no message, create an empty node
2050
            self._create_empty_node(error_tag, "Message")
1✔
2051
        if error.sender_fault:
1✔
2052
            # The sender fault is either not set or "Sender"
2053
            self._default_serialize(error_tag, "Sender", None, "Type", mime_type)
×
2054

2055
    @staticmethod
1✔
2056
    def _create_empty_node(xmlnode: ETree.Element, name: str) -> None:
1✔
2057
        ETree.SubElement(xmlnode, name)
1✔
2058

2059
    def _prepare_additional_traits_in_xml(self, root: ETree.Element | None, request_id: str):
1✔
2060
        # some tools (Serverless) require a newline after the "<?xml ...>\n" preamble line, e.g., for LocationConstraint
2061
        if root and not root.tail:
1✔
2062
            root.tail = "\n"
1✔
2063

2064
        root.attrib["xmlns"] = self.XML_NAMESPACE
1✔
2065

2066
    @staticmethod
1✔
2067
    def _timestamp_iso8601(value: datetime.datetime) -> str:
1✔
2068
        """
2069
        This is very specific to S3, S3 returns an ISO8601 timestamp but with milliseconds always set to 000
2070
        Some SDKs are very picky about the length
2071
        """
2072
        return value.strftime("%Y-%m-%dT%H:%M:%S.000Z")
1✔
2073

2074

2075
class SqsQueryResponseSerializer(QueryResponseSerializer):
1✔
2076
    """
2077
    Unfortunately, SQS uses a rare interpretation of the XML protocol: It uses HTML entities within XML tag text nodes.
2078
    For example:
2079
    - Normal XML serializers: <Message>No need to escape quotes (like this: ") with HTML entities in XML.</Message>
2080
    - SQS XML serializer: <Message>No need to escape quotes (like this: &quot;) with HTML entities in XML.</Message>
2081

2082
    None of the prominent XML frameworks for python allow HTML entity escapes when serializing XML.
2083
    This serializer implements the following workaround:
2084
    - Escape quotes and \r with their HTML entities (&quot; and &#xD;).
2085
    - Since & is (correctly) escaped in XML, the serialized string contains &amp;quot; and &amp;#xD;
2086
    - These double-escapes are corrected by replacing such strings with their original.
2087
    """
2088

2089
    # those are deleted from the JSON specs, but need to be kept for legacy reason (sent in 'x-amzn-query-error')
2090
    QUERY_PREFIXED_ERRORS = {
1✔
2091
        "BatchEntryIdsNotDistinct",
2092
        "BatchRequestTooLong",
2093
        "EmptyBatchRequest",
2094
        "InvalidBatchEntryId",
2095
        "MessageNotInflight",
2096
        "PurgeQueueInProgress",
2097
        "QueueDeletedRecently",
2098
        "TooManyEntriesInBatchRequest",
2099
        "UnsupportedOperation",
2100
    }
2101

2102
    # Some error code changed between JSON and query, and we need to have a way to map it for legacy reason
2103
    JSON_TO_QUERY_ERROR_CODES = {
1✔
2104
        "InvalidParameterValueException": "InvalidParameterValue",
2105
        "MissingRequiredParameterException": "MissingParameter",
2106
        "AccessDeniedException": "AccessDenied",
2107
        "QueueDoesNotExist": "AWS.SimpleQueueService.NonExistentQueue",
2108
        "QueueNameExists": "QueueAlreadyExists",
2109
    }
2110

2111
    SENDER_FAULT_ERRORS = (
1✔
2112
        QUERY_PREFIXED_ERRORS
2113
        | JSON_TO_QUERY_ERROR_CODES.keys()
2114
        | {"OverLimit", "ResourceNotFoundException"}
2115
    )
2116

2117
    def _default_serialize(self, xmlnode: ETree.Element, params: str, _, name: str, __) -> None:
1✔
2118
        """
2119
        Ensures that we "mark" characters in the node's text which need to be specifically encoded.
2120
        This is necessary to easily identify these specific characters later, after the standard XML serialization is
2121
        done, while not replacing any other occurrences of these characters which might appear in the serialized string.
2122
        """
2123
        node = ETree.SubElement(xmlnode, name)
1✔
2124
        node.text = (
1✔
2125
            str(params)
2126
            .replace('"', '__marker__"__marker__')
2127
            .replace("\r", "__marker__-r__marker__")
2128
        )
2129

2130
    def _node_to_string(self, root: ETree.ElementTree | None, mime_type: str) -> str | None:
1✔
2131
        """Replaces the previously "marked" characters with their encoded value."""
2132
        generated_string = super()._node_to_string(root, mime_type)
1✔
2133
        if generated_string is None:
1✔
2134
            return None
×
2135
        generated_string = to_str(generated_string)
1✔
2136
        # Undo the second escaping of the &
2137
        # Undo the second escaping of the carriage return (\r)
2138
        if mime_type == APPLICATION_JSON:
1✔
2139
            # At this point the json was already dumped and escaped, so we replace directly.
2140
            generated_string = generated_string.replace(r"__marker__\"__marker__", r"\"").replace(
1✔
2141
                "__marker__-r__marker__", r"\r"
2142
            )
2143
        else:
2144
            generated_string = generated_string.replace('__marker__"__marker__', "&quot;").replace(
1✔
2145
                "__marker__-r__marker__", "&#xD;"
2146
            )
2147

2148
        return to_bytes(generated_string)
1✔
2149

2150
    def _add_error_tags(
1✔
2151
        self, error: ServiceException, error_tag: ETree.Element, mime_type: str
2152
    ) -> None:
2153
        """The SQS API stubs is now generated from JSON specs, and some fields have been modified"""
2154
        code_tag = ETree.SubElement(error_tag, "Code")
1✔
2155

2156
        if error.code in self.JSON_TO_QUERY_ERROR_CODES:
1✔
2157
            error_code = self.JSON_TO_QUERY_ERROR_CODES[error.code]
1✔
2158
        elif error.code in self.QUERY_PREFIXED_ERRORS:
1✔
2159
            error_code = f"AWS.SimpleQueueService.{error.code}"
1✔
2160
        else:
2161
            error_code = error.code
1✔
2162
        code_tag.text = error_code
1✔
2163
        message = self._get_error_message(error)
1✔
2164
        if message:
1✔
2165
            self._default_serialize(error_tag, message, None, "Message", mime_type)
1✔
2166
        if error.code in self.SENDER_FAULT_ERRORS or error.sender_fault:
1✔
2167
            # The sender fault is either not set or "Sender"
2168
            self._default_serialize(error_tag, "Sender", None, "Type", mime_type)
1✔
2169

2170

2171
class SqsJsonResponseSerializer(JSONResponseSerializer):
1✔
2172
    # those are deleted from the JSON specs, but need to be kept for legacy reason (sent in 'x-amzn-query-error')
2173
    QUERY_PREFIXED_ERRORS = {
1✔
2174
        "BatchEntryIdsNotDistinct",
2175
        "BatchRequestTooLong",
2176
        "EmptyBatchRequest",
2177
        "InvalidBatchEntryId",
2178
        "MessageNotInflight",
2179
        "PurgeQueueInProgress",
2180
        "QueueDeletedRecently",
2181
        "TooManyEntriesInBatchRequest",
2182
        "UnsupportedOperation",
2183
    }
2184

2185
    # Some error code changed between JSON and query, and we need to have a way to map it for legacy reason
2186
    JSON_TO_QUERY_ERROR_CODES = {
1✔
2187
        "InvalidParameterValueException": "InvalidParameterValue",
2188
        "MissingRequiredParameterException": "MissingParameter",
2189
        "AccessDeniedException": "AccessDenied",
2190
        "QueueDoesNotExist": "AWS.SimpleQueueService.NonExistentQueue",
2191
        "QueueNameExists": "QueueAlreadyExists",
2192
    }
2193

2194
    def _serialize_error(
1✔
2195
        self,
2196
        error: ServiceException,
2197
        response: Response,
2198
        shape: StructureShape,
2199
        operation_model: OperationModel,
2200
        mime_type: str,
2201
        request_id: str,
2202
    ) -> None:
2203
        """
2204
        Overrides _serialize_error as SQS has a special header for query API legacy reason: 'x-amzn-query-error',
2205
        which contained the exception code as well as a Sender field.
2206
        Ex: 'x-amzn-query-error': 'InvalidParameterValue;Sender'
2207
        """
2208
        # TODO: for body["__type"] = error.code, it seems AWS differs from what we send for SQS
2209
        # AWS: "com.amazon.coral.service#InvalidParameterValueException"
2210
        # or AWS: "com.amazonaws.sqs#BatchRequestTooLong"
2211
        # LocalStack: "InvalidParameterValue"
2212
        super()._serialize_error(error, response, shape, operation_model, mime_type, request_id)
1✔
2213
        # We need to add a prefix to certain errors, as they have been deleted in the specs. These will not change
2214
        if error.code in self.JSON_TO_QUERY_ERROR_CODES:
1✔
2215
            code = self.JSON_TO_QUERY_ERROR_CODES[error.code]
1✔
2216
        elif error.code in self.QUERY_PREFIXED_ERRORS:
1✔
2217
            code = f"AWS.SimpleQueueService.{error.code}"
1✔
2218
        else:
2219
            code = error.code
1✔
2220

2221
        response.headers["x-amzn-query-error"] = f"{code};Sender"
1✔
2222

2223

2224
def gen_amzn_requestid():
1✔
2225
    """
2226
    Generate generic AWS request ID.
2227

2228
    3 uses a different format and set of request Ids.
2229

2230
    Examples:
2231
    996d38a0-a4e9-45de-bad4-480cd962d208
2232
    b9260553-df1b-4db6-ae41-97b89a5f85ea
2233
    """
2234
    return long_uid()
1✔
2235

2236

2237
@functools.cache
1✔
2238
def create_serializer(
1✔
2239
    service: ServiceModel, protocol: ProtocolName | None = None
2240
) -> ResponseSerializer:
2241
    """
2242
    Creates the right serializer for the given service model.
2243

2244
    :param service: to create the serializer for
2245
    :param protocol: the protocol for the serializer. If not provided, fallback to the service's default protocol
2246
    :return: ResponseSerializer which can handle the protocol of the service
2247
    """
2248

2249
    # Unfortunately, some services show subtle differences in their serialized responses, even though their
2250
    # specification states they implement the same protocol.
2251
    # Since some clients might be stricter / less resilient than others, we need to mimic the serialization of the
2252
    # specific services as close as possible.
2253
    # Therefore, the service-specific serializer implementations (basically the implicit / informally more specific
2254
    # protocol implementation) has precedence over the more general protocol-specific serializers.
2255
    service_specific_serializers = {
1✔
2256
        "sqs": {"json": SqsJsonResponseSerializer, "query": SqsQueryResponseSerializer},
2257
        "s3": {"rest-xml": S3ResponseSerializer},
2258
    }
2259
    protocol_specific_serializers = {
1✔
2260
        "query": QueryResponseSerializer,
2261
        "json": JSONResponseSerializer,
2262
        "rest-json": RestJSONResponseSerializer,
2263
        "rest-xml": RestXMLResponseSerializer,
2264
        "ec2": EC2ResponseSerializer,
2265
        "smithy-rpc-v2-cbor": RpcV2CBORResponseSerializer,
2266
        # TODO: implement multi-protocol support for Kinesis, so that it can uses the `cbor` protocol and remove
2267
        #  CBOR handling from JSONResponseParser
2268
        # this is not an "official" protocol defined from the spec, but is derived from ``json``
2269
    }
2270
    service_protocol = protocol or service.protocol
1✔
2271

2272
    # Try to select a service- and protocol-specific serializer implementation
2273
    if (
1✔
2274
        service.service_name in service_specific_serializers
2275
        and service_protocol in service_specific_serializers[service.service_name]
2276
    ):
2277
        return service_specific_serializers[service.service_name][service_protocol]()
1✔
2278
    else:
2279
        # Otherwise, pick the protocol-specific serializer for the protocol of the service
2280
        return protocol_specific_serializers[service_protocol]()
1✔
2281

2282

2283
def aws_response_serializer(
1✔
2284
    service_name: str, operation: str, protocol: ProtocolName | None = None
2285
):
2286
    """
2287
    A decorator for an HTTP route that can serialize return values or exceptions into AWS responses.
2288
    This can be used to create AWS request handlers in a convenient way. Example usage::
2289

2290
        from localstack.http import route, Request
2291
        from localstack.aws.api.sqs import ListQueuesResult
2292

2293
        @route("/_aws/sqs/queues")
2294
        @aws_response_serializer("sqs", "ListQueues")
2295
        def my_route(request: Request):
2296
            if some_condition_on_request:
2297
                raise CommonServiceError("...")  # <- will be serialized into an error response
2298

2299
            return ListQueuesResult(QueueUrls=...)  # <- object from the SQS API will be serialized
2300

2301
    :param service_name: the AWS service (e.g., "sqs", "lambda")
2302
    :param protocol: the protocol of the AWS service to serialize to. If not set (by default) the default protocol
2303
                    of the service in botocore is used.
2304
    :param operation: the operation name (e.g., "ReceiveMessage", "ListFunctions")
2305
    :returns: a decorator
2306
    """
2307

2308
    def _decorate(fn):
1✔
2309
        service_model = load_service(service_name, protocol=protocol)
1✔
2310
        operation_model = service_model.operation_model(operation)
1✔
2311
        serializer = create_serializer(service_model, protocol=protocol)
1✔
2312

2313
        def _proxy(*args, **kwargs) -> WerkzeugResponse:
1✔
2314
            # extract request from function invocation (decorator can be used for methods as well as for functions).
2315
            if len(args) > 0 and isinstance(args[0], WerkzeugRequest):
1✔
2316
                # function
2317
                request = args[0]
1✔
2318
            elif len(args) > 1 and isinstance(args[1], WerkzeugRequest):
1✔
2319
                # method (arg[0] == self)
2320
                request = args[1]
1✔
2321
            elif "request" in kwargs:
1✔
2322
                request = kwargs["request"]
1✔
2323
            else:
UNCOV
2324
                raise ValueError(f"could not find Request in signature of function {fn}")
×
2325

2326
            # TODO: we have no context here
2327
            # TODO: maybe try to get the request ID from the headers first before generating a new one
2328
            request_id = gen_amzn_requestid()
1✔
2329

2330
            try:
1✔
2331
                response = fn(*args, **kwargs)
1✔
2332

2333
                if isinstance(response, WerkzeugResponse):
1✔
2334
                    return response
1✔
2335

2336
                return serializer.serialize_to_response(
1✔
2337
                    response, operation_model, request.headers, request_id
2338
                )
2339

2340
            except ServiceException as e:
1✔
2341
                return serializer.serialize_error_to_response(
1✔
2342
                    e, operation_model, request.headers, request_id
2343
                )
2344
            except Exception as e:
1✔
2345
                return serializer.serialize_error_to_response(
1✔
2346
                    CommonServiceException(
2347
                        "InternalError", f"An internal error occurred: {e}", status_code=500
2348
                    ),
2349
                    operation_model,
2350
                    request.headers,
2351
                    request_id,
2352
                )
2353

2354
        return _proxy
1✔
2355

2356
    return _decorate
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc