• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 16981563750

14 Aug 2025 10:49PM UTC coverage: 86.896% (+0.04%) from 86.852%
16981563750

push

github

web-flow
add support for Fn::Tranform in CFnV2 (#12966)

Co-authored-by: Simon Walker <simon.walker@localstack.cloud>

181 of 195 new or added lines in 6 files covered. (92.82%)

348 existing lines in 22 files now uncovered.

66915 of 77006 relevant lines covered (86.9%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.74
/localstack-core/localstack/utils/http.py
1
import logging
1✔
2
import math
1✔
3
import os
1✔
4
import re
1✔
5
from typing import Optional, Union
1✔
6
from urllib.parse import parse_qs, parse_qsl, urlencode, urlparse, urlunparse
1✔
7

8
import requests
1✔
9
from requests.models import CaseInsensitiveDict, Response
1✔
10

11
from localstack import config
1✔
12

13
from .strings import to_str
1✔
14

15
# chunk size for file downloads
16
DOWNLOAD_CHUNK_SIZE = 1024 * 1024
1✔
17

18
ACCEPT = "accept"
1✔
19
LOG = logging.getLogger(__name__)
1✔
20

21

22
def uses_chunked_encoding(response):
1✔
23
    return response.headers.get("Transfer-Encoding", "").lower() == "chunked"
×
24

25

26
def parse_chunked_data(data):
1✔
27
    """Parse the body of an HTTP message transmitted with chunked transfer encoding."""
28
    data = (data or "").strip()
1✔
29
    chunks = []
1✔
30
    while data:
1✔
31
        length = re.match(r"^([0-9a-zA-Z]+)\r\n.*", data)
1✔
32
        if not length:
1✔
33
            break
1✔
34
        length = length.group(1).lower()
1✔
35
        length = int(length, 16)
1✔
36
        data = data.partition("\r\n")[2]
1✔
37
        chunks.append(data[:length])
1✔
38
        data = data[length:].strip()
1✔
39
    return "".join(chunks)
1✔
40

41

42
def create_chunked_data(data, chunk_size: int = 80):
1✔
43
    dl = len(data)
1✔
44
    ret = ""
1✔
45
    for i in range(dl // chunk_size):
1✔
46
        ret += "%s\r\n" % (hex(chunk_size)[2:])
×
47
        ret += "%s\r\n\r\n" % (data[i * chunk_size : (i + 1) * chunk_size])
×
48

49
    if len(data) % chunk_size != 0:
1✔
50
        ret += "%s\r\n" % (hex(len(data) % chunk_size)[2:])
1✔
51
        ret += "%s\r\n" % (data[-(len(data) % chunk_size) :])
1✔
52

53
    ret += "0\r\n\r\n"
1✔
54
    return ret
1✔
55

56

57
def canonicalize_headers(headers: Union[dict, CaseInsensitiveDict]) -> dict:
1✔
58
    if not headers:
1✔
59
        return headers
1✔
60

61
    def _normalize(name):
1✔
62
        if name.lower().startswith(ACCEPT):
1✔
63
            return name.lower()
1✔
64
        return name
1✔
65

66
    result = {_normalize(k): v for k, v in headers.items()}
1✔
67
    return result
1✔
68

69

70
def add_path_parameters_to_url(uri: str, path_params: list):
1✔
71
    url = urlparse(uri)
1✔
72
    last_character = (
1✔
73
        "/" if (len(url.path) == 0 or url.path[-1] != "/") and len(path_params) > 0 else ""
74
    )
75
    new_path = url.path + last_character + "/".join(path_params)
1✔
76
    return urlunparse(url._replace(path=new_path))
1✔
77

78

79
def add_query_params_to_url(uri: str, query_params: dict) -> str:
1✔
80
    """
81
    Add query parameters to the uri.
82
    :param uri: the base uri it can contains path arguments and query parameters
83
    :param query_params: new query parameters to be added
84
    :return: the resulting URL
85
    """
86

87
    # parse the incoming uri
88
    url = urlparse(uri)
1✔
89

90
    # parses the query part, if exists, into a dict
91
    query_dict = dict(parse_qsl(url.query))
1✔
92

93
    # updates the dict with new query parameters
94
    query_dict.update(query_params)
1✔
95

96
    # encodes query parameters
97
    url_query = urlencode(query_dict)
1✔
98

99
    # replaces the existing query
100
    url_parse = url._replace(query=url_query)
1✔
101

102
    return urlunparse(url_parse)
1✔
103

104

105
def make_http_request(
1✔
106
    url: str, data: Union[bytes, str] = None, headers: dict[str, str] = None, method: str = "GET"
107
) -> Response:
108
    return requests.request(
×
109
        url=url, method=method, headers=headers, data=data, auth=NetrcBypassAuth(), verify=False
110
    )
111

112

113
class NetrcBypassAuth(requests.auth.AuthBase):
1✔
114
    def __call__(self, r):
1✔
115
        return r
1✔
116

117

118
class _RequestsSafe:
1✔
119
    """Wrapper around requests library, which can prevent it from verifying
120
    SSL certificates or reading credentials from ~/.netrc file"""
121

122
    verify_ssl = True
1✔
123

124
    def __getattr__(self, name):
1✔
125
        method = requests.__dict__.get(name.lower())
1✔
126
        if not method:
1✔
127
            return method
1✔
128

129
        def _wrapper(*args, **kwargs):
1✔
130
            if "auth" not in kwargs:
1✔
131
                kwargs["auth"] = NetrcBypassAuth()
1✔
132
            url = kwargs.get("url") or (args[1] if name == "request" else args[0])
1✔
133
            if not self.verify_ssl and url.startswith("https://") and "verify" not in kwargs:
1✔
134
                kwargs["verify"] = False
1✔
135
            return method(*args, **kwargs)
1✔
136

137
        return _wrapper
1✔
138

139

140
# create safe_requests instance
141
safe_requests = _RequestsSafe()
1✔
142

143

144
def parse_request_data(method: str, path: str, data=None, headers=None) -> dict:
1✔
145
    """Extract request data either from query string as well as request body (e.g., for POST)."""
146
    result = {}
1✔
147
    headers = headers or {}
1✔
148
    content_type = headers.get("Content-Type", "")
1✔
149

150
    # add query params to result
151
    parsed_path = urlparse(path)
1✔
152
    result.update(parse_qs(parsed_path.query))
1✔
153

154
    # add params from url-encoded payload
155
    if method in ["POST", "PUT", "PATCH"] and (not content_type or "form-" in content_type):
1✔
156
        # content-type could be either "application/x-www-form-urlencoded" or "multipart/form-data"
157
        try:
×
158
            params = parse_qs(to_str(data or ""))
×
159
            result.update(params)
×
160
        except Exception:
×
161
            pass  # probably binary / JSON / non-URL encoded payload - ignore
×
162

163
    # select first elements from result lists (this is assuming we are not using parameter lists!)
164
    result = {k: v[0] for k, v in result.items()}
1✔
165
    return result
1✔
166

167

168
def get_proxies() -> dict[str, str]:
1✔
169
    proxy_map = {}
1✔
170
    if config.OUTBOUND_HTTP_PROXY:
1✔
171
        proxy_map["http"] = config.OUTBOUND_HTTP_PROXY
1✔
172
    if config.OUTBOUND_HTTPS_PROXY:
1✔
173
        proxy_map["https"] = config.OUTBOUND_HTTPS_PROXY
1✔
174
    return proxy_map
1✔
175

176

177
def download(
1✔
178
    url: str,
179
    path: str,
180
    verify_ssl: bool = True,
181
    timeout: float = None,
182
    request_headers: Optional[dict] = None,
183
    quiet: bool = False,
184
) -> None:
185
    """Downloads file at url to the given path. Raises TimeoutError if the optional timeout (in secs) is reached.
186

187
    If `quiet` is passed, do not log any status messages. Error messages are still logged.
188
    """
189

190
    # make sure we're creating a new session here to enable parallel file downloads
191
    s = requests.Session()
1✔
192
    proxies = get_proxies()
1✔
193
    if proxies:
1✔
194
        s.proxies.update(proxies)
×
195

196
    # Use REQUESTS_CA_BUNDLE path. If it doesn't exist, use the method provided settings.
197
    # Note that a value that is not False, will result to True and will get the bundle file.
198
    _verify = os.getenv("REQUESTS_CA_BUNDLE", verify_ssl)
1✔
199

200
    r = None
1✔
201
    try:
1✔
202
        r = s.get(url, stream=True, verify=_verify, timeout=timeout, headers=request_headers)
1✔
203
        # check status code before attempting to read body
204
        if not r.ok:
1✔
205
            raise Exception("Failed to download %s, response code %s" % (url, r.status_code))
×
206

207
        total_size = 0
1✔
208
        if r.headers.get("Content-Length"):
1✔
209
            total_size = int(r.headers.get("Content-Length"))
1✔
210

211
        total_downloaded = 0
1✔
212
        if not os.path.exists(os.path.dirname(path)):
1✔
213
            os.makedirs(os.path.dirname(path))
1✔
214
        if not quiet:
1✔
215
            LOG.debug("Starting download from %s to %s", url, path)
1✔
216
        with open(path, "wb") as f:
1✔
217
            iter_length = 0
1✔
218
            percentage_limit = next_percentage_record = 10  # print a log line for every 10%
1✔
219
            iter_limit = (
1✔
220
                1000000  # if we can't tell the percentage, print a log line for every 1MB chunk
221
            )
222
            for chunk in r.iter_content(DOWNLOAD_CHUNK_SIZE):
1✔
223
                # explicitly check the raw stream, since the size from the chunk can be bigger than the amount of
224
                # bytes transferred over the wire due to transparent decompression (f.e. GZIP)
225
                new_total_downloaded = r.raw.tell()
1✔
226
                iter_length += new_total_downloaded - total_downloaded
1✔
227
                total_downloaded = new_total_downloaded
1✔
228
                if chunk:  # filter out keep-alive new chunks
1✔
229
                    f.write(chunk)
1✔
230
                elif not quiet:
×
231
                    LOG.debug(
×
232
                        "Empty chunk %s (total %dK of %dK) from %s",
233
                        chunk,
234
                        total_downloaded / 1024,
235
                        total_size / 1024,
236
                        url,
237
                    )
238

239
                if total_size > 0 and (
1✔
240
                    (current_percent := total_downloaded / total_size * 100)
241
                    >= next_percentage_record
242
                ):
243
                    # increment the limit for the next log output (ensure that there is max 1 log message per block)
244
                    # f.e. percentage_limit is 10, current percentage is 71: next log is earliest at 80%
245
                    next_percentage_record = (
1✔
246
                        math.floor(current_percent / percentage_limit) * percentage_limit
247
                        + percentage_limit
248
                    )
249
                    if not quiet:
1✔
250
                        LOG.debug(
1✔
251
                            "Downloaded %d%% (total %dK of %dK) to %s",
252
                            current_percent,
253
                            total_downloaded / 1024,
254
                            total_size / 1024,
255
                            path,
256
                        )
257
                    iter_length = 0
1✔
258
                elif total_size <= 0 and iter_length >= iter_limit:
1✔
259
                    if not quiet:
×
260
                        # print log message every x K if the total size is not known
261
                        LOG.debug(
×
262
                            "Downloaded %dK (total %dK) to %s",
263
                            iter_length / 1024,
264
                            total_downloaded / 1024,
265
                            path,
266
                        )
267
                    iter_length = 0
×
268
            f.flush()
1✔
269
            os.fsync(f)
1✔
270
        if os.path.getsize(path) == 0:
1✔
271
            LOG.warning("Zero bytes downloaded from %s, retrying", url)
×
272
            download(url, path, verify_ssl)
×
273
            return
×
274
        if not quiet:
1✔
275
            LOG.debug(
1✔
276
                "Done downloading %s, response code %s, total %dK",
277
                url,
278
                r.status_code,
279
                total_downloaded / 1024,
280
            )
281
    except requests.exceptions.ReadTimeout as e:
1✔
282
        raise TimeoutError(f"Timeout ({timeout}) reached on download: {url} - {e}")
1✔
283
    finally:
284
        if r is not None:
1✔
285
            r.close()
1✔
286
        s.close()
1✔
287

288

289
def download_github_artifact(url: str, target_file: str, timeout: int = None):
1✔
290
    """Download file from main URL or fallback URL (to avoid firewall errors if github.com is blocked).
291
    Optionally allows to define a timeout in seconds."""
292

293
    def do_download(
×
294
        download_url: str, request_headers: Optional[dict] = None, print_error: bool = False
295
    ):
296
        try:
×
297
            download(download_url, target_file, timeout=timeout, request_headers=request_headers)
×
298
            return True
×
299
        except Exception as e:
×
300
            if print_error:
×
301
                LOG.error(
×
302
                    "Unable to download Github artifact from %s to %s: %s %s",
303
                    url,
304
                    target_file,
305
                    e,
306
                    exc_info=LOG.isEnabledFor(logging.DEBUG),
307
                )
308

309
    # if a GitHub API token is set, use it to avoid rate limiting issues
310
    gh_token = os.environ.get("GITHUB_API_TOKEN")
×
311
    gh_auth_headers = None
×
312
    if gh_token:
×
313
        gh_auth_headers = {"authorization": f"Bearer {gh_token}"}
×
314
    result = do_download(url, request_headers=gh_auth_headers)
×
UNCOV
315
    if not result:
×
316
        # TODO: use regex below to allow different branch names than "master"
UNCOV
317
        url = url.replace("https://github.com", "https://cdn.jsdelivr.net/gh")
×
318
        # The URL structure is https://cdn.jsdelivr.net/gh/user/repo@branch/file.js
UNCOV
319
        url = url.replace("/raw/master/", "@master/")
×
320
        # Do not send the GitHub auth token to the CDN
UNCOV
321
        do_download(url, print_error=True)
×
322

323

324
# TODO move to aws_responses.py?
325
def replace_response_content(response, pattern, replacement):
1✔
326
    content = to_str(response.content or "")
×
UNCOV
327
    response._content = re.sub(pattern, replacement, content)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc