• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 527eb572-51be-41bc-a2a3-1b2142a8a8f1

26 Feb 2025 03:36PM UTC coverage: 86.885% (+0.02%) from 86.862%
527eb572-51be-41bc-a2a3-1b2142a8a8f1

push

circleci

web-flow
[ESM] Add backoff between poll events calls (#12304)

25 of 25 new or added lines in 4 files covered. (100.0%)

48 existing lines in 12 files now uncovered.

61741 of 71061 relevant lines covered (86.88%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.94
/localstack-core/localstack/aws/client.py
1
"""Utils to process AWS requests as a client."""
2

3
import io
1✔
4
import logging
1✔
5
from datetime import datetime, timezone
1✔
6
from typing import Dict, Iterable, Optional
1✔
7
from urllib.parse import urlsplit
1✔
8

9
from botocore import awsrequest
1✔
10
from botocore.endpoint import Endpoint
1✔
11
from botocore.model import OperationModel
1✔
12
from botocore.parsers import ResponseParser, ResponseParserFactory
1✔
13
from werkzeug.datastructures import Headers
1✔
14

15
from localstack import config
1✔
16
from localstack.http import Request, Response
1✔
17
from localstack.runtime import hooks
1✔
18
from localstack.utils.patch import Patch, patch
1✔
19
from localstack.utils.strings import to_str
1✔
20

21
from .api import CommonServiceException, RequestContext, ServiceException, ServiceResponse
1✔
22
from .connect import get_service_endpoint
1✔
23
from .gateway import Gateway
1✔
24

25
LOG = logging.getLogger(__name__)
1✔
26

27

28
def create_http_request(aws_request: awsrequest.AWSPreparedRequest) -> Request:
1✔
29
    """
30
    Create an ASF HTTP Request from a botocore AWSPreparedRequest.
31

32
    :param aws_request: the botocore prepared request
33
    :return: a new Request
34
    """
35
    split_url = urlsplit(aws_request.url)
1✔
36
    host = split_url.netloc.split(":")
1✔
37
    if len(host) == 1:
1✔
38
        server = (to_str(host[0]), None)
1✔
39
    elif len(host) == 2:
1✔
40
        server = (to_str(host[0]), int(host[1]))
1✔
41
    else:
42
        raise ValueError
×
43

44
    # prepare the RequestContext
45
    headers = Headers()
1✔
46
    for k, v in aws_request.headers.items():
1✔
47
        headers[k] = to_str(v)
1✔
48

49
    return Request(
1✔
50
        method=aws_request.method,
51
        path=split_url.path,
52
        query_string=split_url.query,
53
        headers=headers,
54
        body=aws_request.body,
55
        server=server,
56
    )
57

58

59
class _ResponseStream(io.RawIOBase):
1✔
60
    """
61
    Wraps a Response and makes it available as a readable IO stream. If the response stream is used as an iterable, it
62
    will use the underlying response object directly.
63

64
    Adapted from https://stackoverflow.com/a/20260030/804840
65
    """
66

67
    def __init__(self, response: Response):
1✔
68
        self.response = response
1✔
69
        self.iterator = response.iter_encoded()
1✔
70
        self._buf = None
1✔
71

72
    def stream(self) -> Iterable[bytes]:
1✔
73
        # adds compatibility for botocore's client-side AWSResponse.raw attribute.
74
        return self.iterator
1✔
75

76
    def readable(self):
1✔
77
        return True
×
78

79
    def readinto(self, buffer):
1✔
80
        try:
1✔
81
            upto = len(buffer)  # We're supposed to return at most this much
1✔
82
            chunk = self._buf or next(self.iterator)
1✔
83
            # FIXME: this is very slow as it copies the entire chunk
84
            output, self._buf = chunk[:upto], chunk[upto:]
1✔
85
            buffer[: len(output)] = output
1✔
86
            return len(output)
1✔
87
        except StopIteration:
1✔
88
            return 0  # indicate EOF
1✔
89

90
    def read(self, amt=None) -> bytes | None:
1✔
91
        # see https://github.com/python/cpython/blob/main/Lib/_pyio.py
92
        # adds compatibility for botocore's client-side AWSResponse.raw attribute.
93
        # it seems the default implementation of RawIOBase.read to not handle well some cases
94
        if amt is None:
1✔
95
            amt = -1
1✔
96
        return super().read(amt)
1✔
97

98
    def close(self) -> None:
1✔
99
        return self.response.close()
1✔
100

101
    def __iter__(self):
1✔
102
        return self.iterator
×
103

104
    def __next__(self):
1✔
105
        return next(self.iterator)
1✔
106

107
    def __str__(self):
1✔
108
        length = self.response.content_length
1✔
109
        if length is None:
1✔
110
            length = "unknown"
×
111

112
        return f"StreamedBytes({length})"
1✔
113

114
    def __repr__(self):
115
        return self.__str__()
116

117

118
class _RawStream:
1✔
119
    """This is a compatibility adapter for the raw_stream attribute passed to botocore's EventStream."""
120

121
    def __init__(self, response: Response):
1✔
122
        self.response = response
×
123
        self.iterator = response.iter_encoded()
×
124

125
    def stream(self) -> Iterable[bytes]:
1✔
126
        return self.iterator
×
127

128
    def close(self):
1✔
129
        pass
×
130

131

132
def _add_modeled_error_fields(
1✔
133
    response_dict: Dict,
134
    parsed_response: Dict,
135
    operation_model: OperationModel,
136
    parser: ResponseParser,
137
):
138
    """
139
    This function adds additional error shape members (other than message, code, and type) to an already parsed error
140
    response dict.
141
    Port of botocore's Endpoint#_add_modeled_error_fields.
142
    """
143
    error_code = parsed_response.get("Error", {}).get("Code")
1✔
144
    if error_code is None:
1✔
145
        return
×
146
    service_model = operation_model.service_model
1✔
147
    error_shape = service_model.shape_for_error_code(error_code)
1✔
148
    if error_shape is None:
1✔
149
        return
1✔
150
    modeled_parse = parser.parse(response_dict, error_shape)
1✔
151
    parsed_response.update(modeled_parse)
1✔
152

153

154
def _cbor_timestamp_parser(value):
1✔
155
    return datetime.fromtimestamp(value / 1000)
×
156

157

158
def _cbor_blob_parser(value):
1✔
159
    return bytes(value)
×
160

161

162
@hooks.on_infra_start()
1✔
163
def _patch_botocore_json_parser():
1✔
164
    from botocore.parsers import BaseJSONParser
1✔
165

166
    @patch(BaseJSONParser._parse_body_as_json)
1✔
167
    def _parse_body_as_json(fn, self, body_contents):
1✔
168
        """
169
        botocore does not support CBOR encoded response parsing. Since we use the botocore parsers
170
        to parse responses from external backends (like kinesis-mock), we need to patch botocore to
171
        try CBOR decoding in case the JSON decoding fails.
172
        """
173
        try:
1✔
174
            return fn(self, body_contents)
1✔
175
        except UnicodeDecodeError as json_exception:
×
176
            # cbor2: explicitly load from private _decoder module to avoid using the (non-patched) C-version
177
            from cbor2._decoder import loads
×
178

179
            try:
×
180
                LOG.debug("botocore failed decoding JSON. Trying to decode as CBOR.")
×
181
                return loads(body_contents)
×
182
            except Exception as cbor_exception:
×
183
                LOG.debug("CBOR fallback decoding failed.")
×
184
                raise cbor_exception from json_exception
×
185

186

187
@hooks.on_infra_start()
1✔
188
def _patch_cbor2():
1✔
189
    """
190
    Patch fixing the AWS CBOR en-/decoding of datetime fields.
191

192
    Unfortunately, Kinesis (the only known service using CBOR) does not use the number of seconds (with floating-point
193
    milliseconds - according to RFC8949), but uses milliseconds.
194
    Python cbor2 is highly optimized by using a C-implementation by default, which cannot be patched.
195
    Instead of `from cbor2 import loads`, directly import the python-native loads implementation to avoid loading the
196
    unpatched C implementation:
197
    ```
198
    from cbor2._decoder import loads
199
    from cbor2._decoder import dumps
200
    ```
201

202
    See https://github.com/aws/aws-sdk-java-v2/issues/4661
203
    """
204
    from cbor2._decoder import CBORDecodeValueError, semantic_decoders
1✔
205
    from cbor2._encoder import CBOREncodeValueError, default_encoders
1✔
206
    from cbor2._types import CBORTag
1✔
207

208
    def _patched_decode_epoch_datetime(self) -> datetime:
1✔
209
        """
210
        Replaces `cbor2._decoder.CBORDecoder.decode_epoch_datetime` as default datetime semantic_decoder.
211
        """
212
        # Semantic tag 1
213
        value = self._decode()
1✔
214

215
        try:
1✔
216
            # The next line is the only change in this patch compared to the original function.
217
            # AWS breaks the CBOR spec by using the millis (instead of seconds with floating point support for millis)
218
            # https://github.com/aws/aws-sdk-java-v2/issues/4661
219
            value = value / 1000
1✔
220
            tmp = datetime.fromtimestamp(value, timezone.utc)
1✔
221
        except (OverflowError, OSError, ValueError) as exc:
×
222
            raise CBORDecodeValueError("error decoding datetime from epoch") from exc
×
223

224
        return self.set_shareable(tmp)
1✔
225

226
    def _patched_encode_datetime(self, value: datetime) -> None:
1✔
227
        """
228
        Replaces `cbor2._encoder.CBOREncoder.encode_datetime` as default datetime default_encoder.
229
        """
230
        if not value.tzinfo:
1✔
231
            if self._timezone:
×
232
                value = value.replace(tzinfo=self._timezone)
×
233
            else:
234
                raise CBOREncodeValueError(
×
235
                    f"naive datetime {value!r} encountered and no default timezone has been set"
236
                )
237

238
        if self.datetime_as_timestamp:
1✔
239
            from calendar import timegm
1✔
240

241
            if not value.microsecond:
1✔
UNCOV
242
                timestamp: float = timegm(value.utctimetuple())
×
243
            else:
244
                timestamp = timegm(value.utctimetuple()) + value.microsecond / 1000000
1✔
245
            # The next line is the only change in this patch compared to the original function.
246
            # - AWS breaks the CBOR spec by using the millis (instead of seconds with floating point support for millis)
247
            #   https://github.com/aws/aws-sdk-java-v2/issues/4661
248
            # - AWS SDKs in addition have very tight assumptions on the type.
249
            #   This needs to be an integer, and must not be a floating point number (CBOR is typed)!
250
            timestamp = int(timestamp * 1000)
1✔
251
            self.encode_semantic(CBORTag(1, timestamp))
1✔
252
        else:
253
            datestring = value.isoformat().replace("+00:00", "Z")
×
254
            self.encode_semantic(CBORTag(0, datestring))
×
255

256
    # overwrite the default epoch datetime en-/decoder with patched versions
257
    default_encoders[datetime] = _patched_encode_datetime
1✔
258
    semantic_decoders[1] = _patched_decode_epoch_datetime
1✔
259

260

261
def _create_and_enrich_aws_request(
1✔
262
    fn, self: Endpoint, params: dict, operation_model: OperationModel = None
263
):
264
    """
265
    Patch that adds the botocore operation model and request parameters to a newly created AWSPreparedRequest,
266
    which normally only holds low-level HTTP request information.
267
    """
268
    request: awsrequest.AWSPreparedRequest = fn(self, params, operation_model)
1✔
269

270
    request.params = params
1✔
271
    request.operation_model = operation_model
1✔
272

273
    return request
1✔
274

275

276
botocore_in_memory_endpoint_patch = Patch.function(
1✔
277
    Endpoint.create_request, _create_and_enrich_aws_request
278
)
279

280

281
@hooks.on_infra_start(should_load=config.IN_MEMORY_CLIENT)
1✔
282
def _patch_botocore_endpoint_in_memory():
1✔
283
    botocore_in_memory_endpoint_patch.apply()
×
284

285

286
def parse_response(
1✔
287
    operation: OperationModel, response: Response, include_response_metadata: bool = True
288
) -> ServiceResponse:
289
    """
290
    Parses an HTTP Response object into an AWS response object using botocore. It does this by adapting the
291
    procedure of ``botocore.endpoint.convert_to_response_dict`` to work with Werkzeug's server-side response object.
292

293
    :param operation: the operation of the original request
294
    :param response: the HTTP response object containing the response of the operation
295
    :param include_response_metadata: True if the ResponseMetadata (typical for boto response dicts) should be included
296
    :return: a parsed dictionary as it is returned by botocore
297
    """
298
    # this is what botocore.endpoint.convert_to_response_dict normally does
299
    response_dict = {
1✔
300
        "headers": dict(response.headers.items()),  # boto doesn't like werkzeug headers
301
        "status_code": response.status_code,
302
        "context": {
303
            "operation_name": operation.name,
304
        },
305
    }
306

307
    if response_dict["status_code"] >= 301:
1✔
308
        response_dict["body"] = response.data
1✔
309
    elif operation.has_event_stream_output:
1✔
310
        # TODO test this
311
        response_dict["body"] = _RawStream(response)
×
312
    elif operation.has_streaming_output:
1✔
313
        # for s3.GetObject for example, the Body attribute is actually a stream, not the raw bytes value
314
        response_dict["body"] = _ResponseStream(response)
1✔
315
    else:
316
        response_dict["body"] = response.data
1✔
317

318
    factory = ResponseParserFactory()
1✔
319
    if response.content_type and response.content_type.startswith("application/x-amz-cbor"):
1✔
320
        # botocore cannot handle CBOR encoded responses (because it never sends them), we need to modify the parser
321
        factory.set_parser_defaults(
×
322
            timestamp_parser=_cbor_timestamp_parser, blob_parser=_cbor_blob_parser
323
        )
324

325
    parser = factory.create_parser(operation.service_model.protocol)
1✔
326
    parsed_response = parser.parse(response_dict, operation.output_shape)
1✔
327

328
    if response.status_code >= 301:
1✔
329
        # Add possible additional error shape members
330
        _add_modeled_error_fields(response_dict, parsed_response, operation, parser)
1✔
331

332
    if not include_response_metadata:
1✔
333
        parsed_response.pop("ResponseMetadata", None)
1✔
334

335
    return parsed_response
1✔
336

337

338
def parse_service_exception(
1✔
339
    response: Response, parsed_response: Dict
340
) -> Optional[ServiceException]:
341
    """
342
    Creates a ServiceException (one ASF can handle) from a parsed response (one that botocore would return).
343
    It does not automatically raise the exception (see #raise_service_exception).
344
    :param response: Un-parsed response
345
    :param parsed_response: Parsed response
346
    :return: ServiceException or None (if it's not an error response)
347
    """
348
    if response.status_code < 301 or "Error" not in parsed_response:
1✔
349
        return None
1✔
350
    error = parsed_response["Error"]
1✔
351
    service_exception = CommonServiceException(
1✔
352
        code=error.get("Code", f"'{response.status_code}'"),
353
        status_code=response.status_code,
354
        message=error.get("Message", ""),
355
        sender_fault=error.get("Type") == "Sender",
356
    )
357
    # Add all additional fields in the parsed response as members of the exception
358
    for key, value in parsed_response.items():
1✔
359
        if key.lower() not in ["code", "message", "type", "error"] and not hasattr(
1✔
360
            service_exception, key
361
        ):
362
            setattr(service_exception, key, value)
1✔
363
    return service_exception
1✔
364

365

366
def raise_service_exception(response: Response, parsed_response: Dict) -> None:
1✔
367
    """
368
    Creates and raises a ServiceException from a parsed response (one that botocore would return).
369
    :param response: Un-parsed response
370
    :param parsed_response: Parsed response
371
    :raise ServiceException: If the response is an error response
372
    :return: None if the response is not an error response
373
    """
374
    if service_exception := parse_service_exception(response, parsed_response):
1✔
375
        raise service_exception
1✔
376

377

378
class GatewayShortCircuit:
1✔
379
    gateway: Gateway
1✔
380

381
    def __init__(self, gateway: Gateway):
1✔
382
        self.gateway = gateway
1✔
383
        self._internal_url = get_service_endpoint()
1✔
384

385
    def __call__(
1✔
386
        self, event_name: str, request: awsrequest.AWSPreparedRequest, **kwargs
387
    ) -> awsrequest.AWSResponse | None:
388
        # TODO: we sometimes overrides the endpoint_url to direct it to DynamoDBLocal directly
389
        # if the default endpoint_url is not in the request, just skips the in-memory forwarding
390
        if self._internal_url not in request.url:
1✔
391
            return
×
392

393
        # extract extra data from enriched AWSPreparedRequest
394
        params = request.params
1✔
395
        operation: OperationModel = request.operation_model
1✔
396

397
        # create request
398
        context = RequestContext()
1✔
399
        context.request = create_http_request(request)
1✔
400

401
        # TODO: just a hacky thing to unblock the service model being set to `sqs-query` blocking for now
402
        # this is using the same services as `localstack.aws.protocol.service_router.resolve_conflicts`, maybe
403
        # consolidate. `docdb` and `neptune` uses the RDS API and service.
404
        if operation.service_model.service_name not in {
1✔
405
            "sqs-query",
406
            "docdb",
407
            "neptune",
408
            "timestream-write",
409
        }:
410
            context.service = operation.service_model
1✔
411

412
        context.operation = operation
1✔
413
        context.service_request = params["body"]
1✔
414

415
        # perform request
416
        response = Response()
1✔
417
        self.gateway.handle(context, response)
1✔
418

419
        # transform Werkzeug response to client-side botocore response
420
        aws_response = awsrequest.AWSResponse(
1✔
421
            url=context.request.url,
422
            status_code=response.status_code,
423
            headers=response.headers,
424
            raw=_ResponseStream(response),
425
        )
426

427
        return aws_response
1✔
428

429
    @staticmethod
1✔
430
    def modify_client(client, gateway):
1✔
431
        client.meta.events.register_first("before-send.*.*", GatewayShortCircuit(gateway))
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc