• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20565403496

29 Dec 2025 05:11AM UTC coverage: 84.103% (-2.8%) from 86.921%
20565403496

Pull #13567

github

web-flow
Merge 4816837a5 into 2417384aa
Pull Request #13567: Update ASF APIs

67166 of 79862 relevant lines covered (84.1%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.11
/localstack-core/localstack/aws/protocol/service_router.py
1
import logging
1✔
2
from typing import NamedTuple
1✔
3

4
from botocore.model import ServiceModel
1✔
5
from werkzeug.exceptions import RequestEntityTooLarge
1✔
6
from werkzeug.http import parse_dict_header
1✔
7

8
from localstack.aws.spec import (
1✔
9
    ProtocolName,
10
    ServiceCatalog,
11
    ServiceModelIdentifier,
12
    get_service_catalog,
13
    is_protocol_in_service_model_identifier,
14
)
15
from localstack.http import Request
1✔
16
from localstack.services.s3.utils import uses_host_addressing
1✔
17
from localstack.services.sqs.utils import is_sqs_queue_url
1✔
18
from localstack.utils.strings import to_bytes
1✔
19

20
LOG = logging.getLogger(__name__)
1✔
21

22
_PROTOCOL_DETECTION_PRIORITY: list[ProtocolName] = [
1✔
23
    "smithy-rpc-v2-cbor",
24
    "json",
25
    "query",
26
    "ec2",
27
    "rest-json",
28
    "rest-xml",
29
]
30

31

32
class ProtocolError(Exception):
1✔
33
    """
34
    Error which is thrown if we cannot detect the protocol for the request.
35
    """
36

37
    pass
1✔
38

39

40
class _ServiceIndicators(NamedTuple):
1✔
41
    """
42
    Encapsulates the different fields that might indicate which service a request is targeting.
43

44
    This class does _not_ contain any data which is parsed from the body of the request in order to defer or even avoid
45
    processing the body.
46
    """
47

48
    # AWS service's "signing name" - Contained in the Authorization header
49
    # (https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-auth-using-authorization-header.html)
50
    signing_name: str | None = None
1✔
51
    # Target prefix as defined in the service specs for non-rest protocols - Contained in the X-Amz-Target header
52
    target_prefix: str | None = None
1✔
53
    # Targeted operation as defined in the service specs for non-rest protocols - Contained in the X-Amz-Target header
54
    operation: str | None = None
1✔
55
    # Host field of the HTTP request
56
    host: str | None = None
1✔
57
    # Path of the HTTP request
58
    path: str | None = None
1✔
59

60

61
def _extract_service_indicators(request: Request) -> _ServiceIndicators:
1✔
62
    """Extracts all different fields that might indicate which service a request is targeting."""
63
    x_amz_target = request.headers.get("x-amz-target")
1✔
64
    authorization = request.headers.get("authorization")
1✔
65
    is_rpc_v2 = "rpc-v2-cbor" in request.headers.get("Smithy-Protocol", "")
1✔
66

67
    signing_name = None
1✔
68
    if authorization:
1✔
69
        try:
1✔
70
            auth_type, auth_info = authorization.split(None, 1)
1✔
71
            auth_type = auth_type.lower().strip()
1✔
72
            if auth_type == "aws4-hmac-sha256":
1✔
73
                values = parse_dict_header(auth_info)
1✔
74
                _, _, _, signing_name, _ = values["Credential"].split("/")
1✔
75
        except (ValueError, KeyError):
×
76
            LOG.debug("auth header could not be parsed for service routing: %s", authorization)
×
77
            pass
×
78
    if is_rpc_v2:
1✔
79
        # https://smithy.io/2.0/additional-specs/protocols/smithy-rpc-v2.html#requests
80
        rpc_v2_params = request.path.lstrip("/").split("/")
1✔
81
        if len(rpc_v2_params) >= 4:
1✔
82
            *_, service_shape_name, __, operation = rpc_v2_params
1✔
83
            target_prefix = service_shape_name.split("#")[-1]
1✔
84
        else:
85
            target_prefix, operation = None, None
×
86
    elif x_amz_target:
1✔
87
        if "." in x_amz_target:
1✔
88
            target_prefix, operation = x_amz_target.split(".", 1)
1✔
89
        else:
90
            target_prefix = None
×
91
            operation = x_amz_target
×
92
    else:
93
        target_prefix, operation = None, None
1✔
94

95
    return _ServiceIndicators(signing_name, target_prefix, operation, request.host, request.path)
1✔
96

97

98
def _matches_protocol(request: Request, protocol: ProtocolName) -> bool:
1✔
99
    headers = request.headers
1✔
100
    mimetype = request.mimetype.lower()
1✔
101
    match protocol:
1✔
102
        case "smithy-rpc-v2-cbor":
1✔
103
            # Every request for the rpcv2Cbor protocol MUST contain a `Smithy-Protocol` header with the value
104
            # of `rpc-v2-cbor`.
105
            # https://smithy.io/2.0/additional-specs/protocols/smithy-rpc-v2.html
106
            return headers.get("Smithy-Protocol", "") == "rpc-v2-cbor"
1✔
107
        case "json":
1✔
108
            return mimetype.startswith("application/x-amz-json")
1✔
109
        case "query" | "ec2":
1✔
110
            # https://smithy.io/2.0/aws/protocols/aws-query-protocol.html#request-serialization
111
            return (
1✔
112
                mimetype.startswith("application/x-www-form-urlencoded") or "Action" in request.args
113
            )
114
        case "rest-xml" | "rest-json":
×
115
            # `rest-json` and `rest-xml` can accept any kind of Content-Type, and it can be configured on the operation
116
            # level.
117
            # https://smithy.io/2.0/aws/protocols/aws-restjson1-protocol.html
118
            return True
×
119
        case _:
×
120
            return False
×
121

122

123
def match_available_protocols(
1✔
124
    request: Request, available_protocols: list[ProtocolName]
125
) -> ProtocolName | None:
126
    """
127
    Tries to match the current request and determine the protocol used amongst the available protocols given.
128
    We use a priority order to try to determine the protocol, as some protocols are more permissive that others.
129
    :param request: the incoming request
130
    :param available_protocols: the available protocols of the Service the request is directed to
131
    :return: the protocol matched, if any
132
    """
133
    for protocol in _PROTOCOL_DETECTION_PRIORITY:
1✔
134
        if protocol in available_protocols and _matches_protocol(request, protocol):
1✔
135
            return protocol
1✔
136

137
    return None
1✔
138

139

140
signing_name_path_prefix_rules = {
1✔
141
    # custom rules based on URI path prefixes that are not easily generalizable
142
    "apigateway": {
143
        "/v2": ServiceModelIdentifier("apigatewayv2"),
144
    },
145
    "appconfig": {
146
        "/configuration": ServiceModelIdentifier("appconfigdata"),
147
    },
148
    "bedrock": {
149
        "/guardrail/": ServiceModelIdentifier("bedrock-runtime"),
150
        "/model/": ServiceModelIdentifier("bedrock-runtime"),
151
        "/async-invoke": ServiceModelIdentifier("bedrock-runtime"),
152
    },
153
    "execute-api": {
154
        "/@connections": ServiceModelIdentifier("apigatewaymanagementapi"),
155
        "/participant": ServiceModelIdentifier("connectparticipant"),
156
        "*": ServiceModelIdentifier("iot"),
157
    },
158
    "ses": {
159
        "/v2": ServiceModelIdentifier("sesv2"),
160
        "/v1": ServiceModelIdentifier("pinpoint-email"),
161
    },
162
    "greengrass": {
163
        "/greengrass/v2/": ServiceModelIdentifier("greengrassv2"),
164
    },
165
    "cloudsearch": {
166
        "/2013-01-01": ServiceModelIdentifier("cloudsearchdomain"),
167
    },
168
    "s3": {"/v20180820": ServiceModelIdentifier("s3control")},
169
    "iot1click": {
170
        "/projects": ServiceModelIdentifier("iot1click-projects"),
171
        "/devices": ServiceModelIdentifier("iot1click-devices"),
172
    },
173
    "es": {
174
        "/2015-01-01": ServiceModelIdentifier("es"),
175
        "/2021-01-01": ServiceModelIdentifier("opensearch"),
176
    },
177
    "sagemaker": {
178
        "/endpoints": ServiceModelIdentifier("sagemaker-runtime"),
179
        "/human-loops": ServiceModelIdentifier("sagemaker-a2i-runtime"),
180
    },
181
}
182

183

184
def custom_signing_name_rules(signing_name: str, path: str) -> ServiceModelIdentifier | None:
1✔
185
    """
186
    Rules which are based on the signing name (in the auth header) and the request path.
187
    """
188
    rules = signing_name_path_prefix_rules.get(signing_name)
1✔
189

190
    if not rules:
1✔
191
        if signing_name == "servicecatalog":
1✔
192
            if path == "/":
1✔
193
                # servicecatalog uses the protocol json (only uses root-path URIs, i.e. only /)
194
                return ServiceModelIdentifier("servicecatalog")
1✔
195
            else:
196
                # servicecatalog-appregistry uses rest-json (only uses non-root-path request URIs)
197
                return ServiceModelIdentifier("servicecatalog-appregistry")
1✔
198
        return
1✔
199

200
    for prefix, service_model_identifier in rules.items():
1✔
201
        if path.startswith(prefix):
1✔
202
            return service_model_identifier
1✔
203

204
    return rules.get("*", ServiceModelIdentifier(signing_name))
1✔
205

206

207
def custom_host_addressing_rules(host: str) -> ServiceModelIdentifier | None:
1✔
208
    """
209
    Rules based on the host header of the request, which is typically the data plane of a service.
210

211
    Some services are added through a patch in ext.
212
    """
213
    if ".lambda-url." in host:
1✔
214
        return ServiceModelIdentifier("lambda")
1✔
215

216
    if ".s3-website." in host:
1✔
217
        return ServiceModelIdentifier("s3")
1✔
218

219

220
def custom_path_addressing_rules(path: str) -> ServiceModelIdentifier | None:
1✔
221
    """
222
    Rules which are only based on the request path.
223
    """
224

225
    if is_sqs_queue_url(path):
1✔
226
        return ServiceModelIdentifier("sqs", protocol="query")
×
227

228
    if path.startswith("/2015-03-31/functions"):
1✔
229
        return ServiceModelIdentifier("lambda")
1✔
230

231

232
def legacy_s3_rules(request: Request) -> ServiceModelIdentifier | None:
1✔
233
    """
234
    *Legacy* rules which allow us to fallback to S3 if no other service was matched.
235
    All rules which are implemented here should be removed once we make sure it would not break any use-cases.
236
    """
237

238
    path = request.path
1✔
239
    method = request.method
1✔
240

241
    # TODO The remaining rules here are special S3 rules - needs to be discussed how these should be handled.
242
    #      Some are similar to other rules and not that greedy, others are nearly general fallbacks.
243
    stripped = path.strip("/")
1✔
244
    if method in ["GET", "HEAD"] and stripped:
1✔
245
        # assume that this is an S3 GET request with URL path `/<bucket>/<key ...>`
246
        return ServiceModelIdentifier("s3")
1✔
247

248
    # detect S3 URLs
249
    if stripped and "/" not in stripped:
1✔
250
        if method == "PUT":
1✔
251
            # assume that this is an S3 PUT bucket request with URL path `/<bucket>`
252
            return ServiceModelIdentifier("s3")
×
253
        if method == "POST" and "key" in request.values:
1✔
254
            # assume that this is an S3 POST request with form parameters or multipart form in the body
255
            return ServiceModelIdentifier("s3")
×
256

257
    # detect S3 requests sent from aws-cli using --no-sign-request option
258
    if "aws-cli/" in str(request.user_agent):
1✔
259
        return ServiceModelIdentifier("s3")
×
260

261
    # detect S3 pre-signed URLs (v2 and v4)
262
    values = request.values
1✔
263
    if any(
1✔
264
        value in values
265
        for value in [
266
            "AWSAccessKeyId",
267
            "Signature",
268
            "X-Amz-Algorithm",
269
            "X-Amz-Credential",
270
            "X-Amz-Date",
271
            "X-Amz-Expires",
272
            "X-Amz-SignedHeaders",
273
            "X-Amz-Signature",
274
        ]
275
    ):
276
        return ServiceModelIdentifier("s3")
×
277

278
    # S3 delete object requests
279
    if method == "POST" and "delete" in values:
1✔
280
        data_bytes = to_bytes(request.data)
×
281
        if b"<Delete" in data_bytes and b"<Key>" in data_bytes:
×
282
            return ServiceModelIdentifier("s3")
×
283

284
    # Put Object API can have multiple keys
285
    if stripped.count("/") >= 1 and method == "PUT":
1✔
286
        # assume that this is an S3 PUT bucket object request with URL path `/<bucket>/object`
287
        # or `/<bucket>/object/object1/+`
288
        return ServiceModelIdentifier("s3")
1✔
289

290
    # detect S3 requests with "AWS id:key" Auth headers
291
    auth_header = request.headers.get("Authorization") or ""
1✔
292
    if auth_header.startswith("AWS "):
1✔
293
        return ServiceModelIdentifier("s3")
×
294

295
    if uses_host_addressing(request.headers):
1✔
296
        # Note: This needs to be the last rule (and therefore is not in the host rules), since it is incredibly greedy
297
        return ServiceModelIdentifier("s3")
1✔
298

299

300
def resolve_conflicts(
1✔
301
    candidates: set[ServiceModelIdentifier], request: Request
302
) -> ServiceModelIdentifier:
303
    """
304
    Some service definitions are overlapping to a point where they are _not_ distinguishable at all
305
    (f.e. ``DescribeEndpints`` in timestream-query and timestream-write).
306
    These conflicts need to be resolved manually.
307
    """
308
    service_name_candidates = {service.name for service in candidates}
1✔
309
    if service_name_candidates == {"timestream-query", "timestream-write"}:
1✔
310
        return ServiceModelIdentifier("timestream-query")
1✔
311
    if service_name_candidates == {"docdb", "neptune", "rds"}:
1✔
312
        return ServiceModelIdentifier("rds")
1✔
313
    if service_name_candidates == {"sqs"}:
1✔
314
        # SQS now have 2 different specs for `query` and `json` protocol. From our current implementation with the
315
        # parser and serializer, we need to have 2 different service names for them, but they share one provider
316
        # implementation. `sqs` represents the `json` protocol spec, and `sqs-query` the `query` protocol
317
        # (default again in botocore starting with 1.32.6).
318
        # The `application/x-amz-json-1.0` header is mandatory for requests targeting SQS with the `json` protocol. We
319
        # can safely route them to the `sqs` JSON parser/serializer. If not present, route the request to the
320
        # sqs-query protocol.
321
        protocol = match_available_protocols(request, available_protocols=["json", "query"])
1✔
322
        return (
1✔
323
            ServiceModelIdentifier("sqs")
324
            if protocol == "json"
325
            else ServiceModelIdentifier("sqs", "query")
326
        )
327

328

329
def determine_aws_service_model_for_data_plane(
1✔
330
    request: Request, services: ServiceCatalog = None
331
) -> ServiceModel | None:
332
    """
333
    A stripped down version of ``determine_aws_service_model`` which only checks hostname indicators for
334
    the AWS data plane, such as s3 websites, lambda function URLs, or API gateway routes.
335
    """
336
    custom_host_match = custom_host_addressing_rules(request.host)
1✔
337
    if custom_host_match:
1✔
338
        services = services or get_service_catalog()
1✔
339
        return services.get(custom_host_match.name, custom_host_match.protocol)
1✔
340

341

342
def determine_aws_protocol(request: Request, service_model: ServiceModel) -> ProtocolName:
1✔
343
    if not (protocols := service_model.metadata.get("protocols")):
1✔
344
        # if the service does not define multiple protocols, return the `protocol` defined for the service
345
        return service_model.protocol
1✔
346

347
    if len(protocols) == 1:
1✔
348
        return protocols[0]
1✔
349

350
    if protocol := match_available_protocols(request, available_protocols=protocols):
1✔
351
        return protocol
1✔
352

353
    raise ProtocolError(
1✔
354
        f"Could not determine the protocol for the request: "
355
        f"{request.method} {request.path} for the service '{service_model.service_name}' "
356
        f"(available protocols: {protocols})"
357
    )
358

359

360
def determine_aws_service_model(
1✔
361
    request: Request, services: ServiceCatalog = None
362
) -> ServiceModel | None:
363
    """
364
    Tries to determine the name of the AWS service an incoming request is targeting.
365
    :param request: to determine the target service name of
366
    :param services: service catalog (can be handed in for caching purposes)
367
    :return: service name string (or None if the targeting service could not be determined exactly)
368
    """
369
    services = services or get_service_catalog()
1✔
370
    signing_name, target_prefix, operation, host, path = _extract_service_indicators(request)
1✔
371
    candidates = set()
1✔
372

373
    # 1. check the signing names
374
    if signing_name:
1✔
375
        signing_name_candidates = services.by_signing_name(signing_name)
1✔
376
        if len(signing_name_candidates) == 1:
1✔
377
            # a unique signing-name -> service name mapping is the case for ~75% of service operations
378
            candidate = signing_name_candidates[0]
1✔
379
            return services.get(candidate.name, candidate.protocol)
1✔
380

381
        # try to find a match with the custom signing name rules
382
        custom_match = custom_signing_name_rules(signing_name, path)
1✔
383
        if custom_match:
1✔
384
            return services.get(custom_match.name, custom_match.protocol)
1✔
385

386
        # still ambiguous - add the services to the list of candidates
387
        candidates.update(signing_name_candidates)
1✔
388

389
    # 2. check the target prefix
390
    if target_prefix and operation:
1✔
391
        target_candidates = services.by_target_prefix(target_prefix)
1✔
392
        if len(target_candidates) == 1:
1✔
393
            # a unique target prefix
394
            candidate = target_candidates[0]
1✔
395
            return services.get(candidate.name, candidate.protocol)
1✔
396

397
        # still ambiguous - add the services to the list of candidates
398
        candidates.update(target_candidates)
1✔
399

400
        # exclude services where the operation is not contained in the service spec
401
        for service_identifier in list(candidates):
1✔
402
            service = services.get(service_identifier.name, service_identifier.protocol)
1✔
403
            if operation not in service.operation_names:
1✔
404
                candidates.remove(service_identifier)
1✔
405
    else:
406
        # exclude services which have a target prefix (the current request does not have one)
407
        for service_identifier in list(candidates):
1✔
408
            service = services.get(service_identifier.name, service_identifier.protocol)
1✔
409
            if service.metadata.get("targetPrefix") is not None:
1✔
410
                candidates.remove(service_identifier)
1✔
411

412
    if len(candidates) == 1:
1✔
413
        service_identifier = candidates.pop()
1✔
414
        return services.get(service_identifier.name, service_identifier.protocol)
1✔
415

416
    # 3. check the path if it is set and not a trivial root path
417
    if path and path != "/":
1✔
418
        # try to find a match with the custom path rules
419
        custom_path_match = custom_path_addressing_rules(path)
1✔
420
        if custom_path_match:
1✔
421
            return services.get(custom_path_match.name, custom_path_match.protocol)
1✔
422

423
    # 4. check the host (custom host addressing rules)
424
    if host:
1✔
425
        # iterate over the service spec's endpoint prefix
426
        for prefix, services_per_prefix in services.endpoint_prefix_index.items():
1✔
427
            # this prevents a virtual host addressed bucket to be wrongly recognized
428
            if host.startswith(f"{prefix}.") and ".s3." not in host:
1✔
429
                if len(services_per_prefix) == 1:
1✔
430
                    candidate = services_per_prefix[0]
1✔
431
                    return services.get(candidate.name, candidate.protocol)
1✔
432
                candidates.update(services_per_prefix)
1✔
433

434
        custom_host_match = custom_host_addressing_rules(host)
1✔
435
        if custom_host_match:
1✔
436
            candidate = custom_host_match[0]
×
437
            return services.get(candidate.name, candidate.protocol)
×
438

439
    if request.shallow:
1✔
440
        # from here on we would need access to the request body, which doesn't exist for shallow requests like
441
        # WebsocketRequests.
442
        return None
×
443

444
    # 5. check the query / form-data
445
    try:
1✔
446
        values = request.values
1✔
447
        if "Action" in values:
1✔
448
            # query / ec2 protocol requests always have an action and a version (the action is more significant)
449
            query_candidates = [
1✔
450
                service
451
                for service in services.by_operation(values["Action"])
452
                if any(
453
                    is_protocol_in_service_model_identifier(protocol, service)
454
                    for protocol in ("ec2", "query")
455
                )
456
            ]
457

458
            if len(query_candidates) == 1:
1✔
459
                candidate = query_candidates[0]
1✔
460
                return services.get(candidate.name, candidate.protocol)
1✔
461

462
            if "Version" in values:
1✔
463
                for service_identifier in list(query_candidates):
1✔
464
                    service_model = services.get(
1✔
465
                        service_identifier.name, service_identifier.protocol
466
                    )
467
                    if values["Version"] != service_model.api_version:
1✔
468
                        # the combination of Version and Action is not unique, add matches to the candidates
469
                        query_candidates.remove(service_identifier)
1✔
470

471
            if len(query_candidates) == 1:
1✔
472
                candidate = query_candidates[0]
1✔
473
                return services.get(candidate.name, candidate.protocol)
1✔
474

475
            candidates.update(query_candidates)
1✔
476

477
    except RequestEntityTooLarge:
×
478
        # Some requests can be form-urlencoded but also contain binary data, which will fail the form parsing (S3 can
479
        # do this). In that case, skip this step and continue to try to determine the service name. The exception is
480
        # RequestEntityTooLarge even if the error is due to failed decoding.
481
        LOG.debug(
×
482
            "Failed to determine AWS service from request body because the form could not be parsed",
483
            exc_info=LOG.isEnabledFor(logging.DEBUG),
484
        )
485

486
    # 6. resolve service spec conflicts
487
    resolved_conflict = resolve_conflicts(candidates, request)
1✔
488
    if resolved_conflict:
1✔
489
        return services.get(resolved_conflict.name, resolved_conflict.protocol)
1✔
490

491
    # 7. check the legacy S3 rules in the end
492
    legacy_match = legacy_s3_rules(request)
1✔
493
    if legacy_match:
1✔
494
        return services.get(legacy_match.name, legacy_match.protocol)
1✔
495

496
    if signing_name:
×
497
        return services.get(name=signing_name)
×
498
    if candidates:
×
499
        candidate = candidates.pop()
×
500
        return services.get(candidate.name, candidate.protocol)
×
501
    return None
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc