• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 18670993611

20 Oct 2025 04:39PM UTC coverage: 86.897% (+0.001%) from 86.896%
18670993611

push

github

web-flow
fix json.assign_to_path with non nested path (#13245)

5 of 5 new or added lines in 1 file covered. (100.0%)

9 existing lines in 3 files now uncovered.

68348 of 78654 relevant lines covered (86.9%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.03
/localstack-core/localstack/utils/http.py
1
import logging
1✔
2
import math
1✔
3
import os
1✔
4
import re
1✔
5
from urllib.parse import parse_qs, parse_qsl, urlencode, urlparse, urlunparse
1✔
6

7
import requests
1✔
8
from requests.models import CaseInsensitiveDict, Response
1✔
9

10
from localstack import config
1✔
11

12
from .strings import to_str
1✔
13

14
# chunk size for file downloads
15
DOWNLOAD_CHUNK_SIZE = 1024 * 1024
1✔
16

17
ACCEPT = "accept"
1✔
18
LOG = logging.getLogger(__name__)
1✔
19

20

21
def uses_chunked_encoding(response):
1✔
22
    return response.headers.get("Transfer-Encoding", "").lower() == "chunked"
×
23

24

25
def parse_chunked_data(data):
1✔
26
    """Parse the body of an HTTP message transmitted with chunked transfer encoding."""
27
    data = (data or "").strip()
1✔
28
    chunks = []
1✔
29
    while data:
1✔
30
        length = re.match(r"^([0-9a-zA-Z]+)\r\n.*", data)
1✔
31
        if not length:
1✔
32
            break
1✔
33
        length = length.group(1).lower()
1✔
34
        length = int(length, 16)
1✔
35
        data = data.partition("\r\n")[2]
1✔
36
        chunks.append(data[:length])
1✔
37
        data = data[length:].strip()
1✔
38
    return "".join(chunks)
1✔
39

40

41
def create_chunked_data(data, chunk_size: int = 80):
1✔
42
    dl = len(data)
1✔
43
    ret = ""
1✔
44
    for i in range(dl // chunk_size):
1✔
45
        ret += f"{hex(chunk_size)[2:]}\r\n"
×
46
        ret += f"{data[i * chunk_size : (i + 1) * chunk_size]}\r\n\r\n"
×
47

48
    if len(data) % chunk_size != 0:
1✔
49
        ret += f"{hex(len(data) % chunk_size)[2:]}\r\n"
1✔
50
        ret += f"{data[-(len(data) % chunk_size) :]}\r\n"
1✔
51

52
    ret += "0\r\n\r\n"
1✔
53
    return ret
1✔
54

55

56
def canonicalize_headers(headers: dict | CaseInsensitiveDict) -> dict:
1✔
57
    if not headers:
1✔
58
        return headers
1✔
59

60
    def _normalize(name):
1✔
61
        if name.lower().startswith(ACCEPT):
1✔
62
            return name.lower()
1✔
63
        return name
1✔
64

65
    result = {_normalize(k): v for k, v in headers.items()}
1✔
66
    return result
1✔
67

68

69
def add_path_parameters_to_url(uri: str, path_params: list):
1✔
70
    url = urlparse(uri)
1✔
71
    last_character = (
1✔
72
        "/" if (len(url.path) == 0 or url.path[-1] != "/") and len(path_params) > 0 else ""
73
    )
74
    new_path = url.path + last_character + "/".join(path_params)
1✔
75
    return urlunparse(url._replace(path=new_path))
1✔
76

77

78
def add_query_params_to_url(uri: str, query_params: dict) -> str:
1✔
79
    """
80
    Add query parameters to the uri.
81
    :param uri: the base uri it can contains path arguments and query parameters
82
    :param query_params: new query parameters to be added
83
    :return: the resulting URL
84
    """
85

86
    # parse the incoming uri
87
    url = urlparse(uri)
1✔
88

89
    # parses the query part, if exists, into a dict
90
    query_dict = dict(parse_qsl(url.query))
1✔
91

92
    # updates the dict with new query parameters
93
    query_dict.update(query_params)
1✔
94

95
    # encodes query parameters
96
    url_query = urlencode(query_dict)
1✔
97

98
    # replaces the existing query
99
    url_parse = url._replace(query=url_query)
1✔
100

101
    return urlunparse(url_parse)
1✔
102

103

104
def make_http_request(
1✔
105
    url: str, data: bytes | str = None, headers: dict[str, str] = None, method: str = "GET"
106
) -> Response:
107
    return requests.request(
×
108
        url=url, method=method, headers=headers, data=data, auth=NetrcBypassAuth(), verify=False
109
    )
110

111

112
class NetrcBypassAuth(requests.auth.AuthBase):
1✔
113
    def __call__(self, r):
1✔
114
        return r
1✔
115

116

117
class _RequestsSafe:
1✔
118
    """Wrapper around requests library, which can prevent it from verifying
119
    SSL certificates or reading credentials from ~/.netrc file"""
120

121
    verify_ssl = True
1✔
122

123
    def __getattr__(self, name):
1✔
124
        method = requests.__dict__.get(name.lower())
1✔
125
        if not method:
1✔
126
            return method
1✔
127

128
        def _wrapper(*args, **kwargs):
1✔
129
            if "auth" not in kwargs:
1✔
130
                kwargs["auth"] = NetrcBypassAuth()
1✔
131
            url = kwargs.get("url") or (args[1] if name == "request" else args[0])
1✔
132
            if not self.verify_ssl and url.startswith("https://") and "verify" not in kwargs:
1✔
UNCOV
133
                kwargs["verify"] = False
×
134
            return method(*args, **kwargs)
1✔
135

136
        return _wrapper
1✔
137

138

139
# create safe_requests instance
140
safe_requests = _RequestsSafe()
1✔
141

142

143
def parse_request_data(method: str, path: str, data=None, headers=None) -> dict:
1✔
144
    """Extract request data either from query string as well as request body (e.g., for POST)."""
145
    result = {}
1✔
146
    headers = headers or {}
1✔
147
    content_type = headers.get("Content-Type", "")
1✔
148

149
    # add query params to result
150
    parsed_path = urlparse(path)
1✔
151
    result.update(parse_qs(parsed_path.query))
1✔
152

153
    # add params from url-encoded payload
154
    if method in ["POST", "PUT", "PATCH"] and (not content_type or "form-" in content_type):
1✔
155
        # content-type could be either "application/x-www-form-urlencoded" or "multipart/form-data"
156
        try:
×
157
            params = parse_qs(to_str(data or ""))
×
158
            result.update(params)
×
159
        except Exception:
×
160
            pass  # probably binary / JSON / non-URL encoded payload - ignore
×
161

162
    # select first elements from result lists (this is assuming we are not using parameter lists!)
163
    result = {k: v[0] for k, v in result.items()}
1✔
164
    return result
1✔
165

166

167
def get_proxies() -> dict[str, str]:
1✔
168
    proxy_map = {}
1✔
169
    if config.OUTBOUND_HTTP_PROXY:
1✔
170
        proxy_map["http"] = config.OUTBOUND_HTTP_PROXY
1✔
171
    if config.OUTBOUND_HTTPS_PROXY:
1✔
172
        proxy_map["https"] = config.OUTBOUND_HTTPS_PROXY
1✔
173
    return proxy_map
1✔
174

175

176
def download(
1✔
177
    url: str,
178
    path: str,
179
    verify_ssl: bool = True,
180
    timeout: float = None,
181
    request_headers: dict | None = None,
182
    quiet: bool = False,
183
) -> None:
184
    """Downloads file at url to the given path. Raises TimeoutError if the optional timeout (in secs) is reached.
185

186
    If `quiet` is passed, do not log any status messages. Error messages are still logged.
187
    """
188

189
    # make sure we're creating a new session here to enable parallel file downloads
190
    s = requests.Session()
1✔
191
    proxies = get_proxies()
1✔
192
    if proxies:
1✔
193
        s.proxies.update(proxies)
×
194

195
    # Use REQUESTS_CA_BUNDLE path. If it doesn't exist, use the method provided settings.
196
    # Note that a value that is not False, will result to True and will get the bundle file.
197
    _verify = os.getenv("REQUESTS_CA_BUNDLE", verify_ssl)
1✔
198

199
    r = None
1✔
200
    try:
1✔
201
        r = s.get(url, stream=True, verify=_verify, timeout=timeout, headers=request_headers)
1✔
202
        # check status code before attempting to read body
203
        if not r.ok:
1✔
204
            raise Exception(f"Failed to download {url}, response code {r.status_code}")
×
205

206
        total_size = 0
1✔
207
        if r.headers.get("Content-Length"):
1✔
208
            total_size = int(r.headers.get("Content-Length"))
1✔
209

210
        total_downloaded = 0
1✔
211
        if not os.path.exists(os.path.dirname(path)):
1✔
212
            os.makedirs(os.path.dirname(path))
1✔
213
        if not quiet:
1✔
214
            LOG.debug("Starting download from %s to %s", url, path)
1✔
215
        with open(path, "wb") as f:
1✔
216
            iter_length = 0
1✔
217
            percentage_limit = next_percentage_record = 10  # print a log line for every 10%
1✔
218
            iter_limit = (
1✔
219
                1000000  # if we can't tell the percentage, print a log line for every 1MB chunk
220
            )
221
            for chunk in r.iter_content(DOWNLOAD_CHUNK_SIZE):
1✔
222
                # explicitly check the raw stream, since the size from the chunk can be bigger than the amount of
223
                # bytes transferred over the wire due to transparent decompression (f.e. GZIP)
224
                new_total_downloaded = r.raw.tell()
1✔
225
                iter_length += new_total_downloaded - total_downloaded
1✔
226
                total_downloaded = new_total_downloaded
1✔
227
                if chunk:  # filter out keep-alive new chunks
1✔
228
                    f.write(chunk)
1✔
229
                elif not quiet:
×
230
                    LOG.debug(
×
231
                        "Empty chunk %s (total %dK of %dK) from %s",
232
                        chunk,
233
                        total_downloaded / 1024,
234
                        total_size / 1024,
235
                        url,
236
                    )
237

238
                if total_size > 0 and (
1✔
239
                    (current_percent := total_downloaded / total_size * 100)
240
                    >= next_percentage_record
241
                ):
242
                    # increment the limit for the next log output (ensure that there is max 1 log message per block)
243
                    # f.e. percentage_limit is 10, current percentage is 71: next log is earliest at 80%
244
                    next_percentage_record = (
1✔
245
                        math.floor(current_percent / percentage_limit) * percentage_limit
246
                        + percentage_limit
247
                    )
248
                    if not quiet:
1✔
249
                        LOG.debug(
1✔
250
                            "Downloaded %d%% (total %dK of %dK) to %s",
251
                            current_percent,
252
                            total_downloaded / 1024,
253
                            total_size / 1024,
254
                            path,
255
                        )
256
                    iter_length = 0
1✔
257
                elif total_size <= 0 and iter_length >= iter_limit:
1✔
258
                    if not quiet:
×
259
                        # print log message every x K if the total size is not known
260
                        LOG.debug(
×
261
                            "Downloaded %dK (total %dK) to %s",
262
                            iter_length / 1024,
263
                            total_downloaded / 1024,
264
                            path,
265
                        )
266
                    iter_length = 0
×
267
            f.flush()
1✔
268
            os.fsync(f)
1✔
269
        if os.path.getsize(path) == 0:
1✔
270
            LOG.warning("Zero bytes downloaded from %s, retrying", url)
×
271
            download(url, path, verify_ssl)
×
272
            return
×
273
        if not quiet:
1✔
274
            LOG.debug(
1✔
275
                "Done downloading %s, response code %s, total %dK",
276
                url,
277
                r.status_code,
278
                total_downloaded / 1024,
279
            )
280
    except requests.exceptions.ReadTimeout as e:
1✔
281
        raise TimeoutError(f"Timeout ({timeout}) reached on download: {url} - {e}")
1✔
282
    finally:
283
        if r is not None:
1✔
284
            r.close()
1✔
285
        s.close()
1✔
286

287

288
def download_github_artifact(url: str, target_file: str, timeout: int = None):
1✔
289
    """Download file from main URL or fallback URL (to avoid firewall errors if github.com is blocked).
290
    Optionally allows to define a timeout in seconds."""
291

292
    def do_download(
×
293
        download_url: str, request_headers: dict | None = None, print_error: bool = False
294
    ):
295
        try:
×
296
            download(download_url, target_file, timeout=timeout, request_headers=request_headers)
×
297
            return True
×
298
        except Exception as e:
×
299
            if print_error:
×
300
                LOG.error(
×
301
                    "Unable to download Github artifact from %s to %s: %s %s",
302
                    url,
303
                    target_file,
304
                    e,
305
                    exc_info=LOG.isEnabledFor(logging.DEBUG),
306
                )
307

308
    # if a GitHub API token is set, use it to avoid rate limiting issues
309
    gh_token = os.environ.get("GITHUB_API_TOKEN")
×
310
    gh_auth_headers = None
×
311
    if gh_token:
×
312
        gh_auth_headers = {"authorization": f"Bearer {gh_token}"}
×
313
    result = do_download(url, request_headers=gh_auth_headers)
×
314
    if not result:
×
315
        # TODO: use regex below to allow different branch names than "master"
316
        url = url.replace("https://github.com", "https://cdn.jsdelivr.net/gh")
×
317
        # The URL structure is https://cdn.jsdelivr.net/gh/user/repo@branch/file.js
318
        url = url.replace("/raw/master/", "@master/")
×
319
        # Do not send the GitHub auth token to the CDN
320
        do_download(url, print_error=True)
×
321

322

323
# TODO move to aws_responses.py?
324
def replace_response_content(response, pattern, replacement):
1✔
325
    content = to_str(response.content or "")
×
326
    response._content = re.sub(pattern, replacement, content)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc