• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 7ae8c08b-27e7-4df6-bfb9-4892ae974ff7

17 Mar 2025 11:44PM UTC coverage: 86.954% (+0.02%) from 86.93%
7ae8c08b-27e7-4df6-bfb9-4892ae974ff7

push

circleci

web-flow
SNS: fix Filter Policy engine to not evaluate full complex payload (#12395)

8 of 8 new or added lines in 1 file covered. (100.0%)

27 existing lines in 8 files now uncovered.

62326 of 71677 relevant lines covered (86.95%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.88
/localstack-core/localstack/services/sqs/query_api.py
1
"""The SQS Query API allows using Queue URLs as endpoints for operations on that queue. See:
2
https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-making-api-requests.html. This is a
3
generic implementation that creates from Query API requests the respective AWS requests, and uses an aws_stack client
4
to make the request."""
5

6
import logging
1✔
7
from typing import Dict, Optional, Tuple
1✔
8
from urllib.parse import urlencode
1✔
9

10
from botocore.exceptions import ClientError
1✔
11
from botocore.model import OperationModel
1✔
12
from werkzeug.datastructures import Headers
1✔
13
from werkzeug.exceptions import NotFound
1✔
14

15
from localstack.aws.api import CommonServiceException
1✔
16
from localstack.aws.connect import connect_to
1✔
17
from localstack.aws.protocol.parser import OperationNotFoundParserError, create_parser
1✔
18
from localstack.aws.protocol.serializer import create_serializer
1✔
19
from localstack.aws.protocol.validate import MissingRequiredField, validate_request
1✔
20
from localstack.aws.spec import load_service
1✔
21
from localstack.constants import (
1✔
22
    AWS_REGION_US_EAST_1,
23
    INTERNAL_AWS_ACCESS_KEY_ID,
24
    INTERNAL_AWS_SECRET_ACCESS_KEY,
25
)
26
from localstack.http import Request, Response, Router, route
1✔
27
from localstack.http.dispatcher import Handler
1✔
28
from localstack.services.sqs.exceptions import MissingRequiredParameterException
1✔
29
from localstack.utils.aws.request_context import (
1✔
30
    extract_access_key_id_from_auth_header,
31
    extract_region_from_headers,
32
)
33
from localstack.utils.strings import long_uid
1✔
34

35
LOG = logging.getLogger(__name__)
1✔
36

37
service = load_service("sqs-query")
1✔
38
parser = create_parser(service)
1✔
39
serializer = create_serializer(service)
1✔
40

41

42
@route(
1✔
43
    '/<regex("[0-9]{12}"):account_id>/<regex("[a-zA-Z0-9_-]+(.fifo)?"):queue_name>',
44
    host='sqs.<regex("([a-z0-9-]+\\.)?"):region><regex(".*"):domain><regex("(:[0-9]{2,5})?"):port>',
45
    methods=["POST", "GET"],
46
)
47
def standard_strategy_handler(
1✔
48
    request: Request,
49
    account_id: str,
50
    queue_name: str,
51
    region: str = None,
52
    domain: str = None,
53
    port: int = None,
54
):
55
    """
56
    Handler for modern-style endpoints which always have the region encoded.
57
    See https://docs.aws.amazon.com/general/latest/gr/sqs-service.html
58
    """
59
    return handle_request(request, region.rstrip("."))
1✔
60

61

62
@route(
1✔
63
    '/queue/<regex("[a-z0-9-]+"):region>/<regex("[0-9]{12}"):account_id>/<regex("[a-zA-Z0-9_-]+(.fifo)?"):queue_name>',
64
    methods=["POST", "GET"],
65
)
66
def path_strategy_handler(request: Request, region, account_id: str, queue_name: str):
1✔
67
    return handle_request(request, region)
1✔
68

69

70
@route(
1✔
71
    '/<regex("[0-9]{12}"):account_id>/<regex("[a-zA-Z0-9_-]+(.fifo)?"):queue_name>',
72
    host='<regex("([a-z0-9-]+\\.)?"):region>queue.<regex(".*"):domain><regex("(:[0-9]{2,5})?"):port>',
73
    methods=["POST", "GET"],
74
)
75
def domain_strategy_handler(
1✔
76
    request: Request,
77
    account_id: str,
78
    queue_name: str,
79
    region: str = None,
80
    domain: str = None,
81
    port: int = None,
82
):
83
    """Uses the endpoint host to extract the region. See:
84
    https://docs.aws.amazon.com/general/latest/gr/sqs-service.html"""
85
    if not region:
1✔
UNCOV
86
        region = AWS_REGION_US_EAST_1
×
87
    else:
88
        region = region.rstrip(".")
1✔
89

90
    return handle_request(request, region)
1✔
91

92

93
@route(
1✔
94
    '/<regex("[0-9]{12}"):account_id>/<regex("[a-zA-Z0-9_-]+(.fifo)?"):queue_name>',
95
    methods=["POST", "GET"],
96
)
97
def legacy_handler(request: Request, account_id: str, queue_name: str) -> Response:
1✔
98
    # previously, Queue URLs were created as http://localhost:4566/000000000000/my-queue-name. Because the region is
99
    # ambiguous in this request, we fall back to the region that the request is coming from (this is not how AWS
100
    # behaves though).
101
    if "X-Amz-Credential" in request.args:
1✔
102
        region = request.args["X-Amz-Credential"].split("/")[2]
1✔
103
    else:
104
        region = extract_region_from_headers(request.headers)
×
105

106
    LOG.debug(
1✔
107
        "Region of queue URL %s is ambiguous, got region %s from request", request.url, region
108
    )
109

110
    return handle_request(request, region)
1✔
111

112

113
def register(router: Router[Handler]):
1✔
114
    """
115
    Registers the query API handlers into the given router. There are four routes, one for each SQS_ENDPOINT_STRATEGY.
116

117
    :param router: the router to add the handlers into.
118
    """
119
    router.add(standard_strategy_handler)
1✔
120
    router.add(path_strategy_handler)
1✔
121
    router.add(domain_strategy_handler)
1✔
122
    router.add(legacy_handler)
1✔
123

124

125
class UnknownOperationException(Exception):
1✔
126
    pass
1✔
127

128

129
class InvalidAction(CommonServiceException):
1✔
130
    def __init__(self, action: str):
1✔
131
        super().__init__(
1✔
132
            "InvalidAction",
133
            f"The action {action} is not valid for this endpoint.",
134
            400,
135
            sender_fault=True,
136
        )
137

138

139
class BotoException(CommonServiceException):
1✔
140
    def __init__(self, boto_response):
1✔
141
        error = boto_response["Error"]
1✔
142
        super().__init__(
1✔
143
            code=error.get("Code", "UnknownError"),
144
            status_code=boto_response["ResponseMetadata"]["HTTPStatusCode"],
145
            message=error.get("Message", ""),
146
            sender_fault=error.get("Type", "Sender") == "Sender",
147
        )
148

149

150
def handle_request(request: Request, region: str) -> Response:
1✔
151
    # some SDK (PHP) still send requests to the Queue URL even though the JSON spec does not allow it in the
152
    # documentation. If the request is `json`, raise `NotFound` so that we continue the handler chain and the provider
153
    # can handle the request
154
    if request.headers.get("Content-Type", "").lower() == "application/x-amz-json-1.0":
1✔
155
        raise NotFound
1✔
156

157
    request_id = long_uid()
1✔
158

159
    try:
1✔
160
        response, operation = try_call_sqs(request, region)
1✔
161
        del response["ResponseMetadata"]
1✔
162
        return serializer.serialize_to_response(response, operation, request.headers, request_id)
1✔
163
    except UnknownOperationException:
1✔
164
        return Response("<UnknownOperationException/>", 404)
1✔
165
    except CommonServiceException as e:
1✔
166
        # use a dummy operation for the serialization to work
167
        op = service.operation_model(service.operation_names[0])
1✔
168
        return serializer.serialize_error_to_response(e, op, request.headers, request_id)
1✔
169
    except Exception as e:
×
170
        LOG.exception("exception")
×
171
        op = service.operation_model(service.operation_names[0])
×
172
        return serializer.serialize_error_to_response(
×
173
            CommonServiceException(
174
                "InternalError", f"An internal error occurred: {e}", status_code=500
175
            ),
176
            op,
177
            request.headers,
178
            request_id,
179
        )
180

181

182
def try_call_sqs(request: Request, region: str) -> Tuple[Dict, OperationModel]:
1✔
183
    action = request.values.get("Action")
1✔
184
    if not action:
1✔
185
        raise UnknownOperationException()
1✔
186

187
    if action in ["ListQueues", "CreateQueue"]:
1✔
188
        raise InvalidAction(action)
1✔
189

190
    # prepare aws request for the SQS query protocol (POST request with action url-encoded in the body)
191
    params = {"QueueUrl": request.base_url}
1✔
192
    # if a QueueUrl is already set in the body, it should overwrite the one in the URL. this behavior is validated
193
    # against AWS (see TestSqsQueryApi)
194
    params.update(request.values)
1✔
195
    body = urlencode(params)
1✔
196

197
    try:
1✔
198
        headers = Headers(request.headers)
1✔
199
        headers["Content-Type"] = "application/x-www-form-urlencoded; charset=utf-8"
1✔
200
        operation, service_request = parser.parse(Request("POST", "/", headers=headers, body=body))
1✔
201
        validate_request(operation, service_request).raise_first()
1✔
202
    except OperationNotFoundParserError:
1✔
203
        raise InvalidAction(action)
1✔
204
    except MissingRequiredField as e:
1✔
205
        raise MissingRequiredParameterException(
1✔
206
            f"The request must contain the parameter {e.required_name}."
207
        )
208

209
    # Extract from auth header to allow cross-account operations
210
    # TODO: permissions encoded in URL as AUTHPARAMS cannot be accounted for in this method, which is not a big
211
    #  problem yet since we generally don't enforce permissions.
212
    account_id: Optional[str] = extract_access_key_id_from_auth_header(headers)
1✔
213

214
    client = connect_to(
1✔
215
        region_name=region,
216
        aws_access_key_id=account_id or INTERNAL_AWS_ACCESS_KEY_ID,
217
        aws_secret_access_key=INTERNAL_AWS_SECRET_ACCESS_KEY,
218
    ).sqs_query
219

220
    try:
1✔
221
        # using the layer below boto3.client("sqs").<operation>(...) to make the call
222
        boto_response = client._make_api_call(operation.name, service_request)
1✔
223
    except ClientError as e:
1✔
224
        raise BotoException(e.response) from e
1✔
225

226
    return boto_response, operation
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc