• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20565403496

29 Dec 2025 05:11AM UTC coverage: 84.103% (-2.8%) from 86.921%
20565403496

Pull #13567

github

web-flow
Merge 4816837a5 into 2417384aa
Pull Request #13567: Update ASF APIs

67166 of 79862 relevant lines covered (84.1%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.11
/localstack-core/localstack/utils/archives.py
1
import glob
1✔
2
import io
1✔
3
import logging
1✔
4
import os
1✔
5
import re
1✔
6
import tarfile
1✔
7
import tempfile
1✔
8
import time
1✔
9
import zipfile
1✔
10
from subprocess import Popen
1✔
11
from typing import IO, Literal
1✔
12

13
from localstack.constants import MAVEN_REPO_URL
1✔
14
from localstack.utils.files import load_file, mkdir, new_tmp_file, rm_rf, save_file
1✔
15
from localstack.utils.http import download
1✔
16
from localstack.utils.run import run
1✔
17

18
from .checksum import verify_local_file_with_checksum_url
1✔
19
from .run import is_command_available
1✔
20
from .strings import truncate
1✔
21

22
LOG = logging.getLogger(__name__)
1✔
23

24

25
StrPath = str | os.PathLike
1✔
26

27

28
def is_zip_file(content):
1✔
29
    stream = io.BytesIO(content)
1✔
30
    return zipfile.is_zipfile(stream)
1✔
31

32

33
def get_unzipped_size(zip_file: str | IO[bytes]):
1✔
34
    """Returns the size of the unzipped file."""
35
    with zipfile.ZipFile(zip_file, "r") as zip_ref:
1✔
36
        return sum(f.file_size for f in zip_ref.infolist())
1✔
37

38

39
def unzip(path: str, target_dir: str, overwrite: bool = True) -> str | Popen | None:
1✔
40
    from localstack.utils.platform import is_debian
1✔
41

42
    use_native_cmd = is_debian() or is_command_available("unzip")
1✔
43
    if use_native_cmd:
1✔
44
        # Running the native command can be an order of magnitude faster in the container. Also, `unzip`
45
        #  is capable of extracting zip files with incorrect CRC codes (sometimes happens, e.g., with some
46
        #  Node.js/Serverless versions), which can fail with Python's `zipfile` (extracting empty files).
47
        flags = ["-o"] if overwrite else []
1✔
48
        flags += ["-q"]
1✔
49
        try:
1✔
50
            cmd = ["unzip"] + flags + [path]
1✔
51
            return run(cmd, cwd=target_dir, print_error=False)
1✔
52
        except Exception as e:
1✔
53
            error_str = truncate(str(e), max_length=200)
1✔
54
            LOG.info(
1✔
55
                'Unable to use native "unzip" command (using fallback mechanism): %s', error_str
56
            )
57

58
    try:
1✔
59
        zip_ref = zipfile.ZipFile(path, "r")
1✔
60
    except Exception as e:
×
61
        LOG.warning("Unable to open zip file: %s: %s", path, e)
×
62
        raise e
×
63

64
    def _unzip_file_entry(zip_ref, file_entry, target_dir):
1✔
65
        """Extracts a Zipfile entry and preserves permissions"""
66
        out_path = os.path.join(target_dir, file_entry.filename)
1✔
67
        if use_native_cmd and os.path.exists(out_path) and os.path.getsize(out_path) > 0:
1✔
68
            # this can happen under certain circumstances if the native "unzip" command
69
            # fails with a non-zero exit code, yet manages to extract parts of the zip file
70
            return
1✔
71
        zip_ref.extract(file_entry.filename, path=target_dir)
×
72
        perm = file_entry.external_attr >> 16
×
73
        # Make sure to preserve file permissions in the zip file
74
        # https://www.burgundywall.com/post/preserving-file-perms-with-python-zipfile-module
75
        os.chmod(out_path, perm or 0o777)
×
76

77
    try:
1✔
78
        for file_entry in zip_ref.infolist():
1✔
79
            _unzip_file_entry(zip_ref, file_entry, target_dir)
1✔
80
    finally:
81
        zip_ref.close()
1✔
82

83

84
def untar(path: str, target_dir: str):
1✔
85
    mode = "r:gz" if path.endswith("gz") else "r"
1✔
86
    with tarfile.open(path, mode) as tar:
1✔
87
        tar.extractall(path=target_dir)
1✔
88

89

90
def create_zip_file_cli(source_path: StrPath, base_dir: StrPath, zip_file: StrPath):
1✔
91
    """
92
    Creates a zip archive by using the native zip command. The native command can be an order of magnitude faster in CI
93
    """
94
    source = "." if source_path == base_dir else os.path.basename(source_path)
1✔
95
    run(["zip", "-r", zip_file, source], cwd=base_dir)
1✔
96

97

98
def create_zip_file_python(
1✔
99
    base_dir: StrPath,
100
    zip_file: StrPath,
101
    mode: Literal["r", "w", "x", "a"] = "w",
102
    content_root: str | None = None,
103
):
104
    with zipfile.ZipFile(zip_file, mode) as zip_file:
1✔
105
        for root, dirs, files in os.walk(base_dir):
1✔
106
            for name in files:
1✔
107
                full_name = os.path.join(root, name)
1✔
108
                relative = os.path.relpath(root, start=base_dir)
1✔
109
                if content_root:
1✔
110
                    dest = os.path.join(content_root, relative, name)
×
111
                else:
112
                    dest = os.path.join(relative, name)
1✔
113
                zip_file.write(full_name, dest)
1✔
114

115

116
def add_file_to_jar(class_file, class_url, target_jar, base_dir=None):
1✔
117
    base_dir = base_dir or os.path.dirname(target_jar)
×
118
    patch_class_file = os.path.join(base_dir, class_file)
×
119
    if not os.path.exists(patch_class_file):
×
120
        download(class_url, patch_class_file)
×
121
        run(["zip", target_jar, class_file], cwd=base_dir)
×
122

123

124
def update_jar_manifest(
1✔
125
    jar_file_name: str, parent_dir: str, search: str | re.Pattern, replace: str
126
):
127
    manifest_file_path = "META-INF/MANIFEST.MF"
×
128
    jar_path = os.path.join(parent_dir, jar_file_name)
×
129
    with tempfile.TemporaryDirectory() as tmp_dir:
×
130
        tmp_manifest_file = os.path.join(tmp_dir, manifest_file_path)
×
131
        run(["unzip", "-o", jar_path, manifest_file_path], cwd=tmp_dir)
×
132
        manifest = load_file(tmp_manifest_file)
×
133

134
    # return if the search pattern does not match (for idempotence, to avoid file permission issues further below)
135
    if isinstance(search, re.Pattern):
×
136
        if not search.search(manifest):
×
137
            return
×
138
        manifest = search.sub(replace, manifest, 1)
×
139
    else:
140
        if search not in manifest:
×
141
            return
×
142
        manifest = manifest.replace(search, replace, 1)
×
143

144
    manifest_file = os.path.join(parent_dir, manifest_file_path)
×
145
    save_file(manifest_file, manifest)
×
146
    run(["zip", jar_file_name, manifest_file_path], cwd=parent_dir)
×
147

148

149
def upgrade_jar_file(base_dir: str, file_glob: str, maven_asset: str):
1✔
150
    """
151
    Upgrade the matching Java JAR file in a local directory with the given Maven asset
152
    :param base_dir: base directory to search the JAR file to replace in
153
    :param file_glob: glob pattern for the JAR file to replace
154
    :param maven_asset: name of Maven asset to download, in the form "<qualified_name>:<version>"
155
    """
156

157
    local_path = os.path.join(base_dir, file_glob)
×
158
    parent_dir = os.path.dirname(local_path)
×
159
    maven_asset = maven_asset.replace(":", "/")
×
160
    parts = maven_asset.split("/")
×
161
    maven_asset_url = f"{MAVEN_REPO_URL}/{maven_asset}/{parts[-2]}-{parts[-1]}.jar"
×
162
    target_file = os.path.join(parent_dir, os.path.basename(maven_asset_url))
×
163
    if os.path.exists(target_file):
×
164
        # avoid re-downloading the newer JAR version if it already exists locally
165
        return
×
166
    matches = glob.glob(local_path)
×
167
    if not matches:
×
168
        return
×
169
    for match in matches:
×
170
        os.remove(match)
×
171
    download(maven_asset_url, target_file)
×
172

173

174
def download_and_extract(
1✔
175
    archive_url: str,
176
    target_dir: str,
177
    retries: int | None = 0,
178
    sleep: int | None = 3,
179
    tmp_archive: str | None = None,
180
    checksum_url: str | None = None,
181
) -> None:
182
    """
183
    Download and extract an archive to a target directory with optional checksum verification.
184

185
    Checksum verification is only performed if a `checksum_url` is provided.
186
    Else, the archive is downloaded and extracted without verification.
187

188
    :param archive_url: URL of the archive to download
189
    :param target_dir: Directory to extract the archive contents to
190
    :param retries: Number of download retries (default: 0)
191
    :param sleep: Sleep time between retries in seconds (default: 3)
192
    :param tmp_archive: Optional path for the temporary archive file
193
    :param checksum_url: Optional URL of the checksum file for verification
194
    :return: None
195
    """
196
    mkdir(target_dir)
1✔
197

198
    _, ext = os.path.splitext(tmp_archive or archive_url)
1✔
199
    tmp_archive = tmp_archive or new_tmp_file()
1✔
200
    if not os.path.exists(tmp_archive) or os.path.getsize(tmp_archive) <= 0:
1✔
201
        # create temporary placeholder file, to avoid duplicate parallel downloads
202
        save_file(tmp_archive, "")
1✔
203

204
        for i in range(retries + 1):
1✔
205
            try:
1✔
206
                download(archive_url, tmp_archive)
1✔
207
                break
1✔
208
            except Exception as e:
×
209
                LOG.warning(
×
210
                    "Attempt %d. Failed to download archive from %s: %s",
211
                    i + 1,
212
                    archive_url,
213
                    e,
214
                )
215
                # only sleep between retries, not after the last one
216
                if i < retries:
×
217
                    time.sleep(sleep)
×
218

219
    # if the temporary file we created above hasn't been replaced, we assume failure
220
    if os.path.getsize(tmp_archive) <= 0:
1✔
221
        raise Exception("Failed to download archive from %s: . Retries exhausted", archive_url)
×
222

223
    # Verify checksum if provided
224
    if checksum_url:
1✔
225
        LOG.info("Verifying archive integrity...")
1✔
226
        try:
1✔
227
            verify_local_file_with_checksum_url(
1✔
228
                file_path=tmp_archive,
229
                checksum_url=checksum_url,
230
            )
231
        except Exception as e:
×
232
            # clean up the corrupted download
233
            rm_rf(tmp_archive)
×
234
            raise e
×
235

236
    if ext in (".zip", ".whl"):
1✔
237
        unzip(tmp_archive, target_dir)
×
238
    elif ext in (
1✔
239
        ".bz2",
240
        ".gz",
241
        ".tgz",
242
        ".xz",
243
    ):
244
        untar(tmp_archive, target_dir)
1✔
245
    else:
246
        raise Exception(f"Unsupported archive format: {ext}")
×
247

248

249
def download_and_extract_with_retry(
1✔
250
    archive_url,
251
    tmp_archive,
252
    target_dir,
253
    checksum_url: str | None = None,
254
):
255
    try:
1✔
256
        download_and_extract(
1✔
257
            archive_url,
258
            target_dir,
259
            tmp_archive=tmp_archive,
260
            checksum_url=checksum_url,
261
        )
262
    except Exception as e:
×
263
        # try deleting and re-downloading the zip file
264
        LOG.info("Unable to extract file, re-downloading ZIP archive %s: %s", tmp_archive, e)
×
265
        rm_rf(tmp_archive)
×
266
        download_and_extract(
×
267
            archive_url,
268
            target_dir,
269
            tmp_archive=tmp_archive,
270
            checksum_url=checksum_url,
271
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc