• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20218772397

12 Dec 2025 11:29AM UTC coverage: 86.875% (+0.002%) from 86.873%
20218772397

push

github

web-flow
dev: support executing kubernetes dev setup on command execution (#13509)

69935 of 80501 relevant lines covered (86.87%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.94
/localstack-core/localstack/aws/client.py
1
"""Utils to process AWS requests as a client."""
2

3
import io
1✔
4
import logging
1✔
5
from collections.abc import Iterable
1✔
6
from datetime import UTC, datetime
1✔
7
from urllib.parse import urlsplit
1✔
8

9
from botocore import awsrequest
1✔
10
from botocore.endpoint import Endpoint
1✔
11
from botocore.model import OperationModel
1✔
12
from botocore.parsers import ResponseParser, ResponseParserFactory
1✔
13
from werkzeug.datastructures import Headers
1✔
14

15
from localstack import config
1✔
16
from localstack.http import Request, Response
1✔
17
from localstack.runtime import hooks
1✔
18
from localstack.utils.patch import Patch, patch
1✔
19
from localstack.utils.strings import to_str
1✔
20

21
from .api import CommonServiceException, RequestContext, ServiceException, ServiceResponse
1✔
22
from .connect import get_service_endpoint
1✔
23
from .gateway import Gateway
1✔
24
from .spec import ProtocolName
1✔
25

26
LOG = logging.getLogger(__name__)
1✔
27

28

29
def create_http_request(aws_request: awsrequest.AWSPreparedRequest) -> Request:
1✔
30
    """
31
    Create an ASF HTTP Request from a botocore AWSPreparedRequest.
32

33
    :param aws_request: the botocore prepared request
34
    :return: a new Request
35
    """
36
    split_url = urlsplit(aws_request.url)
1✔
37
    host = split_url.netloc.split(":")
1✔
38
    if len(host) == 1:
1✔
39
        server = (to_str(host[0]), None)
1✔
40
    elif len(host) == 2:
1✔
41
        server = (to_str(host[0]), int(host[1]))
1✔
42
    else:
43
        raise ValueError
×
44

45
    # prepare the RequestContext
46
    headers = Headers()
1✔
47
    for k, v in aws_request.headers.items():
1✔
48
        headers[k] = to_str(v)
1✔
49

50
    return Request(
1✔
51
        method=aws_request.method,
52
        path=split_url.path,
53
        query_string=split_url.query,
54
        headers=headers,
55
        body=aws_request.body,
56
        server=server,
57
    )
58

59

60
class _ResponseStream(io.RawIOBase):
1✔
61
    """
62
    Wraps a Response and makes it available as a readable IO stream. If the response stream is used as an iterable, it
63
    will use the underlying response object directly.
64

65
    Adapted from https://stackoverflow.com/a/20260030/804840
66
    """
67

68
    def __init__(self, response: Response):
1✔
69
        self.response = response
1✔
70
        self.iterator = response.iter_encoded()
1✔
71
        self._buf = None
1✔
72

73
    def stream(self) -> Iterable[bytes]:
1✔
74
        # adds compatibility for botocore's client-side AWSResponse.raw attribute.
75
        return self.iterator
1✔
76

77
    def readable(self):
1✔
78
        return True
×
79

80
    def readinto(self, buffer):
1✔
81
        try:
1✔
82
            upto = len(buffer)  # We're supposed to return at most this much
1✔
83
            chunk = self._buf or next(self.iterator)
1✔
84
            # FIXME: this is very slow as it copies the entire chunk
85
            output, self._buf = chunk[:upto], chunk[upto:]
1✔
86
            buffer[: len(output)] = output
1✔
87
            return len(output)
1✔
88
        except StopIteration:
1✔
89
            return 0  # indicate EOF
1✔
90

91
    def read(self, amt=None) -> bytes | None:
1✔
92
        # see https://github.com/python/cpython/blob/main/Lib/_pyio.py
93
        # adds compatibility for botocore's client-side AWSResponse.raw attribute.
94
        # it seems the default implementation of RawIOBase.read to not handle well some cases
95
        if amt is None:
1✔
96
            amt = -1
1✔
97
        return super().read(amt)
1✔
98

99
    def close(self) -> None:
1✔
100
        return self.response.close()
1✔
101

102
    def __iter__(self):
1✔
103
        return self.iterator
×
104

105
    def __next__(self):
1✔
106
        return next(self.iterator)
1✔
107

108
    def __str__(self):
1✔
109
        length = self.response.content_length
1✔
110
        if length is None:
1✔
111
            length = "unknown"
×
112

113
        return f"StreamedBytes({length})"
1✔
114

115
    def __repr__(self):
116
        return self.__str__()
117

118

119
class _RawStream:
1✔
120
    """This is a compatibility adapter for the raw_stream attribute passed to botocore's EventStream."""
121

122
    def __init__(self, response: Response):
1✔
123
        self.response = response
×
124
        self.iterator = response.iter_encoded()
×
125

126
    def stream(self) -> Iterable[bytes]:
1✔
127
        return self.iterator
×
128

129
    def close(self):
1✔
130
        pass
×
131

132

133
def _add_modeled_error_fields(
1✔
134
    response_dict: dict,
135
    parsed_response: dict,
136
    operation_model: OperationModel,
137
    parser: ResponseParser,
138
):
139
    """
140
    This function adds additional error shape members (other than message, code, and type) to an already parsed error
141
    response dict.
142
    Port of botocore's Endpoint#_add_modeled_error_fields.
143
    """
144
    error_code = parsed_response.get("Error", {}).get("Code")
1✔
145
    if error_code is None:
1✔
146
        return
×
147
    service_model = operation_model.service_model
1✔
148
    error_shape = service_model.shape_for_error_code(error_code)
1✔
149
    if error_shape is None:
1✔
150
        return
1✔
151
    modeled_parse = parser.parse(response_dict, error_shape)
1✔
152
    parsed_response.update(modeled_parse)
1✔
153

154

155
def _cbor_timestamp_parser(value):
1✔
156
    return datetime.fromtimestamp(value / 1000)
×
157

158

159
def _cbor_blob_parser(value):
1✔
160
    return bytes(value)
×
161

162

163
@hooks.on_infra_start()
1✔
164
def _patch_botocore_json_parser():
1✔
165
    from botocore.parsers import BaseJSONParser
1✔
166

167
    @patch(BaseJSONParser._parse_body_as_json)
1✔
168
    def _parse_body_as_json(fn, self, body_contents):
1✔
169
        """
170
        botocore does not support CBOR encoded response parsing. Since we use the botocore parsers
171
        to parse responses from external backends (like kinesis-mock), we need to patch botocore to
172
        try CBOR decoding in case the JSON decoding fails.
173
        """
174
        try:
1✔
175
            return fn(self, body_contents)
1✔
176
        except UnicodeDecodeError as json_exception:
×
177
            # cbor2: explicitly load from private _decoder module to avoid using the (non-patched) C-version
178
            from cbor2._decoder import loads
×
179

180
            try:
×
181
                LOG.debug("botocore failed decoding JSON. Trying to decode as CBOR.")
×
182
                return loads(body_contents)
×
183
            except Exception as cbor_exception:
×
184
                LOG.debug("CBOR fallback decoding failed.")
×
185
                raise cbor_exception from json_exception
×
186

187

188
@hooks.on_infra_start()
1✔
189
def _patch_cbor2():
1✔
190
    """
191
    Patch fixing the AWS CBOR en-/decoding of datetime fields.
192

193
    Unfortunately, Kinesis (the only known service using CBOR) does not use the number of seconds (with floating-point
194
    milliseconds - according to RFC8949), but uses milliseconds.
195
    Python cbor2 is highly optimized by using a C-implementation by default, which cannot be patched.
196
    Instead of `from cbor2 import loads`, directly import the python-native loads implementation to avoid loading the
197
    unpatched C implementation:
198
    ```
199
    from cbor2._decoder import loads
200
    from cbor2._decoder import dumps
201
    ```
202

203
    See https://github.com/aws/aws-sdk-java-v2/issues/4661
204
    """
205
    from cbor2._decoder import CBORDecodeValueError, semantic_decoders
1✔
206
    from cbor2._encoder import CBOREncodeValueError, default_encoders
1✔
207
    from cbor2._types import CBORTag
1✔
208

209
    def _patched_decode_epoch_datetime(self) -> datetime:
1✔
210
        """
211
        Replaces `cbor2._decoder.CBORDecoder.decode_epoch_datetime` as default datetime semantic_decoder.
212
        """
213
        # Semantic tag 1
214
        value = self._decode()
1✔
215

216
        try:
1✔
217
            # The next line is the only change in this patch compared to the original function.
218
            # AWS breaks the CBOR spec by using the millis (instead of seconds with floating point support for millis)
219
            # https://github.com/aws/aws-sdk-java-v2/issues/4661
220
            value = value / 1000
1✔
221
            tmp = datetime.fromtimestamp(value, UTC)
1✔
222
        except (OverflowError, OSError, ValueError) as exc:
×
223
            raise CBORDecodeValueError("error decoding datetime from epoch") from exc
×
224

225
        return self.set_shareable(tmp)
1✔
226

227
    def _patched_encode_datetime(self, value: datetime) -> None:
1✔
228
        """
229
        Replaces `cbor2._encoder.CBOREncoder.encode_datetime` as default datetime default_encoder.
230
        """
231
        if not value.tzinfo:
1✔
232
            if self._timezone:
×
233
                value = value.replace(tzinfo=self._timezone)
×
234
            else:
235
                raise CBOREncodeValueError(
×
236
                    f"naive datetime {value!r} encountered and no default timezone has been set"
237
                )
238

239
        if self.datetime_as_timestamp:
1✔
240
            from calendar import timegm
1✔
241

242
            if not value.microsecond:
1✔
243
                timestamp: float = timegm(value.utctimetuple())
×
244
            else:
245
                timestamp = timegm(value.utctimetuple()) + value.microsecond / 1000000
1✔
246
            # The next line is the only change in this patch compared to the original function.
247
            # - AWS breaks the CBOR spec by using the millis (instead of seconds with floating point support for millis)
248
            #   https://github.com/aws/aws-sdk-java-v2/issues/4661
249
            # - AWS SDKs in addition have very tight assumptions on the type.
250
            #   This needs to be an integer, and must not be a floating point number (CBOR is typed)!
251
            timestamp = int(timestamp * 1000)
1✔
252
            self.encode_semantic(CBORTag(1, timestamp))
1✔
253
        else:
254
            datestring = value.isoformat().replace("+00:00", "Z")
×
255
            self.encode_semantic(CBORTag(0, datestring))
×
256

257
    # overwrite the default epoch datetime en-/decoder with patched versions
258
    default_encoders[datetime] = _patched_encode_datetime
1✔
259
    semantic_decoders[1] = _patched_decode_epoch_datetime
1✔
260

261

262
def _create_and_enrich_aws_request(
1✔
263
    fn, self: Endpoint, params: dict, operation_model: OperationModel = None
264
):
265
    """
266
    Patch that adds the botocore operation model and request parameters to a newly created AWSPreparedRequest,
267
    which normally only holds low-level HTTP request information.
268
    """
269
    request: awsrequest.AWSPreparedRequest = fn(self, params, operation_model)
1✔
270

271
    request.params = params
1✔
272
    request.operation_model = operation_model
1✔
273

274
    return request
1✔
275

276

277
botocore_in_memory_endpoint_patch = Patch.function(
1✔
278
    Endpoint.create_request, _create_and_enrich_aws_request
279
)
280

281

282
@hooks.on_infra_start(should_load=config.IN_MEMORY_CLIENT)
1✔
283
def _patch_botocore_endpoint_in_memory():
1✔
284
    botocore_in_memory_endpoint_patch.apply()
×
285

286

287
def parse_response(
1✔
288
    operation: OperationModel,
289
    protocol: ProtocolName,
290
    response: Response,
291
    include_response_metadata: bool = True,
292
) -> ServiceResponse:
293
    """
294
    Parses an HTTP Response object into an AWS response object using botocore. It does this by adapting the
295
    procedure of ``botocore.endpoint.convert_to_response_dict`` to work with Werkzeug's server-side response object.
296

297
    :param operation: the operation of the original request
298
    :param protocol: the protocol of the original request
299
    :param response: the HTTP response object containing the response of the operation
300
    :param include_response_metadata: True if the ResponseMetadata (typical for boto response dicts) should be included
301
    :return: a parsed dictionary as it is returned by botocore
302
    """
303
    # this is what botocore.endpoint.convert_to_response_dict normally does
304
    response_dict = {
1✔
305
        "headers": dict(response.headers.items()),  # boto doesn't like werkzeug headers
306
        "status_code": response.status_code,
307
        "context": {
308
            "operation_name": operation.name,
309
        },
310
    }
311

312
    if response_dict["status_code"] >= 301:
1✔
313
        response_dict["body"] = response.data
1✔
314
    elif operation.has_event_stream_output:
1✔
315
        # TODO test this
316
        response_dict["body"] = _RawStream(response)
×
317
    elif operation.has_streaming_output:
1✔
318
        # for s3.GetObject for example, the Body attribute is actually a stream, not the raw bytes value
319
        response_dict["body"] = _ResponseStream(response)
1✔
320
    else:
321
        response_dict["body"] = response.data
1✔
322

323
    factory = ResponseParserFactory()
1✔
324
    if response.content_type and response.content_type.startswith("application/x-amz-cbor"):
1✔
325
        # botocore cannot handle CBOR encoded responses (because it never sends them), we need to modify the parser
326
        factory.set_parser_defaults(
×
327
            timestamp_parser=_cbor_timestamp_parser, blob_parser=_cbor_blob_parser
328
        )
329

330
    parser = factory.create_parser(protocol)
1✔
331
    parsed_response = parser.parse(response_dict, operation.output_shape)
1✔
332

333
    if response.status_code >= 301:
1✔
334
        # Add possible additional error shape members
335
        _add_modeled_error_fields(response_dict, parsed_response, operation, parser)
1✔
336

337
    if not include_response_metadata:
1✔
338
        parsed_response.pop("ResponseMetadata", None)
1✔
339

340
    return parsed_response
1✔
341

342

343
def parse_service_exception(response: Response, parsed_response: dict) -> ServiceException | None:
1✔
344
    """
345
    Creates a ServiceException (one ASF can handle) from a parsed response (one that botocore would return).
346
    It does not automatically raise the exception (see #raise_service_exception).
347
    :param response: Un-parsed response
348
    :param parsed_response: Parsed response
349
    :return: ServiceException or None (if it's not an error response)
350
    """
351
    if response.status_code < 301 or "Error" not in parsed_response:
1✔
352
        return None
1✔
353
    error = parsed_response["Error"]
1✔
354
    service_exception = CommonServiceException(
1✔
355
        code=error.get("Code", f"'{response.status_code}'"),
356
        status_code=response.status_code,
357
        message=error.get("Message", ""),
358
        sender_fault=error.get("Type") == "Sender",
359
    )
360
    # Add all additional fields in the parsed response as members of the exception
361
    for key, value in parsed_response.items():
1✔
362
        if key.lower() not in ["code", "message", "type", "error"] and not hasattr(
1✔
363
            service_exception, key
364
        ):
365
            setattr(service_exception, key, value)
1✔
366
    return service_exception
1✔
367

368

369
def raise_service_exception(response: Response, parsed_response: dict) -> None:
1✔
370
    """
371
    Creates and raises a ServiceException from a parsed response (one that botocore would return).
372
    :param response: Un-parsed response
373
    :param parsed_response: Parsed response
374
    :raise ServiceException: If the response is an error response
375
    :return: None if the response is not an error response
376
    """
377
    if service_exception := parse_service_exception(response, parsed_response):
1✔
378
        raise service_exception
1✔
379

380

381
class GatewayShortCircuit:
1✔
382
    gateway: Gateway
1✔
383

384
    def __init__(self, gateway: Gateway):
1✔
385
        self.gateway = gateway
1✔
386
        self._internal_url = get_service_endpoint()
1✔
387

388
    def __call__(
1✔
389
        self, event_name: str, request: awsrequest.AWSPreparedRequest, **kwargs
390
    ) -> awsrequest.AWSResponse | None:
391
        # TODO: we sometimes overrides the endpoint_url to direct it to DynamoDBLocal directly
392
        # if the default endpoint_url is not in the request, just skips the in-memory forwarding
393
        if self._internal_url not in request.url:
1✔
394
            return
×
395

396
        # extract extra data from enriched AWSPreparedRequest
397
        params = request.params
1✔
398
        operation: OperationModel = request.operation_model
1✔
399

400
        # create request
401
        context = RequestContext(request=create_http_request(request))
1✔
402

403
        # TODO: just a hacky thing to unblock the service model being set to `sqs-query` blocking for now
404
        # this is using the same services as `localstack.aws.protocol.service_router.resolve_conflicts`, maybe
405
        # consolidate. `docdb` and `neptune` uses the RDS API and service.
406
        if operation.service_model.service_name not in {
1✔
407
            "sqs-query",
408
            "docdb",
409
            "neptune",
410
            "timestream-write",
411
        }:
412
            context.service = operation.service_model
1✔
413

414
        context.operation = operation
1✔
415
        context.service_request = params["body"]
1✔
416

417
        # perform request
418
        response = Response()
1✔
419
        self.gateway.handle(context, response)
1✔
420

421
        # transform Werkzeug response to client-side botocore response
422
        aws_response = awsrequest.AWSResponse(
1✔
423
            url=context.request.url,
424
            status_code=response.status_code,
425
            headers=response.headers,
426
            raw=_ResponseStream(response),
427
        )
428

429
        return aws_response
1✔
430

431
    @staticmethod
1✔
432
    def modify_client(client, gateway):
1✔
433
        client.meta.events.register_first("before-send.*.*", GatewayShortCircuit(gateway))
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc