• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 17902858349

19 Sep 2025 12:17PM UTC coverage: 86.864% (+0.02%) from 86.844%
17902858349

push

github

web-flow
ASF/CloudWatch: add support for multi-protocols (#13161)

34 of 37 new or added lines in 7 files covered. (91.89%)

136 existing lines in 10 files now uncovered.

67691 of 77928 relevant lines covered (86.86%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.12
/localstack-core/localstack/aws/forwarder.py
1
"""
2
This module contains utilities to call a backend (e.g., an external service process like
3
DynamoDBLocal) from a service provider.
4
"""
5

6
from collections.abc import Callable, Mapping
1✔
7
from typing import Any
1✔
8

9
from botocore.awsrequest import AWSPreparedRequest, prepare_request_dict
1✔
10
from botocore.config import Config as BotoConfig
1✔
11
from botocore.model import OperationModel
1✔
12
from botocore.serialize import create_serializer
1✔
13
from werkzeug.datastructures import Headers
1✔
14

15
from localstack.aws.api.core import (
1✔
16
    RequestContext,
17
    ServiceRequest,
18
    ServiceRequestHandler,
19
    ServiceResponse,
20
)
21
from localstack.aws.client import create_http_request, parse_response, raise_service_exception
1✔
22
from localstack.aws.connect import connect_to
1✔
23
from localstack.aws.skeleton import DispatchTable, create_dispatch_table
1✔
24
from localstack.aws.spec import ProtocolName, load_service
1✔
25
from localstack.constants import AWS_REGION_US_EAST_1
1✔
26
from localstack.http import Response
1✔
27
from localstack.http.proxy import Proxy
1✔
28

29

30
class AwsRequestProxy:
1✔
31
    """
32
    Implements the ``ServiceRequestHandler`` protocol to forward AWS requests to a backend. It is stateful and uses a
33
    ``Proxy`` instance for re-using client connections to the backend.
34
    """
35

36
    def __init__(
1✔
37
        self,
38
        endpoint_url: str,
39
        parse_response: bool = True,
40
        include_response_metadata: bool = False,
41
    ):
42
        """
43
        Create a new AwsRequestProxy. ``parse_response`` control the return behavior of ``forward``. If
44
        ``parse_response`` is set, then ``forward`` parses the HTTP response from the backend and returns a
45
        ``ServiceResponse``, otherwise it returns the raw HTTP ``Response`` object.
46

47
        :param endpoint_url: the backend to proxy the requests to, used as ``forward_base_url`` for the ``Proxy``.
48
        :param parse_response: whether to parse the response before returning it
49
        :param include_response_metadata: include AWS response metadata, only used with ``parse_response=True``
50
        """
51
        self.endpoint_url = endpoint_url
1✔
52
        self.parse_response = parse_response
1✔
53
        self.include_response_metadata = include_response_metadata
1✔
54
        self.proxy = Proxy(forward_base_url=endpoint_url)
1✔
55

56
    def __call__(
1✔
57
        self,
58
        context: RequestContext,
59
        service_request: ServiceRequest = None,
60
    ) -> ServiceResponse | Response | None:
61
        """Method to satisfy the ``ServiceRequestHandler`` protocol."""
62
        return self.forward(context, service_request)
1✔
63

64
    def forward(
1✔
65
        self,
66
        context: RequestContext,
67
        service_request: ServiceRequest = None,
68
    ) -> ServiceResponse | Response | None:
69
        """
70
        Forwards the given request to the backend configured by ``endpoint_url``.
71

72
        :param context: the original request context of the incoming request
73
        :param service_request: optionally a new service
74
        :return:
75
        """
76
        if service_request is not None:
1✔
77
            # if a service request is passed then we need to create a new request context
78
            context = self.new_request_context(context, service_request)
1✔
79

80
        http_response = self.proxy.forward(context.request, forward_path=context.request.path)
1✔
81
        if not self.parse_response:
1✔
82
            return http_response
×
83
        parsed_response = parse_response(
1✔
84
            context.operation, context.protocol, http_response, self.include_response_metadata
85
        )
86
        raise_service_exception(http_response, parsed_response)
1✔
87
        return parsed_response
1✔
88

89
    def new_request_context(self, original: RequestContext, service_request: ServiceRequest):
1✔
90
        context = create_aws_request_context(
1✔
91
            service_name=original.service.service_name,
92
            action=original.operation.name,
93
            parameters=service_request,
94
            region=original.region,
95
            protocol=original.protocol,
96
        )
97
        # update the newly created context with non-payload specific request headers (the payload can differ from
98
        # the original request, f.e. it could be JSON encoded now while the initial request was CBOR encoded)
99
        headers = Headers(original.request.headers)
1✔
100
        headers.pop("Content-Type", None)
1✔
101
        headers.pop("Content-Length", None)
1✔
102
        context.request.headers.update(headers)
1✔
103
        return context
1✔
104

105

106
def ForwardingFallbackDispatcher(
1✔
107
    provider: object, request_forwarder: ServiceRequestHandler
108
) -> DispatchTable:
109
    """
110
    Wraps a provider with a request forwarder. It does by creating a new DispatchTable from the original
111
    provider, and wrapping each method with a fallthrough method that calls ``request_forwarder`` if the
112
    original provider raises a ``NotImplementedError``.
113

114
    :param provider: the ASF provider
115
    :param request_forwarder: callable that forwards the request (e.g., to a backend server)
116
    :return: a modified DispatchTable
117
    """
118
    table = create_dispatch_table(provider)
1✔
119

120
    for op, fn in table.items():
1✔
121
        table[op] = _wrap_with_fallthrough(fn, request_forwarder)
1✔
122

123
    return table
1✔
124

125

126
class NotImplementedAvoidFallbackError(NotImplementedError):
1✔
127
    pass
1✔
128

129

130
def _wrap_with_fallthrough(
1✔
131
    handler: ServiceRequestHandler, fallthrough_handler: ServiceRequestHandler
132
) -> ServiceRequestHandler:
133
    def _call(context, req) -> ServiceResponse:
1✔
134
        try:
1✔
135
            # handler will typically be an ASF provider method, and in case it hasn't been
136
            # implemented, we try to fall back to forwarding the request to the backend
137
            return handler(context, req)
1✔
138
        except NotImplementedAvoidFallbackError as e:
1✔
139
            # if the fallback has been explicitly disabled, don't pass on to the fallback
140
            raise e
1✔
141
        except NotImplementedError:
1✔
142
            pass
1✔
143

144
        return fallthrough_handler(context, req)
1✔
145

146
    return _call
1✔
147

148

149
def HttpFallbackDispatcher(provider: object, forward_url_getter: Callable[[str, str], str]):
1✔
150
    return ForwardingFallbackDispatcher(provider, get_request_forwarder_http(forward_url_getter))
1✔
151

152

153
def get_request_forwarder_http(
1✔
154
    forward_url_getter: Callable[[str, str], str],
155
) -> ServiceRequestHandler:
156
    """
157
    Returns a ServiceRequestHandler that creates for each invocation a new AwsRequestProxy with the result of
158
    forward_url_getter. Note that this is an inefficient method of proxying, since for every call a new client
159
    connection has to be established. Try to instead use static forward URL values and use ``AwsRequestProxy`` directly.
160

161
    :param forward_url_getter: a factory method for returning forward base urls for the proxy
162
    :return: a ServiceRequestHandler acting as a proxy
163
    """
164

165
    def _forward_request(
1✔
166
        context: RequestContext, service_request: ServiceRequest = None
167
    ) -> ServiceResponse:
168
        return AwsRequestProxy(forward_url_getter(context.account_id, context.region)).forward(
1✔
169
            context, service_request
170
        )
171

172
    return _forward_request
1✔
173

174

175
def dispatch_to_backend(
1✔
176
    context: RequestContext,
177
    http_request_dispatcher: Callable[[RequestContext], Response],
178
    include_response_metadata=False,
179
) -> ServiceResponse:
180
    """
181
    Dispatch the given request to a backend by using the `request_forwarder` function to
182
    fetch an HTTP response, converting it to a ServiceResponse.
183
    :param context: the request context
184
    :param http_request_dispatcher: dispatcher that performs the request and returns an HTTP response
185
    :param include_response_metadata: whether to include boto3 response metadata in the response
186
    :return: parsed service response
187
    :raises ServiceException: if the dispatcher returned an error response
188
    """
189
    http_response = http_request_dispatcher(context)
1✔
190
    parsed_response = parse_response(
1✔
191
        context.operation, context.protocol, http_response, include_response_metadata
192
    )
193
    raise_service_exception(http_response, parsed_response)
1✔
194
    return parsed_response
1✔
195

196

197
# boto config deactivating param validation to forward to backends (backends are responsible for validating params)
198
_non_validating_boto_config = BotoConfig(parameter_validation=False)
1✔
199

200

201
def create_aws_request_context(
1✔
202
    service_name: str,
203
    action: str,
204
    protocol: ProtocolName = None,
205
    parameters: Mapping[str, Any] = None,
206
    region: str = None,
207
    endpoint_url: str | None = None,
208
) -> RequestContext:
209
    """
210
    This is a stripped-down version of what the botocore client does to perform an HTTP request from a client call. A
211
    client call looks something like this: boto3.client("sqs").create_queue(QueueName="myqueue"), which will be
212
    serialized into an HTTP request. This method does the same, without performing the actual request, and with a
213
    more low-level interface. An equivalent call would be
214

215
         create_aws_request_context("sqs", "CreateQueue", {"QueueName": "myqueue"})
216

217
    :param service_name: the AWS service
218
    :param action: the action to invoke
219
    :param protocol: the protocol to use
220
    :param parameters: the invocation parameters
221
    :param region: the region name (default is us-east-1)
222
    :param endpoint_url: the endpoint to call (defaults to localstack)
223
    :return: a RequestContext object that describes this request
224
    """
225
    if parameters is None:
1✔
226
        parameters = {}
1✔
227
    if region is None:
1✔
228
        region = AWS_REGION_US_EAST_1
1✔
229

230
    service = load_service(service_name)
1✔
231
    operation = service.operation_model(action)
1✔
232
    # TODO: remove this once every usage upstream has been removed
233
    protocol = protocol or service.resolved_protocol
1✔
234

235
    # we re-use botocore internals here to serialize the HTTP request,
236
    # but deactivate validation (validation errors should be handled by the backend)
237
    # and don't send it yet
238
    client = connect_to.get_client(
1✔
239
        service_name,
240
        endpoint_url=endpoint_url,
241
        region_name=region,
242
        config=_non_validating_boto_config,
243
    )
244
    request_context = {
1✔
245
        "client_region": region,
246
        "has_streaming_input": operation.has_streaming_input,
247
        "auth_type": operation.auth_type,
248
    }
249

250
    # The endpoint URL is mandatory here, set a dummy if not given (doesn't _need_ to be localstack specific)
251
    if not endpoint_url:
1✔
252
        endpoint_url = "http://localhost.localstack.cloud"
1✔
253
    # pre-process the request args (some params are modified using botocore event handlers)
254
    parameters = client._emit_api_params(parameters, operation, request_context)
1✔
255

256
    request_dict = _convert_to_request_dict_with_protocol(
1✔
257
        client=client,
258
        protocol=protocol,
259
        api_params=parameters,
260
        operation_model=operation,
261
        endpoint_url=endpoint_url,
262
        context=request_context,
263
    )
264

265
    if auth_path := request_dict.get("auth_path"):
1✔
266
        # botocore >= 1.28 might modify the url path of the request dict (specifically for S3).
267
        # It will then set the original url path as "auth_path". If the auth_path is set, we reset the url_path.
268
        # Since botocore 1.31.2, botocore will strip the query from the `authPart`
269
        # We need to add it back from `requestUri` field
270
        # Afterwards the request needs to be prepared again.
271
        path, sep, query = request_dict["url_path"].partition("?")
1✔
272
        request_dict["url_path"] = f"{auth_path}{sep}{query}"
1✔
273
        prepare_request_dict(
1✔
274
            request_dict,
275
            endpoint_url=endpoint_url,
276
            user_agent=client._client_config.user_agent,
277
            context=request_context,
278
        )
279

280
    aws_request: AWSPreparedRequest = client._endpoint.create_request(request_dict, operation)
1✔
281
    context = RequestContext(request=create_http_request(aws_request))
1✔
282
    context.service = service
1✔
283
    context.operation = operation
1✔
284
    context.protocol = protocol
1✔
285
    context.region = region
1✔
286
    context.service_request = parameters
1✔
287

288
    return context
1✔
289

290

291
def _convert_to_request_dict_with_protocol(
1✔
292
    client,
293
    protocol: ProtocolName,
294
    api_params: dict,
295
    operation_model: OperationModel,
296
    endpoint_url: str,
297
    context: dict,
298
    set_user_agent_header: bool = True,
299
) -> dict:
300
    """
301
    This function is taken from botocore Client._convert_to_request_dict, but we are overriding the serializer
302
    Botocore does not expose a way to create a client with a specific protocol, but we need this functionality
303
    to support multi-protocols.
304
    """
305
    serializer = create_serializer(protocol, include_validation=False)
1✔
306
    request_dict = serializer.serialize_to_request(api_params, operation_model)
1✔
307
    if not client._client_config.inject_host_prefix:
1✔
NEW
308
        request_dict.pop("host_prefix", None)
×
309
    if set_user_agent_header:
1✔
310
        user_agent = client._user_agent_creator.to_string()
1✔
311
    else:
NEW
312
        user_agent = None
×
313
    prepare_request_dict(
1✔
314
        request_dict,
315
        endpoint_url=endpoint_url,
316
        user_agent=user_agent,
317
        context=context,
318
    )
319
    return request_dict
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc