• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / ea4d3a2c-ec60-4c10-9aae-c0c88ca77069

13 Apr 2025 11:57PM UTC coverage: 86.431% (-0.1%) from 86.564%
ea4d3a2c-ec60-4c10-9aae-c0c88ca77069

push

circleci

web-flow
add AVP to list of CFN composite quirks (#12517)

63584 of 73566 relevant lines covered (86.43%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

45.04
/localstack-core/localstack/utils/aws/aws_responses.py
1
import binascii
1✔
2
import datetime
1✔
3
import json
1✔
4
import re
1✔
5
from binascii import crc32
1✔
6
from typing import Any, Dict, Optional, Union
1✔
7
from urllib.parse import parse_qs
1✔
8

9
import xmltodict
1✔
10
from requests.models import CaseInsensitiveDict
1✔
11
from requests.models import Response as RequestsResponse
1✔
12

13
from localstack.constants import APPLICATION_JSON, HEADER_CONTENT_TYPE
1✔
14
from localstack.utils.json import json_safe
1✔
15
from localstack.utils.strings import short_uid, str_startswith_ignore_case, to_bytes, to_str
1✔
16

17
REGEX_FLAGS = re.MULTILINE | re.DOTALL
1✔
18

19
regex_url_start = re.compile("^[a-z]{2,5}://")
1✔
20

21

22
class ErrorResponse(Exception):
1✔
23
    def __init__(self, response):
1✔
24
        self.response = response
×
25

26

27
def requests_error_response_json(message, code=500, error_type="InternalFailure"):
1✔
28
    result = {
×
29
        "Type": "User" if code < 500 else "Server",
30
        "message": message,
31
        "__type": error_type,
32
    }
33
    headers = {"x-amzn-errortype": error_type}
×
34
    return requests_response(json.dumps(result), status_code=code, headers=headers)
×
35

36

37
def requests_error_response_xml(
1✔
38
    message: str,
39
    code: Optional[int] = 400,
40
    code_string: Optional[str] = "InvalidParameter",
41
    service: Optional[str] = None,
42
    xmlns: Optional[str] = None,
43
):
44
    response = RequestsResponse()
×
45
    xmlns = xmlns or "http://%s.amazonaws.com/doc/2010-03-31/" % service
×
46
    response._content = """<ErrorResponse xmlns="{xmlns}"><Error>
×
47
        <Type>Sender</Type>
48
        <Code>{code_string}</Code>
49
        <Message>{message}</Message>
50
        </Error><RequestId>{req_id}</RequestId>
51
        </ErrorResponse>""".format(
52
        xmlns=xmlns, message=message, code_string=code_string, req_id=short_uid()
53
    )
54
    response.status_code = code
×
55
    return response
×
56

57

58
def requests_error_response_xml_signature_calculation(
1✔
59
    message,
60
    string_to_sign=None,
61
    signature=None,
62
    expires=None,
63
    code=400,
64
    code_string="AccessDenied",
65
    aws_access_token="temp",
66
):
67
    response = RequestsResponse()
×
68
    response_template = """<?xml version="1.0" encoding="UTF-8"?>
×
69
        <Error>
70
            <Code>{code_string}</Code>
71
            <Message>{message}</Message>
72
            <RequestId>{req_id}</RequestId>
73
            <HostId>{host_id}</HostId>
74
        </Error>""".format(
75
        message=message,
76
        code_string=code_string,
77
        req_id=short_uid(),
78
        host_id=short_uid(),
79
    )
80

81
    parsed_response = xmltodict.parse(response_template)
×
82
    response.status_code = code
×
83

84
    if signature and string_to_sign or code_string == "SignatureDoesNotMatch":
×
85
        bytes_signature = binascii.hexlify(bytes(signature, encoding="utf-8"))
×
86
        parsed_response["Error"]["Code"] = code_string
×
87
        parsed_response["Error"]["AWSAccessKeyId"] = aws_access_token
×
88
        parsed_response["Error"]["StringToSign"] = string_to_sign
×
89
        parsed_response["Error"]["SignatureProvided"] = signature
×
90
        parsed_response["Error"]["StringToSignBytes"] = "{}".format(bytes_signature.decode("utf-8"))
×
91
        set_response_content(response, xmltodict.unparse(parsed_response))
×
92

93
    if expires and code_string == "AccessDenied":
×
94
        server_time = datetime.datetime.utcnow().isoformat()[:-4]
×
95
        expires_isoformat = datetime.datetime.fromtimestamp(int(expires)).isoformat()[:-4]
×
96
        parsed_response["Error"]["Code"] = code_string
×
97
        parsed_response["Error"]["Expires"] = "{}Z".format(expires_isoformat)
×
98
        parsed_response["Error"]["ServerTime"] = "{}Z".format(server_time)
×
99
        set_response_content(response, xmltodict.unparse(parsed_response))
×
100

101
    if not signature and not expires and code_string == "AccessDenied":
×
102
        set_response_content(response, xmltodict.unparse(parsed_response))
×
103

104
    if response._content:
×
105
        return response
×
106

107

108
def requests_error_response(
1✔
109
    req_headers: Dict,
110
    message: Union[str, bytes],
111
    code: int = 500,
112
    error_type: str = "InternalFailure",
113
    service: str = None,
114
    xmlns: str = None,
115
):
116
    is_json = is_json_request(req_headers)
×
117
    if is_json:
×
118
        return requests_error_response_json(message=message, code=code, error_type=error_type)
×
119
    return requests_error_response_xml(
×
120
        message, code=code, code_string=error_type, service=service, xmlns=xmlns
121
    )
122

123

124
def is_json_request(req_headers: Dict) -> bool:
1✔
125
    ctype = req_headers.get("Content-Type", "")
×
126
    accept = req_headers.get("Accept", "")
×
127
    return "json" in ctype or "json" in accept
×
128

129

130
def is_invalid_html_response(headers, content) -> bool:
1✔
131
    content_type = headers.get("Content-Type", "")
×
132
    return "text/html" in content_type and not str_startswith_ignore_case(content, "<!doctype html")
×
133

134

135
def is_response_obj(result, include_lambda_response=False):
1✔
136
    types = (RequestsResponse,)
×
137
    if include_lambda_response:
×
138
        types += (LambdaResponse,)
×
139
    return isinstance(result, types)
×
140

141

142
def get_response_payload(response, as_json=False):
1✔
143
    result = response.content if isinstance(response, RequestsResponse) else None
×
144
    result = "" if result is None else result
×
145
    if as_json:
×
146
        result = result or "{}"
×
147
        result = json.loads(to_str(result))
×
148
    return result
×
149

150

151
def requests_response(content, status_code=200, headers=None):
1✔
152
    if headers is None:
1✔
153
        headers = {}
1✔
154
    resp = RequestsResponse()
1✔
155
    headers = CaseInsensitiveDict(dict(headers or {}))
1✔
156
    if isinstance(content, dict):
1✔
157
        content = json.dumps(content)
1✔
158
        if not headers.get(HEADER_CONTENT_TYPE):
1✔
159
            headers[HEADER_CONTENT_TYPE] = APPLICATION_JSON
1✔
160
    resp._content = content
1✔
161
    resp.status_code = int(status_code)
1✔
162
    # Note: update headers (instead of assigning directly), to ensure we're using a case-insensitive dict
163
    resp.headers.update(headers)
1✔
164
    return resp
1✔
165

166

167
def request_response_stream(stream, status_code=200, headers=None):
1✔
168
    if headers is None:
×
169
        headers = {}
×
170
    resp = RequestsResponse()
×
171
    resp.raw = stream
×
172
    resp.status_code = int(status_code)
×
173
    # Note: update headers (instead of assigning directly), to ensure we're using a case-insensitive dict
174
    resp.headers.update(headers or {})
×
175
    return resp
×
176

177

178
def set_response_content(response, content, headers=None):
1✔
179
    if isinstance(content, dict):
×
180
        content = json.dumps(json_safe(content))
×
181
    elif isinstance(content, RequestsResponse):
×
182
        response.status_code = content.status_code
×
183
        content = content.content
×
184
    response._content = content or ""
×
185
    response.headers.update(headers or {})
×
186
    response.headers["Content-Length"] = str(len(response._content))
×
187

188

189
def create_sqs_system_attributes(headers: Dict[str, str]) -> Dict[str, Any]:
1✔
190
    system_attributes = {}
1✔
191
    if "X-Amzn-Trace-Id" in headers:
1✔
192
        system_attributes["AWSTraceHeader"] = {
1✔
193
            "DataType": "String",
194
            "StringValue": str(headers["X-Amzn-Trace-Id"]),
195
        }
196
    return system_attributes
1✔
197

198

199
def parse_query_string(url_or_qs: str, multi_values=False) -> Dict[str, str]:
1✔
200
    url_or_qs = str(url_or_qs or "").strip()
1✔
201
    # we match if the `url_or_qs` passed is maybe a URL
202
    if regex_url_start.match(url_or_qs) and "?" not in url_or_qs:
1✔
203
        url_or_qs = f"{url_or_qs}?"
1✔
204
    url_or_qs = url_or_qs.split("?", maxsplit=1)[-1]
1✔
205
    result = parse_qs(url_or_qs, keep_blank_values=True)
1✔
206
    if not multi_values:
1✔
207
        result = {k: v[0] for k, v in result.items()}
1✔
208
    return result
1✔
209

210

211
def calculate_crc32(content: Union[str, bytes]) -> int:
1✔
212
    return crc32(to_bytes(content)) & 0xFFFFFFFF
×
213

214

215
class LambdaResponse:
1✔
216
    """Helper class to support multi_value_headers in Lambda responses"""
217

218
    def __init__(self):
1✔
219
        self._content = False
×
220
        self.status_code = None
×
221
        self.multi_value_headers = CaseInsensitiveDict()
×
222
        self.headers = CaseInsensitiveDict()
×
223

224
    @property
1✔
225
    def content(self):
1✔
226
        return self._content
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc