• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20565403496

29 Dec 2025 05:11AM UTC coverage: 84.103% (-2.8%) from 86.921%
20565403496

Pull #13567

github

web-flow
Merge 4816837a5 into 2417384aa
Pull Request #13567: Update ASF APIs

67166 of 79862 relevant lines covered (84.1%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.56
/localstack-core/localstack/aws/protocol/serializer.py
1
"""
2
Response serializers for the different AWS service protocols.
3

4
The module contains classes that take a service's response dict, and
5
given an operation model, serialize the HTTP response according to the
6
specified output shape.
7

8
It can be seen as the counterpart to the ``parse`` module in ``botocore``
9
(which parses the result of these serializer). It has a lot of
10
similarities with the ``serialize`` module in ``botocore``, but
11
serves a different purpose (serializing responses instead of requests).
12

13
The different protocols have many similarities. The class hierarchy is
14
designed such that the serializers share as much logic as possible.
15
The class hierarchy looks as follows:
16
::
17
                                     ┌────────────────────┐
18
                                     │ ResponseSerializer │
19
                                     └────────────────────┘
20
                                           ▲    ▲    ▲
21
                 ┌─────────────────┬───────┘    │    └──────────────┬──────────────────────┐
22
    ┌────────────┴────────────┐    │    ┌───────┴──────────────┐    │         ┌────────────┴─────────────┐
23
    │BaseXMLResponseSerializer│    │    │JSONResponseSerializer│    │         │BaseCBORResponseSerializer│
24
    └─────────────────────────┘    │    └──────────────────────┘    │         └──────────────────────────┘
25
       ▲    ▲        ┌─────────────┴────────────┐     ▲       ┌─────┴─────────────────────┐  ▲         ▲
26
       │    │        │BaseRestResponseSerializer│     │       │BaseRpcV2ResponseSerializer│  │         │
27
       │    │        └──────────────────────────┘     │       └───────────────────────────┘  │         │
28
       │    │              ▲              ▲           │                          ▲           │         │
29
       │    │              │              │           │                          │           │         │
30
       │  ┌─┴──────────────┴────────┐  ┌──┴───────────┴───────────┐   ┌──────────┴───────────┴────┐    │
31
       │  │RestXMLResponseSerializer│  │RestJSONResponseSerializer│   │RpcV2CBORResponseSerializer│    │
32
       │  └─────────────────────────┘  └──────────────────────────┘   └───────────────────────────┘    │
33
 ┌─────┴──────────────────┐                                                                 ┌──────────┴─────────────┐
34
 │QueryResponseSerializer │                                                                 │ CBORResponseSerializer │
35
 └────────────────────────┘                                                                 └────────────────────────┘
36
             ▲
37
   ┌─────────┴───────────┐
38
   │EC2ResponseSerializer│
39
   └─────────────────────┘
40
::
41

42
The ``ResponseSerializer`` contains the logic that is used among all the
43
different protocols (``query``, ``json``, ``rest-json``, ``rest-xml``, ``cbor``
44
and ``ec2``).
45
The protocols relate to each other in the following ways:
46

47
* The ``query`` and the ``rest-xml`` protocols both have XML bodies in their
48
  responses which are serialized quite similarly (with some specifics for each
49
  type).
50
* The ``json`` and the ``rest-json`` protocols both have JSON bodies in their
51
  responses which are serialized the same way.
52
* The ``cbor`` protocol is not properly defined in the spec, but mirrors the
53
  ``json`` protocol.
54
* The ``rest-json`` and ``rest-xml`` protocols serialize some metadata in
55
  the HTTP response's header fields.
56
* The ``ec2`` protocol is basically similar to the ``query`` protocol with a
57
  specific error response formatting.
58
* The ``smithy-rpc-v2-cbor`` protocol defines a specific way to route request
59
  to services via the RPC v2 trait, and encodes its body with the CBOR format.
60

61
The serializer classes in this module correspond directly to the different
62
protocols. ``#create_serializer`` shows the explicit mapping between the
63
classes and the protocols.
64
The classes are structured as follows:
65

66
* The ``ResponseSerializer`` contains all the basic logic for the
67
  serialization which is shared among all different protocols.
68
* The ``BaseXMLResponseSerializer``, ``JSONResponseSerializer`` and
69
  ``BaseCBORResponseSerializer`` contain the logic for the XML, JSON
70
  and the CBOR serialization respectively.
71
* The ``BaseRestResponseSerializer`` contains the logic for the REST
72
  protocol specifics (i.e. specific HTTP header serializations).
73
* The ``BaseRpcV2ResponseSerializer`` contains the logic for the RPC v2
74
  protocol specifics (i.e. pretty bare, does not has any specific
75
  about body serialization).
76
* The ``RestXMLResponseSerializer`` and the ``RestJSONResponseSerializer``
77
  inherit the ReST specific logic from the ``BaseRestResponseSerializer``
78
  and the XML / JSON body serialization from their second super class.
79
* The ``RpcV2CBORResponseSerializer`` inherits the RPC v2 specific logic
80
  from the ``BaseRpcV2ResponseSerializer`` and the CBOR body serialization
81
  from its second super class.
82
* The ``CBORResponseSerializer`` contains the logic specific to the
83
  non-official ``cbor`` protocol, mirroring the ``json`` protocol but
84
  with CBOR encoded body
85

86
The services and their protocols are defined by using AWS's Smithy
87
(a language to define services in a - somewhat - protocol-agnostic
88
way). The "peculiarities" in this serializer code usually correspond
89
to certain so-called "traits" in Smithy.
90

91
The result of the serialization methods is the HTTP response which can
92
be sent back to the calling client.
93
"""
94

95
import abc
1✔
96
import base64
1✔
97
import datetime
1✔
98
import functools
1✔
99
import json
1✔
100
import logging
1✔
101
import math
1✔
102
import string
1✔
103
import struct
1✔
104
from abc import ABC
1✔
105
from binascii import crc32
1✔
106
from collections.abc import Iterable, Iterator
1✔
107
from email.utils import formatdate
1✔
108
from struct import pack
1✔
109
from typing import IO, Any
1✔
110
from xml.etree import ElementTree as ETree
1✔
111

112
import xmltodict
1✔
113
from botocore.model import (
1✔
114
    ListShape,
115
    MapShape,
116
    OperationModel,
117
    ServiceModel,
118
    Shape,
119
    ShapeResolver,
120
    StringShape,
121
    StructureShape,
122
)
123
from botocore.serialize import ISO8601, ISO8601_MICRO
1✔
124
from botocore.utils import calculate_md5, is_json_value_header, parse_to_aware_datetime
1✔
125

126
# cbor2: explicitly load from private _encoder module to avoid using the (non-patched) C-version
127
from cbor2._encoder import dumps as cbor2_dumps
1✔
128
from werkzeug import Request as WerkzeugRequest
1✔
129
from werkzeug import Response as WerkzeugResponse
1✔
130
from werkzeug.datastructures import Headers, MIMEAccept
1✔
131
from werkzeug.http import parse_accept_header
1✔
132

133
from localstack.aws.api import CommonServiceException, ServiceException
1✔
134
from localstack.aws.spec import ProtocolName, load_service
1✔
135
from localstack.constants import (
1✔
136
    APPLICATION_AMZ_CBOR_1_1,
137
    APPLICATION_AMZ_JSON_1_0,
138
    APPLICATION_AMZ_JSON_1_1,
139
    APPLICATION_CBOR,
140
    APPLICATION_JSON,
141
    APPLICATION_XML,
142
    TEXT_XML,
143
)
144
from localstack.http import Response
1✔
145
from localstack.utils.common import to_bytes, to_str
1✔
146
from localstack.utils.strings import long_uid
1✔
147
from localstack.utils.xml import strip_xmlns
1✔
148

149
LOG = logging.getLogger(__name__)
1✔
150

151
REQUEST_ID_CHARACTERS = string.digits + string.ascii_uppercase
1✔
152

153

154
class ResponseSerializerError(Exception):
1✔
155
    """
156
    Error which is thrown if the request serialization fails.
157
    Super class of all exceptions raised by the serializer.
158
    """
159

160
    pass
1✔
161

162

163
class UnknownSerializerError(ResponseSerializerError):
1✔
164
    """
165
    Error which indicates that the exception raised by the serializer could be caused by invalid data or by any other
166
    (unknown) issue. Errors like this should be reported and indicate an issue in the serializer itself.
167
    """
168

169
    pass
1✔
170

171

172
class ProtocolSerializerError(ResponseSerializerError):
1✔
173
    """
174
    Error which indicates that the given data is not compliant with the service's specification and cannot be
175
    serialized. This usually results in a response to the client with an HTTP 5xx status code (internal server error).
176
    """
177

178
    pass
1✔
179

180

181
def _handle_exceptions(func):
1✔
182
    """
183
    Decorator which handles the exceptions raised by the serializer. It ensures that all exceptions raised by the public
184
    methods of the parser are instances of ResponseSerializerError.
185
    :param func: to wrap in order to add the exception handling
186
    :return: wrapped function
187
    """
188

189
    @functools.wraps(func)
1✔
190
    def wrapper(*args, **kwargs):
1✔
191
        try:
1✔
192
            return func(*args, **kwargs)
1✔
193
        except ResponseSerializerError:
1✔
194
            raise
1✔
195
        except Exception as e:
1✔
196
            raise UnknownSerializerError(
1✔
197
                "An unknown error occurred when trying to serialize the response."
198
            ) from e
199

200
    return wrapper
1✔
201

202

203
class ResponseSerializer(abc.ABC):
1✔
204
    """
205
    The response serializer is responsible for the serialization of a service implementation's result to an actual
206
    HTTP response (which will be sent to the calling client).
207
    It is the base class of all serializers and therefore contains the basic logic which is used among all of them.
208
    """
209

210
    DEFAULT_ENCODING = "utf-8"
1✔
211
    # The default timestamp format is ISO8601, but this can be overwritten by subclasses.
212
    TIMESTAMP_FORMAT = "iso8601"
1✔
213
    # Event streaming binary data type mapping for type "string"
214
    AWS_BINARY_DATA_TYPE_STRING = 7
1✔
215
    # Defines the supported mime types of the specific serializer. Sorted by priority (preferred / default first).
216
    # Needs to be specified by subclasses.
217
    SUPPORTED_MIME_TYPES: list[str] = []
1✔
218

219
    @_handle_exceptions
1✔
220
    def serialize_to_response(
1✔
221
        self,
222
        response: dict,
223
        operation_model: OperationModel,
224
        headers: dict | Headers | None,
225
        request_id: str,
226
    ) -> Response:
227
        """
228
        Takes a response dict and serializes it to an actual HttpResponse.
229

230
        :param response: to serialize
231
        :param operation_model: specification of the service & operation containing information about the shape of the
232
                                service's output / response
233
        :param headers: the headers of the incoming request this response should be serialized for. This is necessary
234
                        for features like Content-Negotiation (define response content type based on request headers).
235
        :param request_id: autogenerated AWS request ID identifying the original request
236
        :return: Response which can be sent to the calling client
237
        :raises: ResponseSerializerError (either a ProtocolSerializerError or an UnknownSerializerError)
238
        """
239

240
        # determine the preferred mime type (based on the serializer's supported mime types and the Accept header)
241
        mime_type = self._get_mime_type(headers)
1✔
242

243
        # if the operation has a streaming output, handle the serialization differently
244
        if operation_model.has_event_stream_output:
1✔
245
            return self._serialize_event_stream(response, operation_model, mime_type, request_id)
1✔
246

247
        serialized_response = self._create_default_response(operation_model, mime_type)
1✔
248
        shape = operation_model.output_shape
1✔
249
        # The shape can also be none (for empty responses), but it still needs to be serialized (to add some metadata)
250
        shape_members = shape.members if shape is not None else None
1✔
251
        self._serialize_response(
1✔
252
            response,
253
            serialized_response,
254
            shape,
255
            shape_members,
256
            operation_model,
257
            mime_type,
258
            request_id,
259
        )
260
        serialized_response = self._prepare_additional_traits_in_response(
1✔
261
            serialized_response, operation_model, request_id
262
        )
263
        return serialized_response
1✔
264

265
    @_handle_exceptions
1✔
266
    def serialize_error_to_response(
1✔
267
        self,
268
        error: ServiceException,
269
        operation_model: OperationModel,
270
        headers: dict | Headers | None,
271
        request_id: str,
272
    ) -> Response:
273
        """
274
        Takes an error instance and serializes it to an actual HttpResponse.
275
        Therefore, this method is used for errors which should be serialized and transmitted to the calling client.
276

277
        :param error: to serialize
278
        :param operation_model: specification of the service & operation containing information about the shape of the
279
                                service's output / response
280
        :param headers: the headers of the incoming request this response should be serialized for. This is necessary
281
                        for features like Content-Negotiation (define response content type based on request headers).
282
        :param request_id: autogenerated AWS request ID identifying the original request
283
        :return: HttpResponse which can be sent to the calling client
284
        :raises: ResponseSerializerError (either a ProtocolSerializerError or an UnknownSerializerError)
285
        """
286
        # determine the preferred mime type (based on the serializer's supported mime types and the Accept header)
287
        mime_type = self._get_mime_type(headers)
1✔
288

289
        # TODO implement streaming error serialization
290
        serialized_response = self._create_default_response(operation_model, mime_type)
1✔
291
        if not error or not isinstance(error, ServiceException):
1✔
292
            raise ProtocolSerializerError(
1✔
293
                f"Error to serialize ({error.__class__.__name__ if error else None}) is not a ServiceException."
294
            )
295
        shape = operation_model.service_model.shape_for_error_code(error.code)
1✔
296
        serialized_response.status_code = self._get_error_status_code(
1✔
297
            error=error,
298
            headers=headers,
299
            service_model=operation_model.service_model,
300
        )
301

302
        self._serialize_error(
1✔
303
            error, serialized_response, shape, operation_model, mime_type, request_id
304
        )
305
        serialized_response = self._prepare_additional_traits_in_response(
1✔
306
            serialized_response, operation_model, request_id
307
        )
308
        return serialized_response
1✔
309

310
    def _serialize_response(
1✔
311
        self,
312
        parameters: dict,
313
        response: Response,
314
        shape: Shape | None,
315
        shape_members: dict,
316
        operation_model: OperationModel,
317
        mime_type: str,
318
        request_id: str,
319
    ) -> None:
320
        raise NotImplementedError
321

322
    def _serialize_body_params(
1✔
323
        self,
324
        params: dict,
325
        shape: Shape,
326
        operation_model: OperationModel,
327
        mime_type: str,
328
        request_id: str,
329
    ) -> str | None:
330
        """
331
        Actually serializes the given params for the given shape to a string for the transmission in the body of the
332
        response.
333
        :param params: to serialize
334
        :param shape: to know how to serialize the params
335
        :param operation_model: for additional metadata
336
        :param mime_type: Mime type which should be used to encode the payload
337
        :param request_id: autogenerated AWS request ID identifying the original request
338
        :return: string containing the serialized body
339
        """
340
        raise NotImplementedError
341

342
    def _serialize_error(
1✔
343
        self,
344
        error: ServiceException,
345
        response: Response,
346
        shape: StructureShape,
347
        operation_model: OperationModel,
348
        mime_type: str,
349
        request_id: str,
350
    ) -> None:
351
        raise NotImplementedError
352

353
    def _serialize_event_stream(
1✔
354
        self,
355
        response: dict,
356
        operation_model: OperationModel,
357
        mime_type: str,
358
        request_id: str,
359
    ) -> Response:
360
        """
361
        Serializes a given response dict (the return payload of a service implementation) to an _event stream_ using the
362
        given operation model.
363

364
        :param response: dictionary containing the payload for the response
365
        :param operation_model: describing the operation the response dict is being returned by
366
        :param mime_type: Mime type which should be used to encode the payload
367
        :param request_id: autogenerated AWS request ID identifying the original request
368
        :return: Response which can directly be sent to the client (in chunks)
369
        """
370
        event_stream_shape = operation_model.get_event_stream_output()
1✔
371
        event_stream_member_name = operation_model.output_shape.event_stream_name
1✔
372

373
        # wrap the generator in operation specific serialization
374
        def event_stream_serializer() -> Iterable[bytes]:
1✔
375
            yield self._encode_event_payload("initial-response")
1✔
376

377
            # create a default response
378
            serialized_event_response = self._create_default_response(operation_model, mime_type)
1✔
379
            # get the members of the event stream shape
380
            event_stream_shape_members = (
1✔
381
                event_stream_shape.members if event_stream_shape is not None else None
382
            )
383
            # extract the generator from the given response data
384
            event_generator = response.get(event_stream_member_name)
1✔
385
            if not isinstance(event_generator, Iterator):
1✔
386
                raise ProtocolSerializerError(
×
387
                    "Expected iterator for streaming event serialization."
388
                )
389

390
            # yield one event per generated event
391
            for event in event_generator:
1✔
392
                # find the actual event payload (the member with event=true)
393
                event_member_shape = None
1✔
394
                event_member_name = None
1✔
395
                for member_name, member_shape in event_stream_shape_members.items():
1✔
396
                    if member_shape.serialization.get("event") and member_name in event:
1✔
397
                        event_member_shape = member_shape
1✔
398
                        event_member_name = member_name
1✔
399
                        break
1✔
400
                if event_member_shape is None:
1✔
401
                    raise UnknownSerializerError("Couldn't find event shape for serialization.")
×
402

403
                # serialize the part of the response for the event
404
                self._serialize_response(
1✔
405
                    event.get(event_member_name),
406
                    serialized_event_response,
407
                    event_member_shape,
408
                    event_member_shape.members if event_member_shape is not None else None,
409
                    operation_model,
410
                    mime_type,
411
                    request_id,
412
                )
413
                # execute additional response traits (might be modifying the response)
414
                serialized_event_response = self._prepare_additional_traits_in_response(
1✔
415
                    serialized_event_response, operation_model, request_id
416
                )
417
                # encode the event and yield it
418
                yield self._encode_event_payload(
1✔
419
                    event_type=event_member_name, content=serialized_event_response.data
420
                )
421

422
        return Response(
1✔
423
            response=event_stream_serializer(),
424
            status=operation_model.http.get("responseCode", 200),
425
        )
426

427
    def _encode_event_payload(
1✔
428
        self,
429
        event_type: str,
430
        content: str | bytes = "",
431
        error_code: str | None = None,
432
        error_message: str | None = None,
433
    ) -> bytes:
434
        """
435
        Encodes the given event payload according to AWS specific binary event encoding.
436
        A specification of the format can be found in the AWS docs:
437
        https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html
438

439
        :param content: string or bytes of the event payload
440
        :param event_type: type of the event. Usually the name of the event shape or specific event types like
441
                            "initial-response".
442
        :param error_code: Optional. Error code if the payload represents an error.
443
        :param error_message: Optional. Error message if the payload represents an error.
444
        :return: bytes with the AWS-specific encoded event payload
445
        """
446

447
        # determine the event type (error if an error message or an error code is set)
448
        if error_message or error_code:
1✔
449
            message_type = "error"
×
450
        else:
451
            message_type = "event"
1✔
452

453
        # set the headers
454
        headers = {":event-type": event_type, ":message-type": message_type}
1✔
455
        if error_message:
1✔
456
            headers[":error-message"] = error_message
×
457
        if error_code:
1✔
458
            headers[":error-code"] = error_code
×
459

460
        # construct headers
461
        header_section = b""
1✔
462
        for key, value in headers.items():
1✔
463
            header_name = key.encode(self.DEFAULT_ENCODING)
1✔
464
            header_value = to_bytes(value)
1✔
465
            header_section += pack("!B", len(header_name))
1✔
466
            header_section += header_name
1✔
467
            header_section += pack("!B", self.AWS_BINARY_DATA_TYPE_STRING)
1✔
468
            header_section += pack("!H", len(header_value))
1✔
469
            header_section += header_value
1✔
470

471
        # construct body
472
        if isinstance(content, str):
1✔
473
            payload = bytes(content, self.DEFAULT_ENCODING)
1✔
474
        else:
475
            payload = content
1✔
476

477
        # calculate lengths
478
        headers_length = len(header_section)
1✔
479
        payload_length = len(payload)
1✔
480

481
        # construct message
482
        # - prelude
483
        result = pack("!I", payload_length + headers_length + 16)
1✔
484
        result += pack("!I", headers_length)
1✔
485
        # - prelude crc
486
        prelude_crc = crc32(result)
1✔
487
        result += pack("!I", prelude_crc)
1✔
488
        # - headers
489
        result += header_section
1✔
490
        # - payload
491
        result += payload
1✔
492
        # - message crc
493
        payload_crc = crc32(result)
1✔
494
        result += pack("!I", payload_crc)
1✔
495

496
        return result
1✔
497

498
    def _create_default_response(self, operation_model: OperationModel, mime_type: str) -> Response:
1✔
499
        """
500
        Creates a boilerplate default response to be used by subclasses as starting points.
501
        Uses the default HTTP response status code defined in the operation model (if defined), otherwise 200.
502

503
        :param operation_model: to extract the default HTTP status code
504
        :param mime_type: Mime type which should be used to encode the payload
505
        :return: boilerplate HTTP response
506
        """
507
        return Response(status=operation_model.http.get("responseCode", 200))
1✔
508

509
    def _get_mime_type(self, headers: dict | Headers | None) -> str:
1✔
510
        """
511
        Extracts the accepted mime type from the request headers and returns a matching, supported mime type for the
512
        serializer or the default mime type of the service if there is no match.
513
        :param headers: to extract the "Accept" header from
514
        :return: preferred mime type to be used by the serializer (if it is not accepted by the client,
515
                 an error is logged)
516
        """
517
        accept_header = None
1✔
518
        if headers and "Accept" in headers and not headers.get("Accept") == "*/*":
1✔
519
            accept_header = headers.get("Accept")
1✔
520
        elif headers and headers.get("Content-Type"):
1✔
521
            # If there is no specific Accept header given, we use the given Content-Type as a fallback.
522
            # i.e. if the request content was JSON encoded and the client doesn't send a specific an Accept header, the
523
            # serializer should prefer JSON encoding.
524
            content_type = headers.get("Content-Type")
1✔
525
            LOG.debug(
1✔
526
                "No accept header given. Using request's Content-Type (%s) as preferred response Content-Type.",
527
                content_type,
528
            )
529
            accept_header = content_type + ", */*"
1✔
530
        mime_accept: MIMEAccept = parse_accept_header(accept_header, MIMEAccept)
1✔
531
        mime_type = mime_accept.best_match(self.SUPPORTED_MIME_TYPES)
1✔
532
        if not mime_type:
1✔
533
            # There is no match between the supported mime types and the requested one(s)
534
            mime_type = self.SUPPORTED_MIME_TYPES[0]
1✔
535
            LOG.debug(
1✔
536
                "Determined accept type (%s) is not supported by this serializer. Using default of this serializer: %s",
537
                accept_header,
538
                mime_type,
539
            )
540
        return mime_type
1✔
541

542
    # Some extra utility methods subclasses can use.
543

544
    @staticmethod
1✔
545
    def _timestamp_iso8601(value: datetime.datetime) -> str:
1✔
546
        if value.microsecond > 0:
1✔
547
            timestamp_format = ISO8601_MICRO
1✔
548
        else:
549
            timestamp_format = ISO8601
1✔
550
        return value.strftime(timestamp_format)
1✔
551

552
    @staticmethod
1✔
553
    def _timestamp_unixtimestamp(value: datetime.datetime) -> float:
1✔
554
        return value.timestamp()
1✔
555

556
    def _timestamp_rfc822(self, value: datetime.datetime) -> str:
1✔
557
        if isinstance(value, datetime.datetime):
1✔
558
            value = self._timestamp_unixtimestamp(value)
1✔
559
        return formatdate(value, usegmt=True)
1✔
560

561
    def _convert_timestamp_to_str(
1✔
562
        self, value: int | str | datetime.datetime, timestamp_format=None
563
    ) -> str:
564
        if timestamp_format is None:
1✔
565
            timestamp_format = self.TIMESTAMP_FORMAT
1✔
566
        timestamp_format = timestamp_format.lower()
1✔
567
        datetime_obj = parse_to_aware_datetime(value)
1✔
568
        converter = getattr(self, f"_timestamp_{timestamp_format}")
1✔
569
        final_value = converter(datetime_obj)
1✔
570
        return final_value
1✔
571

572
    @staticmethod
1✔
573
    def _get_serialized_name(shape: Shape, default_name: str) -> str:
1✔
574
        """
575
        Returns the serialized name for the shape if it exists.
576
        Otherwise, it will return the passed in default_name.
577
        """
578
        return shape.serialization.get("name", default_name)
1✔
579

580
    def _get_base64(self, value: str | bytes):
1✔
581
        """
582
        Returns the base64-encoded version of value, handling
583
        both strings and bytes. The returned value is a string
584
        via the default encoding.
585
        """
586
        if isinstance(value, str):
1✔
587
            value = value.encode(self.DEFAULT_ENCODING)
×
588
        return base64.b64encode(value).strip().decode(self.DEFAULT_ENCODING)
1✔
589

590
    def _encode_payload(self, body: bytes | str) -> bytes:
1✔
591
        if isinstance(body, str):
1✔
592
            return body.encode(self.DEFAULT_ENCODING)
1✔
593
        return body
1✔
594

595
    def _prepare_additional_traits_in_response(
1✔
596
        self, response: Response, operation_model: OperationModel, request_id: str
597
    ):
598
        """Applies additional traits on the raw response for a given model or protocol."""
599
        if operation_model.http_checksum_required:
1✔
600
            self._add_md5_header(response)
×
601
        return response
1✔
602

603
    def _has_header(self, header_name: str, headers: dict):
1✔
604
        """Case-insensitive check for header key."""
605
        if header_name is None:
1✔
606
            return False
×
607
        else:
608
            return header_name.lower() in [key.lower() for key in headers.keys()]
1✔
609

610
    def _add_md5_header(self, response: Response):
1✔
611
        """Add a Content-MD5 header if not yet there. Adapted from botocore.utils"""
612
        headers = response.headers
×
613
        body = response.data
×
614
        if body is not None and "Content-MD5" not in headers:
×
615
            md5_digest = calculate_md5(body)
×
616
            headers["Content-MD5"] = md5_digest
×
617

618
    def _get_error_message(self, error: Exception) -> str | None:
1✔
619
        return str(error) if error is not None and str(error) != "None" else None
1✔
620

621
    def _get_error_status_code(
1✔
622
        self, error: ServiceException, headers: Headers, service_model: ServiceModel
623
    ) -> int:
624
        return error.status_code
1✔
625

626

627
class QueryCompatibleProtocolMixin:
1✔
628
    def _get_error_status_code(
1✔
629
        self, error: ServiceException, headers: dict | Headers | None, service_model: ServiceModel
630
    ) -> int:
631
        # by default, some protocols (namely `json` and `smithy-rpc-v2-cbor`) might not define exception status code in
632
        # their specs, so they are not defined in the `ServiceException` object and will use the default value of `400`
633
        # But Query compatible service always do define them, so we get the wrong code for service that are
634
        # multi-protocols like CloudWatch
635
        # we need to verify if the service is compatible, and if the client has requested the query compatible error
636
        # code to return the right value
637
        if not service_model.is_query_compatible:
1✔
638
            return error.status_code
1✔
639

640
        if headers and headers.get("x-amzn-query-mode") == "true":
1✔
641
            return error.status_code
1✔
642

643
        # we only want to override status code 4XX
644
        if 400 < error.status_code <= 499:
1✔
645
            return 400
1✔
646

647
        return error.status_code
1✔
648

649
    def _add_query_compatible_error_header(self, response: Response, error: ServiceException):
1✔
650
        """
651
        Add an `x-amzn-query-error` header for client to  translate errors codes from former `query` services
652
        into other protocols.
653
        """
654

655
        sender_fault = "Sender" if error.sender_fault else "Receiver"
1✔
656
        response.headers["x-amzn-query-error"] = f"{error.code};{sender_fault}"
1✔
657

658
    def _get_error_code(
1✔
659
        self, is_query_compatible: bool, error: ServiceException, shape: Shape | None = None
660
    ):
661
        # if the operation is query compatible, we need to add to use shape name
662
        if is_query_compatible:
1✔
663
            if shape:
1✔
664
                code = shape.name
1✔
665
            else:
666
                # if the shape is not defined, we are using the Exception named to derive the `Code`, like you would
667
                # from the shape. This allows us to have Exception that are valid in multi-protocols by defining its
668
                # code and its name to be different
669
                code = error.__class__.__name__
1✔
670
        else:
671
            code = error.code
1✔
672

673
        return code
1✔
674

675

676
class BaseXMLResponseSerializer(ResponseSerializer):
1✔
677
    """
678
    The BaseXMLResponseSerializer performs the basic logic for the XML response serialization.
679
    It is slightly adapted by the QueryResponseSerializer.
680
    While the botocore's RestXMLSerializer is quite similar, there are some subtle differences (since botocore's
681
    implementation handles the serialization of the requests from the client to the service, not the responses from the
682
    service to the client).
683
    """
684

685
    SUPPORTED_MIME_TYPES = [TEXT_XML, APPLICATION_XML, APPLICATION_JSON]
1✔
686

687
    def _serialize_error(
1✔
688
        self,
689
        error: ServiceException,
690
        response: Response,
691
        shape: StructureShape,
692
        operation_model: OperationModel,
693
        mime_type: str,
694
        request_id: str,
695
    ) -> None:
696
        # Check if we need to add a namespace
697
        attr = (
1✔
698
            {"xmlns": operation_model.metadata.get("xmlNamespace")}
699
            if "xmlNamespace" in operation_model.metadata
700
            else {}
701
        )
702
        root = ETree.Element("ErrorResponse", attr)
1✔
703

704
        error_tag = ETree.SubElement(root, "Error")
1✔
705
        self._add_error_tags(error, error_tag, mime_type)
1✔
706
        request_id_element = ETree.SubElement(root, "RequestId")
1✔
707
        request_id_element.text = request_id
1✔
708

709
        self._add_additional_error_tags(vars(error), root, shape, mime_type)
1✔
710

711
        response.set_response(self._encode_payload(self._node_to_string(root, mime_type)))
1✔
712

713
    def _add_error_tags(
1✔
714
        self, error: ServiceException, error_tag: ETree.Element, mime_type: str
715
    ) -> None:
716
        code_tag = ETree.SubElement(error_tag, "Code")
1✔
717
        code_tag.text = error.code
1✔
718
        message = self._get_error_message(error)
1✔
719
        if message:
1✔
720
            self._default_serialize(error_tag, message, None, "Message", mime_type)
1✔
721
        if error.sender_fault:
1✔
722
            # The sender fault is either not set or "Sender"
723
            self._default_serialize(error_tag, "Sender", None, "Type", mime_type)
1✔
724

725
    def _add_additional_error_tags(
1✔
726
        self, parameters: dict, node: ETree, shape: StructureShape, mime_type: str
727
    ):
728
        if shape:
1✔
729
            params = {}
1✔
730
            # TODO add a possibility to serialize simple non-modelled errors (like S3 NoSuchBucket#BucketName)
731
            for member in shape.members:
1✔
732
                # XML protocols do not add modeled default fields to the root node
733
                # (tested for cloudfront, route53, cloudwatch, iam)
734
                if member.lower() not in ["code", "message"] and member in parameters:
1✔
735
                    params[member] = parameters[member]
1✔
736

737
            # If there is an error shape with members which should be set, they need to be added to the node
738
            if params:
1✔
739
                # Serialize the remaining params
740
                root_name = shape.serialization.get("name", shape.name)
1✔
741
                pseudo_root = ETree.Element("")
1✔
742
                self._serialize(shape, params, pseudo_root, root_name, mime_type)
1✔
743
                real_root = list(pseudo_root)[0]
1✔
744
                # Add the child elements to the already created root error element
745
                for child in list(real_root):
1✔
746
                    node.append(child)
1✔
747

748
    def _serialize_body_params(
1✔
749
        self,
750
        params: dict,
751
        shape: Shape,
752
        operation_model: OperationModel,
753
        mime_type: str,
754
        request_id: str,
755
    ) -> str | None:
756
        root = self._serialize_body_params_to_xml(params, shape, operation_model, mime_type)
1✔
757
        self._prepare_additional_traits_in_xml(root, request_id)
1✔
758
        return self._node_to_string(root, mime_type)
1✔
759

760
    def _serialize_body_params_to_xml(
1✔
761
        self, params: dict, shape: Shape, operation_model: OperationModel, mime_type: str
762
    ) -> ETree.Element | None:
763
        if shape is None:
1✔
764
            return
1✔
765
        # The botocore serializer expects `shape.serialization["name"]`, but this isn't always present for responses
766
        root_name = shape.serialization.get("name", shape.name)
1✔
767
        pseudo_root = ETree.Element("")
1✔
768
        self._serialize(shape, params, pseudo_root, root_name, mime_type)
1✔
769
        real_root = list(pseudo_root)[0]
1✔
770
        return real_root
1✔
771

772
    def _serialize(
1✔
773
        self, shape: Shape, params: Any, xmlnode: ETree.Element, name: str, mime_type: str
774
    ) -> None:
775
        """This method dynamically invokes the correct `_serialize_type_*` method for each shape type."""
776
        if shape is None:
1✔
777
            return
×
778
        # Some output shapes define a `resultWrapper` in their serialization spec.
779
        # While the name would imply that the result is _wrapped_, it is actually renamed.
780
        if shape.serialization.get("resultWrapper"):
1✔
781
            name = shape.serialization.get("resultWrapper")
1✔
782

783
        try:
1✔
784
            method = getattr(self, f"_serialize_type_{shape.type_name}", self._default_serialize)
1✔
785
            method(xmlnode, params, shape, name, mime_type)
1✔
786
        except (TypeError, ValueError, AttributeError) as e:
1✔
787
            raise ProtocolSerializerError(
1✔
788
                f"Invalid type when serializing {shape.name}: '{xmlnode}' cannot be parsed to {shape.type_name}."
789
            ) from e
790

791
    def _serialize_type_structure(
1✔
792
        self, xmlnode: ETree.Element, params: dict, shape: StructureShape, name: str, mime_type
793
    ) -> None:
794
        structure_node = ETree.SubElement(xmlnode, name)
1✔
795

796
        if "xmlNamespace" in shape.serialization:
1✔
797
            namespace_metadata = shape.serialization["xmlNamespace"]
1✔
798
            attribute_name = "xmlns"
1✔
799
            if namespace_metadata.get("prefix"):
1✔
800
                attribute_name += ":{}".format(namespace_metadata["prefix"])
1✔
801
            structure_node.attrib[attribute_name] = namespace_metadata["uri"]
1✔
802
        for key, value in params.items():
1✔
803
            if value is None:
1✔
804
                # Don't serialize any param whose value is None.
805
                continue
1✔
806
            try:
1✔
807
                member_shape = shape.members[key]
1✔
808
            except KeyError:
1✔
809
                LOG.warning(
1✔
810
                    "Response object %s contains a member which is not specified: %s",
811
                    shape.name,
812
                    key,
813
                )
814
                continue
1✔
815
            member_name = member_shape.serialization.get("name", key)
1✔
816
            # We need to special case member shapes that are marked as an xmlAttribute.
817
            # Rather than serializing into an XML child node, we instead serialize the shape to
818
            # an XML attribute of the *current* node.
819
            if member_shape.serialization.get("xmlAttribute"):
1✔
820
                # xmlAttributes must have a serialization name.
821
                xml_attribute_name = member_shape.serialization["name"]
1✔
822
                structure_node.attrib[xml_attribute_name] = value
1✔
823
                continue
1✔
824
            self._serialize(member_shape, value, structure_node, member_name, mime_type)
1✔
825

826
    def _serialize_type_list(
1✔
827
        self, xmlnode: ETree.Element, params: list, shape: ListShape, name: str, mime_type: str
828
    ) -> None:
829
        if params is None:
1✔
830
            # Don't serialize any param whose value is None.
831
            return
×
832
        member_shape = shape.member
1✔
833
        if shape.serialization.get("flattened"):
1✔
834
            # If the list is flattened, either take the member's "name" or the name of the usual name for the parent
835
            # element for the children.
836
            element_name = self._get_serialized_name(member_shape, name)
1✔
837
            list_node = xmlnode
1✔
838
        else:
839
            element_name = self._get_serialized_name(member_shape, "member")
1✔
840
            list_node = ETree.SubElement(xmlnode, name)
1✔
841
        for item in params:
1✔
842
            # Don't serialize any item which is None
843
            if item is not None:
1✔
844
                self._serialize(member_shape, item, list_node, element_name, mime_type)
1✔
845

846
    def _serialize_type_map(
1✔
847
        self, xmlnode: ETree.Element, params: dict, shape: MapShape, name: str, mime_type: str
848
    ) -> None:
849
        """
850
        Given the ``name`` of MyMap, an input of {"key1": "val1", "key2": "val2"}, and the ``flattened: False``
851
        we serialize this as:
852
          <MyMap>
853
            <entry>
854
              <key>key1</key>
855
              <value>val1</value>
856
            </entry>
857
            <entry>
858
              <key>key2</key>
859
              <value>val2</value>
860
            </entry>
861
          </MyMap>
862
        If it is flattened, it is serialized as follows:
863
          <MyMap>
864
            <key>key1</key>
865
            <value>val1</value>
866
          </MyMap>
867
          <MyMap>
868
            <key>key2</key>
869
            <value>val2</value>
870
          </MyMap>
871
        """
872
        if params is None:
1✔
873
            # Don't serialize a non-existing map
874
            return
×
875
        if shape.serialization.get("flattened"):
1✔
876
            entries_node = xmlnode
1✔
877
            entry_node_name = name
1✔
878
        else:
879
            entries_node = ETree.SubElement(xmlnode, name)
1✔
880
            entry_node_name = "entry"
1✔
881

882
        for key, value in params.items():
1✔
883
            if value is None:
1✔
884
                # Don't serialize any param whose value is None.
885
                continue
×
886
            entry_node = ETree.SubElement(entries_node, entry_node_name)
1✔
887
            key_name = self._get_serialized_name(shape.key, default_name="key")
1✔
888
            val_name = self._get_serialized_name(shape.value, default_name="value")
1✔
889
            self._serialize(shape.key, key, entry_node, key_name, mime_type)
1✔
890
            self._serialize(shape.value, value, entry_node, val_name, mime_type)
1✔
891

892
    @staticmethod
1✔
893
    def _serialize_type_boolean(xmlnode: ETree.Element, params: bool, _, name: str, __) -> None:
1✔
894
        """
895
        For scalar types, the 'params' attr is actually just a scalar value representing the data
896
        we need to serialize as a boolean. It will either be 'true' or 'false'
897
        """
898
        node = ETree.SubElement(xmlnode, name)
1✔
899
        if params:
1✔
900
            str_value = "true"
1✔
901
        else:
902
            str_value = "false"
1✔
903
        node.text = str_value
1✔
904

905
    def _serialize_type_blob(
1✔
906
        self, xmlnode: ETree.Element, params: str | bytes, _, name: str, __
907
    ) -> None:
908
        node = ETree.SubElement(xmlnode, name)
1✔
909
        node.text = self._get_base64(params)
1✔
910

911
    def _serialize_type_timestamp(
1✔
912
        self, xmlnode: ETree.Element, params: str, shape: Shape, name: str, mime_type: str
913
    ) -> None:
914
        node = ETree.SubElement(xmlnode, name)
1✔
915
        if mime_type != APPLICATION_JSON:
1✔
916
            # Default XML timestamp serialization
917
            node.text = self._convert_timestamp_to_str(
1✔
918
                params, shape.serialization.get("timestampFormat")
919
            )
920
        else:
921
            # For services with XML protocols, where the Accept header is JSON, timestamps are formatted like for JSON
922
            # protocols, but using the int representation instead of the float representation (f.e. requesting JSON
923
            # responses in STS).
924
            node.text = str(
1✔
925
                int(self._convert_timestamp_to_str(params, JSONResponseSerializer.TIMESTAMP_FORMAT))
926
            )
927

928
    def _default_serialize(self, xmlnode: ETree.Element, params: str, _, name: str, __) -> None:
1✔
929
        node = ETree.SubElement(xmlnode, name)
1✔
930
        node.text = str(params)
1✔
931

932
    def _prepare_additional_traits_in_xml(self, root: ETree.Element | None, request_id: str):
1✔
933
        """
934
        Prepares the XML root node before being serialized with additional traits (like the Response ID in the Query
935
        protocol).
936
        For some protocols (like rest-xml), the root can be None.
937
        """
938
        pass
1✔
939

940
    def _create_default_response(self, operation_model: OperationModel, mime_type: str) -> Response:
1✔
941
        response = super()._create_default_response(operation_model, mime_type)
1✔
942
        response.headers["Content-Type"] = mime_type
1✔
943
        return response
1✔
944

945
    def _node_to_string(self, root: ETree.Element | None, mime_type: str) -> str | None:
1✔
946
        """Generates the string representation of the given XML element."""
947
        if root is not None:
1✔
948
            content = ETree.tostring(
1✔
949
                element=root, encoding=self.DEFAULT_ENCODING, xml_declaration=True
950
            )
951
            if mime_type == APPLICATION_JSON:
1✔
952
                # FIXME try to directly convert the ElementTree node to JSON
953
                xml_dict = xmltodict.parse(content)
1✔
954
                xml_dict = strip_xmlns(xml_dict)
1✔
955
                content = json.dumps(xml_dict)
1✔
956
            return content
1✔
957

958

959
class BaseRestResponseSerializer(ResponseSerializer, ABC):
1✔
960
    """
961
    The BaseRestResponseSerializer performs the basic logic for the ReST response serialization.
962
    In our case it basically only adds the request metadata to the HTTP header.
963
    """
964

965
    HEADER_TIMESTAMP_FORMAT = "rfc822"
1✔
966

967
    def _serialize_response(
1✔
968
        self,
969
        parameters: dict,
970
        response: Response,
971
        shape: Shape | None,
972
        shape_members: dict,
973
        operation_model: OperationModel,
974
        mime_type: str,
975
        request_id: str,
976
    ) -> None:
977
        header_params, payload_params = self._partition_members(parameters, shape)
1✔
978
        self._process_header_members(header_params, response, shape)
1✔
979
        # "HEAD" responses are basically "GET" responses without the actual body.
980
        # Do not process the body payload in this case (setting a body could also manipulate the headers)
981
        if operation_model.http.get("method") != "HEAD":
1✔
982
            self._serialize_payload(
1✔
983
                payload_params,
984
                response,
985
                shape,
986
                shape_members,
987
                operation_model,
988
                mime_type,
989
                request_id,
990
            )
991
        self._serialize_content_type(response, shape, shape_members, mime_type)
1✔
992
        self._prepare_additional_traits_in_response(response, operation_model, request_id)
1✔
993

994
    def _serialize_payload(
1✔
995
        self,
996
        parameters: dict,
997
        response: Response,
998
        shape: Shape | None,
999
        shape_members: dict,
1000
        operation_model: OperationModel,
1001
        mime_type: str,
1002
        request_id: str,
1003
    ) -> None:
1004
        """
1005
        Serializes the given payload.
1006

1007
        :param parameters: The user input params
1008
        :param response: The final serialized Response
1009
        :param shape: Describes the expected output shape (can be None in case of an "empty" response)
1010
        :param shape_members: The members of the output struct shape
1011
        :param operation_model: The specification of the operation of which the response is serialized here
1012
        :param mime_type: Mime type which should be used to encode the payload
1013
        :param request_id: autogenerated AWS request ID identifying the original request
1014
        :return: None - the given `serialized` dict is modified
1015
        """
1016
        if shape is None:
1✔
1017
            return
1✔
1018

1019
        payload_member = shape.serialization.get("payload")
1✔
1020
        # If this shape is defined as being an event, we need to search for the payload member
1021
        if not payload_member and shape.serialization.get("event"):
1✔
1022
            for member_name, member_shape in shape_members.items():
1✔
1023
                # Try to find the first shape which is marked as "eventpayload" and is given in the params dict
1024
                if member_shape.serialization.get("eventpayload") and parameters.get(member_name):
1✔
1025
                    payload_member = member_name
1✔
1026
                    break
1✔
1027
        if payload_member is not None and shape_members[payload_member].type_name in [
1✔
1028
            "blob",
1029
            "string",
1030
        ]:
1031
            # If it's streaming, then the body is just the value of the payload.
1032
            body_payload = parameters.get(payload_member, b"")
1✔
1033
            body_payload = self._encode_payload(body_payload)
1✔
1034
            response.set_response(body_payload)
1✔
1035
        elif payload_member is not None:
1✔
1036
            # If there's a payload member, we serialized that member to the body.
1037
            body_params = parameters.get(payload_member)
1✔
1038
            if body_params is not None:
1✔
1039
                response.set_response(
1✔
1040
                    self._encode_payload(
1041
                        self._serialize_body_params(
1042
                            body_params,
1043
                            shape_members[payload_member],
1044
                            operation_model,
1045
                            mime_type,
1046
                            request_id,
1047
                        )
1048
                    )
1049
                )
1050
        else:
1051
            # Otherwise, we use the "traditional" way of serializing the whole parameters dict recursively.
1052
            response.set_response(
1✔
1053
                self._encode_payload(
1054
                    self._serialize_body_params(
1055
                        parameters, shape, operation_model, mime_type, request_id
1056
                    )
1057
                )
1058
            )
1059

1060
    def _serialize_content_type(
1✔
1061
        self, serialized: Response, shape: Shape, shape_members: dict, mime_type: str
1062
    ):
1063
        """
1064
        Some protocols require varied Content-Type headers depending on user input.
1065
        This allows subclasses to apply this conditionally.
1066
        """
1067
        pass
1✔
1068

1069
    def _has_streaming_payload(self, payload: str | None, shape_members):
1✔
1070
        """Determine if payload is streaming (a blob or string)."""
1071
        return payload is not None and shape_members[payload].type_name in ["blob", "string"]
1✔
1072

1073
    def _prepare_additional_traits_in_response(
1✔
1074
        self, response: Response, operation_model: OperationModel, request_id: str
1075
    ):
1076
        """Adds the request ID to the headers (in contrast to the body - as in the Query protocol)."""
1077
        response = super()._prepare_additional_traits_in_response(
1✔
1078
            response, operation_model, request_id
1079
        )
1080
        response.headers["x-amz-request-id"] = request_id
1✔
1081
        return response
1✔
1082

1083
    def _process_header_members(self, parameters: dict, response: Response, shape: Shape):
1✔
1084
        shape_members = shape.members if isinstance(shape, StructureShape) else []
1✔
1085
        for name in shape_members:
1✔
1086
            member_shape = shape_members[name]
1✔
1087
            location = member_shape.serialization.get("location")
1✔
1088
            if not location:
1✔
1089
                continue
1✔
1090
            if name not in parameters:
1✔
1091
                # ignores optional keys
1092
                continue
1✔
1093
            key = member_shape.serialization.get("name", name)
1✔
1094
            value = parameters[name]
1✔
1095
            if value is None:
1✔
1096
                continue
×
1097
            if location == "header":
1✔
1098
                response.headers[key] = self._serialize_header_value(member_shape, value)
1✔
1099
            elif location == "headers":
1✔
1100
                header_prefix = key
1✔
1101
                self._serialize_header_map(header_prefix, response, value)
1✔
1102
            elif location == "statusCode":
1✔
1103
                response.status_code = int(value)
1✔
1104

1105
    def _serialize_header_map(self, prefix: str, response: Response, params: dict) -> None:
1✔
1106
        """Serializes the header map for the location trait "headers"."""
1107
        for key, val in params.items():
1✔
1108
            actual_key = prefix + key
1✔
1109
            response.headers[actual_key] = val
1✔
1110

1111
    def _serialize_header_value(self, shape: Shape, value: Any):
1✔
1112
        """Serializes a value for the location trait "header"."""
1113
        if shape.type_name == "timestamp":
1✔
1114
            datetime_obj = parse_to_aware_datetime(value)
1✔
1115
            timestamp_format = shape.serialization.get(
1✔
1116
                "timestampFormat", self.HEADER_TIMESTAMP_FORMAT
1117
            )
1118
            return self._convert_timestamp_to_str(datetime_obj, timestamp_format)
1✔
1119
        elif shape.type_name == "list":
1✔
1120
            converted_value = [
×
1121
                self._serialize_header_value(shape.member, v) for v in value if v is not None
1122
            ]
1123
            return ",".join(converted_value)
×
1124
        elif shape.type_name == "boolean":
1✔
1125
            # Set the header value to "true" if the given value is truthy, otherwise set the header value to "false".
1126
            return "true" if value else "false"
1✔
1127
        elif is_json_value_header(shape):
1✔
1128
            # Serialize with no spaces after separators to save space in
1129
            # the header.
1130
            return self._get_base64(json.dumps(value, separators=(",", ":")))
×
1131
        else:
1132
            return value
1✔
1133

1134
    def _partition_members(self, parameters: dict, shape: Shape | None) -> tuple[dict, dict]:
1✔
1135
        """Separates the top-level keys in the given parameters dict into header- and payload-located params."""
1136
        if not isinstance(shape, StructureShape):
1✔
1137
            # If the shape isn't a structure, we default to the whole response being parsed in the body.
1138
            # Non-payload members are only loaded in the top-level hierarchy and those are always structures.
1139
            return {}, parameters
1✔
1140
        header_params = {}
1✔
1141
        payload_params = {}
1✔
1142
        shape_members = shape.members
1✔
1143
        for name in shape_members:
1✔
1144
            member_shape = shape_members[name]
1✔
1145
            if name not in parameters:
1✔
1146
                continue
1✔
1147
            location = member_shape.serialization.get("location")
1✔
1148
            if location:
1✔
1149
                header_params[name] = parameters[name]
1✔
1150
            else:
1151
                payload_params[name] = parameters[name]
1✔
1152
        return header_params, payload_params
1✔
1153

1154

1155
class RestXMLResponseSerializer(BaseRestResponseSerializer, BaseXMLResponseSerializer):
1✔
1156
    """
1157
    The ``RestXMLResponseSerializer`` is responsible for the serialization of responses from services with the
1158
    ``rest-xml`` protocol.
1159
    It combines the ``BaseRestResponseSerializer`` (for the ReST specific logic) with the ``BaseXMLResponseSerializer``
1160
    (for the XML body response serialization).
1161
    """
1162

1163
    pass
1✔
1164

1165

1166
class QueryResponseSerializer(BaseXMLResponseSerializer):
1✔
1167
    """
1168
    The ``QueryResponseSerializer`` is responsible for the serialization of responses from services which use the
1169
    ``query`` protocol. The responses of these services also use XML. It is basically a subset of the features, since it
1170
    does not allow any payload or location traits.
1171
    """
1172

1173
    def _serialize_response(
1✔
1174
        self,
1175
        parameters: dict,
1176
        response: Response,
1177
        shape: Shape | None,
1178
        shape_members: dict,
1179
        operation_model: OperationModel,
1180
        mime_type: str,
1181
        request_id: str,
1182
    ) -> None:
1183
        """
1184
        Serializes the given parameters as XML for the query protocol.
1185

1186
        :param parameters: The user input params
1187
        :param response: The final serialized Response
1188
        :param shape: Describes the expected output shape (can be None in case of an "empty" response)
1189
        :param shape_members: The members of the output struct shape
1190
        :param operation_model: The specification of the operation of which the response is serialized here
1191
        :param mime_type: Mime type which should be used to encode the payload
1192
        :param request_id: autogenerated AWS request ID identifying the original request
1193
        :return: None - the given `serialized` dict is modified
1194
        """
1195
        response.set_response(
1✔
1196
            self._encode_payload(
1197
                self._serialize_body_params(
1198
                    parameters, shape, operation_model, mime_type, request_id
1199
                )
1200
            )
1201
        )
1202

1203
    def _serialize_body_params_to_xml(
1✔
1204
        self, params: dict, shape: Shape, operation_model: OperationModel, mime_type: str
1205
    ) -> ETree.Element:
1206
        # The Query protocol responses have a root element which is not contained in the specification file.
1207
        # Therefore, we first call the super function to perform the normal XML serialization, and afterwards wrap the
1208
        # result in a root element based on the operation name.
1209
        node = super()._serialize_body_params_to_xml(params, shape, operation_model, mime_type)
1✔
1210

1211
        # Check if we need to add a namespace
1212
        attr = (
1✔
1213
            {"xmlns": operation_model.metadata.get("xmlNamespace")}
1214
            if "xmlNamespace" in operation_model.metadata
1215
            else None
1216
        )
1217

1218
        # Create the root element and add the result of the XML serializer as a child node
1219
        root = ETree.Element(f"{operation_model.name}Response", attr)
1✔
1220
        if node is not None:
1✔
1221
            root.append(node)
1✔
1222
        return root
1✔
1223

1224
    def _prepare_additional_traits_in_xml(self, root: ETree.Element | None, request_id: str):
1✔
1225
        # Add the response metadata here (it's not defined in the specs)
1226
        # For the ec2 and the query protocol, the root cannot be None at this time.
1227
        response_metadata = ETree.SubElement(root, "ResponseMetadata")
1✔
1228
        request_id_element = ETree.SubElement(response_metadata, "RequestId")
1✔
1229
        request_id_element.text = request_id
1✔
1230

1231
    def _prepare_additional_traits_in_response(
1✔
1232
        self, response: Response, operation_model: OperationModel, request_id: str
1233
    ):
1234
        response.headers["x-amzn-RequestId"] = request_id
1✔
1235
        response = super()._prepare_additional_traits_in_response(
1✔
1236
            response, operation_model, request_id
1237
        )
1238
        return response
1✔
1239

1240

1241
class EC2ResponseSerializer(QueryResponseSerializer):
1✔
1242
    """
1243
    The ``EC2ResponseSerializer`` is responsible for the serialization of responses from services which use the
1244
    ``ec2`` protocol (basically the EC2 service). This protocol is basically equal to the ``query`` protocol with only
1245
    a few subtle differences.
1246
    """
1247

1248
    def _serialize_error(
1✔
1249
        self,
1250
        error: ServiceException,
1251
        response: Response,
1252
        shape: StructureShape,
1253
        operation_model: OperationModel,
1254
        mime_type: str,
1255
        request_id: str,
1256
    ) -> None:
1257
        # EC2 errors look like:
1258
        # <Response>
1259
        #   <Errors>
1260
        #     <Error>
1261
        #       <Code>InvalidInstanceID.Malformed</Code>
1262
        #       <Message>Invalid id: "1343124"</Message>
1263
        #     </Error>
1264
        #   </Errors>
1265
        #   <RequestID>12345</RequestID>
1266
        # </Response>
1267
        # This is different from QueryParser in that it's RequestID, not RequestId
1268
        # and that the Error tag is in an enclosing Errors tag.
1269
        attr = (
1✔
1270
            {"xmlns": operation_model.metadata.get("xmlNamespace")}
1271
            if "xmlNamespace" in operation_model.metadata
1272
            else None
1273
        )
1274
        root = ETree.Element("Response", attr)
1✔
1275
        errors_tag = ETree.SubElement(root, "Errors")
1✔
1276
        error_tag = ETree.SubElement(errors_tag, "Error")
1✔
1277
        self._add_error_tags(error, error_tag, mime_type)
1✔
1278
        request_id_element = ETree.SubElement(root, "RequestID")
1✔
1279
        request_id_element.text = request_id
1✔
1280
        response.set_response(self._encode_payload(self._node_to_string(root, mime_type)))
1✔
1281

1282
    def _prepare_additional_traits_in_xml(self, root: ETree.Element | None, request_id: str):
1✔
1283
        # The EC2 protocol does not use the root output shape, therefore we need to remove the hierarchy level
1284
        # below the root level
1285
        if len(root) > 0:
1✔
1286
            output_node = root[0]
1✔
1287
            for child in output_node:
1✔
1288
                root.append(child)
1✔
1289
            root.remove(output_node)
1✔
1290

1291
        # Add the requestId here (it's not defined in the specs)
1292
        # For the ec2 and the query protocol, the root cannot be None at this time.
1293
        request_id_element = ETree.SubElement(root, "requestId")
1✔
1294
        request_id_element.text = request_id
1✔
1295

1296

1297
class JSONResponseSerializer(QueryCompatibleProtocolMixin, ResponseSerializer):
1✔
1298
    """
1299
    The ``JSONResponseSerializer`` is responsible for the serialization of responses from services with the ``json``
1300
    protocol. It implements the JSON response body serialization, which is also used by the
1301
    ``RestJSONResponseSerializer``.
1302
    """
1303

1304
    JSON_TYPES = [APPLICATION_JSON, APPLICATION_AMZ_JSON_1_0, APPLICATION_AMZ_JSON_1_1]
1✔
1305
    CBOR_TYPES = [APPLICATION_CBOR, APPLICATION_AMZ_CBOR_1_1]
1✔
1306
    SUPPORTED_MIME_TYPES = JSON_TYPES + CBOR_TYPES
1✔
1307

1308
    TIMESTAMP_FORMAT = "unixtimestamp"
1✔
1309

1310
    def _serialize_error(
1✔
1311
        self,
1312
        error: ServiceException,
1313
        response: Response,
1314
        shape: StructureShape,
1315
        operation_model: OperationModel,
1316
        mime_type: str,
1317
        request_id: str,
1318
    ) -> None:
1319
        body = {}
1✔
1320

1321
        # TODO implement different service-specific serializer configurations
1322
        #   - currently we set both, the `__type` member as well as the `X-Amzn-Errortype` header
1323
        #   - the specification defines that it's either the __type field OR the header
1324
        # this depends on the JSON protocol version as well. If json-1.0 the Error should be the full shape ID, like
1325
        # com.amazon.coral.service#ExceptionName
1326
        # if json-1.1, it should only be the name
1327

1328
        is_query_compatible = operation_model.service_model.is_query_compatible
1✔
1329
        code = self._get_error_code(is_query_compatible, error, shape)
1✔
1330

1331
        response.headers["X-Amzn-Errortype"] = code
1✔
1332

1333
        # the `__type` field is not defined in default botocore error shapes
1334
        body["__type"] = code
1✔
1335

1336
        if shape:
1✔
1337
            remaining_params = {}
1✔
1338
            # TODO add a possibility to serialize simple non-modelled errors (like S3 NoSuchBucket#BucketName)
1339
            for member in shape.members:
1✔
1340
                if hasattr(error, member):
1✔
1341
                    value = getattr(error, member)
1✔
1342

1343
                # Default error message fields can sometimes have different casing in the specs
1344
                elif member.lower() in ["code", "message"] and hasattr(error, member.lower()):
1✔
1345
                    value = getattr(error, member.lower())
1✔
1346

1347
                else:
1348
                    continue
1✔
1349

1350
                if value is None:
1✔
1351
                    # do not serialize a value that is set to `None`
1352
                    continue
1✔
1353

1354
                # if the value is falsy (empty string, empty list) and not in the Shape required members, AWS will
1355
                # not serialize it, and it will not be part of the response body.
1356
                if value or member in shape.required_members:
1✔
1357
                    remaining_params[member] = value
1✔
1358

1359
            self._serialize(body, remaining_params, shape, None, mime_type)
1✔
1360

1361
        # this is a workaround, some Error Shape do not define a `Message` field, but it is always returned
1362
        # this could be solved at the same time as the `__type` field
1363
        if "message" not in body and "Message" not in body:
1✔
1364
            if error_message := self._get_error_message(error):
1✔
1365
                body["message"] = error_message
1✔
1366

1367
        if mime_type in self.CBOR_TYPES:
1✔
1368
            response.set_response(cbor2_dumps(body, datetime_as_timestamp=True))
1✔
1369
            response.content_type = mime_type
1✔
1370
        else:
1371
            response.set_json(body)
1✔
1372

1373
        if is_query_compatible:
1✔
1374
            self._add_query_compatible_error_header(response, error)
1✔
1375

1376
    def _serialize_response(
1✔
1377
        self,
1378
        parameters: dict,
1379
        response: Response,
1380
        shape: Shape | None,
1381
        shape_members: dict,
1382
        operation_model: OperationModel,
1383
        mime_type: str,
1384
        request_id: str,
1385
    ) -> None:
1386
        if mime_type in self.CBOR_TYPES:
1✔
1387
            response.content_type = mime_type
1✔
1388
        else:
1389
            json_version = operation_model.metadata.get("jsonVersion")
1✔
1390
            if json_version is not None:
1✔
1391
                response.headers["Content-Type"] = f"application/x-amz-json-{json_version}"
1✔
1392
        response.set_response(
1✔
1393
            self._serialize_body_params(parameters, shape, operation_model, mime_type, request_id)
1394
        )
1395

1396
    def _serialize_body_params(
1✔
1397
        self,
1398
        params: dict,
1399
        shape: Shape,
1400
        operation_model: OperationModel,
1401
        mime_type: str,
1402
        request_id: str,
1403
    ) -> str | None:
1404
        body = {}
1✔
1405
        if shape is not None:
1✔
1406
            self._serialize(body, params, shape, None, mime_type)
1✔
1407

1408
        if mime_type in self.CBOR_TYPES:
1✔
1409
            return cbor2_dumps(body, datetime_as_timestamp=True)
1✔
1410
        else:
1411
            return json.dumps(body)
1✔
1412

1413
    def _serialize(self, body: dict, value: Any, shape, key: str | None, mime_type: str):
1✔
1414
        """This method dynamically invokes the correct `_serialize_type_*` method for each shape type."""
1415
        try:
1✔
1416
            method = getattr(self, f"_serialize_type_{shape.type_name}", self._default_serialize)
1✔
1417
            method(body, value, shape, key, mime_type)
1✔
1418
        except (TypeError, ValueError, AttributeError) as e:
×
1419
            raise ProtocolSerializerError(
×
1420
                f"Invalid type when serializing {shape.name}: '{value}' cannot be parsed to {shape.type_name}."
1421
            ) from e
1422

1423
    def _serialize_type_structure(
1✔
1424
        self, body: dict, value: dict, shape: StructureShape, key: str | None, mime_type: str
1425
    ):
1426
        if value is None:
1✔
1427
            return
×
1428
        if shape.is_document_type:
1✔
1429
            body[key] = value
×
1430
        else:
1431
            if key is not None:
1✔
1432
                # If a key is provided, this is a result of a recursive
1433
                # call, so we need to add a new child dict as the value
1434
                # of the passed in serialized dict.  We'll then add
1435
                # all the structure members as key/vals in the new serialized
1436
                # dictionary we just created.
1437
                new_serialized = {}
1✔
1438
                body[key] = new_serialized
1✔
1439
                body = new_serialized
1✔
1440
            members = shape.members
1✔
1441
            for member_key, member_value in value.items():
1✔
1442
                if member_value is None:
1✔
1443
                    continue
1✔
1444
                try:
1✔
1445
                    member_shape = members[member_key]
1✔
1446
                except KeyError:
1✔
1447
                    LOG.warning(
1✔
1448
                        "Response object %s contains a member which is not specified: %s",
1449
                        shape.name,
1450
                        member_key,
1451
                    )
1452
                    continue
1✔
1453
                if "name" in member_shape.serialization:
1✔
1454
                    member_key = member_shape.serialization["name"]
1✔
1455
                self._serialize(body, member_value, member_shape, member_key, mime_type)
1✔
1456

1457
    def _serialize_type_map(
1✔
1458
        self, body: dict, value: dict, shape: MapShape, key: str, mime_type: str
1459
    ):
1460
        if value is None:
1✔
1461
            return
×
1462
        map_obj = {}
1✔
1463
        body[key] = map_obj
1✔
1464
        for sub_key, sub_value in value.items():
1✔
1465
            if sub_value is not None:
1✔
1466
                self._serialize(map_obj, sub_value, shape.value, sub_key, mime_type)
1✔
1467

1468
    def _serialize_type_list(
1✔
1469
        self, body: dict, value: list, shape: ListShape, key: str, mime_type: str
1470
    ):
1471
        if value is None:
1✔
1472
            return
×
1473
        list_obj = []
1✔
1474
        body[key] = list_obj
1✔
1475
        for list_item in value:
1✔
1476
            if list_item is not None:
1✔
1477
                wrapper = {}
1✔
1478
                # The JSON list serialization is the only case where we aren't
1479
                # setting a key on a dict.  We handle this by using
1480
                # a __current__ key on a wrapper dict to serialize each
1481
                # list item before appending it to the serialized list.
1482
                self._serialize(wrapper, list_item, shape.member, "__current__", mime_type)
1✔
1483
                list_obj.append(wrapper["__current__"])
1✔
1484

1485
    def _default_serialize(self, body: dict, value: Any, _, key: str, __):
1✔
1486
        body[key] = value
1✔
1487

1488
    def _serialize_type_timestamp(
1✔
1489
        self, body: dict, value: Any, shape: Shape, key: str, mime_type: str
1490
    ):
1491
        if mime_type in self.CBOR_TYPES:
1✔
1492
            # CBOR has native support for timestamps
1493
            body[key] = value
1✔
1494
        else:
1495
            timestamp_format = shape.serialization.get("timestampFormat")
1✔
1496
            body[key] = self._convert_timestamp_to_str(value, timestamp_format)
1✔
1497

1498
    def _serialize_type_blob(self, body: dict, value: str | bytes, _, key: str, mime_type: str):
1✔
1499
        if mime_type in self.CBOR_TYPES:
1✔
1500
            body[key] = value
1✔
1501
        else:
1502
            body[key] = self._get_base64(value)
1✔
1503

1504
    def _prepare_additional_traits_in_response(
1✔
1505
        self, response: Response, operation_model: OperationModel, request_id: str
1506
    ):
1507
        response.headers.setdefault("x-amzn-RequestId", request_id)
1✔
1508
        response = super()._prepare_additional_traits_in_response(
1✔
1509
            response, operation_model, request_id
1510
        )
1511
        return response
1✔
1512

1513

1514
class RestJSONResponseSerializer(BaseRestResponseSerializer, JSONResponseSerializer):
1✔
1515
    """
1516
    The ``RestJSONResponseSerializer`` is responsible for the serialization of responses from services with the
1517
    ``rest-json`` protocol.
1518
    It combines the ``BaseRestResponseSerializer`` (for the ReST specific logic) with the ``JSONResponseSerializer``
1519
    (for the JSOn body response serialization).
1520
    """
1521

1522
    def _serialize_content_type(
1✔
1523
        self, serialized: Response, shape: Shape, shape_members: dict, mime_type: str
1524
    ):
1525
        """Set Content-Type to application/json for all structured bodies."""
1526
        payload = shape.serialization.get("payload") if shape is not None else None
1✔
1527
        if self._has_streaming_payload(payload, shape_members):
1✔
1528
            # Don't apply content-type to streaming bodies
1529
            return
1✔
1530

1531
        has_body = serialized.data != b""
1✔
1532
        has_content_type = self._has_header("Content-Type", serialized.headers)
1✔
1533
        if has_body and not has_content_type:
1✔
1534
            serialized.headers["Content-Type"] = mime_type
×
1535

1536

1537
class BaseCBORResponseSerializer(ResponseSerializer):
1✔
1538
    """
1539
    The ``BaseCBORResponseSerializer`` performs the basic logic for the CBOR response serialization.
1540

1541
    There are two types of map/list in CBOR, indefinite length types and "defined" ones:
1542
    You can use the `\xbf` byte marker to indicate a map with indefinite length, then `\xff` to indicate the end
1543
     of the map.
1544
    You can also use, for example, `\xa4` to indicate a map with exactly 4 things in it, so `\xff` is not
1545
    required at the end.
1546
    AWS, for both Kinesis and `smithy-rpc-v2-cbor` services, is using indefinite data structures when returning
1547
    responses.
1548

1549
    The CBOR serializer cannot serialize an exception if it is not defined in our specs.
1550
    LocalStack defines a way to have user-defined exception by subclassing `CommonServiceException`, so it needs to be
1551
     able to encode those, as well as InternalError
1552
    We are creating a default botocore structure shape (`_DEFAULT_ERROR_STRUCTURE_SHAPE`) to be used in such cases.
1553
    """
1554

1555
    SUPPORTED_MIME_TYPES = [APPLICATION_CBOR, APPLICATION_AMZ_CBOR_1_1]
1✔
1556

1557
    UNSIGNED_INT_MAJOR_TYPE = 0
1✔
1558
    NEGATIVE_INT_MAJOR_TYPE = 1
1✔
1559
    BLOB_MAJOR_TYPE = 2
1✔
1560
    STRING_MAJOR_TYPE = 3
1✔
1561
    LIST_MAJOR_TYPE = 4
1✔
1562
    MAP_MAJOR_TYPE = 5
1✔
1563
    TAG_MAJOR_TYPE = 6
1✔
1564
    FLOAT_AND_SIMPLE_MAJOR_TYPE = 7
1✔
1565

1566
    INDEFINITE_ITEM_ADDITIONAL_INFO = 31
1✔
1567
    BREAK_CODE = b"\xff"
1✔
1568
    USE_INDEFINITE_DATA_STRUCTURE = True
1✔
1569

1570
    _ERROR_TYPE_SHAPE = StringShape(shape_name="__type", shape_model={"type": "string"})
1✔
1571

1572
    _DEFAULT_ERROR_STRUCTURE_SHAPE = StructureShape(
1✔
1573
        shape_name="DefaultErrorStructure",
1574
        shape_model={
1575
            "type": "structure",
1576
            "members": {
1577
                "message": {"shape": "ErrorMessage"},
1578
                "__type": {"shape": "ErrorType"},
1579
            },
1580
            "error": {"code": "DefaultErrorStructure", "httpStatusCode": 400, "senderFault": True},
1581
            "exception": True,
1582
        },
1583
        shape_resolver=ShapeResolver(
1584
            shape_map={
1585
                "ErrorMessage": {"type": "string"},
1586
                "ErrorType": {"type": "string"},
1587
            },
1588
        ),
1589
    )
1590

1591
    def _serialize_data_item(
1✔
1592
        self, serialized: bytearray, value: Any, shape: Shape | None, name: str | None = None
1593
    ) -> None:
1594
        method = getattr(self, f"_serialize_type_{shape.type_name}")
1✔
1595
        if method is None:
1✔
1596
            raise ValueError(
×
1597
                f"Unrecognized C2J type: {shape.type_name}, unable to serialize request"
1598
            )
1599
        method(serialized, value, shape, name)
1✔
1600

1601
    def _serialize_type_integer(
1✔
1602
        self, serialized: bytearray, value: int, shape: Shape | None, name: str | None = None
1603
    ) -> None:
1604
        if value >= 0:
1✔
1605
            major_type = self.UNSIGNED_INT_MAJOR_TYPE
1✔
1606
        else:
1607
            major_type = self.NEGATIVE_INT_MAJOR_TYPE
×
1608
            # The only differences in serializing negative and positive integers is
1609
            # that for negative, we set the major type to 1 and set the value to -1
1610
            # minus the value
1611
            value = -1 - value
×
1612
        additional_info, num_bytes = self._get_additional_info_and_num_bytes(value)
1✔
1613
        initial_byte = self._get_initial_byte(major_type, additional_info)
1✔
1614
        if num_bytes == 0:
1✔
1615
            serialized.extend(initial_byte)
1✔
1616
        else:
1617
            serialized.extend(initial_byte + value.to_bytes(num_bytes, "big"))
1✔
1618

1619
    def _serialize_type_long(
1✔
1620
        self, serialized: bytearray, value: int, shape: Shape, name: str | None = None
1621
    ) -> None:
1622
        self._serialize_type_integer(serialized, value, shape, name)
1✔
1623

1624
    def _serialize_type_blob(
1✔
1625
        self,
1626
        serialized: bytearray,
1627
        value: str | bytes | IO[bytes],
1628
        shape: Shape | None,
1629
        name: str | None = None,
1630
    ) -> None:
1631
        if isinstance(value, str):
1✔
1632
            value = value.encode("utf-8")
×
1633
        elif not isinstance(value, (bytes, bytearray)):
1✔
1634
            # We support file-like objects for blobs; these already have been
1635
            # validated to ensure they have a read method
1636
            value = value.read()
×
1637
        length = len(value)
1✔
1638
        additional_info, num_bytes = self._get_additional_info_and_num_bytes(length)
1✔
1639
        initial_byte = self._get_initial_byte(self.BLOB_MAJOR_TYPE, additional_info)
1✔
1640
        if num_bytes == 0:
1✔
1641
            serialized.extend(initial_byte)
1✔
1642
        else:
1643
            serialized.extend(initial_byte + length.to_bytes(num_bytes, "big"))
×
1644
        serialized.extend(value)
1✔
1645

1646
    def _serialize_type_string(
1✔
1647
        self, serialized: bytearray, value: str, shape: Shape | None, name: str | None = None
1648
    ) -> None:
1649
        encoded = value.encode("utf-8")
1✔
1650
        length = len(encoded)
1✔
1651
        additional_info, num_bytes = self._get_additional_info_and_num_bytes(length)
1✔
1652
        initial_byte = self._get_initial_byte(self.STRING_MAJOR_TYPE, additional_info)
1✔
1653
        if num_bytes == 0:
1✔
1654
            serialized.extend(initial_byte + encoded)
1✔
1655
        else:
1656
            serialized.extend(initial_byte + length.to_bytes(num_bytes, "big") + encoded)
1✔
1657

1658
    def _serialize_type_list(
1✔
1659
        self, serialized: bytearray, value: list, shape: Shape | None, name: str | None = None
1660
    ) -> None:
1661
        initial_bytes, closing_bytes = self._get_bytes_for_data_structure(
1✔
1662
            value, self.LIST_MAJOR_TYPE
1663
        )
1664
        serialized.extend(initial_bytes)
1✔
1665

1666
        for item in value:
1✔
1667
            self._serialize_data_item(serialized, item, shape.member)
1✔
1668

1669
        if closing_bytes is not None:
1✔
1670
            serialized.extend(closing_bytes)
1✔
1671

1672
    def _serialize_type_map(
1✔
1673
        self, serialized: bytearray, value: dict, shape: Shape | None, name: str | None = None
1674
    ) -> None:
1675
        initial_bytes, closing_bytes = self._get_bytes_for_data_structure(
1✔
1676
            value, self.MAP_MAJOR_TYPE
1677
        )
1678
        serialized.extend(initial_bytes)
1✔
1679

1680
        for key_item, item in value.items():
1✔
1681
            self._serialize_data_item(serialized, key_item, shape.key)
1✔
1682
            self._serialize_data_item(serialized, item, shape.value)
1✔
1683

1684
        if closing_bytes is not None:
1✔
1685
            serialized.extend(closing_bytes)
1✔
1686

1687
    def _serialize_type_structure(
1✔
1688
        self,
1689
        serialized: bytearray,
1690
        value: dict,
1691
        shape: Shape | None,
1692
        name: str | None = None,
1693
        shape_members: dict[str, Shape] | None = None,
1694
    ) -> None:
1695
        # `_serialize_type_structure` has a different signature other `_serialize_type_*` methods as it accepts
1696
        # `shape_members`. This is because sometimes, the `StructureShape` does not have some members defined in the
1697
        # specs, and we want to be able to pass arbitrary members to serialize undocumented members.
1698
        # see `_serialize_error_structure` for its specific usage
1699

1700
        if name is not None:
1✔
1701
            # For nested structures, we need to serialize the key first
1702
            self._serialize_data_item(serialized, name, shape.key_shape)
×
1703

1704
        # Remove `None` values from the dictionary
1705
        value = {k: v for k, v in value.items() if v is not None}
1✔
1706

1707
        initial_bytes, closing_bytes = self._get_bytes_for_data_structure(
1✔
1708
            value, self.MAP_MAJOR_TYPE
1709
        )
1710
        serialized.extend(initial_bytes)
1✔
1711
        members = shape_members or shape.members
1✔
1712
        for member_key, member_value in value.items():
1✔
1713
            member_shape = members[member_key]
1✔
1714
            if "name" in member_shape.serialization:
1✔
1715
                member_key = member_shape.serialization["name"]
×
1716
            if member_value is not None:
1✔
1717
                self._serialize_type_string(serialized, member_key, None, None)
1✔
1718
                self._serialize_data_item(serialized, member_value, member_shape)
1✔
1719

1720
        if closing_bytes is not None:
1✔
1721
            serialized.extend(closing_bytes)
1✔
1722

1723
    def _serialize_type_timestamp(
1✔
1724
        self,
1725
        serialized: bytearray,
1726
        value: int | str | datetime.datetime,
1727
        shape: Shape | None,
1728
        name: str | None = None,
1729
    ) -> None:
1730
        # https://smithy.io/2.0/additional-specs/protocols/smithy-rpc-v2.html#timestamp-type-serialization
1731
        tag = 1  # Use tag 1 for unix timestamp
1✔
1732
        initial_byte = self._get_initial_byte(self.TAG_MAJOR_TYPE, tag)
1✔
1733
        serialized.extend(initial_byte)  # Tagging the timestamp
1✔
1734

1735
        # we encode the timestamp as a double, like the Go SDK
1736
        # https://github.com/aws/aws-sdk-go-v2/blob/5d7c17325a2581afae4455c150549174ebfd9428/internal/protocoltest/smithyrpcv2cbor/serializers.go#L664-L669
1737
        # Currently, the Botocore serializer using unsigned integers, but it does not conform to the Smithy specs:
1738
        # > This protocol uses epoch-seconds, also known as Unix timestamps, with millisecond
1739
        # > (1/1000th of a second) resolution.
1740
        timestamp = float(self._convert_timestamp_to_str(value))
1✔
1741
        initial_byte = self._get_initial_byte(self.FLOAT_AND_SIMPLE_MAJOR_TYPE, 27)
1✔
1742
        serialized.extend(initial_byte + struct.pack(">d", timestamp))
1✔
1743

1744
    def _serialize_type_float(
1✔
1745
        self, serialized: bytearray, value: float, shape: Shape | None, name: str | None = None
1746
    ) -> None:
1747
        if self._is_special_number(value):
×
1748
            serialized.extend(
×
1749
                self._get_bytes_for_special_numbers(value)
1750
            )  # Handle special values like NaN or Infinity
1751
        else:
1752
            initial_byte = self._get_initial_byte(self.FLOAT_AND_SIMPLE_MAJOR_TYPE, 26)
×
1753
            serialized.extend(initial_byte + struct.pack(">f", value))
×
1754

1755
    def _serialize_type_double(
1✔
1756
        self, serialized: bytearray, value: float, shape: Shape | None, name: str | None = None
1757
    ) -> None:
1758
        if self._is_special_number(value):
1✔
1759
            serialized.extend(
×
1760
                self._get_bytes_for_special_numbers(value)
1761
            )  # Handle special values like NaN or Infinity
1762
        else:
1763
            initial_byte = self._get_initial_byte(self.FLOAT_AND_SIMPLE_MAJOR_TYPE, 27)
1✔
1764
            serialized.extend(initial_byte + struct.pack(">d", value))
1✔
1765

1766
    def _serialize_type_boolean(
1✔
1767
        self, serialized: bytearray, value: bool, shape: Shape | None, name: str | None = None
1768
    ) -> None:
1769
        additional_info = 21 if value else 20
1✔
1770
        serialized.extend(self._get_initial_byte(self.FLOAT_AND_SIMPLE_MAJOR_TYPE, additional_info))
1✔
1771

1772
    @staticmethod
1✔
1773
    def _get_additional_info_and_num_bytes(value: int) -> tuple[int, int]:
1✔
1774
        # Values under 24 can be stored in the initial byte and don't need further
1775
        # encoding
1776
        if value < 24:
1✔
1777
            return value, 0
1✔
1778
        # Values between 24 and 255 (inclusive) can be stored in 1 byte and
1779
        # correspond to additional info 24
1780
        elif value < 256:
1✔
1781
            return 24, 1
1✔
1782
        # Values up to 65535 can be stored in two bytes and correspond to additional
1783
        # info 25
1784
        elif value < 65536:
1✔
1785
            return 25, 2
1✔
1786
        # Values up to 4294967296 can be stored in four bytes and correspond to
1787
        # additional info 26
1788
        elif value < 4294967296:
×
1789
            return 26, 4
×
1790
        # The maximum number of bytes in a definite length data items is 8 which
1791
        # to additional info 27
1792
        else:
1793
            return 27, 8
×
1794

1795
    def _get_initial_byte(self, major_type: int, additional_info: int) -> bytes:
1✔
1796
        # The highest order three bits are the major type, so we need to bitshift the
1797
        # major type by 5
1798
        major_type_bytes = major_type << 5
1✔
1799
        return (major_type_bytes | additional_info).to_bytes(1, "big")
1✔
1800

1801
    @staticmethod
1✔
1802
    def _is_special_number(value: int | float) -> bool:
1✔
1803
        return any(
1✔
1804
            [
1805
                value == float("inf"),
1806
                value == float("-inf"),
1807
                math.isnan(value),
1808
            ]
1809
        )
1810

1811
    def _get_bytes_for_special_numbers(self, value: int | float) -> bytes:
1✔
1812
        additional_info = 25
×
1813
        initial_byte = self._get_initial_byte(self.FLOAT_AND_SIMPLE_MAJOR_TYPE, additional_info)
×
1814
        if value == float("inf"):
×
1815
            return initial_byte + struct.pack(">H", 0x7C00)
×
1816
        elif value == float("-inf"):
×
1817
            return initial_byte + struct.pack(">H", 0xFC00)
×
1818
        elif math.isnan(value):
×
1819
            return initial_byte + struct.pack(">H", 0x7E00)
×
1820

1821
    def _get_bytes_for_data_structure(
1✔
1822
        self, value: list | dict, major_type: int
1823
    ) -> tuple[bytes, bytes | None]:
1824
        if self.USE_INDEFINITE_DATA_STRUCTURE:
1✔
1825
            additional_info = self.INDEFINITE_ITEM_ADDITIONAL_INFO
1✔
1826
            return self._get_initial_byte(major_type, additional_info), self.BREAK_CODE
1✔
1827
        else:
1828
            length = len(value)
×
1829
            additional_info, num_bytes = self._get_additional_info_and_num_bytes(length)
×
1830
            initial_byte = self._get_initial_byte(major_type, additional_info)
×
1831
            if num_bytes != 0:
×
1832
                initial_byte = initial_byte + length.to_bytes(num_bytes, "big")
×
1833

1834
            return initial_byte, None
×
1835

1836
    def _serialize_error_structure(
1✔
1837
        self, body: bytearray, shape: Shape | None, error: ServiceException, code: str
1838
    ):
1839
        if not shape:
1✔
1840
            shape = self._DEFAULT_ERROR_STRUCTURE_SHAPE
1✔
1841
            shape_members = shape.members
1✔
1842
        else:
1843
            # we need to manually add the `__type` field to the shape members as it is not part of the specs
1844
            # we do a shallow copy of the shape members
1845
            shape_members = shape.members.copy()
1✔
1846
            shape_members["__type"] = self._ERROR_TYPE_SHAPE
1✔
1847

1848
        # Error responses in the rpcv2Cbor protocol MUST be serialized identically to standard responses with one
1849
        # additional component to distinguish which error is contained: a body field named __type.
1850
        params = {"__type": code}
1✔
1851

1852
        for member in shape_members:
1✔
1853
            if hasattr(error, member):
1✔
1854
                value = getattr(error, member)
1✔
1855

1856
            # Default error message fields can sometimes have different casing in the specs
1857
            elif member.lower() in ["code", "message"] and hasattr(error, member.lower()):
1✔
1858
                value = getattr(error, member.lower())
×
1859

1860
            else:
1861
                continue
1✔
1862

1863
            if value is None:
1✔
1864
                # do not serialize a value that is set to `None`
1865
                continue
×
1866

1867
            # if the value is falsy (empty string, empty list) and not in the Shape required members, AWS will
1868
            # not serialize it, and it will not be part of the response body.
1869
            if value or member in shape.required_members:
1✔
1870
                params[member] = value
1✔
1871

1872
        self._serialize_type_structure(body, params, shape, None, shape_members=shape_members)
1✔
1873

1874

1875
class CBORResponseSerializer(BaseCBORResponseSerializer):
1✔
1876
    """
1877
    The ``CBORResponseSerializer`` is responsible for the serialization of responses from services with the ``cbor``
1878
    protocol. It implements the CBOR response body serialization, which is only currently used by Kinesis and is derived
1879
    conceptually from the ``JSONResponseSerializer``
1880
    """
1881

1882
    TIMESTAMP_FORMAT = "unixtimestamp"
1✔
1883

1884
    def _serialize_error(
1✔
1885
        self,
1886
        error: ServiceException,
1887
        response: Response,
1888
        shape: StructureShape,
1889
        operation_model: OperationModel,
1890
        mime_type: str,
1891
        request_id: str,
1892
    ) -> None:
1893
        body = bytearray()
×
1894
        response.content_type = mime_type
×
1895
        response.headers["X-Amzn-Errortype"] = error.code
×
1896

1897
        self._serialize_error_structure(body, shape, error, code=error.code)
×
1898

1899
        response.set_response(bytes(body))
×
1900

1901
    def _serialize_response(
1✔
1902
        self,
1903
        parameters: dict,
1904
        response: Response,
1905
        shape: Shape | None,
1906
        shape_members: dict,
1907
        operation_model: OperationModel,
1908
        mime_type: str,
1909
        request_id: str,
1910
    ) -> None:
1911
        response.content_type = mime_type
1✔
1912
        response.set_response(
1✔
1913
            self._serialize_body_params(parameters, shape, operation_model, mime_type, request_id)
1914
        )
1915

1916
    def _serialize_body_params(
1✔
1917
        self,
1918
        params: dict,
1919
        shape: Shape,
1920
        operation_model: OperationModel,
1921
        mime_type: str,
1922
        request_id: str,
1923
    ) -> bytes | None:
1924
        if shape is None:
1✔
1925
            return b""
×
1926
        body = bytearray()
1✔
1927
        self._serialize_data_item(body, params, shape)
1✔
1928
        return bytes(body)
1✔
1929

1930
    def _prepare_additional_traits_in_response(
1✔
1931
        self, response: Response, operation_model: OperationModel, request_id: str
1932
    ) -> Response:
1933
        response.headers["x-amzn-RequestId"] = request_id
1✔
1934
        response = super()._prepare_additional_traits_in_response(
1✔
1935
            response, operation_model, request_id
1936
        )
1937
        return response
1✔
1938

1939

1940
class BaseRpcV2ResponseSerializer(ResponseSerializer):
1✔
1941
    """
1942
    The BaseRpcV2ResponseSerializer performs the basic logic for the RPC V2 response serialization.
1943
    The only variance between the various RPCv2 protocols is the way the body is serialized for regular responses,
1944
    and the way they will encode exceptions.
1945
    """
1946

1947
    def _serialize_response(
1✔
1948
        self,
1949
        parameters: dict,
1950
        response: Response,
1951
        shape: Shape | None,
1952
        shape_members: dict,
1953
        operation_model: OperationModel,
1954
        mime_type: str,
1955
        request_id: str,
1956
    ) -> None:
1957
        response.content_type = mime_type
1✔
1958
        response.set_response(
1✔
1959
            self._serialize_body_params(parameters, shape, operation_model, mime_type, request_id)
1960
        )
1961

1962
    def _serialize_body_params(
1✔
1963
        self,
1964
        params: dict,
1965
        shape: Shape,
1966
        operation_model: OperationModel,
1967
        mime_type: str,
1968
        request_id: str,
1969
    ) -> bytes | None:
1970
        raise NotImplementedError
1971

1972

1973
class RpcV2CBORResponseSerializer(
1✔
1974
    QueryCompatibleProtocolMixin, BaseRpcV2ResponseSerializer, BaseCBORResponseSerializer
1975
):
1976
    """
1977
    The RpcV2CBORResponseSerializer implements the CBOR body serialization part for the RPC v2 protocol, and implements the
1978
    specific exception serialization.
1979
    https://smithy.io/2.0/additional-specs/protocols/smithy-rpc-v2.html
1980
    """
1981

1982
    # the Smithy spec defines that only `application/cbor` is supported for RPC v2 CBOR
1983
    SUPPORTED_MIME_TYPES = [APPLICATION_CBOR]
1✔
1984
    TIMESTAMP_FORMAT = "unixtimestamp"
1✔
1985

1986
    def _serialize_body_params(
1✔
1987
        self,
1988
        params: dict,
1989
        shape: Shape,
1990
        operation_model: OperationModel,
1991
        mime_type: str,
1992
        request_id: str,
1993
    ) -> bytes | None:
1994
        if shape is None:
1✔
1995
            return b""
1✔
1996
        body = bytearray()
1✔
1997
        self._serialize_data_item(body, params, shape)
1✔
1998
        return bytes(body)
1✔
1999

2000
    def _serialize_error(
1✔
2001
        self,
2002
        error: ServiceException,
2003
        response: Response,
2004
        shape: StructureShape,
2005
        operation_model: OperationModel,
2006
        mime_type: str,
2007
        request_id: str,
2008
    ) -> None:
2009
        body = bytearray()
1✔
2010
        response.content_type = mime_type  # can only be 'application/cbor'
1✔
2011

2012
        # Responses for the rpcv2Cbor protocol SHOULD NOT contain the X-Amzn-ErrorType header.
2013
        # Type information is always serialized in the payload. This is different from the `json` protocol
2014
        is_query_compatible = operation_model.service_model.is_query_compatible
1✔
2015
        code = self._get_error_code(is_query_compatible, error, shape)
1✔
2016

2017
        self._serialize_error_structure(body, shape, error, code=code)
1✔
2018

2019
        response.set_response(bytes(body))
1✔
2020

2021
        if is_query_compatible:
1✔
2022
            self._add_query_compatible_error_header(response, error)
1✔
2023

2024
    def _prepare_additional_traits_in_response(
1✔
2025
        self, response: Response, operation_model: OperationModel, request_id: str
2026
    ):
2027
        response.headers["x-amzn-RequestId"] = request_id
1✔
2028
        response.headers["Smithy-Protocol"] = "rpc-v2-cbor"
1✔
2029
        response = super()._prepare_additional_traits_in_response(
1✔
2030
            response, operation_model, request_id
2031
        )
2032
        return response
1✔
2033

2034

2035
class S3ResponseSerializer(RestXMLResponseSerializer):
1✔
2036
    """
2037
    The ``S3ResponseSerializer`` adds some minor logic to handle S3 specific peculiarities with the error response
2038
    serialization and the root node tag.
2039
    """
2040

2041
    SUPPORTED_MIME_TYPES = [APPLICATION_XML, TEXT_XML]
1✔
2042
    _RESPONSE_ROOT_TAGS = {
1✔
2043
        "CompleteMultipartUploadOutput": "CompleteMultipartUploadResult",
2044
        "CopyObjectOutput": "CopyObjectResult",
2045
        "CreateMultipartUploadOutput": "InitiateMultipartUploadResult",
2046
        "DeleteObjectsOutput": "DeleteResult",
2047
        "GetBucketAccelerateConfigurationOutput": "AccelerateConfiguration",
2048
        "GetBucketAclOutput": "AccessControlPolicy",
2049
        "GetBucketAnalyticsConfigurationOutput": "AnalyticsConfiguration",
2050
        "GetBucketCorsOutput": "CORSConfiguration",
2051
        "GetBucketEncryptionOutput": "ServerSideEncryptionConfiguration",
2052
        "GetBucketIntelligentTieringConfigurationOutput": "IntelligentTieringConfiguration",
2053
        "GetBucketInventoryConfigurationOutput": "InventoryConfiguration",
2054
        "GetBucketLifecycleOutput": "LifecycleConfiguration",
2055
        "GetBucketLifecycleConfigurationOutput": "LifecycleConfiguration",
2056
        "GetBucketLoggingOutput": "BucketLoggingStatus",
2057
        "GetBucketMetricsConfigurationOutput": "MetricsConfiguration",
2058
        "NotificationConfigurationDeprecated": "NotificationConfiguration",
2059
        "GetBucketOwnershipControlsOutput": "OwnershipControls",
2060
        "GetBucketPolicyStatusOutput": "PolicyStatus",
2061
        "GetBucketReplicationOutput": "ReplicationConfiguration",
2062
        "GetBucketRequestPaymentOutput": "RequestPaymentConfiguration",
2063
        "GetBucketTaggingOutput": "Tagging",
2064
        "GetBucketVersioningOutput": "VersioningConfiguration",
2065
        "GetBucketWebsiteOutput": "WebsiteConfiguration",
2066
        "GetObjectAclOutput": "AccessControlPolicy",
2067
        "GetObjectLegalHoldOutput": "LegalHold",
2068
        "GetObjectLockConfigurationOutput": "ObjectLockConfiguration",
2069
        "GetObjectRetentionOutput": "Retention",
2070
        "GetObjectTaggingOutput": "Tagging",
2071
        "GetObjectAttributesOutput": "GetObjectAttributesResponse",
2072
        "GetPublicAccessBlockOutput": "PublicAccessBlockConfiguration",
2073
        "ListBucketAnalyticsConfigurationsOutput": "ListBucketAnalyticsConfigurationResult",
2074
        "ListBucketInventoryConfigurationsOutput": "ListInventoryConfigurationsResult",
2075
        "ListBucketMetricsConfigurationsOutput": "ListMetricsConfigurationsResult",
2076
        "ListBucketsOutput": "ListAllMyBucketsResult",
2077
        "ListMultipartUploadsOutput": "ListMultipartUploadsResult",
2078
        "ListObjectsOutput": "ListBucketResult",
2079
        "ListObjectsV2Output": "ListBucketResult",
2080
        "ListObjectVersionsOutput": "ListVersionsResult",
2081
        "ListPartsOutput": "ListPartsResult",
2082
        "UploadPartCopyOutput": "CopyPartResult",
2083
    }
2084

2085
    XML_NAMESPACE = "http://s3.amazonaws.com/doc/2006-03-01/"
1✔
2086

2087
    def _serialize_response(
1✔
2088
        self,
2089
        parameters: dict,
2090
        response: Response,
2091
        shape: Shape | None,
2092
        shape_members: dict,
2093
        operation_model: OperationModel,
2094
        mime_type: str,
2095
        request_id: str,
2096
    ) -> None:
2097
        header_params, payload_params = self._partition_members(parameters, shape)
1✔
2098
        self._process_header_members(header_params, response, shape)
1✔
2099
        # "HEAD" responses are basically "GET" responses without the actual body.
2100
        # Do not process the body payload in this case (setting a body could also manipulate the headers)
2101
        # - If the response is a redirection, the body should be empty as well
2102
        # - If the response is from a "PUT" request, the body should be empty except if there's a specific "payload"
2103
        #   field in the serialization (CopyObject and CopyObjectPart)
2104
        http_method = operation_model.http.get("method")
1✔
2105
        if (
1✔
2106
            http_method != "HEAD"
2107
            and not 300 <= response.status_code < 400
2108
            and not (http_method == "PUT" and shape and not shape.serialization.get("payload"))
2109
        ):
2110
            self._serialize_payload(
1✔
2111
                payload_params,
2112
                response,
2113
                shape,
2114
                shape_members,
2115
                operation_model,
2116
                mime_type,
2117
                request_id,
2118
            )
2119
        self._serialize_content_type(response, shape, shape_members, mime_type)
1✔
2120

2121
    def _serialize_error(
1✔
2122
        self,
2123
        error: ServiceException,
2124
        response: Response,
2125
        shape: StructureShape,
2126
        operation_model: OperationModel,
2127
        mime_type: str,
2128
        request_id: str,
2129
    ) -> None:
2130
        attr = (
1✔
2131
            {"xmlns": operation_model.metadata.get("xmlNamespace")}
2132
            if "xmlNamespace" in operation_model.metadata
2133
            else {}
2134
        )
2135
        root = ETree.Element("Error", attr)
1✔
2136
        self._add_error_tags(error, root, mime_type)
1✔
2137
        request_id_element = ETree.SubElement(root, "RequestId")
1✔
2138
        request_id_element.text = request_id
1✔
2139

2140
        header_params, payload_params = self._partition_members(vars(error), shape)
1✔
2141
        self._add_additional_error_tags(payload_params, root, shape, mime_type)
1✔
2142
        self._process_header_members(header_params, response, shape)
1✔
2143

2144
        response.set_response(self._encode_payload(self._node_to_string(root, mime_type)))
1✔
2145

2146
    def _serialize_body_params(
1✔
2147
        self,
2148
        params: dict,
2149
        shape: Shape,
2150
        operation_model: OperationModel,
2151
        mime_type: str,
2152
        request_id: str,
2153
    ) -> str | None:
2154
        root = self._serialize_body_params_to_xml(params, shape, operation_model, mime_type)
1✔
2155
        # S3 does not follow the specs on the root tag name for 41 of 44 operations
2156
        root.tag = self._RESPONSE_ROOT_TAGS.get(root.tag, root.tag)
1✔
2157
        self._prepare_additional_traits_in_xml(root, request_id)
1✔
2158
        return self._node_to_string(root, mime_type)
1✔
2159

2160
    def _prepare_additional_traits_in_response(
1✔
2161
        self, response: Response, operation_model: OperationModel, request_id: str
2162
    ):
2163
        """Adds the request ID to the headers (in contrast to the body - as in the Query protocol)."""
2164
        response = super()._prepare_additional_traits_in_response(
1✔
2165
            response, operation_model, request_id
2166
        )
2167
        # s3 extended Request ID
2168
        # mostly used internally on AWS and corresponds to a HostId
2169
        response.headers["x-amz-id-2"] = (
1✔
2170
            "s9lzHYrFp76ZVxRcpX9+5cjAnEH2ROuNkd2BHfIa6UkFVdtjf5mKR3/eTPFvsiP/XV/VLi31234="
2171
        )
2172
        return response
1✔
2173

2174
    def _add_error_tags(
1✔
2175
        self, error: ServiceException, error_tag: ETree.Element, mime_type: str
2176
    ) -> None:
2177
        code_tag = ETree.SubElement(error_tag, "Code")
1✔
2178
        code_tag.text = error.code
1✔
2179
        message = self._get_error_message(error)
1✔
2180
        if message:
1✔
2181
            self._default_serialize(error_tag, message, None, "Message", mime_type)
1✔
2182
        else:
2183
            # In S3, if there's no message, create an empty node
2184
            self._create_empty_node(error_tag, "Message")
1✔
2185
        if error.sender_fault:
1✔
2186
            # The sender fault is either not set or "Sender"
2187
            self._default_serialize(error_tag, "Sender", None, "Type", mime_type)
×
2188

2189
    @staticmethod
1✔
2190
    def _create_empty_node(xmlnode: ETree.Element, name: str) -> None:
1✔
2191
        ETree.SubElement(xmlnode, name)
1✔
2192

2193
    def _prepare_additional_traits_in_xml(self, root: ETree.Element | None, request_id: str):
1✔
2194
        # some tools (Serverless) require a newline after the "<?xml ...>\n" preamble line, e.g., for LocationConstraint
2195
        if root is not None and not root.tail:
1✔
2196
            root.tail = "\n"
1✔
2197

2198
        root.attrib["xmlns"] = self.XML_NAMESPACE
1✔
2199

2200
    @staticmethod
1✔
2201
    def _timestamp_iso8601(value: datetime.datetime) -> str:
1✔
2202
        """
2203
        This is very specific to S3, S3 returns an ISO8601 timestamp but with milliseconds always set to 000
2204
        Some SDKs are very picky about the length
2205
        """
2206
        return value.strftime("%Y-%m-%dT%H:%M:%S.000Z")
1✔
2207

2208

2209
class SqsQueryResponseSerializer(QueryResponseSerializer):
1✔
2210
    """
2211
    Unfortunately, SQS uses a rare interpretation of the XML protocol: It uses HTML entities within XML tag text nodes.
2212
    For example:
2213
    - Normal XML serializers: <Message>No need to escape quotes (like this: ") with HTML entities in XML.</Message>
2214
    - SQS XML serializer: <Message>No need to escape quotes (like this: &quot;) with HTML entities in XML.</Message>
2215

2216
    None of the prominent XML frameworks for python allow HTML entity escapes when serializing XML.
2217
    This serializer implements the following workaround:
2218
    - Escape quotes and \r with their HTML entities (&quot; and &#xD;).
2219
    - Since & is (correctly) escaped in XML, the serialized string contains &amp;quot; and &amp;#xD;
2220
    - These double-escapes are corrected by replacing such strings with their original.
2221
    """
2222

2223
    # those are deleted from the JSON specs, but need to be kept for legacy reason (sent in 'x-amzn-query-error')
2224
    QUERY_PREFIXED_ERRORS = {
1✔
2225
        "BatchEntryIdsNotDistinct",
2226
        "BatchRequestTooLong",
2227
        "EmptyBatchRequest",
2228
        "InvalidBatchEntryId",
2229
        "MessageNotInflight",
2230
        "PurgeQueueInProgress",
2231
        "QueueDeletedRecently",
2232
        "TooManyEntriesInBatchRequest",
2233
        "UnsupportedOperation",
2234
    }
2235

2236
    # Some error code changed between JSON and query, and we need to have a way to map it for legacy reason
2237
    JSON_TO_QUERY_ERROR_CODES = {
1✔
2238
        "InvalidParameterValueException": "InvalidParameterValue",
2239
        "MissingRequiredParameterException": "MissingParameter",
2240
        "AccessDeniedException": "AccessDenied",
2241
        "QueueDoesNotExist": "AWS.SimpleQueueService.NonExistentQueue",
2242
        "QueueNameExists": "QueueAlreadyExists",
2243
    }
2244

2245
    SENDER_FAULT_ERRORS = (
1✔
2246
        QUERY_PREFIXED_ERRORS
2247
        | JSON_TO_QUERY_ERROR_CODES.keys()
2248
        | {"OverLimit", "ResourceNotFoundException"}
2249
    )
2250

2251
    def _default_serialize(self, xmlnode: ETree.Element, params: str, _, name: str, __) -> None:
1✔
2252
        """
2253
        Ensures that we "mark" characters in the node's text which need to be specifically encoded.
2254
        This is necessary to easily identify these specific characters later, after the standard XML serialization is
2255
        done, while not replacing any other occurrences of these characters which might appear in the serialized string.
2256
        """
2257
        node = ETree.SubElement(xmlnode, name)
1✔
2258
        node.text = (
1✔
2259
            str(params)
2260
            .replace('"', '__marker__"__marker__')
2261
            .replace("\r", "__marker__-r__marker__")
2262
        )
2263

2264
    def _node_to_string(self, root: ETree.ElementTree | None, mime_type: str) -> str | None:
1✔
2265
        """Replaces the previously "marked" characters with their encoded value."""
2266
        generated_string = super()._node_to_string(root, mime_type)
1✔
2267
        if generated_string is None:
1✔
2268
            return None
×
2269
        generated_string = to_str(generated_string)
1✔
2270
        # Undo the second escaping of the &
2271
        # Undo the second escaping of the carriage return (\r)
2272
        if mime_type == APPLICATION_JSON:
1✔
2273
            # At this point the json was already dumped and escaped, so we replace directly.
2274
            generated_string = generated_string.replace(r"__marker__\"__marker__", r"\"").replace(
1✔
2275
                "__marker__-r__marker__", r"\r"
2276
            )
2277
        else:
2278
            generated_string = generated_string.replace('__marker__"__marker__', "&quot;").replace(
1✔
2279
                "__marker__-r__marker__", "&#xD;"
2280
            )
2281

2282
        return to_bytes(generated_string)
1✔
2283

2284
    def _add_error_tags(
1✔
2285
        self, error: ServiceException, error_tag: ETree.Element, mime_type: str
2286
    ) -> None:
2287
        """The SQS API stubs is now generated from JSON specs, and some fields have been modified"""
2288
        code_tag = ETree.SubElement(error_tag, "Code")
1✔
2289

2290
        if error.code in self.JSON_TO_QUERY_ERROR_CODES:
1✔
2291
            error_code = self.JSON_TO_QUERY_ERROR_CODES[error.code]
1✔
2292
        elif error.code in self.QUERY_PREFIXED_ERRORS:
1✔
2293
            error_code = f"AWS.SimpleQueueService.{error.code}"
1✔
2294
        else:
2295
            error_code = error.code
1✔
2296
        code_tag.text = error_code
1✔
2297
        message = self._get_error_message(error)
1✔
2298
        if message:
1✔
2299
            self._default_serialize(error_tag, message, None, "Message", mime_type)
1✔
2300
        if error.code in self.SENDER_FAULT_ERRORS or error.sender_fault:
1✔
2301
            # The sender fault is either not set or "Sender"
2302
            self._default_serialize(error_tag, "Sender", None, "Type", mime_type)
1✔
2303

2304

2305
class SqsJsonResponseSerializer(JSONResponseSerializer):
1✔
2306
    # those are deleted from the JSON specs, but need to be kept for legacy reason (sent in 'x-amzn-query-error')
2307
    QUERY_PREFIXED_ERRORS = {
1✔
2308
        "BatchEntryIdsNotDistinct",
2309
        "BatchRequestTooLong",
2310
        "EmptyBatchRequest",
2311
        "InvalidBatchEntryId",
2312
        "MessageNotInflight",
2313
        "PurgeQueueInProgress",
2314
        "QueueDeletedRecently",
2315
        "TooManyEntriesInBatchRequest",
2316
        "UnsupportedOperation",
2317
    }
2318

2319
    # Some error code changed between JSON and query, and we need to have a way to map it for legacy reason
2320
    JSON_TO_QUERY_ERROR_CODES = {
1✔
2321
        "InvalidParameterValueException": "InvalidParameterValue",
2322
        "MissingRequiredParameterException": "MissingParameter",
2323
        "AccessDeniedException": "AccessDenied",
2324
        "QueueDoesNotExist": "AWS.SimpleQueueService.NonExistentQueue",
2325
        "QueueNameExists": "QueueAlreadyExists",
2326
    }
2327

2328
    # TODO: on body error serialization (body["__type"]),it seems AWS differs from what we send for SQS
2329
    #  AWS: "com.amazon.coral.service#InvalidParameterValueException"
2330
    #  or AWS: "com.amazonaws.sqs#BatchRequestTooLong"
2331
    #  LocalStack: "InvalidParameterValue"
2332

2333
    def _add_query_compatible_error_header(self, response: Response, error: ServiceException):
1✔
2334
        if error.code in self.JSON_TO_QUERY_ERROR_CODES:
1✔
2335
            code = self.JSON_TO_QUERY_ERROR_CODES[error.code]
1✔
2336
        elif error.code in self.QUERY_PREFIXED_ERRORS:
1✔
2337
            code = f"AWS.SimpleQueueService.{error.code}"
1✔
2338
        else:
2339
            code = error.code
1✔
2340

2341
        # SQS exceptions all have sender fault set to False, so we hardcode it to `Sender`
2342
        response.headers["x-amzn-query-error"] = f"{code};Sender"
1✔
2343

2344

2345
def gen_amzn_requestid():
1✔
2346
    """
2347
    Generate generic AWS request ID.
2348

2349
    3 uses a different format and set of request Ids.
2350

2351
    Examples:
2352
    996d38a0-a4e9-45de-bad4-480cd962d208
2353
    b9260553-df1b-4db6-ae41-97b89a5f85ea
2354
    """
2355
    return long_uid()
1✔
2356

2357

2358
@functools.cache
1✔
2359
def create_serializer(
1✔
2360
    service: ServiceModel, protocol: ProtocolName | None = None
2361
) -> ResponseSerializer:
2362
    """
2363
    Creates the right serializer for the given service model.
2364

2365
    :param service: to create the serializer for
2366
    :param protocol: the protocol for the serializer. If not provided, fallback to the service's default protocol
2367
    :return: ResponseSerializer which can handle the protocol of the service
2368
    """
2369

2370
    # Unfortunately, some services show subtle differences in their serialized responses, even though their
2371
    # specification states they implement the same protocol.
2372
    # Since some clients might be stricter / less resilient than others, we need to mimic the serialization of the
2373
    # specific services as close as possible.
2374
    # Therefore, the service-specific serializer implementations (basically the implicit / informally more specific
2375
    # protocol implementation) has precedence over the more general protocol-specific serializers.
2376
    service_specific_serializers = {
1✔
2377
        "sqs": {"json": SqsJsonResponseSerializer, "query": SqsQueryResponseSerializer},
2378
        "s3": {"rest-xml": S3ResponseSerializer},
2379
    }
2380
    protocol_specific_serializers = {
1✔
2381
        "query": QueryResponseSerializer,
2382
        "json": JSONResponseSerializer,
2383
        "rest-json": RestJSONResponseSerializer,
2384
        "rest-xml": RestXMLResponseSerializer,
2385
        "ec2": EC2ResponseSerializer,
2386
        "smithy-rpc-v2-cbor": RpcV2CBORResponseSerializer,
2387
        # TODO: implement multi-protocol support for Kinesis, so that it can uses the `cbor` protocol and remove
2388
        #  CBOR handling from JSONResponseParser
2389
        # this is not an "official" protocol defined from the spec, but is derived from ``json``
2390
    }
2391
    service_protocol = protocol or service.protocol
1✔
2392

2393
    # Try to select a service- and protocol-specific serializer implementation
2394
    if (
1✔
2395
        service.service_name in service_specific_serializers
2396
        and service_protocol in service_specific_serializers[service.service_name]
2397
    ):
2398
        return service_specific_serializers[service.service_name][service_protocol]()
1✔
2399
    else:
2400
        # Otherwise, pick the protocol-specific serializer for the protocol of the service
2401
        return protocol_specific_serializers[service_protocol]()
1✔
2402

2403

2404
def aws_response_serializer(
1✔
2405
    service_name: str, operation: str, protocol: ProtocolName | None = None
2406
):
2407
    """
2408
    A decorator for an HTTP route that can serialize return values or exceptions into AWS responses.
2409
    This can be used to create AWS request handlers in a convenient way. Example usage::
2410

2411
        from localstack.http import route, Request
2412
        from localstack.aws.api.sqs import ListQueuesResult
2413

2414
        @route("/_aws/sqs/queues")
2415
        @aws_response_serializer("sqs", "ListQueues")
2416
        def my_route(request: Request):
2417
            if some_condition_on_request:
2418
                raise CommonServiceError("...")  # <- will be serialized into an error response
2419

2420
            return ListQueuesResult(QueueUrls=...)  # <- object from the SQS API will be serialized
2421

2422
    :param service_name: the AWS service (e.g., "sqs", "lambda")
2423
    :param protocol: the protocol of the AWS service to serialize to. If not set (by default) the default protocol
2424
                    of the service in botocore is used.
2425
    :param operation: the operation name (e.g., "ReceiveMessage", "ListFunctions")
2426
    :returns: a decorator
2427
    """
2428

2429
    def _decorate(fn):
1✔
2430
        service_model = load_service(service_name, protocol=protocol)
1✔
2431
        operation_model = service_model.operation_model(operation)
1✔
2432
        serializer = create_serializer(service_model, protocol=protocol)
1✔
2433

2434
        def _proxy(*args, **kwargs) -> WerkzeugResponse:
1✔
2435
            # extract request from function invocation (decorator can be used for methods as well as for functions).
2436
            if len(args) > 0 and isinstance(args[0], WerkzeugRequest):
1✔
2437
                # function
2438
                request = args[0]
1✔
2439
            elif len(args) > 1 and isinstance(args[1], WerkzeugRequest):
1✔
2440
                # method (arg[0] == self)
2441
                request = args[1]
1✔
2442
            elif "request" in kwargs:
1✔
2443
                request = kwargs["request"]
1✔
2444
            else:
2445
                raise ValueError(f"could not find Request in signature of function {fn}")
×
2446

2447
            # TODO: we have no context here
2448
            # TODO: maybe try to get the request ID from the headers first before generating a new one
2449
            request_id = gen_amzn_requestid()
1✔
2450

2451
            try:
1✔
2452
                response = fn(*args, **kwargs)
1✔
2453

2454
                if isinstance(response, WerkzeugResponse):
1✔
2455
                    return response
1✔
2456

2457
                return serializer.serialize_to_response(
1✔
2458
                    response, operation_model, request.headers, request_id
2459
                )
2460

2461
            except ServiceException as e:
1✔
2462
                return serializer.serialize_error_to_response(
1✔
2463
                    e, operation_model, request.headers, request_id
2464
                )
2465
            except Exception as e:
1✔
2466
                return serializer.serialize_error_to_response(
1✔
2467
                    CommonServiceException(
2468
                        "InternalError", f"An internal error occurred: {e}", status_code=500
2469
                    ),
2470
                    operation_model,
2471
                    request.headers,
2472
                    request_id,
2473
                )
2474

2475
        return _proxy
1✔
2476

2477
    return _decorate
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc