• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 16820655284

07 Aug 2025 05:03PM UTC coverage: 86.841% (-0.05%) from 86.892%
16820655284

push

github

web-flow
CFNV2: support CDK bootstrap and deployment (#12967)

32 of 38 new or added lines in 5 files covered. (84.21%)

2013 existing lines in 125 files now uncovered.

66606 of 76699 relevant lines covered (86.84%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.85
/localstack-core/localstack/services/sqs/query_api.py
1
"""The SQS Query API allows using Queue URLs as endpoints for operations on that queue. See:
2
https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-making-api-requests.html. This is a
3
generic implementation that creates from Query API requests the respective AWS requests, and uses an aws_stack client
4
to make the request."""
5

6
import logging
1✔
7
from urllib.parse import urlencode
1✔
8

9
from botocore.exceptions import ClientError
1✔
10
from botocore.model import OperationModel
1✔
11
from werkzeug.datastructures import Headers
1✔
12
from werkzeug.exceptions import NotFound
1✔
13

14
from localstack.aws.api import CommonServiceException
1✔
15
from localstack.aws.connect import connect_to
1✔
16
from localstack.aws.protocol.parser import OperationNotFoundParserError, create_parser
1✔
17
from localstack.aws.protocol.serializer import create_serializer
1✔
18
from localstack.aws.protocol.validate import MissingRequiredField, validate_request
1✔
19
from localstack.aws.spec import load_service
1✔
20
from localstack.constants import (
1✔
21
    AWS_REGION_US_EAST_1,
22
    INTERNAL_AWS_ACCESS_KEY_ID,
23
    INTERNAL_AWS_SECRET_ACCESS_KEY,
24
)
25
from localstack.http import Request, Response, Router, route
1✔
26
from localstack.http.dispatcher import Handler
1✔
27
from localstack.services.sqs.exceptions import MissingRequiredParameterException
1✔
28
from localstack.utils.aws.request_context import (
1✔
29
    extract_access_key_id_from_auth_header,
30
    extract_region_from_headers,
31
)
32
from localstack.utils.strings import long_uid
1✔
33

34
LOG = logging.getLogger(__name__)
1✔
35

36
service = load_service("sqs-query")
1✔
37
parser = create_parser(service)
1✔
38
serializer = create_serializer(service)
1✔
39

40

41
@route(
1✔
42
    '/<regex("[0-9]{12}"):account_id>/<regex("[a-zA-Z0-9_-]+(.fifo)?"):queue_name>',
43
    host='sqs.<regex("([a-z0-9-]+\\.)?"):region><regex(".*"):domain><regex("(:[0-9]{2,5})?"):port>',
44
    methods=["POST", "GET"],
45
)
46
def standard_strategy_handler(
1✔
47
    request: Request,
48
    account_id: str,
49
    queue_name: str,
50
    region: str = None,
51
    domain: str = None,
52
    port: int = None,
53
):
54
    """
55
    Handler for modern-style endpoints which always have the region encoded.
56
    See https://docs.aws.amazon.com/general/latest/gr/sqs-service.html
57
    """
58
    return handle_request(request, region.rstrip("."))
1✔
59

60

61
@route(
1✔
62
    '/queue/<regex("[a-z0-9-]+"):region>/<regex("[0-9]{12}"):account_id>/<regex("[a-zA-Z0-9_-]+(.fifo)?"):queue_name>',
63
    methods=["POST", "GET"],
64
)
65
def path_strategy_handler(request: Request, region, account_id: str, queue_name: str):
1✔
66
    return handle_request(request, region)
1✔
67

68

69
@route(
1✔
70
    '/<regex("[0-9]{12}"):account_id>/<regex("[a-zA-Z0-9_-]+(.fifo)?"):queue_name>',
71
    host='<regex("([a-z0-9-]+\\.)?"):region>queue.<regex(".*"):domain><regex("(:[0-9]{2,5})?"):port>',
72
    methods=["POST", "GET"],
73
)
74
def domain_strategy_handler(
1✔
75
    request: Request,
76
    account_id: str,
77
    queue_name: str,
78
    region: str = None,
79
    domain: str = None,
80
    port: int = None,
81
):
82
    """Uses the endpoint host to extract the region. See:
83
    https://docs.aws.amazon.com/general/latest/gr/sqs-service.html"""
84
    if not region:
1✔
85
        region = AWS_REGION_US_EAST_1
1✔
86
    else:
87
        region = region.rstrip(".")
1✔
88

89
    return handle_request(request, region)
1✔
90

91

92
@route(
1✔
93
    '/<regex("[0-9]{12}"):account_id>/<regex("[a-zA-Z0-9_-]+(.fifo)?"):queue_name>',
94
    methods=["POST", "GET"],
95
)
96
def legacy_handler(request: Request, account_id: str, queue_name: str) -> Response:
1✔
97
    # previously, Queue URLs were created as http://localhost:4566/000000000000/my-queue-name. Because the region is
98
    # ambiguous in this request, we fall back to the region that the request is coming from (this is not how AWS
99
    # behaves though).
100
    if "X-Amz-Credential" in request.args:
1✔
101
        region = request.args["X-Amz-Credential"].split("/")[2]
1✔
102
    else:
UNCOV
103
        region = extract_region_from_headers(request.headers)
×
104

105
    LOG.debug(
1✔
106
        "Region of queue URL %s is ambiguous, got region %s from request", request.url, region
107
    )
108

109
    return handle_request(request, region)
1✔
110

111

112
def register(router: Router[Handler]):
1✔
113
    """
114
    Registers the query API handlers into the given router. There are four routes, one for each SQS_ENDPOINT_STRATEGY.
115

116
    :param router: the router to add the handlers into.
117
    """
118
    router.add(standard_strategy_handler)
1✔
119
    router.add(path_strategy_handler)
1✔
120
    router.add(domain_strategy_handler)
1✔
121
    router.add(legacy_handler)
1✔
122

123

124
class UnknownOperationException(Exception):
1✔
125
    pass
1✔
126

127

128
class InvalidAction(CommonServiceException):
1✔
129
    def __init__(self, action: str):
1✔
130
        super().__init__(
1✔
131
            "InvalidAction",
132
            f"The action {action} is not valid for this endpoint.",
133
            400,
134
            sender_fault=True,
135
        )
136

137

138
class BotoException(CommonServiceException):
1✔
139
    def __init__(self, boto_response):
1✔
140
        error = boto_response["Error"]
1✔
141
        super().__init__(
1✔
142
            code=error.get("Code", "UnknownError"),
143
            status_code=boto_response["ResponseMetadata"]["HTTPStatusCode"],
144
            message=error.get("Message", ""),
145
            sender_fault=error.get("Type", "Sender") == "Sender",
146
        )
147

148

149
def handle_request(request: Request, region: str) -> Response:
1✔
150
    # some SDK (PHP) still send requests to the Queue URL even though the JSON spec does not allow it in the
151
    # documentation. If the request is `json`, raise `NotFound` so that we continue the handler chain and the provider
152
    # can handle the request
153
    if request.headers.get("Content-Type", "").lower() == "application/x-amz-json-1.0":
1✔
154
        raise NotFound
1✔
155

156
    request_id = long_uid()
1✔
157

158
    try:
1✔
159
        response, operation = try_call_sqs(request, region)
1✔
160
        del response["ResponseMetadata"]
1✔
161
        return serializer.serialize_to_response(response, operation, request.headers, request_id)
1✔
162
    except UnknownOperationException:
1✔
163
        return Response("<UnknownOperationException/>", 404)
1✔
164
    except CommonServiceException as e:
1✔
165
        # use a dummy operation for the serialization to work
166
        op = service.operation_model(service.operation_names[0])
1✔
167
        return serializer.serialize_error_to_response(e, op, request.headers, request_id)
1✔
UNCOV
168
    except Exception as e:
×
169
        LOG.exception("exception")
×
170
        op = service.operation_model(service.operation_names[0])
×
171
        return serializer.serialize_error_to_response(
×
172
            CommonServiceException(
173
                "InternalError", f"An internal error occurred: {e}", status_code=500
174
            ),
175
            op,
176
            request.headers,
177
            request_id,
178
        )
179

180

181
def try_call_sqs(request: Request, region: str) -> tuple[dict, OperationModel]:
1✔
182
    action = request.values.get("Action")
1✔
183
    if not action:
1✔
184
        raise UnknownOperationException()
1✔
185

186
    if action in ["ListQueues", "CreateQueue"]:
1✔
187
        raise InvalidAction(action)
1✔
188

189
    # prepare aws request for the SQS query protocol (POST request with action url-encoded in the body)
190
    params = {"QueueUrl": request.base_url}
1✔
191
    # if a QueueUrl is already set in the body, it should overwrite the one in the URL. this behavior is validated
192
    # against AWS (see TestSqsQueryApi)
193
    params.update(request.values)
1✔
194
    body = urlencode(params)
1✔
195

196
    try:
1✔
197
        headers = Headers(request.headers)
1✔
198
        headers["Content-Type"] = "application/x-www-form-urlencoded; charset=utf-8"
1✔
199
        operation, service_request = parser.parse(Request("POST", "/", headers=headers, body=body))
1✔
200
        validate_request(operation, service_request).raise_first()
1✔
201
    except OperationNotFoundParserError:
1✔
202
        raise InvalidAction(action)
1✔
203
    except MissingRequiredField as e:
1✔
204
        raise MissingRequiredParameterException(
1✔
205
            f"The request must contain the parameter {e.required_name}."
206
        )
207

208
    # Extract from auth header to allow cross-account operations
209
    # TODO: permissions encoded in URL as AUTHPARAMS cannot be accounted for in this method, which is not a big
210
    #  problem yet since we generally don't enforce permissions.
211
    account_id: str | None = extract_access_key_id_from_auth_header(headers)
1✔
212

213
    client = connect_to(
1✔
214
        region_name=region,
215
        aws_access_key_id=account_id or INTERNAL_AWS_ACCESS_KEY_ID,
216
        aws_secret_access_key=INTERNAL_AWS_SECRET_ACCESS_KEY,
217
    ).sqs_query
218

219
    try:
1✔
220
        # using the layer below boto3.client("sqs").<operation>(...) to make the call
221
        boto_response = client._make_api_call(operation.name, service_request)
1✔
222
    except ClientError as e:
1✔
223
        raise BotoException(e.response) from e
1✔
224

225
    return boto_response, operation
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc