• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 17995138107

24 Sep 2025 06:34PM UTC coverage: 86.88% (-0.005%) from 86.885%
17995138107

push

github

web-flow
CloudWatch: fix MA/MR for new snapshot test `test_put_metric_alarm_escape_character` (#13190)

67743 of 77973 relevant lines covered (86.88%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.52
/localstack-core/localstack/aws/protocol/serializer.py
1
"""
2
Response serializers for the different AWS service protocols.
3

4
The module contains classes that take a service's response dict, and
5
given an operation model, serialize the HTTP response according to the
6
specified output shape.
7

8
It can be seen as the counterpart to the ``parse`` module in ``botocore``
9
(which parses the result of these serializer). It has a lot of
10
similarities with the ``serialize`` module in ``botocore``, but
11
serves a different purpose (serializing responses instead of requests).
12

13
The different protocols have many similarities. The class hierarchy is
14
designed such that the serializers share as much logic as possible.
15
The class hierarchy looks as follows:
16
::
17
                                     ┌────────────────────┐
18
                                     │ ResponseSerializer │
19
                                     └────────────────────┘
20
                                           ▲    ▲    ▲
21
                 ┌─────────────────┬───────┘    │    └──────────────┬──────────────────────┐
22
    ┌────────────┴────────────┐    │    ┌───────┴──────────────┐    │         ┌────────────┴─────────────┐
23
    │BaseXMLResponseSerializer│    │    │JSONResponseSerializer│    │         │BaseCBORResponseSerializer│
24
    └─────────────────────────┘    │    └──────────────────────┘    │         └──────────────────────────┘
25
       ▲    ▲        ┌─────────────┴────────────┐     ▲       ┌─────┴─────────────────────┐  ▲         ▲
26
       │    │        │BaseRestResponseSerializer│     │       │BaseRpcV2ResponseSerializer│  │         │
27
       │    │        └──────────────────────────┘     │       └───────────────────────────┘  │         │
28
       │    │              ▲              ▲           │                          ▲           │         │
29
       │    │              │              │           │                          │           │         │
30
       │  ┌─┴──────────────┴────────┐  ┌──┴───────────┴───────────┐   ┌──────────┴───────────┴────┐    │
31
       │  │RestXMLResponseSerializer│  │RestJSONResponseSerializer│   │RpcV2CBORResponseSerializer│    │
32
       │  └─────────────────────────┘  └──────────────────────────┘   └───────────────────────────┘    │
33
 ┌─────┴──────────────────┐                                                                 ┌──────────┴─────────────┐
34
 │QueryResponseSerializer │                                                                 │ CBORResponseSerializer │
35
 └────────────────────────┘                                                                 └────────────────────────┘
36
             ▲
37
   ┌─────────┴───────────┐
38
   │EC2ResponseSerializer│
39
   └─────────────────────┘
40
::
41

42
The ``ResponseSerializer`` contains the logic that is used among all the
43
different protocols (``query``, ``json``, ``rest-json``, ``rest-xml``, ``cbor``
44
and ``ec2``).
45
The protocols relate to each other in the following ways:
46

47
* The ``query`` and the ``rest-xml`` protocols both have XML bodies in their
48
  responses which are serialized quite similarly (with some specifics for each
49
  type).
50
* The ``json`` and the ``rest-json`` protocols both have JSON bodies in their
51
  responses which are serialized the same way.
52
* The ``cbor`` protocol is not properly defined in the spec, but mirrors the
53
  ``json`` protocol.
54
* The ``rest-json`` and ``rest-xml`` protocols serialize some metadata in
55
  the HTTP response's header fields.
56
* The ``ec2`` protocol is basically similar to the ``query`` protocol with a
57
  specific error response formatting.
58
* The ``smithy-rpc-v2-cbor`` protocol defines a specific way to route request
59
  to services via the RPC v2 trait, and encodes its body with the CBOR format.
60

61
The serializer classes in this module correspond directly to the different
62
protocols. ``#create_serializer`` shows the explicit mapping between the
63
classes and the protocols.
64
The classes are structured as follows:
65

66
* The ``ResponseSerializer`` contains all the basic logic for the
67
  serialization which is shared among all different protocols.
68
* The ``BaseXMLResponseSerializer``, ``JSONResponseSerializer`` and
69
  ``BaseCBORResponseSerializer`` contain the logic for the XML, JSON
70
  and the CBOR serialization respectively.
71
* The ``BaseRestResponseSerializer`` contains the logic for the REST
72
  protocol specifics (i.e. specific HTTP header serializations).
73
* The ``BaseRpcV2ResponseSerializer`` contains the logic for the RPC v2
74
  protocol specifics (i.e. pretty bare, does not has any specific
75
  about body serialization).
76
* The ``RestXMLResponseSerializer`` and the ``RestJSONResponseSerializer``
77
  inherit the ReST specific logic from the ``BaseRestResponseSerializer``
78
  and the XML / JSON body serialization from their second super class.
79
* The ``RpcV2CBORResponseSerializer`` inherits the RPC v2 specific logic
80
  from the ``BaseRpcV2ResponseSerializer`` and the CBOR body serialization
81
  from its second super class.
82
* The ``CBORResponseSerializer`` contains the logic specific to the
83
  non-official ``cbor`` protocol, mirroring the ``json`` protocol but
84
  with CBOR encoded body
85

86
The services and their protocols are defined by using AWS's Smithy
87
(a language to define services in a - somewhat - protocol-agnostic
88
way). The "peculiarities" in this serializer code usually correspond
89
to certain so-called "traits" in Smithy.
90

91
The result of the serialization methods is the HTTP response which can
92
be sent back to the calling client.
93
"""
94

95
import abc
1✔
96
import base64
1✔
97
import datetime
1✔
98
import functools
1✔
99
import json
1✔
100
import logging
1✔
101
import math
1✔
102
import string
1✔
103
import struct
1✔
104
from abc import ABC
1✔
105
from binascii import crc32
1✔
106
from collections.abc import Iterable, Iterator
1✔
107
from email.utils import formatdate
1✔
108
from struct import pack
1✔
109
from typing import IO, Any
1✔
110
from xml.etree import ElementTree as ETree
1✔
111

112
import xmltodict
1✔
113
from botocore.model import (
1✔
114
    ListShape,
115
    MapShape,
116
    OperationModel,
117
    ServiceModel,
118
    Shape,
119
    ShapeResolver,
120
    StringShape,
121
    StructureShape,
122
)
123
from botocore.serialize import ISO8601, ISO8601_MICRO
1✔
124
from botocore.utils import calculate_md5, is_json_value_header, parse_to_aware_datetime
1✔
125

126
# cbor2: explicitly load from private _encoder module to avoid using the (non-patched) C-version
127
from cbor2._encoder import dumps as cbor2_dumps
1✔
128
from werkzeug import Request as WerkzeugRequest
1✔
129
from werkzeug import Response as WerkzeugResponse
1✔
130
from werkzeug.datastructures import Headers, MIMEAccept
1✔
131
from werkzeug.http import parse_accept_header
1✔
132

133
from localstack.aws.api import CommonServiceException, ServiceException
1✔
134
from localstack.aws.spec import ProtocolName, load_service
1✔
135
from localstack.constants import (
1✔
136
    APPLICATION_AMZ_CBOR_1_1,
137
    APPLICATION_AMZ_JSON_1_0,
138
    APPLICATION_AMZ_JSON_1_1,
139
    APPLICATION_CBOR,
140
    APPLICATION_JSON,
141
    APPLICATION_XML,
142
    TEXT_XML,
143
)
144
from localstack.http import Response
1✔
145
from localstack.utils.common import to_bytes, to_str
1✔
146
from localstack.utils.strings import long_uid
1✔
147
from localstack.utils.xml import strip_xmlns
1✔
148

149
LOG = logging.getLogger(__name__)
1✔
150

151
REQUEST_ID_CHARACTERS = string.digits + string.ascii_uppercase
1✔
152

153

154
class ResponseSerializerError(Exception):
1✔
155
    """
156
    Error which is thrown if the request serialization fails.
157
    Super class of all exceptions raised by the serializer.
158
    """
159

160
    pass
1✔
161

162

163
class UnknownSerializerError(ResponseSerializerError):
1✔
164
    """
165
    Error which indicates that the exception raised by the serializer could be caused by invalid data or by any other
166
    (unknown) issue. Errors like this should be reported and indicate an issue in the serializer itself.
167
    """
168

169
    pass
1✔
170

171

172
class ProtocolSerializerError(ResponseSerializerError):
1✔
173
    """
174
    Error which indicates that the given data is not compliant with the service's specification and cannot be
175
    serialized. This usually results in a response to the client with an HTTP 5xx status code (internal server error).
176
    """
177

178
    pass
1✔
179

180

181
def _handle_exceptions(func):
1✔
182
    """
183
    Decorator which handles the exceptions raised by the serializer. It ensures that all exceptions raised by the public
184
    methods of the parser are instances of ResponseSerializerError.
185
    :param func: to wrap in order to add the exception handling
186
    :return: wrapped function
187
    """
188

189
    @functools.wraps(func)
1✔
190
    def wrapper(*args, **kwargs):
1✔
191
        try:
1✔
192
            return func(*args, **kwargs)
1✔
193
        except ResponseSerializerError:
1✔
194
            raise
1✔
195
        except Exception as e:
1✔
196
            raise UnknownSerializerError(
1✔
197
                "An unknown error occurred when trying to serialize the response."
198
            ) from e
199

200
    return wrapper
1✔
201

202

203
class ResponseSerializer(abc.ABC):
1✔
204
    """
205
    The response serializer is responsible for the serialization of a service implementation's result to an actual
206
    HTTP response (which will be sent to the calling client).
207
    It is the base class of all serializers and therefore contains the basic logic which is used among all of them.
208
    """
209

210
    DEFAULT_ENCODING = "utf-8"
1✔
211
    # The default timestamp format is ISO8601, but this can be overwritten by subclasses.
212
    TIMESTAMP_FORMAT = "iso8601"
1✔
213
    # Event streaming binary data type mapping for type "string"
214
    AWS_BINARY_DATA_TYPE_STRING = 7
1✔
215
    # Defines the supported mime types of the specific serializer. Sorted by priority (preferred / default first).
216
    # Needs to be specified by subclasses.
217
    SUPPORTED_MIME_TYPES: list[str] = []
1✔
218

219
    @_handle_exceptions
1✔
220
    def serialize_to_response(
1✔
221
        self,
222
        response: dict,
223
        operation_model: OperationModel,
224
        headers: dict | Headers | None,
225
        request_id: str,
226
    ) -> Response:
227
        """
228
        Takes a response dict and serializes it to an actual HttpResponse.
229

230
        :param response: to serialize
231
        :param operation_model: specification of the service & operation containing information about the shape of the
232
                                service's output / response
233
        :param headers: the headers of the incoming request this response should be serialized for. This is necessary
234
                        for features like Content-Negotiation (define response content type based on request headers).
235
        :param request_id: autogenerated AWS request ID identifying the original request
236
        :return: Response which can be sent to the calling client
237
        :raises: ResponseSerializerError (either a ProtocolSerializerError or an UnknownSerializerError)
238
        """
239

240
        # determine the preferred mime type (based on the serializer's supported mime types and the Accept header)
241
        mime_type = self._get_mime_type(headers)
1✔
242

243
        # if the operation has a streaming output, handle the serialization differently
244
        if operation_model.has_event_stream_output:
1✔
245
            return self._serialize_event_stream(response, operation_model, mime_type, request_id)
1✔
246

247
        serialized_response = self._create_default_response(operation_model, mime_type)
1✔
248
        shape = operation_model.output_shape
1✔
249
        # The shape can also be none (for empty responses), but it still needs to be serialized (to add some metadata)
250
        shape_members = shape.members if shape is not None else None
1✔
251
        self._serialize_response(
1✔
252
            response,
253
            serialized_response,
254
            shape,
255
            shape_members,
256
            operation_model,
257
            mime_type,
258
            request_id,
259
        )
260
        serialized_response = self._prepare_additional_traits_in_response(
1✔
261
            serialized_response, operation_model, request_id
262
        )
263
        return serialized_response
1✔
264

265
    @_handle_exceptions
1✔
266
    def serialize_error_to_response(
1✔
267
        self,
268
        error: ServiceException,
269
        operation_model: OperationModel,
270
        headers: dict | Headers | None,
271
        request_id: str,
272
    ) -> Response:
273
        """
274
        Takes an error instance and serializes it to an actual HttpResponse.
275
        Therefore, this method is used for errors which should be serialized and transmitted to the calling client.
276

277
        :param error: to serialize
278
        :param operation_model: specification of the service & operation containing information about the shape of the
279
                                service's output / response
280
        :param headers: the headers of the incoming request this response should be serialized for. This is necessary
281
                        for features like Content-Negotiation (define response content type based on request headers).
282
        :param request_id: autogenerated AWS request ID identifying the original request
283
        :return: HttpResponse which can be sent to the calling client
284
        :raises: ResponseSerializerError (either a ProtocolSerializerError or an UnknownSerializerError)
285
        """
286
        # determine the preferred mime type (based on the serializer's supported mime types and the Accept header)
287
        mime_type = self._get_mime_type(headers)
1✔
288

289
        # TODO implement streaming error serialization
290
        serialized_response = self._create_default_response(operation_model, mime_type)
1✔
291
        if not error or not isinstance(error, ServiceException):
1✔
292
            raise ProtocolSerializerError(
1✔
293
                f"Error to serialize ({error.__class__.__name__ if error else None}) is not a ServiceException."
294
            )
295
        shape = operation_model.service_model.shape_for_error_code(error.code)
1✔
296
        serialized_response.status_code = self._get_error_status_code(
1✔
297
            error=error,
298
            headers=headers,
299
            service_model=operation_model.service_model,
300
        )
301

302
        self._serialize_error(
1✔
303
            error, serialized_response, shape, operation_model, mime_type, request_id
304
        )
305
        serialized_response = self._prepare_additional_traits_in_response(
1✔
306
            serialized_response, operation_model, request_id
307
        )
308
        return serialized_response
1✔
309

310
    def _serialize_response(
1✔
311
        self,
312
        parameters: dict,
313
        response: Response,
314
        shape: Shape | None,
315
        shape_members: dict,
316
        operation_model: OperationModel,
317
        mime_type: str,
318
        request_id: str,
319
    ) -> None:
320
        raise NotImplementedError
321

322
    def _serialize_body_params(
1✔
323
        self,
324
        params: dict,
325
        shape: Shape,
326
        operation_model: OperationModel,
327
        mime_type: str,
328
        request_id: str,
329
    ) -> str | None:
330
        """
331
        Actually serializes the given params for the given shape to a string for the transmission in the body of the
332
        response.
333
        :param params: to serialize
334
        :param shape: to know how to serialize the params
335
        :param operation_model: for additional metadata
336
        :param mime_type: Mime type which should be used to encode the payload
337
        :param request_id: autogenerated AWS request ID identifying the original request
338
        :return: string containing the serialized body
339
        """
340
        raise NotImplementedError
341

342
    def _serialize_error(
1✔
343
        self,
344
        error: ServiceException,
345
        response: Response,
346
        shape: StructureShape,
347
        operation_model: OperationModel,
348
        mime_type: str,
349
        request_id: str,
350
    ) -> None:
351
        raise NotImplementedError
352

353
    def _serialize_event_stream(
1✔
354
        self,
355
        response: dict,
356
        operation_model: OperationModel,
357
        mime_type: str,
358
        request_id: str,
359
    ) -> Response:
360
        """
361
        Serializes a given response dict (the return payload of a service implementation) to an _event stream_ using the
362
        given operation model.
363

364
        :param response: dictionary containing the payload for the response
365
        :param operation_model: describing the operation the response dict is being returned by
366
        :param mime_type: Mime type which should be used to encode the payload
367
        :param request_id: autogenerated AWS request ID identifying the original request
368
        :return: Response which can directly be sent to the client (in chunks)
369
        """
370
        event_stream_shape = operation_model.get_event_stream_output()
1✔
371
        event_stream_member_name = operation_model.output_shape.event_stream_name
1✔
372

373
        # wrap the generator in operation specific serialization
374
        def event_stream_serializer() -> Iterable[bytes]:
1✔
375
            yield self._encode_event_payload("initial-response")
1✔
376

377
            # create a default response
378
            serialized_event_response = self._create_default_response(operation_model, mime_type)
1✔
379
            # get the members of the event stream shape
380
            event_stream_shape_members = (
1✔
381
                event_stream_shape.members if event_stream_shape is not None else None
382
            )
383
            # extract the generator from the given response data
384
            event_generator = response.get(event_stream_member_name)
1✔
385
            if not isinstance(event_generator, Iterator):
1✔
386
                raise ProtocolSerializerError(
×
387
                    "Expected iterator for streaming event serialization."
388
                )
389

390
            # yield one event per generated event
391
            for event in event_generator:
1✔
392
                # find the actual event payload (the member with event=true)
393
                event_member_shape = None
1✔
394
                event_member_name = None
1✔
395
                for member_name, member_shape in event_stream_shape_members.items():
1✔
396
                    if member_shape.serialization.get("event") and member_name in event:
1✔
397
                        event_member_shape = member_shape
1✔
398
                        event_member_name = member_name
1✔
399
                        break
1✔
400
                if event_member_shape is None:
1✔
401
                    raise UnknownSerializerError("Couldn't find event shape for serialization.")
×
402

403
                # serialize the part of the response for the event
404
                self._serialize_response(
1✔
405
                    event.get(event_member_name),
406
                    serialized_event_response,
407
                    event_member_shape,
408
                    event_member_shape.members if event_member_shape is not None else None,
409
                    operation_model,
410
                    mime_type,
411
                    request_id,
412
                )
413
                # execute additional response traits (might be modifying the response)
414
                serialized_event_response = self._prepare_additional_traits_in_response(
1✔
415
                    serialized_event_response, operation_model, request_id
416
                )
417
                # encode the event and yield it
418
                yield self._encode_event_payload(
1✔
419
                    event_type=event_member_name, content=serialized_event_response.data
420
                )
421

422
        return Response(
1✔
423
            response=event_stream_serializer(),
424
            status=operation_model.http.get("responseCode", 200),
425
        )
426

427
    def _encode_event_payload(
1✔
428
        self,
429
        event_type: str,
430
        content: str | bytes = "",
431
        error_code: str | None = None,
432
        error_message: str | None = None,
433
    ) -> bytes:
434
        """
435
        Encodes the given event payload according to AWS specific binary event encoding.
436
        A specification of the format can be found in the AWS docs:
437
        https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html
438

439
        :param content: string or bytes of the event payload
440
        :param event_type: type of the event. Usually the name of the event shape or specific event types like
441
                            "initial-response".
442
        :param error_code: Optional. Error code if the payload represents an error.
443
        :param error_message: Optional. Error message if the payload represents an error.
444
        :return: bytes with the AWS-specific encoded event payload
445
        """
446

447
        # determine the event type (error if an error message or an error code is set)
448
        if error_message or error_code:
1✔
449
            message_type = "error"
×
450
        else:
451
            message_type = "event"
1✔
452

453
        # set the headers
454
        headers = {":event-type": event_type, ":message-type": message_type}
1✔
455
        if error_message:
1✔
456
            headers[":error-message"] = error_message
×
457
        if error_code:
1✔
458
            headers[":error-code"] = error_code
×
459

460
        # construct headers
461
        header_section = b""
1✔
462
        for key, value in headers.items():
1✔
463
            header_name = key.encode(self.DEFAULT_ENCODING)
1✔
464
            header_value = to_bytes(value)
1✔
465
            header_section += pack("!B", len(header_name))
1✔
466
            header_section += header_name
1✔
467
            header_section += pack("!B", self.AWS_BINARY_DATA_TYPE_STRING)
1✔
468
            header_section += pack("!H", len(header_value))
1✔
469
            header_section += header_value
1✔
470

471
        # construct body
472
        if isinstance(content, str):
1✔
473
            payload = bytes(content, self.DEFAULT_ENCODING)
1✔
474
        else:
475
            payload = content
1✔
476

477
        # calculate lengths
478
        headers_length = len(header_section)
1✔
479
        payload_length = len(payload)
1✔
480

481
        # construct message
482
        # - prelude
483
        result = pack("!I", payload_length + headers_length + 16)
1✔
484
        result += pack("!I", headers_length)
1✔
485
        # - prelude crc
486
        prelude_crc = crc32(result)
1✔
487
        result += pack("!I", prelude_crc)
1✔
488
        # - headers
489
        result += header_section
1✔
490
        # - payload
491
        result += payload
1✔
492
        # - message crc
493
        payload_crc = crc32(result)
1✔
494
        result += pack("!I", payload_crc)
1✔
495

496
        return result
1✔
497

498
    def _create_default_response(self, operation_model: OperationModel, mime_type: str) -> Response:
1✔
499
        """
500
        Creates a boilerplate default response to be used by subclasses as starting points.
501
        Uses the default HTTP response status code defined in the operation model (if defined), otherwise 200.
502

503
        :param operation_model: to extract the default HTTP status code
504
        :param mime_type: Mime type which should be used to encode the payload
505
        :return: boilerplate HTTP response
506
        """
507
        return Response(status=operation_model.http.get("responseCode", 200))
1✔
508

509
    def _get_mime_type(self, headers: dict | Headers | None) -> str:
1✔
510
        """
511
        Extracts the accepted mime type from the request headers and returns a matching, supported mime type for the
512
        serializer or the default mime type of the service if there is no match.
513
        :param headers: to extract the "Accept" header from
514
        :return: preferred mime type to be used by the serializer (if it is not accepted by the client,
515
                 an error is logged)
516
        """
517
        accept_header = None
1✔
518
        if headers and "Accept" in headers and not headers.get("Accept") == "*/*":
1✔
519
            accept_header = headers.get("Accept")
1✔
520
        elif headers and headers.get("Content-Type"):
1✔
521
            # If there is no specific Accept header given, we use the given Content-Type as a fallback.
522
            # i.e. if the request content was JSON encoded and the client doesn't send a specific an Accept header, the
523
            # serializer should prefer JSON encoding.
524
            content_type = headers.get("Content-Type")
1✔
525
            LOG.debug(
1✔
526
                "No accept header given. Using request's Content-Type (%s) as preferred response Content-Type.",
527
                content_type,
528
            )
529
            accept_header = content_type + ", */*"
1✔
530
        mime_accept: MIMEAccept = parse_accept_header(accept_header, MIMEAccept)
1✔
531
        mime_type = mime_accept.best_match(self.SUPPORTED_MIME_TYPES)
1✔
532
        if not mime_type:
1✔
533
            # There is no match between the supported mime types and the requested one(s)
534
            mime_type = self.SUPPORTED_MIME_TYPES[0]
1✔
535
            LOG.debug(
1✔
536
                "Determined accept type (%s) is not supported by this serializer. Using default of this serializer: %s",
537
                accept_header,
538
                mime_type,
539
            )
540
        return mime_type
1✔
541

542
    # Some extra utility methods subclasses can use.
543

544
    @staticmethod
1✔
545
    def _timestamp_iso8601(value: datetime.datetime) -> str:
1✔
546
        if value.microsecond > 0:
1✔
547
            timestamp_format = ISO8601_MICRO
1✔
548
        else:
549
            timestamp_format = ISO8601
1✔
550
        return value.strftime(timestamp_format)
1✔
551

552
    @staticmethod
1✔
553
    def _timestamp_unixtimestamp(value: datetime.datetime) -> float:
1✔
554
        return value.timestamp()
1✔
555

556
    def _timestamp_rfc822(self, value: datetime.datetime) -> str:
1✔
557
        if isinstance(value, datetime.datetime):
1✔
558
            value = self._timestamp_unixtimestamp(value)
1✔
559
        return formatdate(value, usegmt=True)
1✔
560

561
    def _convert_timestamp_to_str(
1✔
562
        self, value: int | str | datetime.datetime, timestamp_format=None
563
    ) -> str:
564
        if timestamp_format is None:
1✔
565
            timestamp_format = self.TIMESTAMP_FORMAT
1✔
566
        timestamp_format = timestamp_format.lower()
1✔
567
        datetime_obj = parse_to_aware_datetime(value)
1✔
568
        converter = getattr(self, f"_timestamp_{timestamp_format}")
1✔
569
        final_value = converter(datetime_obj)
1✔
570
        return final_value
1✔
571

572
    @staticmethod
1✔
573
    def _get_serialized_name(shape: Shape, default_name: str) -> str:
1✔
574
        """
575
        Returns the serialized name for the shape if it exists.
576
        Otherwise, it will return the passed in default_name.
577
        """
578
        return shape.serialization.get("name", default_name)
1✔
579

580
    def _get_base64(self, value: str | bytes):
1✔
581
        """
582
        Returns the base64-encoded version of value, handling
583
        both strings and bytes. The returned value is a string
584
        via the default encoding.
585
        """
586
        if isinstance(value, str):
1✔
587
            value = value.encode(self.DEFAULT_ENCODING)
×
588
        return base64.b64encode(value).strip().decode(self.DEFAULT_ENCODING)
1✔
589

590
    def _encode_payload(self, body: bytes | str) -> bytes:
1✔
591
        if isinstance(body, str):
1✔
592
            return body.encode(self.DEFAULT_ENCODING)
1✔
593
        return body
1✔
594

595
    def _prepare_additional_traits_in_response(
1✔
596
        self, response: Response, operation_model: OperationModel, request_id: str
597
    ):
598
        """Applies additional traits on the raw response for a given model or protocol."""
599
        if operation_model.http_checksum_required:
1✔
600
            self._add_md5_header(response)
×
601
        return response
1✔
602

603
    def _has_header(self, header_name: str, headers: dict):
1✔
604
        """Case-insensitive check for header key."""
605
        if header_name is None:
1✔
606
            return False
×
607
        else:
608
            return header_name.lower() in [key.lower() for key in headers.keys()]
1✔
609

610
    def _add_md5_header(self, response: Response):
1✔
611
        """Add a Content-MD5 header if not yet there. Adapted from botocore.utils"""
612
        headers = response.headers
×
613
        body = response.data
×
614
        if body is not None and "Content-MD5" not in headers:
×
615
            md5_digest = calculate_md5(body)
×
616
            headers["Content-MD5"] = md5_digest
×
617

618
    def _get_error_message(self, error: Exception) -> str | None:
1✔
619
        return str(error) if error is not None and str(error) != "None" else None
1✔
620

621
    def _get_error_status_code(
1✔
622
        self, error: ServiceException, headers: Headers, service_model: ServiceModel
623
    ) -> int:
624
        return error.status_code
1✔
625

626

627
class QueryCompatibleProtocolMixin:
1✔
628
    def _get_error_status_code(
1✔
629
        self, error: ServiceException, headers: dict | Headers | None, service_model: ServiceModel
630
    ) -> int:
631
        # by default, some protocols (namely `json` and `smithy-rpc-v2-cbor`) might not define exception status code in
632
        # their specs, so they are not defined in the `ServiceException` object and will use the default value of `400`
633
        # But Query compatible service always do define them, so we get the wrong code for service that are
634
        # multi-protocols like CloudWatch
635
        # we need to verify if the service is compatible, and if the client has requested the query compatible error
636
        # code to return the right value
637
        if not service_model.is_query_compatible:
1✔
638
            return error.status_code
1✔
639

640
        if headers and headers.get("x-amzn-query-mode") == "true":
1✔
641
            return error.status_code
1✔
642

643
        # we only want to override status code 4XX
644
        if 400 < error.status_code <= 499:
1✔
645
            return 400
1✔
646

647
        return error.status_code
1✔
648

649
    def _add_query_compatible_error_header(self, response: Response, error: ServiceException):
1✔
650
        """
651
        Add an `x-amzn-query-error` header for client to  translate errors codes from former `query` services
652
        into other protocols.
653
        """
654

655
        sender_fault = "Sender" if error.sender_fault else "Receiver"
1✔
656
        response.headers["x-amzn-query-error"] = f"{error.code};{sender_fault}"
1✔
657

658
    def _get_error_code(
1✔
659
        self, is_query_compatible: bool, error: ServiceException, shape: Shape | None = None
660
    ):
661
        # if the operation is query compatible, we need to add to use shape name
662
        if is_query_compatible:
1✔
663
            if shape:
1✔
664
                code = shape.name
1✔
665
            else:
666
                # if the shape is not defined, we are using the Exception named to derive the `Code`, like you would
667
                # from the shape. This allows us to have Exception that are valid in multi-protocols by defining its
668
                # code and its name to be different
669
                code = error.__class__.__name__
1✔
670
        else:
671
            code = error.code
1✔
672

673
        return code
1✔
674

675

676
class BaseXMLResponseSerializer(ResponseSerializer):
1✔
677
    """
678
    The BaseXMLResponseSerializer performs the basic logic for the XML response serialization.
679
    It is slightly adapted by the QueryResponseSerializer.
680
    While the botocore's RestXMLSerializer is quite similar, there are some subtle differences (since botocore's
681
    implementation handles the serialization of the requests from the client to the service, not the responses from the
682
    service to the client).
683
    """
684

685
    SUPPORTED_MIME_TYPES = [TEXT_XML, APPLICATION_XML, APPLICATION_JSON]
1✔
686

687
    def _serialize_error(
1✔
688
        self,
689
        error: ServiceException,
690
        response: Response,
691
        shape: StructureShape,
692
        operation_model: OperationModel,
693
        mime_type: str,
694
        request_id: str,
695
    ) -> None:
696
        # Check if we need to add a namespace
697
        attr = (
1✔
698
            {"xmlns": operation_model.metadata.get("xmlNamespace")}
699
            if "xmlNamespace" in operation_model.metadata
700
            else {}
701
        )
702
        root = ETree.Element("ErrorResponse", attr)
1✔
703

704
        error_tag = ETree.SubElement(root, "Error")
1✔
705
        self._add_error_tags(error, error_tag, mime_type)
1✔
706
        request_id_element = ETree.SubElement(root, "RequestId")
1✔
707
        request_id_element.text = request_id
1✔
708

709
        self._add_additional_error_tags(vars(error), root, shape, mime_type)
1✔
710

711
        response.set_response(self._encode_payload(self._node_to_string(root, mime_type)))
1✔
712

713
    def _add_error_tags(
1✔
714
        self, error: ServiceException, error_tag: ETree.Element, mime_type: str
715
    ) -> None:
716
        code_tag = ETree.SubElement(error_tag, "Code")
1✔
717
        code_tag.text = error.code
1✔
718
        message = self._get_error_message(error)
1✔
719
        if message:
1✔
720
            self._default_serialize(error_tag, message, None, "Message", mime_type)
1✔
721
        if error.sender_fault:
1✔
722
            # The sender fault is either not set or "Sender"
723
            self._default_serialize(error_tag, "Sender", None, "Type", mime_type)
1✔
724

725
    def _add_additional_error_tags(
1✔
726
        self, parameters: dict, node: ETree, shape: StructureShape, mime_type: str
727
    ):
728
        if shape:
1✔
729
            params = {}
1✔
730
            # TODO add a possibility to serialize simple non-modelled errors (like S3 NoSuchBucket#BucketName)
731
            for member in shape.members:
1✔
732
                # XML protocols do not add modeled default fields to the root node
733
                # (tested for cloudfront, route53, cloudwatch, iam)
734
                if member.lower() not in ["code", "message"] and member in parameters:
1✔
735
                    params[member] = parameters[member]
1✔
736

737
            # If there is an error shape with members which should be set, they need to be added to the node
738
            if params:
1✔
739
                # Serialize the remaining params
740
                root_name = shape.serialization.get("name", shape.name)
1✔
741
                pseudo_root = ETree.Element("")
1✔
742
                self._serialize(shape, params, pseudo_root, root_name, mime_type)
1✔
743
                real_root = list(pseudo_root)[0]
1✔
744
                # Add the child elements to the already created root error element
745
                for child in list(real_root):
1✔
746
                    node.append(child)
1✔
747

748
    def _serialize_body_params(
1✔
749
        self,
750
        params: dict,
751
        shape: Shape,
752
        operation_model: OperationModel,
753
        mime_type: str,
754
        request_id: str,
755
    ) -> str | None:
756
        root = self._serialize_body_params_to_xml(params, shape, operation_model, mime_type)
1✔
757
        self._prepare_additional_traits_in_xml(root, request_id)
1✔
758
        return self._node_to_string(root, mime_type)
1✔
759

760
    def _serialize_body_params_to_xml(
1✔
761
        self, params: dict, shape: Shape, operation_model: OperationModel, mime_type: str
762
    ) -> ETree.Element | None:
763
        if shape is None:
1✔
764
            return
1✔
765
        # The botocore serializer expects `shape.serialization["name"]`, but this isn't always present for responses
766
        root_name = shape.serialization.get("name", shape.name)
1✔
767
        pseudo_root = ETree.Element("")
1✔
768
        self._serialize(shape, params, pseudo_root, root_name, mime_type)
1✔
769
        real_root = list(pseudo_root)[0]
1✔
770
        return real_root
1✔
771

772
    def _serialize(
1✔
773
        self, shape: Shape, params: Any, xmlnode: ETree.Element, name: str, mime_type: str
774
    ) -> None:
775
        """This method dynamically invokes the correct `_serialize_type_*` method for each shape type."""
776
        if shape is None:
1✔
777
            return
×
778
        # Some output shapes define a `resultWrapper` in their serialization spec.
779
        # While the name would imply that the result is _wrapped_, it is actually renamed.
780
        if shape.serialization.get("resultWrapper"):
1✔
781
            name = shape.serialization.get("resultWrapper")
1✔
782

783
        try:
1✔
784
            method = getattr(self, f"_serialize_type_{shape.type_name}", self._default_serialize)
1✔
785
            method(xmlnode, params, shape, name, mime_type)
1✔
786
        except (TypeError, ValueError, AttributeError) as e:
1✔
787
            raise ProtocolSerializerError(
1✔
788
                f"Invalid type when serializing {shape.name}: '{xmlnode}' cannot be parsed to {shape.type_name}."
789
            ) from e
790

791
    def _serialize_type_structure(
1✔
792
        self, xmlnode: ETree.Element, params: dict, shape: StructureShape, name: str, mime_type
793
    ) -> None:
794
        structure_node = ETree.SubElement(xmlnode, name)
1✔
795

796
        if "xmlNamespace" in shape.serialization:
1✔
797
            namespace_metadata = shape.serialization["xmlNamespace"]
1✔
798
            attribute_name = "xmlns"
1✔
799
            if namespace_metadata.get("prefix"):
1✔
800
                attribute_name += ":{}".format(namespace_metadata["prefix"])
1✔
801
            structure_node.attrib[attribute_name] = namespace_metadata["uri"]
1✔
802
        for key, value in params.items():
1✔
803
            if value is None:
1✔
804
                # Don't serialize any param whose value is None.
805
                continue
1✔
806
            try:
1✔
807
                member_shape = shape.members[key]
1✔
808
            except KeyError:
1✔
809
                LOG.warning(
1✔
810
                    "Response object %s contains a member which is not specified: %s",
811
                    shape.name,
812
                    key,
813
                )
814
                continue
1✔
815
            member_name = member_shape.serialization.get("name", key)
1✔
816
            # We need to special case member shapes that are marked as an xmlAttribute.
817
            # Rather than serializing into an XML child node, we instead serialize the shape to
818
            # an XML attribute of the *current* node.
819
            if member_shape.serialization.get("xmlAttribute"):
1✔
820
                # xmlAttributes must have a serialization name.
821
                xml_attribute_name = member_shape.serialization["name"]
1✔
822
                structure_node.attrib[xml_attribute_name] = value
1✔
823
                continue
1✔
824
            self._serialize(member_shape, value, structure_node, member_name, mime_type)
1✔
825

826
    def _serialize_type_list(
1✔
827
        self, xmlnode: ETree.Element, params: list, shape: ListShape, name: str, mime_type: str
828
    ) -> None:
829
        if params is None:
1✔
830
            # Don't serialize any param whose value is None.
831
            return
×
832
        member_shape = shape.member
1✔
833
        if shape.serialization.get("flattened"):
1✔
834
            # If the list is flattened, either take the member's "name" or the name of the usual name for the parent
835
            # element for the children.
836
            element_name = self._get_serialized_name(member_shape, name)
1✔
837
            list_node = xmlnode
1✔
838
        else:
839
            element_name = self._get_serialized_name(member_shape, "member")
1✔
840
            list_node = ETree.SubElement(xmlnode, name)
1✔
841
        for item in params:
1✔
842
            # Don't serialize any item which is None
843
            if item is not None:
1✔
844
                self._serialize(member_shape, item, list_node, element_name, mime_type)
1✔
845

846
    def _serialize_type_map(
1✔
847
        self, xmlnode: ETree.Element, params: dict, shape: MapShape, name: str, mime_type: str
848
    ) -> None:
849
        """
850
        Given the ``name`` of MyMap, an input of {"key1": "val1", "key2": "val2"}, and the ``flattened: False``
851
        we serialize this as:
852
          <MyMap>
853
            <entry>
854
              <key>key1</key>
855
              <value>val1</value>
856
            </entry>
857
            <entry>
858
              <key>key2</key>
859
              <value>val2</value>
860
            </entry>
861
          </MyMap>
862
        If it is flattened, it is serialized as follows:
863
          <MyMap>
864
            <key>key1</key>
865
            <value>val1</value>
866
          </MyMap>
867
          <MyMap>
868
            <key>key2</key>
869
            <value>val2</value>
870
          </MyMap>
871
        """
872
        if params is None:
1✔
873
            # Don't serialize a non-existing map
874
            return
×
875
        if shape.serialization.get("flattened"):
1✔
876
            entries_node = xmlnode
1✔
877
            entry_node_name = name
1✔
878
        else:
879
            entries_node = ETree.SubElement(xmlnode, name)
1✔
880
            entry_node_name = "entry"
1✔
881

882
        for key, value in params.items():
1✔
883
            if value is None:
1✔
884
                # Don't serialize any param whose value is None.
885
                continue
×
886
            entry_node = ETree.SubElement(entries_node, entry_node_name)
1✔
887
            key_name = self._get_serialized_name(shape.key, default_name="key")
1✔
888
            val_name = self._get_serialized_name(shape.value, default_name="value")
1✔
889
            self._serialize(shape.key, key, entry_node, key_name, mime_type)
1✔
890
            self._serialize(shape.value, value, entry_node, val_name, mime_type)
1✔
891

892
    @staticmethod
1✔
893
    def _serialize_type_boolean(xmlnode: ETree.Element, params: bool, _, name: str, __) -> None:
1✔
894
        """
895
        For scalar types, the 'params' attr is actually just a scalar value representing the data
896
        we need to serialize as a boolean. It will either be 'true' or 'false'
897
        """
898
        node = ETree.SubElement(xmlnode, name)
1✔
899
        if params:
1✔
900
            str_value = "true"
1✔
901
        else:
902
            str_value = "false"
1✔
903
        node.text = str_value
1✔
904

905
    def _serialize_type_blob(
1✔
906
        self, xmlnode: ETree.Element, params: str | bytes, _, name: str, __
907
    ) -> None:
908
        node = ETree.SubElement(xmlnode, name)
1✔
909
        node.text = self._get_base64(params)
1✔
910

911
    def _serialize_type_timestamp(
1✔
912
        self, xmlnode: ETree.Element, params: str, shape: Shape, name: str, mime_type: str
913
    ) -> None:
914
        node = ETree.SubElement(xmlnode, name)
1✔
915
        if mime_type != APPLICATION_JSON:
1✔
916
            # Default XML timestamp serialization
917
            node.text = self._convert_timestamp_to_str(
1✔
918
                params, shape.serialization.get("timestampFormat")
919
            )
920
        else:
921
            # For services with XML protocols, where the Accept header is JSON, timestamps are formatted like for JSON
922
            # protocols, but using the int representation instead of the float representation (f.e. requesting JSON
923
            # responses in STS).
924
            node.text = str(
1✔
925
                int(self._convert_timestamp_to_str(params, JSONResponseSerializer.TIMESTAMP_FORMAT))
926
            )
927

928
    def _default_serialize(self, xmlnode: ETree.Element, params: str, _, name: str, __) -> None:
1✔
929
        node = ETree.SubElement(xmlnode, name)
1✔
930
        node.text = str(params)
1✔
931

932
    def _prepare_additional_traits_in_xml(self, root: ETree.Element | None, request_id: str):
1✔
933
        """
934
        Prepares the XML root node before being serialized with additional traits (like the Response ID in the Query
935
        protocol).
936
        For some protocols (like rest-xml), the root can be None.
937
        """
938
        pass
1✔
939

940
    def _create_default_response(self, operation_model: OperationModel, mime_type: str) -> Response:
1✔
941
        response = super()._create_default_response(operation_model, mime_type)
1✔
942
        response.headers["Content-Type"] = mime_type
1✔
943
        return response
1✔
944

945
    def _node_to_string(self, root: ETree.Element | None, mime_type: str) -> str | None:
1✔
946
        """Generates the string representation of the given XML element."""
947
        if root is not None:
1✔
948
            content = ETree.tostring(
1✔
949
                element=root, encoding=self.DEFAULT_ENCODING, xml_declaration=True
950
            )
951
            if mime_type == APPLICATION_JSON:
1✔
952
                # FIXME try to directly convert the ElementTree node to JSON
953
                xml_dict = xmltodict.parse(content)
1✔
954
                xml_dict = strip_xmlns(xml_dict)
1✔
955
                content = json.dumps(xml_dict)
1✔
956
            return content
1✔
957

958

959
class BaseRestResponseSerializer(ResponseSerializer, ABC):
1✔
960
    """
961
    The BaseRestResponseSerializer performs the basic logic for the ReST response serialization.
962
    In our case it basically only adds the request metadata to the HTTP header.
963
    """
964

965
    HEADER_TIMESTAMP_FORMAT = "rfc822"
1✔
966

967
    def _serialize_response(
1✔
968
        self,
969
        parameters: dict,
970
        response: Response,
971
        shape: Shape | None,
972
        shape_members: dict,
973
        operation_model: OperationModel,
974
        mime_type: str,
975
        request_id: str,
976
    ) -> None:
977
        header_params, payload_params = self._partition_members(parameters, shape)
1✔
978
        self._process_header_members(header_params, response, shape)
1✔
979
        # "HEAD" responses are basically "GET" responses without the actual body.
980
        # Do not process the body payload in this case (setting a body could also manipulate the headers)
981
        if operation_model.http.get("method") != "HEAD":
1✔
982
            self._serialize_payload(
1✔
983
                payload_params,
984
                response,
985
                shape,
986
                shape_members,
987
                operation_model,
988
                mime_type,
989
                request_id,
990
            )
991
        self._serialize_content_type(response, shape, shape_members, mime_type)
1✔
992
        self._prepare_additional_traits_in_response(response, operation_model, request_id)
1✔
993

994
    def _serialize_payload(
1✔
995
        self,
996
        parameters: dict,
997
        response: Response,
998
        shape: Shape | None,
999
        shape_members: dict,
1000
        operation_model: OperationModel,
1001
        mime_type: str,
1002
        request_id: str,
1003
    ) -> None:
1004
        """
1005
        Serializes the given payload.
1006

1007
        :param parameters: The user input params
1008
        :param response: The final serialized Response
1009
        :param shape: Describes the expected output shape (can be None in case of an "empty" response)
1010
        :param shape_members: The members of the output struct shape
1011
        :param operation_model: The specification of the operation of which the response is serialized here
1012
        :param mime_type: Mime type which should be used to encode the payload
1013
        :param request_id: autogenerated AWS request ID identifying the original request
1014
        :return: None - the given `serialized` dict is modified
1015
        """
1016
        if shape is None:
1✔
1017
            return
1✔
1018

1019
        payload_member = shape.serialization.get("payload")
1✔
1020
        # If this shape is defined as being an event, we need to search for the payload member
1021
        if not payload_member and shape.serialization.get("event"):
1✔
1022
            for member_name, member_shape in shape_members.items():
1✔
1023
                # Try to find the first shape which is marked as "eventpayload" and is given in the params dict
1024
                if member_shape.serialization.get("eventpayload") and parameters.get(member_name):
1✔
1025
                    payload_member = member_name
1✔
1026
                    break
1✔
1027
        if payload_member is not None and shape_members[payload_member].type_name in [
1✔
1028
            "blob",
1029
            "string",
1030
        ]:
1031
            # If it's streaming, then the body is just the value of the payload.
1032
            body_payload = parameters.get(payload_member, b"")
1✔
1033
            body_payload = self._encode_payload(body_payload)
1✔
1034
            response.set_response(body_payload)
1✔
1035
        elif payload_member is not None:
1✔
1036
            # If there's a payload member, we serialized that member to the body.
1037
            body_params = parameters.get(payload_member)
1✔
1038
            if body_params is not None:
1✔
1039
                response.set_response(
1✔
1040
                    self._encode_payload(
1041
                        self._serialize_body_params(
1042
                            body_params,
1043
                            shape_members[payload_member],
1044
                            operation_model,
1045
                            mime_type,
1046
                            request_id,
1047
                        )
1048
                    )
1049
                )
1050
        else:
1051
            # Otherwise, we use the "traditional" way of serializing the whole parameters dict recursively.
1052
            response.set_response(
1✔
1053
                self._encode_payload(
1054
                    self._serialize_body_params(
1055
                        parameters, shape, operation_model, mime_type, request_id
1056
                    )
1057
                )
1058
            )
1059

1060
    def _serialize_content_type(
1✔
1061
        self, serialized: Response, shape: Shape, shape_members: dict, mime_type: str
1062
    ):
1063
        """
1064
        Some protocols require varied Content-Type headers depending on user input.
1065
        This allows subclasses to apply this conditionally.
1066
        """
1067
        pass
1✔
1068

1069
    def _has_streaming_payload(self, payload: str | None, shape_members):
1✔
1070
        """Determine if payload is streaming (a blob or string)."""
1071
        return payload is not None and shape_members[payload].type_name in ["blob", "string"]
1✔
1072

1073
    def _prepare_additional_traits_in_response(
1✔
1074
        self, response: Response, operation_model: OperationModel, request_id: str
1075
    ):
1076
        """Adds the request ID to the headers (in contrast to the body - as in the Query protocol)."""
1077
        response = super()._prepare_additional_traits_in_response(
1✔
1078
            response, operation_model, request_id
1079
        )
1080
        response.headers["x-amz-request-id"] = request_id
1✔
1081
        return response
1✔
1082

1083
    def _process_header_members(self, parameters: dict, response: Response, shape: Shape):
1✔
1084
        shape_members = shape.members if isinstance(shape, StructureShape) else []
1✔
1085
        for name in shape_members:
1✔
1086
            member_shape = shape_members[name]
1✔
1087
            location = member_shape.serialization.get("location")
1✔
1088
            if not location:
1✔
1089
                continue
1✔
1090
            if name not in parameters:
1✔
1091
                # ignores optional keys
1092
                continue
1✔
1093
            key = member_shape.serialization.get("name", name)
1✔
1094
            value = parameters[name]
1✔
1095
            if value is None:
1✔
1096
                continue
×
1097
            if location == "header":
1✔
1098
                response.headers[key] = self._serialize_header_value(member_shape, value)
1✔
1099
            elif location == "headers":
1✔
1100
                header_prefix = key
1✔
1101
                self._serialize_header_map(header_prefix, response, value)
1✔
1102
            elif location == "statusCode":
1✔
1103
                response.status_code = int(value)
1✔
1104

1105
    def _serialize_header_map(self, prefix: str, response: Response, params: dict) -> None:
1✔
1106
        """Serializes the header map for the location trait "headers"."""
1107
        for key, val in params.items():
1✔
1108
            actual_key = prefix + key
1✔
1109
            response.headers[actual_key] = val
1✔
1110

1111
    def _serialize_header_value(self, shape: Shape, value: Any):
1✔
1112
        """Serializes a value for the location trait "header"."""
1113
        if shape.type_name == "timestamp":
1✔
1114
            datetime_obj = parse_to_aware_datetime(value)
1✔
1115
            timestamp_format = shape.serialization.get(
1✔
1116
                "timestampFormat", self.HEADER_TIMESTAMP_FORMAT
1117
            )
1118
            return self._convert_timestamp_to_str(datetime_obj, timestamp_format)
1✔
1119
        elif shape.type_name == "list":
1✔
1120
            converted_value = [
×
1121
                self._serialize_header_value(shape.member, v) for v in value if v is not None
1122
            ]
1123
            return ",".join(converted_value)
×
1124
        elif shape.type_name == "boolean":
1✔
1125
            # Set the header value to "true" if the given value is truthy, otherwise set the header value to "false".
1126
            return "true" if value else "false"
1✔
1127
        elif is_json_value_header(shape):
1✔
1128
            # Serialize with no spaces after separators to save space in
1129
            # the header.
1130
            return self._get_base64(json.dumps(value, separators=(",", ":")))
×
1131
        else:
1132
            return value
1✔
1133

1134
    def _partition_members(self, parameters: dict, shape: Shape | None) -> tuple[dict, dict]:
1✔
1135
        """Separates the top-level keys in the given parameters dict into header- and payload-located params."""
1136
        if not isinstance(shape, StructureShape):
1✔
1137
            # If the shape isn't a structure, we default to the whole response being parsed in the body.
1138
            # Non-payload members are only loaded in the top-level hierarchy and those are always structures.
1139
            return {}, parameters
1✔
1140
        header_params = {}
1✔
1141
        payload_params = {}
1✔
1142
        shape_members = shape.members
1✔
1143
        for name in shape_members:
1✔
1144
            member_shape = shape_members[name]
1✔
1145
            if name not in parameters:
1✔
1146
                continue
1✔
1147
            location = member_shape.serialization.get("location")
1✔
1148
            if location:
1✔
1149
                header_params[name] = parameters[name]
1✔
1150
            else:
1151
                payload_params[name] = parameters[name]
1✔
1152
        return header_params, payload_params
1✔
1153

1154

1155
class RestXMLResponseSerializer(BaseRestResponseSerializer, BaseXMLResponseSerializer):
1✔
1156
    """
1157
    The ``RestXMLResponseSerializer`` is responsible for the serialization of responses from services with the
1158
    ``rest-xml`` protocol.
1159
    It combines the ``BaseRestResponseSerializer`` (for the ReST specific logic) with the ``BaseXMLResponseSerializer``
1160
    (for the XML body response serialization).
1161
    """
1162

1163
    pass
1✔
1164

1165

1166
class QueryResponseSerializer(BaseXMLResponseSerializer):
1✔
1167
    """
1168
    The ``QueryResponseSerializer`` is responsible for the serialization of responses from services which use the
1169
    ``query`` protocol. The responses of these services also use XML. It is basically a subset of the features, since it
1170
    does not allow any payload or location traits.
1171
    """
1172

1173
    def _serialize_response(
1✔
1174
        self,
1175
        parameters: dict,
1176
        response: Response,
1177
        shape: Shape | None,
1178
        shape_members: dict,
1179
        operation_model: OperationModel,
1180
        mime_type: str,
1181
        request_id: str,
1182
    ) -> None:
1183
        """
1184
        Serializes the given parameters as XML for the query protocol.
1185

1186
        :param parameters: The user input params
1187
        :param response: The final serialized Response
1188
        :param shape: Describes the expected output shape (can be None in case of an "empty" response)
1189
        :param shape_members: The members of the output struct shape
1190
        :param operation_model: The specification of the operation of which the response is serialized here
1191
        :param mime_type: Mime type which should be used to encode the payload
1192
        :param request_id: autogenerated AWS request ID identifying the original request
1193
        :return: None - the given `serialized` dict is modified
1194
        """
1195
        response.set_response(
1✔
1196
            self._encode_payload(
1197
                self._serialize_body_params(
1198
                    parameters, shape, operation_model, mime_type, request_id
1199
                )
1200
            )
1201
        )
1202

1203
    def _serialize_body_params_to_xml(
1✔
1204
        self, params: dict, shape: Shape, operation_model: OperationModel, mime_type: str
1205
    ) -> ETree.Element:
1206
        # The Query protocol responses have a root element which is not contained in the specification file.
1207
        # Therefore, we first call the super function to perform the normal XML serialization, and afterwards wrap the
1208
        # result in a root element based on the operation name.
1209
        node = super()._serialize_body_params_to_xml(params, shape, operation_model, mime_type)
1✔
1210

1211
        # Check if we need to add a namespace
1212
        attr = (
1✔
1213
            {"xmlns": operation_model.metadata.get("xmlNamespace")}
1214
            if "xmlNamespace" in operation_model.metadata
1215
            else None
1216
        )
1217

1218
        # Create the root element and add the result of the XML serializer as a child node
1219
        root = ETree.Element(f"{operation_model.name}Response", attr)
1✔
1220
        if node is not None:
1✔
1221
            root.append(node)
1✔
1222
        return root
1✔
1223

1224
    def _prepare_additional_traits_in_xml(self, root: ETree.Element | None, request_id: str):
1✔
1225
        # Add the response metadata here (it's not defined in the specs)
1226
        # For the ec2 and the query protocol, the root cannot be None at this time.
1227
        response_metadata = ETree.SubElement(root, "ResponseMetadata")
1✔
1228
        request_id_element = ETree.SubElement(response_metadata, "RequestId")
1✔
1229
        request_id_element.text = request_id
1✔
1230

1231

1232
class EC2ResponseSerializer(QueryResponseSerializer):
1✔
1233
    """
1234
    The ``EC2ResponseSerializer`` is responsible for the serialization of responses from services which use the
1235
    ``ec2`` protocol (basically the EC2 service). This protocol is basically equal to the ``query`` protocol with only
1236
    a few subtle differences.
1237
    """
1238

1239
    def _serialize_error(
1✔
1240
        self,
1241
        error: ServiceException,
1242
        response: Response,
1243
        shape: StructureShape,
1244
        operation_model: OperationModel,
1245
        mime_type: str,
1246
        request_id: str,
1247
    ) -> None:
1248
        # EC2 errors look like:
1249
        # <Response>
1250
        #   <Errors>
1251
        #     <Error>
1252
        #       <Code>InvalidInstanceID.Malformed</Code>
1253
        #       <Message>Invalid id: "1343124"</Message>
1254
        #     </Error>
1255
        #   </Errors>
1256
        #   <RequestID>12345</RequestID>
1257
        # </Response>
1258
        # This is different from QueryParser in that it's RequestID, not RequestId
1259
        # and that the Error tag is in an enclosing Errors tag.
1260
        attr = (
1✔
1261
            {"xmlns": operation_model.metadata.get("xmlNamespace")}
1262
            if "xmlNamespace" in operation_model.metadata
1263
            else None
1264
        )
1265
        root = ETree.Element("Response", attr)
1✔
1266
        errors_tag = ETree.SubElement(root, "Errors")
1✔
1267
        error_tag = ETree.SubElement(errors_tag, "Error")
1✔
1268
        self._add_error_tags(error, error_tag, mime_type)
1✔
1269
        request_id_element = ETree.SubElement(root, "RequestID")
1✔
1270
        request_id_element.text = request_id
1✔
1271
        response.set_response(self._encode_payload(self._node_to_string(root, mime_type)))
1✔
1272

1273
    def _prepare_additional_traits_in_xml(self, root: ETree.Element | None, request_id: str):
1✔
1274
        # The EC2 protocol does not use the root output shape, therefore we need to remove the hierarchy level
1275
        # below the root level
1276
        if len(root) > 0:
1✔
1277
            output_node = root[0]
1✔
1278
            for child in output_node:
1✔
1279
                root.append(child)
1✔
1280
            root.remove(output_node)
1✔
1281

1282
        # Add the requestId here (it's not defined in the specs)
1283
        # For the ec2 and the query protocol, the root cannot be None at this time.
1284
        request_id_element = ETree.SubElement(root, "requestId")
1✔
1285
        request_id_element.text = request_id
1✔
1286

1287

1288
class JSONResponseSerializer(QueryCompatibleProtocolMixin, ResponseSerializer):
1✔
1289
    """
1290
    The ``JSONResponseSerializer`` is responsible for the serialization of responses from services with the ``json``
1291
    protocol. It implements the JSON response body serialization, which is also used by the
1292
    ``RestJSONResponseSerializer``.
1293
    """
1294

1295
    JSON_TYPES = [APPLICATION_JSON, APPLICATION_AMZ_JSON_1_0, APPLICATION_AMZ_JSON_1_1]
1✔
1296
    CBOR_TYPES = [APPLICATION_CBOR, APPLICATION_AMZ_CBOR_1_1]
1✔
1297
    SUPPORTED_MIME_TYPES = JSON_TYPES + CBOR_TYPES
1✔
1298

1299
    TIMESTAMP_FORMAT = "unixtimestamp"
1✔
1300

1301
    def _serialize_error(
1✔
1302
        self,
1303
        error: ServiceException,
1304
        response: Response,
1305
        shape: StructureShape,
1306
        operation_model: OperationModel,
1307
        mime_type: str,
1308
        request_id: str,
1309
    ) -> None:
1310
        body = {}
1✔
1311

1312
        # TODO implement different service-specific serializer configurations
1313
        #   - currently we set both, the `__type` member as well as the `X-Amzn-Errortype` header
1314
        #   - the specification defines that it's either the __type field OR the header
1315
        # this depends on the JSON protocol version as well. If json-1.0 the Error should be the full shape ID, like
1316
        # com.amazon.coral.service#ExceptionName
1317
        # if json-1.1, it should only be the name
1318

1319
        is_query_compatible = operation_model.service_model.is_query_compatible
1✔
1320
        code = self._get_error_code(is_query_compatible, error, shape)
1✔
1321

1322
        response.headers["X-Amzn-Errortype"] = code
1✔
1323

1324
        # the `__type` field is not defined in default botocore error shapes
1325
        body["__type"] = code
1✔
1326

1327
        if shape:
1✔
1328
            remaining_params = {}
1✔
1329
            # TODO add a possibility to serialize simple non-modelled errors (like S3 NoSuchBucket#BucketName)
1330
            for member in shape.members:
1✔
1331
                if hasattr(error, member):
1✔
1332
                    value = getattr(error, member)
1✔
1333

1334
                # Default error message fields can sometimes have different casing in the specs
1335
                elif member.lower() in ["code", "message"] and hasattr(error, member.lower()):
1✔
1336
                    value = getattr(error, member.lower())
1✔
1337

1338
                else:
1339
                    continue
1✔
1340

1341
                if value is None:
1✔
1342
                    # do not serialize a value that is set to `None`
1343
                    continue
1✔
1344

1345
                # if the value is falsy (empty string, empty list) and not in the Shape required members, AWS will
1346
                # not serialize it, and it will not be part of the response body.
1347
                if value or member in shape.required_members:
1✔
1348
                    remaining_params[member] = value
1✔
1349

1350
            self._serialize(body, remaining_params, shape, None, mime_type)
1✔
1351

1352
        # this is a workaround, some Error Shape do not define a `Message` field, but it is always returned
1353
        # this could be solved at the same time as the `__type` field
1354
        if "message" not in body and "Message" not in body:
1✔
1355
            if error_message := self._get_error_message(error):
1✔
1356
                body["message"] = error_message
1✔
1357

1358
        if mime_type in self.CBOR_TYPES:
1✔
1359
            response.set_response(cbor2_dumps(body, datetime_as_timestamp=True))
1✔
1360
            response.content_type = mime_type
1✔
1361
        else:
1362
            response.set_json(body)
1✔
1363

1364
        if is_query_compatible:
1✔
1365
            self._add_query_compatible_error_header(response, error)
1✔
1366

1367
    def _serialize_response(
1✔
1368
        self,
1369
        parameters: dict,
1370
        response: Response,
1371
        shape: Shape | None,
1372
        shape_members: dict,
1373
        operation_model: OperationModel,
1374
        mime_type: str,
1375
        request_id: str,
1376
    ) -> None:
1377
        if mime_type in self.CBOR_TYPES:
1✔
1378
            response.content_type = mime_type
1✔
1379
        else:
1380
            json_version = operation_model.metadata.get("jsonVersion")
1✔
1381
            if json_version is not None:
1✔
1382
                response.headers["Content-Type"] = f"application/x-amz-json-{json_version}"
1✔
1383
        response.set_response(
1✔
1384
            self._serialize_body_params(parameters, shape, operation_model, mime_type, request_id)
1385
        )
1386

1387
    def _serialize_body_params(
1✔
1388
        self,
1389
        params: dict,
1390
        shape: Shape,
1391
        operation_model: OperationModel,
1392
        mime_type: str,
1393
        request_id: str,
1394
    ) -> str | None:
1395
        body = {}
1✔
1396
        if shape is not None:
1✔
1397
            self._serialize(body, params, shape, None, mime_type)
1✔
1398

1399
        if mime_type in self.CBOR_TYPES:
1✔
1400
            return cbor2_dumps(body, datetime_as_timestamp=True)
1✔
1401
        else:
1402
            return json.dumps(body)
1✔
1403

1404
    def _serialize(self, body: dict, value: Any, shape, key: str | None, mime_type: str):
1✔
1405
        """This method dynamically invokes the correct `_serialize_type_*` method for each shape type."""
1406
        try:
1✔
1407
            method = getattr(self, f"_serialize_type_{shape.type_name}", self._default_serialize)
1✔
1408
            method(body, value, shape, key, mime_type)
1✔
1409
        except (TypeError, ValueError, AttributeError) as e:
×
1410
            raise ProtocolSerializerError(
×
1411
                f"Invalid type when serializing {shape.name}: '{value}' cannot be parsed to {shape.type_name}."
1412
            ) from e
1413

1414
    def _serialize_type_structure(
1✔
1415
        self, body: dict, value: dict, shape: StructureShape, key: str | None, mime_type: str
1416
    ):
1417
        if value is None:
1✔
1418
            return
×
1419
        if shape.is_document_type:
1✔
1420
            body[key] = value
×
1421
        else:
1422
            if key is not None:
1✔
1423
                # If a key is provided, this is a result of a recursive
1424
                # call, so we need to add a new child dict as the value
1425
                # of the passed in serialized dict.  We'll then add
1426
                # all the structure members as key/vals in the new serialized
1427
                # dictionary we just created.
1428
                new_serialized = {}
1✔
1429
                body[key] = new_serialized
1✔
1430
                body = new_serialized
1✔
1431
            members = shape.members
1✔
1432
            for member_key, member_value in value.items():
1✔
1433
                if member_value is None:
1✔
1434
                    continue
1✔
1435
                try:
1✔
1436
                    member_shape = members[member_key]
1✔
1437
                except KeyError:
1✔
1438
                    LOG.warning(
1✔
1439
                        "Response object %s contains a member which is not specified: %s",
1440
                        shape.name,
1441
                        member_key,
1442
                    )
1443
                    continue
1✔
1444
                if "name" in member_shape.serialization:
1✔
1445
                    member_key = member_shape.serialization["name"]
1✔
1446
                self._serialize(body, member_value, member_shape, member_key, mime_type)
1✔
1447

1448
    def _serialize_type_map(
1✔
1449
        self, body: dict, value: dict, shape: MapShape, key: str, mime_type: str
1450
    ):
1451
        if value is None:
1✔
1452
            return
×
1453
        map_obj = {}
1✔
1454
        body[key] = map_obj
1✔
1455
        for sub_key, sub_value in value.items():
1✔
1456
            if sub_value is not None:
1✔
1457
                self._serialize(map_obj, sub_value, shape.value, sub_key, mime_type)
1✔
1458

1459
    def _serialize_type_list(
1✔
1460
        self, body: dict, value: list, shape: ListShape, key: str, mime_type: str
1461
    ):
1462
        if value is None:
1✔
1463
            return
×
1464
        list_obj = []
1✔
1465
        body[key] = list_obj
1✔
1466
        for list_item in value:
1✔
1467
            if list_item is not None:
1✔
1468
                wrapper = {}
1✔
1469
                # The JSON list serialization is the only case where we aren't
1470
                # setting a key on a dict.  We handle this by using
1471
                # a __current__ key on a wrapper dict to serialize each
1472
                # list item before appending it to the serialized list.
1473
                self._serialize(wrapper, list_item, shape.member, "__current__", mime_type)
1✔
1474
                list_obj.append(wrapper["__current__"])
1✔
1475

1476
    def _default_serialize(self, body: dict, value: Any, _, key: str, __):
1✔
1477
        body[key] = value
1✔
1478

1479
    def _serialize_type_timestamp(
1✔
1480
        self, body: dict, value: Any, shape: Shape, key: str, mime_type: str
1481
    ):
1482
        if mime_type in self.CBOR_TYPES:
1✔
1483
            # CBOR has native support for timestamps
1484
            body[key] = value
1✔
1485
        else:
1486
            timestamp_format = shape.serialization.get("timestampFormat")
1✔
1487
            body[key] = self._convert_timestamp_to_str(value, timestamp_format)
1✔
1488

1489
    def _serialize_type_blob(self, body: dict, value: str | bytes, _, key: str, mime_type: str):
1✔
1490
        if mime_type in self.CBOR_TYPES:
1✔
1491
            body[key] = value
1✔
1492
        else:
1493
            body[key] = self._get_base64(value)
1✔
1494

1495
    def _prepare_additional_traits_in_response(
1✔
1496
        self, response: Response, operation_model: OperationModel, request_id: str
1497
    ):
1498
        response.headers["x-amzn-requestid"] = request_id
1✔
1499
        response = super()._prepare_additional_traits_in_response(
1✔
1500
            response, operation_model, request_id
1501
        )
1502
        return response
1✔
1503

1504

1505
class RestJSONResponseSerializer(BaseRestResponseSerializer, JSONResponseSerializer):
1✔
1506
    """
1507
    The ``RestJSONResponseSerializer`` is responsible for the serialization of responses from services with the
1508
    ``rest-json`` protocol.
1509
    It combines the ``BaseRestResponseSerializer`` (for the ReST specific logic) with the ``JSONResponseSerializer``
1510
    (for the JSOn body response serialization).
1511
    """
1512

1513
    def _serialize_content_type(
1✔
1514
        self, serialized: Response, shape: Shape, shape_members: dict, mime_type: str
1515
    ):
1516
        """Set Content-Type to application/json for all structured bodies."""
1517
        payload = shape.serialization.get("payload") if shape is not None else None
1✔
1518
        if self._has_streaming_payload(payload, shape_members):
1✔
1519
            # Don't apply content-type to streaming bodies
1520
            return
1✔
1521

1522
        has_body = serialized.data != b""
1✔
1523
        has_content_type = self._has_header("Content-Type", serialized.headers)
1✔
1524
        if has_body and not has_content_type:
1✔
1525
            serialized.headers["Content-Type"] = mime_type
×
1526

1527

1528
class BaseCBORResponseSerializer(ResponseSerializer):
1✔
1529
    """
1530
    The ``BaseCBORResponseSerializer`` performs the basic logic for the CBOR response serialization.
1531

1532
    There are two types of map/list in CBOR, indefinite length types and "defined" ones:
1533
    You can use the `\xbf` byte marker to indicate a map with indefinite length, then `\xff` to indicate the end
1534
     of the map.
1535
    You can also use, for example, `\xa4` to indicate a map with exactly 4 things in it, so `\xff` is not
1536
    required at the end.
1537
    AWS, for both Kinesis and `smithy-rpc-v2-cbor` services, is using indefinite data structures when returning
1538
    responses.
1539

1540
    The CBOR serializer cannot serialize an exception if it is not defined in our specs.
1541
    LocalStack defines a way to have user-defined exception by subclassing `CommonServiceException`, so it needs to be
1542
     able to encode those, as well as InternalError
1543
    We are creating a default botocore structure shape (`_DEFAULT_ERROR_STRUCTURE_SHAPE`) to be used in such cases.
1544
    """
1545

1546
    SUPPORTED_MIME_TYPES = [APPLICATION_CBOR, APPLICATION_AMZ_CBOR_1_1]
1✔
1547

1548
    UNSIGNED_INT_MAJOR_TYPE = 0
1✔
1549
    NEGATIVE_INT_MAJOR_TYPE = 1
1✔
1550
    BLOB_MAJOR_TYPE = 2
1✔
1551
    STRING_MAJOR_TYPE = 3
1✔
1552
    LIST_MAJOR_TYPE = 4
1✔
1553
    MAP_MAJOR_TYPE = 5
1✔
1554
    TAG_MAJOR_TYPE = 6
1✔
1555
    FLOAT_AND_SIMPLE_MAJOR_TYPE = 7
1✔
1556

1557
    INDEFINITE_ITEM_ADDITIONAL_INFO = 31
1✔
1558
    BREAK_CODE = b"\xff"
1✔
1559
    USE_INDEFINITE_DATA_STRUCTURE = True
1✔
1560

1561
    _ERROR_TYPE_SHAPE = StringShape(shape_name="__type", shape_model={"type": "string"})
1✔
1562

1563
    _DEFAULT_ERROR_STRUCTURE_SHAPE = StructureShape(
1✔
1564
        shape_name="DefaultErrorStructure",
1565
        shape_model={
1566
            "type": "structure",
1567
            "members": {
1568
                "message": {"shape": "ErrorMessage"},
1569
                "__type": {"shape": "ErrorType"},
1570
            },
1571
            "error": {"code": "DefaultErrorStructure", "httpStatusCode": 400, "senderFault": True},
1572
            "exception": True,
1573
        },
1574
        shape_resolver=ShapeResolver(
1575
            shape_map={
1576
                "ErrorMessage": {"type": "string"},
1577
                "ErrorType": {"type": "string"},
1578
            },
1579
        ),
1580
    )
1581

1582
    def _serialize_data_item(
1✔
1583
        self, serialized: bytearray, value: Any, shape: Shape | None, name: str | None = None
1584
    ) -> None:
1585
        method = getattr(self, f"_serialize_type_{shape.type_name}")
1✔
1586
        if method is None:
1✔
1587
            raise ValueError(
×
1588
                f"Unrecognized C2J type: {shape.type_name}, unable to serialize request"
1589
            )
1590
        method(serialized, value, shape, name)
1✔
1591

1592
    def _serialize_type_integer(
1✔
1593
        self, serialized: bytearray, value: int, shape: Shape | None, name: str | None = None
1594
    ) -> None:
1595
        if value >= 0:
1✔
1596
            major_type = self.UNSIGNED_INT_MAJOR_TYPE
1✔
1597
        else:
1598
            major_type = self.NEGATIVE_INT_MAJOR_TYPE
×
1599
            # The only differences in serializing negative and positive integers is
1600
            # that for negative, we set the major type to 1 and set the value to -1
1601
            # minus the value
1602
            value = -1 - value
×
1603
        additional_info, num_bytes = self._get_additional_info_and_num_bytes(value)
1✔
1604
        initial_byte = self._get_initial_byte(major_type, additional_info)
1✔
1605
        if num_bytes == 0:
1✔
1606
            serialized.extend(initial_byte)
1✔
1607
        else:
1608
            serialized.extend(initial_byte + value.to_bytes(num_bytes, "big"))
1✔
1609

1610
    def _serialize_type_long(
1✔
1611
        self, serialized: bytearray, value: int, shape: Shape, name: str | None = None
1612
    ) -> None:
1613
        self._serialize_type_integer(serialized, value, shape, name)
1✔
1614

1615
    def _serialize_type_blob(
1✔
1616
        self,
1617
        serialized: bytearray,
1618
        value: str | bytes | IO[bytes],
1619
        shape: Shape | None,
1620
        name: str | None = None,
1621
    ) -> None:
1622
        if isinstance(value, str):
1✔
1623
            value = value.encode("utf-8")
×
1624
        elif not isinstance(value, (bytes, bytearray)):
1✔
1625
            # We support file-like objects for blobs; these already have been
1626
            # validated to ensure they have a read method
1627
            value = value.read()
×
1628
        length = len(value)
1✔
1629
        additional_info, num_bytes = self._get_additional_info_and_num_bytes(length)
1✔
1630
        initial_byte = self._get_initial_byte(self.BLOB_MAJOR_TYPE, additional_info)
1✔
1631
        if num_bytes == 0:
1✔
1632
            serialized.extend(initial_byte)
1✔
1633
        else:
1634
            serialized.extend(initial_byte + length.to_bytes(num_bytes, "big"))
×
1635
        serialized.extend(value)
1✔
1636

1637
    def _serialize_type_string(
1✔
1638
        self, serialized: bytearray, value: str, shape: Shape | None, name: str | None = None
1639
    ) -> None:
1640
        encoded = value.encode("utf-8")
1✔
1641
        length = len(encoded)
1✔
1642
        additional_info, num_bytes = self._get_additional_info_and_num_bytes(length)
1✔
1643
        initial_byte = self._get_initial_byte(self.STRING_MAJOR_TYPE, additional_info)
1✔
1644
        if num_bytes == 0:
1✔
1645
            serialized.extend(initial_byte + encoded)
1✔
1646
        else:
1647
            serialized.extend(initial_byte + length.to_bytes(num_bytes, "big") + encoded)
1✔
1648

1649
    def _serialize_type_list(
1✔
1650
        self, serialized: bytearray, value: list, shape: Shape | None, name: str | None = None
1651
    ) -> None:
1652
        initial_bytes, closing_bytes = self._get_bytes_for_data_structure(
1✔
1653
            value, self.LIST_MAJOR_TYPE
1654
        )
1655
        serialized.extend(initial_bytes)
1✔
1656

1657
        for item in value:
1✔
1658
            self._serialize_data_item(serialized, item, shape.member)
1✔
1659

1660
        if closing_bytes is not None:
1✔
1661
            serialized.extend(closing_bytes)
1✔
1662

1663
    def _serialize_type_map(
1✔
1664
        self, serialized: bytearray, value: dict, shape: Shape | None, name: str | None = None
1665
    ) -> None:
1666
        initial_bytes, closing_bytes = self._get_bytes_for_data_structure(
1✔
1667
            value, self.MAP_MAJOR_TYPE
1668
        )
1669
        serialized.extend(initial_bytes)
1✔
1670

1671
        for key_item, item in value.items():
1✔
1672
            self._serialize_data_item(serialized, key_item, shape.key)
1✔
1673
            self._serialize_data_item(serialized, item, shape.value)
1✔
1674

1675
        if closing_bytes is not None:
1✔
1676
            serialized.extend(closing_bytes)
1✔
1677

1678
    def _serialize_type_structure(
1✔
1679
        self,
1680
        serialized: bytearray,
1681
        value: dict,
1682
        shape: Shape | None,
1683
        name: str | None = None,
1684
        shape_members: dict[str, Shape] | None = None,
1685
    ) -> None:
1686
        # `_serialize_type_structure` has a different signature other `_serialize_type_*` methods as it accepts
1687
        # `shape_members`. This is because sometimes, the `StructureShape` does not have some members defined in the
1688
        # specs, and we want to be able to pass arbitrary members to serialize undocumented members.
1689
        # see `_serialize_error_structure` for its specific usage
1690

1691
        if name is not None:
1✔
1692
            # For nested structures, we need to serialize the key first
1693
            self._serialize_data_item(serialized, name, shape.key_shape)
×
1694

1695
        # Remove `None` values from the dictionary
1696
        value = {k: v for k, v in value.items() if v is not None}
1✔
1697

1698
        initial_bytes, closing_bytes = self._get_bytes_for_data_structure(
1✔
1699
            value, self.MAP_MAJOR_TYPE
1700
        )
1701
        serialized.extend(initial_bytes)
1✔
1702
        members = shape_members or shape.members
1✔
1703
        for member_key, member_value in value.items():
1✔
1704
            member_shape = members[member_key]
1✔
1705
            if "name" in member_shape.serialization:
1✔
1706
                member_key = member_shape.serialization["name"]
×
1707
            if member_value is not None:
1✔
1708
                self._serialize_type_string(serialized, member_key, None, None)
1✔
1709
                self._serialize_data_item(serialized, member_value, member_shape)
1✔
1710

1711
        if closing_bytes is not None:
1✔
1712
            serialized.extend(closing_bytes)
1✔
1713

1714
    def _serialize_type_timestamp(
1✔
1715
        self,
1716
        serialized: bytearray,
1717
        value: int | str | datetime.datetime,
1718
        shape: Shape | None,
1719
        name: str | None = None,
1720
    ) -> None:
1721
        # https://smithy.io/2.0/additional-specs/protocols/smithy-rpc-v2.html#timestamp-type-serialization
1722
        tag = 1  # Use tag 1 for unix timestamp
1✔
1723
        initial_byte = self._get_initial_byte(self.TAG_MAJOR_TYPE, tag)
1✔
1724
        serialized.extend(initial_byte)  # Tagging the timestamp
1✔
1725

1726
        # we encode the timestamp as a double, like the Go SDK
1727
        # https://github.com/aws/aws-sdk-go-v2/blob/5d7c17325a2581afae4455c150549174ebfd9428/internal/protocoltest/smithyrpcv2cbor/serializers.go#L664-L669
1728
        # Currently, the Botocore serializer using unsigned integers, but it does not conform to the Smithy specs:
1729
        # > This protocol uses epoch-seconds, also known as Unix timestamps, with millisecond
1730
        # > (1/1000th of a second) resolution.
1731
        timestamp = float(self._convert_timestamp_to_str(value))
1✔
1732
        initial_byte = self._get_initial_byte(self.FLOAT_AND_SIMPLE_MAJOR_TYPE, 27)
1✔
1733
        serialized.extend(initial_byte + struct.pack(">d", timestamp))
1✔
1734

1735
    def _serialize_type_float(
1✔
1736
        self, serialized: bytearray, value: float, shape: Shape | None, name: str | None = None
1737
    ) -> None:
1738
        if self._is_special_number(value):
×
1739
            serialized.extend(
×
1740
                self._get_bytes_for_special_numbers(value)
1741
            )  # Handle special values like NaN or Infinity
1742
        else:
1743
            initial_byte = self._get_initial_byte(self.FLOAT_AND_SIMPLE_MAJOR_TYPE, 26)
×
1744
            serialized.extend(initial_byte + struct.pack(">f", value))
×
1745

1746
    def _serialize_type_double(
1✔
1747
        self, serialized: bytearray, value: float, shape: Shape | None, name: str | None = None
1748
    ) -> None:
1749
        if self._is_special_number(value):
1✔
1750
            serialized.extend(
×
1751
                self._get_bytes_for_special_numbers(value)
1752
            )  # Handle special values like NaN or Infinity
1753
        else:
1754
            initial_byte = self._get_initial_byte(self.FLOAT_AND_SIMPLE_MAJOR_TYPE, 27)
1✔
1755
            serialized.extend(initial_byte + struct.pack(">d", value))
1✔
1756

1757
    def _serialize_type_boolean(
1✔
1758
        self, serialized: bytearray, value: bool, shape: Shape | None, name: str | None = None
1759
    ) -> None:
1760
        additional_info = 21 if value else 20
1✔
1761
        serialized.extend(self._get_initial_byte(self.FLOAT_AND_SIMPLE_MAJOR_TYPE, additional_info))
1✔
1762

1763
    @staticmethod
1✔
1764
    def _get_additional_info_and_num_bytes(value: int) -> tuple[int, int]:
1✔
1765
        # Values under 24 can be stored in the initial byte and don't need further
1766
        # encoding
1767
        if value < 24:
1✔
1768
            return value, 0
1✔
1769
        # Values between 24 and 255 (inclusive) can be stored in 1 byte and
1770
        # correspond to additional info 24
1771
        elif value < 256:
1✔
1772
            return 24, 1
1✔
1773
        # Values up to 65535 can be stored in two bytes and correspond to additional
1774
        # info 25
1775
        elif value < 65536:
1✔
1776
            return 25, 2
1✔
1777
        # Values up to 4294967296 can be stored in four bytes and correspond to
1778
        # additional info 26
1779
        elif value < 4294967296:
×
1780
            return 26, 4
×
1781
        # The maximum number of bytes in a definite length data items is 8 which
1782
        # to additional info 27
1783
        else:
1784
            return 27, 8
×
1785

1786
    def _get_initial_byte(self, major_type: int, additional_info: int) -> bytes:
1✔
1787
        # The highest order three bits are the major type, so we need to bitshift the
1788
        # major type by 5
1789
        major_type_bytes = major_type << 5
1✔
1790
        return (major_type_bytes | additional_info).to_bytes(1, "big")
1✔
1791

1792
    @staticmethod
1✔
1793
    def _is_special_number(value: int | float) -> bool:
1✔
1794
        return any(
1✔
1795
            [
1796
                value == float("inf"),
1797
                value == float("-inf"),
1798
                math.isnan(value),
1799
            ]
1800
        )
1801

1802
    def _get_bytes_for_special_numbers(self, value: int | float) -> bytes:
1✔
1803
        additional_info = 25
×
1804
        initial_byte = self._get_initial_byte(self.FLOAT_AND_SIMPLE_MAJOR_TYPE, additional_info)
×
1805
        if value == float("inf"):
×
1806
            return initial_byte + struct.pack(">H", 0x7C00)
×
1807
        elif value == float("-inf"):
×
1808
            return initial_byte + struct.pack(">H", 0xFC00)
×
1809
        elif math.isnan(value):
×
1810
            return initial_byte + struct.pack(">H", 0x7E00)
×
1811

1812
    def _get_bytes_for_data_structure(
1✔
1813
        self, value: list | dict, major_type: int
1814
    ) -> tuple[bytes, bytes | None]:
1815
        if self.USE_INDEFINITE_DATA_STRUCTURE:
1✔
1816
            additional_info = self.INDEFINITE_ITEM_ADDITIONAL_INFO
1✔
1817
            return self._get_initial_byte(major_type, additional_info), self.BREAK_CODE
1✔
1818
        else:
1819
            length = len(value)
×
1820
            additional_info, num_bytes = self._get_additional_info_and_num_bytes(length)
×
1821
            initial_byte = self._get_initial_byte(major_type, additional_info)
×
1822
            if num_bytes != 0:
×
1823
                initial_byte = initial_byte + length.to_bytes(num_bytes, "big")
×
1824

1825
            return initial_byte, None
×
1826

1827
    def _serialize_error_structure(
1✔
1828
        self, body: bytearray, shape: Shape | None, error: ServiceException, code: str
1829
    ):
1830
        if not shape:
1✔
1831
            shape = self._DEFAULT_ERROR_STRUCTURE_SHAPE
1✔
1832
            shape_members = shape.members
1✔
1833
        else:
1834
            # we need to manually add the `__type` field to the shape members as it is not part of the specs
1835
            # we do a shallow copy of the shape members
1836
            shape_members = shape.members.copy()
1✔
1837
            shape_members["__type"] = self._ERROR_TYPE_SHAPE
1✔
1838

1839
        # Error responses in the rpcv2Cbor protocol MUST be serialized identically to standard responses with one
1840
        # additional component to distinguish which error is contained: a body field named __type.
1841
        params = {"__type": code}
1✔
1842

1843
        for member in shape_members:
1✔
1844
            if hasattr(error, member):
1✔
1845
                value = getattr(error, member)
1✔
1846

1847
            # Default error message fields can sometimes have different casing in the specs
1848
            elif member.lower() in ["code", "message"] and hasattr(error, member.lower()):
1✔
1849
                value = getattr(error, member.lower())
×
1850

1851
            else:
1852
                continue
1✔
1853

1854
            if value is None:
1✔
1855
                # do not serialize a value that is set to `None`
1856
                continue
×
1857

1858
            # if the value is falsy (empty string, empty list) and not in the Shape required members, AWS will
1859
            # not serialize it, and it will not be part of the response body.
1860
            if value or member in shape.required_members:
1✔
1861
                params[member] = value
1✔
1862

1863
        self._serialize_type_structure(body, params, shape, None, shape_members=shape_members)
1✔
1864

1865

1866
class CBORResponseSerializer(BaseCBORResponseSerializer):
1✔
1867
    """
1868
    The ``CBORResponseSerializer`` is responsible for the serialization of responses from services with the ``cbor``
1869
    protocol. It implements the CBOR response body serialization, which is only currently used by Kinesis and is derived
1870
    conceptually from the ``JSONResponseSerializer``
1871
    """
1872

1873
    TIMESTAMP_FORMAT = "unixtimestamp"
1✔
1874

1875
    def _serialize_error(
1✔
1876
        self,
1877
        error: ServiceException,
1878
        response: Response,
1879
        shape: StructureShape,
1880
        operation_model: OperationModel,
1881
        mime_type: str,
1882
        request_id: str,
1883
    ) -> None:
1884
        body = bytearray()
×
1885
        response.content_type = mime_type
×
1886
        response.headers["X-Amzn-Errortype"] = error.code
×
1887

1888
        self._serialize_error_structure(body, shape, error, code=error.code)
×
1889

1890
        response.set_response(bytes(body))
×
1891

1892
    def _serialize_response(
1✔
1893
        self,
1894
        parameters: dict,
1895
        response: Response,
1896
        shape: Shape | None,
1897
        shape_members: dict,
1898
        operation_model: OperationModel,
1899
        mime_type: str,
1900
        request_id: str,
1901
    ) -> None:
1902
        response.content_type = mime_type
1✔
1903
        response.set_response(
1✔
1904
            self._serialize_body_params(parameters, shape, operation_model, mime_type, request_id)
1905
        )
1906

1907
    def _serialize_body_params(
1✔
1908
        self,
1909
        params: dict,
1910
        shape: Shape,
1911
        operation_model: OperationModel,
1912
        mime_type: str,
1913
        request_id: str,
1914
    ) -> bytes | None:
1915
        if shape is None:
1✔
1916
            return b""
×
1917
        body = bytearray()
1✔
1918
        self._serialize_data_item(body, params, shape)
1✔
1919
        return bytes(body)
1✔
1920

1921
    def _prepare_additional_traits_in_response(
1✔
1922
        self, response: Response, operation_model: OperationModel, request_id: str
1923
    ) -> Response:
1924
        response.headers["x-amzn-requestid"] = request_id
1✔
1925
        response = super()._prepare_additional_traits_in_response(
1✔
1926
            response, operation_model, request_id
1927
        )
1928
        return response
1✔
1929

1930

1931
class BaseRpcV2ResponseSerializer(ResponseSerializer):
1✔
1932
    """
1933
    The BaseRpcV2ResponseSerializer performs the basic logic for the RPC V2 response serialization.
1934
    The only variance between the various RPCv2 protocols is the way the body is serialized for regular responses,
1935
    and the way they will encode exceptions.
1936
    """
1937

1938
    def _serialize_response(
1✔
1939
        self,
1940
        parameters: dict,
1941
        response: Response,
1942
        shape: Shape | None,
1943
        shape_members: dict,
1944
        operation_model: OperationModel,
1945
        mime_type: str,
1946
        request_id: str,
1947
    ) -> None:
1948
        response.content_type = mime_type
1✔
1949
        response.set_response(
1✔
1950
            self._serialize_body_params(parameters, shape, operation_model, mime_type, request_id)
1951
        )
1952

1953
    def _serialize_body_params(
1✔
1954
        self,
1955
        params: dict,
1956
        shape: Shape,
1957
        operation_model: OperationModel,
1958
        mime_type: str,
1959
        request_id: str,
1960
    ) -> bytes | None:
1961
        raise NotImplementedError
1962

1963

1964
class RpcV2CBORResponseSerializer(
1✔
1965
    QueryCompatibleProtocolMixin, BaseRpcV2ResponseSerializer, BaseCBORResponseSerializer
1966
):
1967
    """
1968
    The RpcV2CBORResponseSerializer implements the CBOR body serialization part for the RPC v2 protocol, and implements the
1969
    specific exception serialization.
1970
    https://smithy.io/2.0/additional-specs/protocols/smithy-rpc-v2.html
1971
    """
1972

1973
    # the Smithy spec defines that only `application/cbor` is supported for RPC v2 CBOR
1974
    SUPPORTED_MIME_TYPES = [APPLICATION_CBOR]
1✔
1975
    TIMESTAMP_FORMAT = "unixtimestamp"
1✔
1976

1977
    def _serialize_body_params(
1✔
1978
        self,
1979
        params: dict,
1980
        shape: Shape,
1981
        operation_model: OperationModel,
1982
        mime_type: str,
1983
        request_id: str,
1984
    ) -> bytes | None:
1985
        if shape is None:
1✔
1986
            return b""
1✔
1987
        body = bytearray()
1✔
1988
        self._serialize_data_item(body, params, shape)
1✔
1989
        return bytes(body)
1✔
1990

1991
    def _serialize_error(
1✔
1992
        self,
1993
        error: ServiceException,
1994
        response: Response,
1995
        shape: StructureShape,
1996
        operation_model: OperationModel,
1997
        mime_type: str,
1998
        request_id: str,
1999
    ) -> None:
2000
        body = bytearray()
1✔
2001
        response.content_type = mime_type  # can only be 'application/cbor'
1✔
2002

2003
        # Responses for the rpcv2Cbor protocol SHOULD NOT contain the X-Amzn-ErrorType header.
2004
        # Type information is always serialized in the payload. This is different from the `json` protocol
2005
        is_query_compatible = operation_model.service_model.is_query_compatible
1✔
2006
        code = self._get_error_code(is_query_compatible, error, shape)
1✔
2007

2008
        self._serialize_error_structure(body, shape, error, code=code)
1✔
2009

2010
        response.set_response(bytes(body))
1✔
2011

2012
        if is_query_compatible:
1✔
2013
            self._add_query_compatible_error_header(response, error)
1✔
2014

2015
    def _prepare_additional_traits_in_response(
1✔
2016
        self, response: Response, operation_model: OperationModel, request_id: str
2017
    ):
2018
        response.headers["x-amzn-requestid"] = request_id
1✔
2019
        response.headers["Smithy-Protocol"] = "rpc-v2-cbor"
1✔
2020
        response = super()._prepare_additional_traits_in_response(
1✔
2021
            response, operation_model, request_id
2022
        )
2023
        return response
1✔
2024

2025

2026
class S3ResponseSerializer(RestXMLResponseSerializer):
1✔
2027
    """
2028
    The ``S3ResponseSerializer`` adds some minor logic to handle S3 specific peculiarities with the error response
2029
    serialization and the root node tag.
2030
    """
2031

2032
    SUPPORTED_MIME_TYPES = [APPLICATION_XML, TEXT_XML]
1✔
2033
    _RESPONSE_ROOT_TAGS = {
1✔
2034
        "CompleteMultipartUploadOutput": "CompleteMultipartUploadResult",
2035
        "CopyObjectOutput": "CopyObjectResult",
2036
        "CreateMultipartUploadOutput": "InitiateMultipartUploadResult",
2037
        "DeleteObjectsOutput": "DeleteResult",
2038
        "GetBucketAccelerateConfigurationOutput": "AccelerateConfiguration",
2039
        "GetBucketAclOutput": "AccessControlPolicy",
2040
        "GetBucketAnalyticsConfigurationOutput": "AnalyticsConfiguration",
2041
        "GetBucketCorsOutput": "CORSConfiguration",
2042
        "GetBucketEncryptionOutput": "ServerSideEncryptionConfiguration",
2043
        "GetBucketIntelligentTieringConfigurationOutput": "IntelligentTieringConfiguration",
2044
        "GetBucketInventoryConfigurationOutput": "InventoryConfiguration",
2045
        "GetBucketLifecycleOutput": "LifecycleConfiguration",
2046
        "GetBucketLifecycleConfigurationOutput": "LifecycleConfiguration",
2047
        "GetBucketLoggingOutput": "BucketLoggingStatus",
2048
        "GetBucketMetricsConfigurationOutput": "MetricsConfiguration",
2049
        "NotificationConfigurationDeprecated": "NotificationConfiguration",
2050
        "GetBucketOwnershipControlsOutput": "OwnershipControls",
2051
        "GetBucketPolicyStatusOutput": "PolicyStatus",
2052
        "GetBucketReplicationOutput": "ReplicationConfiguration",
2053
        "GetBucketRequestPaymentOutput": "RequestPaymentConfiguration",
2054
        "GetBucketTaggingOutput": "Tagging",
2055
        "GetBucketVersioningOutput": "VersioningConfiguration",
2056
        "GetBucketWebsiteOutput": "WebsiteConfiguration",
2057
        "GetObjectAclOutput": "AccessControlPolicy",
2058
        "GetObjectLegalHoldOutput": "LegalHold",
2059
        "GetObjectLockConfigurationOutput": "ObjectLockConfiguration",
2060
        "GetObjectRetentionOutput": "Retention",
2061
        "GetObjectTaggingOutput": "Tagging",
2062
        "GetObjectAttributesOutput": "GetObjectAttributesResponse",
2063
        "GetPublicAccessBlockOutput": "PublicAccessBlockConfiguration",
2064
        "ListBucketAnalyticsConfigurationsOutput": "ListBucketAnalyticsConfigurationResult",
2065
        "ListBucketInventoryConfigurationsOutput": "ListInventoryConfigurationsResult",
2066
        "ListBucketMetricsConfigurationsOutput": "ListMetricsConfigurationsResult",
2067
        "ListBucketsOutput": "ListAllMyBucketsResult",
2068
        "ListMultipartUploadsOutput": "ListMultipartUploadsResult",
2069
        "ListObjectsOutput": "ListBucketResult",
2070
        "ListObjectsV2Output": "ListBucketResult",
2071
        "ListObjectVersionsOutput": "ListVersionsResult",
2072
        "ListPartsOutput": "ListPartsResult",
2073
        "UploadPartCopyOutput": "CopyPartResult",
2074
    }
2075

2076
    XML_NAMESPACE = "http://s3.amazonaws.com/doc/2006-03-01/"
1✔
2077

2078
    def _serialize_response(
1✔
2079
        self,
2080
        parameters: dict,
2081
        response: Response,
2082
        shape: Shape | None,
2083
        shape_members: dict,
2084
        operation_model: OperationModel,
2085
        mime_type: str,
2086
        request_id: str,
2087
    ) -> None:
2088
        header_params, payload_params = self._partition_members(parameters, shape)
1✔
2089
        self._process_header_members(header_params, response, shape)
1✔
2090
        # "HEAD" responses are basically "GET" responses without the actual body.
2091
        # Do not process the body payload in this case (setting a body could also manipulate the headers)
2092
        # - If the response is a redirection, the body should be empty as well
2093
        # - If the response is from a "PUT" request, the body should be empty except if there's a specific "payload"
2094
        #   field in the serialization (CopyObject and CopyObjectPart)
2095
        http_method = operation_model.http.get("method")
1✔
2096
        if (
1✔
2097
            http_method != "HEAD"
2098
            and not 300 <= response.status_code < 400
2099
            and not (http_method == "PUT" and shape and not shape.serialization.get("payload"))
2100
        ):
2101
            self._serialize_payload(
1✔
2102
                payload_params,
2103
                response,
2104
                shape,
2105
                shape_members,
2106
                operation_model,
2107
                mime_type,
2108
                request_id,
2109
            )
2110
        self._serialize_content_type(response, shape, shape_members, mime_type)
1✔
2111

2112
    def _serialize_error(
1✔
2113
        self,
2114
        error: ServiceException,
2115
        response: Response,
2116
        shape: StructureShape,
2117
        operation_model: OperationModel,
2118
        mime_type: str,
2119
        request_id: str,
2120
    ) -> None:
2121
        attr = (
1✔
2122
            {"xmlns": operation_model.metadata.get("xmlNamespace")}
2123
            if "xmlNamespace" in operation_model.metadata
2124
            else {}
2125
        )
2126
        root = ETree.Element("Error", attr)
1✔
2127
        self._add_error_tags(error, root, mime_type)
1✔
2128
        request_id_element = ETree.SubElement(root, "RequestId")
1✔
2129
        request_id_element.text = request_id
1✔
2130

2131
        header_params, payload_params = self._partition_members(vars(error), shape)
1✔
2132
        self._add_additional_error_tags(payload_params, root, shape, mime_type)
1✔
2133
        self._process_header_members(header_params, response, shape)
1✔
2134

2135
        response.set_response(self._encode_payload(self._node_to_string(root, mime_type)))
1✔
2136

2137
    def _serialize_body_params(
1✔
2138
        self,
2139
        params: dict,
2140
        shape: Shape,
2141
        operation_model: OperationModel,
2142
        mime_type: str,
2143
        request_id: str,
2144
    ) -> str | None:
2145
        root = self._serialize_body_params_to_xml(params, shape, operation_model, mime_type)
1✔
2146
        # S3 does not follow the specs on the root tag name for 41 of 44 operations
2147
        root.tag = self._RESPONSE_ROOT_TAGS.get(root.tag, root.tag)
1✔
2148
        self._prepare_additional_traits_in_xml(root, request_id)
1✔
2149
        return self._node_to_string(root, mime_type)
1✔
2150

2151
    def _prepare_additional_traits_in_response(
1✔
2152
        self, response: Response, operation_model: OperationModel, request_id: str
2153
    ):
2154
        """Adds the request ID to the headers (in contrast to the body - as in the Query protocol)."""
2155
        response = super()._prepare_additional_traits_in_response(
1✔
2156
            response, operation_model, request_id
2157
        )
2158
        # s3 extended Request ID
2159
        # mostly used internally on AWS and corresponds to a HostId
2160
        response.headers["x-amz-id-2"] = (
1✔
2161
            "s9lzHYrFp76ZVxRcpX9+5cjAnEH2ROuNkd2BHfIa6UkFVdtjf5mKR3/eTPFvsiP/XV/VLi31234="
2162
        )
2163
        return response
1✔
2164

2165
    def _add_error_tags(
1✔
2166
        self, error: ServiceException, error_tag: ETree.Element, mime_type: str
2167
    ) -> None:
2168
        code_tag = ETree.SubElement(error_tag, "Code")
1✔
2169
        code_tag.text = error.code
1✔
2170
        message = self._get_error_message(error)
1✔
2171
        if message:
1✔
2172
            self._default_serialize(error_tag, message, None, "Message", mime_type)
1✔
2173
        else:
2174
            # In S3, if there's no message, create an empty node
2175
            self._create_empty_node(error_tag, "Message")
1✔
2176
        if error.sender_fault:
1✔
2177
            # The sender fault is either not set or "Sender"
2178
            self._default_serialize(error_tag, "Sender", None, "Type", mime_type)
×
2179

2180
    @staticmethod
1✔
2181
    def _create_empty_node(xmlnode: ETree.Element, name: str) -> None:
1✔
2182
        ETree.SubElement(xmlnode, name)
1✔
2183

2184
    def _prepare_additional_traits_in_xml(self, root: ETree.Element | None, request_id: str):
1✔
2185
        # some tools (Serverless) require a newline after the "<?xml ...>\n" preamble line, e.g., for LocationConstraint
2186
        if root and not root.tail:
1✔
2187
            root.tail = "\n"
1✔
2188

2189
        root.attrib["xmlns"] = self.XML_NAMESPACE
1✔
2190

2191
    @staticmethod
1✔
2192
    def _timestamp_iso8601(value: datetime.datetime) -> str:
1✔
2193
        """
2194
        This is very specific to S3, S3 returns an ISO8601 timestamp but with milliseconds always set to 000
2195
        Some SDKs are very picky about the length
2196
        """
2197
        return value.strftime("%Y-%m-%dT%H:%M:%S.000Z")
1✔
2198

2199

2200
class SqsQueryResponseSerializer(QueryResponseSerializer):
1✔
2201
    """
2202
    Unfortunately, SQS uses a rare interpretation of the XML protocol: It uses HTML entities within XML tag text nodes.
2203
    For example:
2204
    - Normal XML serializers: <Message>No need to escape quotes (like this: ") with HTML entities in XML.</Message>
2205
    - SQS XML serializer: <Message>No need to escape quotes (like this: &quot;) with HTML entities in XML.</Message>
2206

2207
    None of the prominent XML frameworks for python allow HTML entity escapes when serializing XML.
2208
    This serializer implements the following workaround:
2209
    - Escape quotes and \r with their HTML entities (&quot; and &#xD;).
2210
    - Since & is (correctly) escaped in XML, the serialized string contains &amp;quot; and &amp;#xD;
2211
    - These double-escapes are corrected by replacing such strings with their original.
2212
    """
2213

2214
    # those are deleted from the JSON specs, but need to be kept for legacy reason (sent in 'x-amzn-query-error')
2215
    QUERY_PREFIXED_ERRORS = {
1✔
2216
        "BatchEntryIdsNotDistinct",
2217
        "BatchRequestTooLong",
2218
        "EmptyBatchRequest",
2219
        "InvalidBatchEntryId",
2220
        "MessageNotInflight",
2221
        "PurgeQueueInProgress",
2222
        "QueueDeletedRecently",
2223
        "TooManyEntriesInBatchRequest",
2224
        "UnsupportedOperation",
2225
    }
2226

2227
    # Some error code changed between JSON and query, and we need to have a way to map it for legacy reason
2228
    JSON_TO_QUERY_ERROR_CODES = {
1✔
2229
        "InvalidParameterValueException": "InvalidParameterValue",
2230
        "MissingRequiredParameterException": "MissingParameter",
2231
        "AccessDeniedException": "AccessDenied",
2232
        "QueueDoesNotExist": "AWS.SimpleQueueService.NonExistentQueue",
2233
        "QueueNameExists": "QueueAlreadyExists",
2234
    }
2235

2236
    SENDER_FAULT_ERRORS = (
1✔
2237
        QUERY_PREFIXED_ERRORS
2238
        | JSON_TO_QUERY_ERROR_CODES.keys()
2239
        | {"OverLimit", "ResourceNotFoundException"}
2240
    )
2241

2242
    def _default_serialize(self, xmlnode: ETree.Element, params: str, _, name: str, __) -> None:
1✔
2243
        """
2244
        Ensures that we "mark" characters in the node's text which need to be specifically encoded.
2245
        This is necessary to easily identify these specific characters later, after the standard XML serialization is
2246
        done, while not replacing any other occurrences of these characters which might appear in the serialized string.
2247
        """
2248
        node = ETree.SubElement(xmlnode, name)
1✔
2249
        node.text = (
1✔
2250
            str(params)
2251
            .replace('"', '__marker__"__marker__')
2252
            .replace("\r", "__marker__-r__marker__")
2253
        )
2254

2255
    def _node_to_string(self, root: ETree.ElementTree | None, mime_type: str) -> str | None:
1✔
2256
        """Replaces the previously "marked" characters with their encoded value."""
2257
        generated_string = super()._node_to_string(root, mime_type)
1✔
2258
        if generated_string is None:
1✔
2259
            return None
×
2260
        generated_string = to_str(generated_string)
1✔
2261
        # Undo the second escaping of the &
2262
        # Undo the second escaping of the carriage return (\r)
2263
        if mime_type == APPLICATION_JSON:
1✔
2264
            # At this point the json was already dumped and escaped, so we replace directly.
2265
            generated_string = generated_string.replace(r"__marker__\"__marker__", r"\"").replace(
1✔
2266
                "__marker__-r__marker__", r"\r"
2267
            )
2268
        else:
2269
            generated_string = generated_string.replace('__marker__"__marker__', "&quot;").replace(
1✔
2270
                "__marker__-r__marker__", "&#xD;"
2271
            )
2272

2273
        return to_bytes(generated_string)
1✔
2274

2275
    def _add_error_tags(
1✔
2276
        self, error: ServiceException, error_tag: ETree.Element, mime_type: str
2277
    ) -> None:
2278
        """The SQS API stubs is now generated from JSON specs, and some fields have been modified"""
2279
        code_tag = ETree.SubElement(error_tag, "Code")
1✔
2280

2281
        if error.code in self.JSON_TO_QUERY_ERROR_CODES:
1✔
2282
            error_code = self.JSON_TO_QUERY_ERROR_CODES[error.code]
1✔
2283
        elif error.code in self.QUERY_PREFIXED_ERRORS:
1✔
2284
            error_code = f"AWS.SimpleQueueService.{error.code}"
1✔
2285
        else:
2286
            error_code = error.code
1✔
2287
        code_tag.text = error_code
1✔
2288
        message = self._get_error_message(error)
1✔
2289
        if message:
1✔
2290
            self._default_serialize(error_tag, message, None, "Message", mime_type)
1✔
2291
        if error.code in self.SENDER_FAULT_ERRORS or error.sender_fault:
1✔
2292
            # The sender fault is either not set or "Sender"
2293
            self._default_serialize(error_tag, "Sender", None, "Type", mime_type)
1✔
2294

2295

2296
class SqsJsonResponseSerializer(JSONResponseSerializer):
1✔
2297
    # those are deleted from the JSON specs, but need to be kept for legacy reason (sent in 'x-amzn-query-error')
2298
    QUERY_PREFIXED_ERRORS = {
1✔
2299
        "BatchEntryIdsNotDistinct",
2300
        "BatchRequestTooLong",
2301
        "EmptyBatchRequest",
2302
        "InvalidBatchEntryId",
2303
        "MessageNotInflight",
2304
        "PurgeQueueInProgress",
2305
        "QueueDeletedRecently",
2306
        "TooManyEntriesInBatchRequest",
2307
        "UnsupportedOperation",
2308
    }
2309

2310
    # Some error code changed between JSON and query, and we need to have a way to map it for legacy reason
2311
    JSON_TO_QUERY_ERROR_CODES = {
1✔
2312
        "InvalidParameterValueException": "InvalidParameterValue",
2313
        "MissingRequiredParameterException": "MissingParameter",
2314
        "AccessDeniedException": "AccessDenied",
2315
        "QueueDoesNotExist": "AWS.SimpleQueueService.NonExistentQueue",
2316
        "QueueNameExists": "QueueAlreadyExists",
2317
    }
2318

2319
    # TODO: on body error serialization (body["__type"]),it seems AWS differs from what we send for SQS
2320
    #  AWS: "com.amazon.coral.service#InvalidParameterValueException"
2321
    #  or AWS: "com.amazonaws.sqs#BatchRequestTooLong"
2322
    #  LocalStack: "InvalidParameterValue"
2323

2324
    def _add_query_compatible_error_header(self, response: Response, error: ServiceException):
1✔
2325
        if error.code in self.JSON_TO_QUERY_ERROR_CODES:
1✔
2326
            code = self.JSON_TO_QUERY_ERROR_CODES[error.code]
1✔
2327
        elif error.code in self.QUERY_PREFIXED_ERRORS:
1✔
2328
            code = f"AWS.SimpleQueueService.{error.code}"
1✔
2329
        else:
2330
            code = error.code
1✔
2331

2332
        # SQS exceptions all have sender fault set to False, so we hardcode it to `Sender`
2333
        response.headers["x-amzn-query-error"] = f"{code};Sender"
1✔
2334

2335

2336
def gen_amzn_requestid():
1✔
2337
    """
2338
    Generate generic AWS request ID.
2339

2340
    3 uses a different format and set of request Ids.
2341

2342
    Examples:
2343
    996d38a0-a4e9-45de-bad4-480cd962d208
2344
    b9260553-df1b-4db6-ae41-97b89a5f85ea
2345
    """
2346
    return long_uid()
1✔
2347

2348

2349
@functools.cache
1✔
2350
def create_serializer(
1✔
2351
    service: ServiceModel, protocol: ProtocolName | None = None
2352
) -> ResponseSerializer:
2353
    """
2354
    Creates the right serializer for the given service model.
2355

2356
    :param service: to create the serializer for
2357
    :param protocol: the protocol for the serializer. If not provided, fallback to the service's default protocol
2358
    :return: ResponseSerializer which can handle the protocol of the service
2359
    """
2360

2361
    # Unfortunately, some services show subtle differences in their serialized responses, even though their
2362
    # specification states they implement the same protocol.
2363
    # Since some clients might be stricter / less resilient than others, we need to mimic the serialization of the
2364
    # specific services as close as possible.
2365
    # Therefore, the service-specific serializer implementations (basically the implicit / informally more specific
2366
    # protocol implementation) has precedence over the more general protocol-specific serializers.
2367
    service_specific_serializers = {
1✔
2368
        "sqs": {"json": SqsJsonResponseSerializer, "query": SqsQueryResponseSerializer},
2369
        "s3": {"rest-xml": S3ResponseSerializer},
2370
    }
2371
    protocol_specific_serializers = {
1✔
2372
        "query": QueryResponseSerializer,
2373
        "json": JSONResponseSerializer,
2374
        "rest-json": RestJSONResponseSerializer,
2375
        "rest-xml": RestXMLResponseSerializer,
2376
        "ec2": EC2ResponseSerializer,
2377
        "smithy-rpc-v2-cbor": RpcV2CBORResponseSerializer,
2378
        # TODO: implement multi-protocol support for Kinesis, so that it can uses the `cbor` protocol and remove
2379
        #  CBOR handling from JSONResponseParser
2380
        # this is not an "official" protocol defined from the spec, but is derived from ``json``
2381
    }
2382
    service_protocol = protocol or service.protocol
1✔
2383

2384
    # Try to select a service- and protocol-specific serializer implementation
2385
    if (
1✔
2386
        service.service_name in service_specific_serializers
2387
        and service_protocol in service_specific_serializers[service.service_name]
2388
    ):
2389
        return service_specific_serializers[service.service_name][service_protocol]()
1✔
2390
    else:
2391
        # Otherwise, pick the protocol-specific serializer for the protocol of the service
2392
        return protocol_specific_serializers[service_protocol]()
1✔
2393

2394

2395
def aws_response_serializer(
1✔
2396
    service_name: str, operation: str, protocol: ProtocolName | None = None
2397
):
2398
    """
2399
    A decorator for an HTTP route that can serialize return values or exceptions into AWS responses.
2400
    This can be used to create AWS request handlers in a convenient way. Example usage::
2401

2402
        from localstack.http import route, Request
2403
        from localstack.aws.api.sqs import ListQueuesResult
2404

2405
        @route("/_aws/sqs/queues")
2406
        @aws_response_serializer("sqs", "ListQueues")
2407
        def my_route(request: Request):
2408
            if some_condition_on_request:
2409
                raise CommonServiceError("...")  # <- will be serialized into an error response
2410

2411
            return ListQueuesResult(QueueUrls=...)  # <- object from the SQS API will be serialized
2412

2413
    :param service_name: the AWS service (e.g., "sqs", "lambda")
2414
    :param protocol: the protocol of the AWS service to serialize to. If not set (by default) the default protocol
2415
                    of the service in botocore is used.
2416
    :param operation: the operation name (e.g., "ReceiveMessage", "ListFunctions")
2417
    :returns: a decorator
2418
    """
2419

2420
    def _decorate(fn):
1✔
2421
        service_model = load_service(service_name, protocol=protocol)
1✔
2422
        operation_model = service_model.operation_model(operation)
1✔
2423
        serializer = create_serializer(service_model, protocol=protocol)
1✔
2424

2425
        def _proxy(*args, **kwargs) -> WerkzeugResponse:
1✔
2426
            # extract request from function invocation (decorator can be used for methods as well as for functions).
2427
            if len(args) > 0 and isinstance(args[0], WerkzeugRequest):
1✔
2428
                # function
2429
                request = args[0]
1✔
2430
            elif len(args) > 1 and isinstance(args[1], WerkzeugRequest):
1✔
2431
                # method (arg[0] == self)
2432
                request = args[1]
1✔
2433
            elif "request" in kwargs:
1✔
2434
                request = kwargs["request"]
1✔
2435
            else:
2436
                raise ValueError(f"could not find Request in signature of function {fn}")
×
2437

2438
            # TODO: we have no context here
2439
            # TODO: maybe try to get the request ID from the headers first before generating a new one
2440
            request_id = gen_amzn_requestid()
1✔
2441

2442
            try:
1✔
2443
                response = fn(*args, **kwargs)
1✔
2444

2445
                if isinstance(response, WerkzeugResponse):
1✔
2446
                    return response
1✔
2447

2448
                return serializer.serialize_to_response(
1✔
2449
                    response, operation_model, request.headers, request_id
2450
                )
2451

2452
            except ServiceException as e:
1✔
2453
                return serializer.serialize_error_to_response(
1✔
2454
                    e, operation_model, request.headers, request_id
2455
                )
2456
            except Exception as e:
1✔
2457
                return serializer.serialize_error_to_response(
1✔
2458
                    CommonServiceException(
2459
                        "InternalError", f"An internal error occurred: {e}", status_code=500
2460
                    ),
2461
                    operation_model,
2462
                    request.headers,
2463
                    request_id,
2464
                )
2465

2466
        return _proxy
1✔
2467

2468
    return _decorate
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc