• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 22048723723

13 Feb 2026 06:59PM UTC coverage: 87.006% (+0.1%) from 86.883%
22048723723

push

github

web-flow
CW Logs: Test suite for service internalization (#13692)

22 of 22 new or added lines in 1 file covered. (100.0%)

928 existing lines in 33 files now uncovered.

69716 of 80128 relevant lines covered (87.01%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.84
/localstack-core/localstack/utils/files.py
1
import configparser
1✔
2
import inspect
1✔
3
import logging
1✔
4
import os
1✔
5
import shutil
1✔
6
import stat
1✔
7
import tempfile
1✔
8
from pathlib import Path
1✔
9

10
LOG = logging.getLogger(__name__)
1✔
11
TMP_FILES = []
1✔
12

13

14
def parse_config_file(file_or_str: str, single_section: bool = True) -> dict:
1✔
15
    """Parse the given properties config file/string and return a dict of section->key->value.
16
    If the config contains a single section, and 'single_section' is True, returns"""
17

18
    config = configparser.RawConfigParser()
1✔
19

20
    if os.path.exists(file_or_str):
1✔
21
        file_or_str = load_file(file_or_str)
1✔
22

23
    try:
1✔
24
        config.read_string(file_or_str)
1✔
25
    except configparser.MissingSectionHeaderError:
1✔
26
        file_or_str = f"[default]\n{file_or_str}"
1✔
27
        config.read_string(file_or_str)
1✔
28

29
    sections = list(config.sections())
1✔
30

31
    result = {sec: dict(config.items(sec)) for sec in sections}
1✔
32
    if len(sections) == 1 and single_section:
1✔
33
        result = result[sections[0]]
1✔
34

35
    return result
1✔
36

37

38
def get_user_cache_dir() -> Path:
1✔
39
    """
40
    Returns the path of the user's cache dir (e.g., ~/.cache on Linux, or ~/Library/Caches on Mac).
41

42
    :return: a Path pointing to the platform-specific cache dir of the user
43
    """
UNCOV
44
    from localstack.utils.platform import is_linux, is_mac_os, is_windows
×
45

UNCOV
46
    if is_windows():
×
47
        return Path(os.path.expandvars(r"%LOCALAPPDATA%\cache"))
×
UNCOV
48
    if is_mac_os():
×
49
        return Path.home() / "Library" / "Caches"
×
UNCOV
50
    if is_linux():
×
UNCOV
51
        string_path = os.environ.get("XDG_CACHE_HOME")
×
UNCOV
52
        if string_path and os.path.isabs(string_path):
×
53
            return Path(string_path)
×
54
    # Use the common place to store caches in Linux as a default
UNCOV
55
    return Path.home() / ".cache"
×
56

57

58
def cache_dir() -> Path:
1✔
59
    """
60
    Returns the cache dir for localstack (e.g., ~/.cache/localstack)
61

62
    :return: a Path pointing to the localstack cache dir
63
    """
UNCOV
64
    return get_user_cache_dir() / "localstack"
×
65

66

67
def save_file(file, content, append=False, permissions=None):
1✔
68
    mode = "a" if append else "w+"
1✔
69
    if not isinstance(content, str):
1✔
70
        mode = mode + "b"
1✔
71

72
    def _opener(path, flags):
1✔
73
        return os.open(path, flags, permissions)
1✔
74

75
    # make sure that the parent dir exists
76
    mkdir(os.path.dirname(file))
1✔
77
    # store file contents
78
    with open(file, mode, opener=_opener if permissions else None) as f:
1✔
79
        f.write(content)
1✔
80
        f.flush()
1✔
81

82

83
def load_file(
1✔
84
    file_path: str | os.PathLike,
85
    default: str | bytes | None = None,
86
    mode: str | None = None,
87
    strict: bool = False,
88
) -> str | bytes | None:
89
    """
90
    Return file contents
91

92
    :param file_path: path of the file
93
    :param default: if strict=False then return this value if the file does not exist
94
    :param mode: mode to open the file with (e.g. `r`, `rw`)
95
    :param strict: raise an error if the file path is not a file
96
    :return: the file contents
97
    """
98
    if not os.path.isfile(file_path):
1✔
99
        if strict:
1✔
100
            raise FileNotFoundError(file_path)
1✔
101
        else:
102
            return default
1✔
103
    if not mode:
1✔
104
        mode = "r"
1✔
105
    with open(file_path, mode) as f:
1✔
106
        result = f.read()
1✔
107
    return result
1✔
108

109

110
def get_or_create_file(file_path, content=None, permissions=None):
1✔
111
    if os.path.exists(file_path):
×
112
        return load_file(file_path)
×
113
    content = "{}" if content is None else content
×
114
    try:
×
115
        save_file(file_path, content, permissions=permissions)
×
116
        return content
×
117
    except Exception:
×
118
        pass
×
119

120

121
def replace_in_file(search, replace, file_path):
1✔
122
    """Replace all occurrences of `search` with `replace` in the given file (overwrites in place!)"""
123
    content = load_file(file_path) or ""
1✔
124
    content_new = content.replace(search, replace)
1✔
125
    if content != content_new:
1✔
126
        save_file(file_path, content_new)
1✔
127

128

129
def mkdir(folder: str):
1✔
130
    if not os.path.exists(folder):
1✔
131
        os.makedirs(folder, exist_ok=True)
1✔
132

133

134
def is_empty_dir(directory: str, ignore_hidden: bool = False) -> bool:
1✔
135
    """Return whether the given directory contains any entries (files/folders), including hidden
136
    entries whose name starts with a dot (.), unless ignore_hidden=True is passed."""
137
    if not os.path.isdir(directory):
1✔
138
        raise Exception(f"Path is not a directory: {directory}")
×
139
    entries = os.listdir(directory)
1✔
140
    if ignore_hidden:
1✔
141
        entries = [e for e in entries if not e.startswith(".")]
1✔
142
    return not bool(entries)
1✔
143

144

145
def ensure_readable(file_path: str, default_perms: int = None):
1✔
146
    if default_perms is None:
×
147
        default_perms = 0o644
×
148
    try:
×
149
        with open(file_path, "rb"):
×
150
            pass
×
151
    except Exception:
×
152
        LOG.info("Updating permissions as file is currently not readable: %s", file_path)
×
153
        os.chmod(file_path, default_perms)
×
154

155

156
def chown_r(path: str, user: str):
1✔
157
    """Recursive chown on the given file/directory path."""
158
    # keep these imports here for Windows compatibility
159
    import grp
1✔
160
    import pwd
1✔
161

162
    uid = pwd.getpwnam(user).pw_uid
1✔
163
    gid = grp.getgrnam(user).gr_gid
1✔
164
    os.chown(path, uid, gid)
1✔
165
    for root, dirs, files in os.walk(path):
1✔
166
        for dirname in dirs:
1✔
167
            os.chown(os.path.join(root, dirname), uid, gid)
1✔
168
        for filename in files:
1✔
169
            os.chown(os.path.join(root, filename), uid, gid)
1✔
170

171

172
def chmod_r(path: str, mode: int):
1✔
173
    """
174
    Recursive chmod
175
    :param path: path to file or directory
176
    :param mode: permission mask as octal integer value
177
    """
178
    if not os.path.exists(path):
1✔
179
        return
×
180
    idempotent_chmod(path, mode)
1✔
181
    for root, dirnames, filenames in os.walk(path):
1✔
182
        for dirname in dirnames:
1✔
183
            idempotent_chmod(os.path.join(root, dirname), mode)
1✔
184
        for filename in filenames:
1✔
185
            idempotent_chmod(os.path.join(root, filename), mode)
1✔
186

187

188
def idempotent_chmod(path: str, mode: int):
1✔
189
    """
190
    Perform idempotent chmod on the given file path (non-recursively). The function attempts to call `os.chmod`, and
191
    will catch and only re-raise exceptions (e.g., PermissionError) if the file does not have the given mode already.
192
    :param path: path to file
193
    :param mode: permission mask as octal integer value
194
    """
195
    try:
1✔
196
        os.chmod(path, mode)
1✔
197
    except Exception:
×
198
        try:
×
199
            existing_mode = os.stat(path)
×
200
        except FileNotFoundError:
×
201
            # file deleted in the meantime, or otherwise not accessible (socket)
202
            return
×
203
        if mode in (existing_mode.st_mode, stat.S_IMODE(existing_mode.st_mode)):
×
204
            # file already has the desired permissions -> return
205
            return
×
206
        raise
×
207

208

209
def rm_rf(path: str):
1✔
210
    """
211
    Recursively removes a file or directory
212
    """
213
    from localstack.utils.platform import is_debian
1✔
214
    from localstack.utils.run import run
1✔
215

216
    if not path or not os.path.exists(path):
1✔
217
        return
1✔
218
    # Running the native command can be an order of magnitude faster in Alpine on Travis-CI
219
    if is_debian():
1✔
220
        try:
1✔
221
            return run(f'rm -rf "{path}"')
1✔
222
        except Exception:
×
223
            pass
×
224
    # Make sure all files are writeable and dirs executable to remove
225
    try:
1✔
226
        chmod_r(path, 0o777)
1✔
227
    except PermissionError:
×
228
        pass  # todo log
×
229
    # check if the file is either a normal file, or, e.g., a fifo
230
    exists_but_non_dir = os.path.exists(path) and not os.path.isdir(path)
1✔
231
    if os.path.isfile(path) or exists_but_non_dir:
1✔
232
        os.remove(path)
1✔
233
    else:
234
        shutil.rmtree(path)
1✔
235

236

237
def cp_r(src: str, dst: str, rm_dest_on_conflict=False, ignore_copystat_errors=False, **kwargs):
1✔
238
    """Recursively copies file/directory"""
239
    # attention: this patch is not threadsafe
240
    copystat_orig = shutil.copystat
1✔
241
    if ignore_copystat_errors:
1✔
242

243
        def _copystat(*args, **kwargs):
×
244
            try:
×
245
                return copystat_orig(*args, **kwargs)
×
246
            except Exception:
×
247
                pass
×
248

249
        shutil.copystat = _copystat
×
250
    try:
1✔
251
        if os.path.isfile(src):
1✔
252
            if os.path.isdir(dst):
1✔
253
                dst = os.path.join(dst, os.path.basename(src))
×
254
            return shutil.copyfile(src, dst)
1✔
255
        if "dirs_exist_ok" in inspect.getfullargspec(shutil.copytree).args:
1✔
256
            kwargs["dirs_exist_ok"] = True
1✔
257
        try:
1✔
258
            return shutil.copytree(src, dst, **kwargs)
1✔
259
        except FileExistsError:
×
260
            if rm_dest_on_conflict:
×
261
                rm_rf(dst)
×
262
                return shutil.copytree(src, dst, **kwargs)
×
263
            raise
×
264
    except Exception as e:
×
265

266
        def _info(_path):
×
267
            return f"{_path} (file={os.path.isfile(_path)}, symlink={os.path.islink(_path)})"
×
268

269
        LOG.debug("Error copying files from %s to %s: %s", _info(src), _info(dst), e)
×
270
        raise
×
271
    finally:
272
        shutil.copystat = copystat_orig
1✔
273

274

275
def disk_usage(path: str) -> int:
1✔
276
    """Return the disk usage of the given file or directory."""
277

278
    if not os.path.exists(path):
1✔
279
        return 0
1✔
280

281
    if os.path.isfile(path):
1✔
282
        return os.path.getsize(path)
1✔
283

284
    total_size = 0
1✔
285
    for dirpath, dirnames, filenames in os.walk(path):
1✔
286
        for f in filenames:
1✔
287
            fp = os.path.join(dirpath, f)
1✔
288
            # skip if it is symbolic link
289
            if not os.path.islink(fp):
1✔
290
                total_size += os.path.getsize(fp)
1✔
291
    return total_size
1✔
292

293

294
def file_exists_not_empty(path: str) -> bool:
1✔
295
    """Return whether the given file or directory exists and is non-empty (i.e., >0 bytes content)"""
296
    return path and disk_usage(path) > 0
1✔
297

298

299
def cleanup_tmp_files():
1✔
300
    for tmp in TMP_FILES:
×
301
        try:
×
302
            rm_rf(tmp)
×
303
        except Exception:
×
304
            pass  # file likely doesn't exist, or permission denied
×
305
    del TMP_FILES[:]
×
306

307

308
def new_tmp_file(suffix: str | None = None, dir: str | None = None) -> str:
1✔
309
    """Return a path to a new temporary file."""
310
    tmp_file, tmp_path = tempfile.mkstemp(suffix=suffix, dir=dir)
1✔
311
    os.close(tmp_file)
1✔
312
    TMP_FILES.append(tmp_path)
1✔
313
    return tmp_path
1✔
314

315

316
def new_tmp_dir(dir: str | None = None, mode: int = 0o777) -> str:
1✔
317
    """
318
    Create a new temporary directory with the specified permissions. The directory is added to the tracked temporary
319
    files.
320
    :param dir: parent directory for the temporary directory to be created. Systems's default otherwise.
321
    :param mode: file permission for the directory (default: 0o777)
322
    :return: the absolute path of the created directory
323
    """
324
    folder = tempfile.mkdtemp(dir=dir)
1✔
325
    TMP_FILES.append(folder)
1✔
326
    idempotent_chmod(folder, mode=mode)
1✔
327
    return folder
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc