• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 21157667695

19 Jan 2026 06:51PM UTC coverage: 86.955% (-0.006%) from 86.961%
21157667695

push

github

web-flow
Update CODEOWNERS (#13630)

Co-authored-by: LocalStack Bot <localstack-bot@users.noreply.github.com>

70332 of 80883 relevant lines covered (86.96%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.88
/localstack-core/localstack/utils/run.py
1
import io
1✔
2
import logging
1✔
3
import os
1✔
4
import re
1✔
5
import select
1✔
6
import subprocess
1✔
7
import sys
1✔
8
import threading
1✔
9
import time
1✔
10
from collections.abc import Callable
1✔
11
from functools import lru_cache
1✔
12
from queue import Queue
1✔
13
from typing import Any, AnyStr
1✔
14

15
from localstack import config
1✔
16

17
# TODO: remove imports from here (need to update any client code that imports these from utils.common)
18
from localstack.utils.platform import is_linux, is_mac_os, is_windows  # noqa
1✔
19

20
from .sync import retry
1✔
21
from .threads import FuncThread, start_worker_thread
1✔
22

23
LOG = logging.getLogger(__name__)
1✔
24

25

26
def run(
1✔
27
    cmd: str | list[str],
28
    print_error=True,
29
    asynchronous=False,
30
    stdin=False,
31
    stderr=subprocess.STDOUT,
32
    outfile=None,
33
    env_vars: dict[AnyStr, AnyStr] | None = None,
34
    inherit_cwd=False,
35
    inherit_env=True,
36
    tty=False,
37
    shell=True,
38
    cwd: str = None,
39
) -> str | subprocess.Popen:
40
    LOG.debug("Executing command: %s", cmd)
1✔
41
    env_dict = os.environ.copy() if inherit_env else {}
1✔
42
    if env_vars:
1✔
43
        env_dict.update(env_vars)
1✔
44
    env_dict = {k: to_str(str(v)) for k, v in env_dict.items()}
1✔
45

46
    if isinstance(cmd, list):
1✔
47
        # See docs of subprocess.Popen(...):
48
        #  "On POSIX with shell=True, the shell defaults to /bin/sh. If args is a string,
49
        #   the string specifies the command to execute through the shell. [...] If args is
50
        #   a sequence, the first item specifies the command string, and any additional
51
        #   items will be treated as additional arguments to the shell itself."
52
        # Hence, we should *disable* shell mode here to be on the safe side, to prevent
53
        #  arguments in the cmd list from leaking into arguments to the shell itself. This will
54
        #  effectively allow us to call run(..) with both - str and list - as cmd argument, although
55
        #  over time we should move from "cmd: Union[str, List[str]]" to "cmd: List[str]" only.
56
        shell = False
1✔
57

58
    if tty:
1✔
59
        asynchronous = True
×
60
        stdin = True
×
61

62
    try:
1✔
63
        if inherit_cwd and not cwd:
1✔
64
            cwd = os.getcwd()
×
65
        if not asynchronous:
1✔
66
            if stdin:
1✔
67
                return subprocess.check_output(
×
68
                    cmd, shell=shell, stderr=stderr, env=env_dict, stdin=subprocess.PIPE, cwd=cwd
69
                )
70
            output = subprocess.check_output(cmd, shell=shell, stderr=stderr, env=env_dict, cwd=cwd)
1✔
71
            return output.decode(config.DEFAULT_ENCODING)
1✔
72

73
        stdin_arg = subprocess.PIPE if stdin else None
1✔
74
        stdout_arg = open(outfile, "ab") if isinstance(outfile, str) else outfile
1✔
75
        stderr_arg = stderr
1✔
76
        if tty:
1✔
77
            # Note: leave the "pty" import here (not supported in Windows)
78
            import pty
×
79

80
            master_fd, slave_fd = pty.openpty()
×
81
            stdin_arg = slave_fd
×
82
            stdout_arg = stderr_arg = None
×
83

84
        # start the actual sub process
85
        kwargs = {}
1✔
86
        if is_linux() or is_mac_os():
1✔
87
            kwargs["start_new_session"] = True
1✔
88
        process = subprocess.Popen(
1✔
89
            cmd,
90
            shell=shell,
91
            stdin=stdin_arg,
92
            bufsize=-1,
93
            stderr=stderr_arg,
94
            stdout=stdout_arg,
95
            env=env_dict,
96
            cwd=cwd,
97
            **kwargs,
98
        )
99

100
        if tty:
1✔
101
            # based on: https://stackoverflow.com/questions/41542960
102
            def pipe_streams(*args):
×
103
                while process.poll() is None:
×
104
                    r, w, e = select.select([sys.stdin, master_fd], [], [])
×
105
                    if sys.stdin in r:
×
106
                        d = os.read(sys.stdin.fileno(), 10240)
×
107
                        os.write(master_fd, d)
×
108
                    elif master_fd in r:
×
109
                        o = os.read(master_fd, 10240)
×
110
                        if o:
×
111
                            os.write(sys.stdout.fileno(), o)
×
112

113
            FuncThread(pipe_streams, name="pipe-streams").start()
×
114

115
        return process
1✔
116
    except subprocess.CalledProcessError as e:
1✔
117
        if print_error:
1✔
118
            print(f"ERROR: '{cmd}': exit code {e.returncode}; output: {e.output}")
1✔
119
            sys.stdout.flush()
1✔
120
        raise e
1✔
121

122

123
def run_for_max_seconds(max_secs, _function, *args, **kwargs):
1✔
124
    """Run the given function for a maximum of `max_secs` seconds - continue running
125
    in a background thread if the function does not finish in time."""
126

127
    def _worker(*_args):
1✔
128
        try:
1✔
129
            fn_result = _function(*args, **kwargs)
1✔
130
        except Exception as e:
×
131
            fn_result = e
×
132

133
        fn_result = True if fn_result is None else fn_result
1✔
134
        q.put(fn_result)
1✔
135
        return fn_result
1✔
136

137
    start = time.time()
1✔
138
    q = Queue()
1✔
139
    start_worker_thread(_worker)
1✔
140
    for i in range(max_secs * 2):
1✔
141
        result = None
1✔
142
        try:
1✔
143
            result = q.get_nowait()
1✔
144
        except Exception:
1✔
145
            pass
1✔
146
        if result is not None:
1✔
147
            if isinstance(result, Exception):
×
148
                raise result
×
149
            return result
×
150
        if time.time() - start >= max_secs:
1✔
151
            return
×
152
        time.sleep(0.5)
1✔
153

154

155
def run_interactive(command: list[str]):
1✔
156
    """
157
    Run an interactive command in a subprocess. This blocks the current thread and attaches sys.stdin to
158
    the process. Copied from https://stackoverflow.com/a/43012138/804840
159

160
    :param command: the command to pass to subprocess.Popen
161
    """
162
    subprocess.check_call(command)
×
163

164

165
def is_command_available(cmd: str) -> bool:
1✔
166
    try:
1✔
167
        run(["which", cmd], print_error=False)
1✔
168
        return True
1✔
169
    except Exception:
1✔
170
        return False
1✔
171

172

173
def kill_process_tree(parent_pid):
1✔
174
    # Note: Do NOT import "psutil" at the root scope
175
    import psutil
1✔
176

177
    parent_pid = getattr(parent_pid, "pid", None) or parent_pid
1✔
178
    parent = psutil.Process(parent_pid)
1✔
179
    for child in parent.children(recursive=True):
1✔
180
        try:
1✔
181
            child.kill()
1✔
182
        except Exception:
×
183
            pass
×
184
    parent.kill()
1✔
185

186

187
def wait_for_process_to_be_killed(pid: int, sleep: float = None, retries: int = None):
1✔
188
    import psutil
×
189

190
    def _check_pid():
×
191
        assert not psutil.pid_exists(pid)
×
192

193
    retry(_check_pid, sleep=sleep, retries=retries)
×
194

195

196
def is_root() -> bool:
1✔
197
    return get_os_user() == "root"
1✔
198

199

200
@lru_cache
1✔
201
def get_os_user() -> str:
1✔
202
    # using getpass.getuser() seems to be reporting a different/invalid user in Docker/macOS
203
    return run("whoami").strip()
1✔
204

205

206
def to_str(obj: str | bytes, errors="strict"):
1✔
207
    return obj.decode(config.DEFAULT_ENCODING, errors) if isinstance(obj, bytes) else obj
1✔
208

209

210
class ShellCommandThread(FuncThread):
1✔
211
    """Helper class to run a shell command in a background thread."""
212

213
    def __init__(
1✔
214
        self,
215
        cmd: str | list[str],
216
        params: Any = None,
217
        outfile: str | int = None,
218
        env_vars: dict[str, str] = None,
219
        stdin: bool = False,
220
        auto_restart: bool = False,
221
        quiet: bool = True,
222
        inherit_cwd: bool = False,
223
        inherit_env: bool = True,
224
        log_listener: Callable = None,
225
        stop_listener: Callable = None,
226
        strip_color: bool = False,
227
        name: str | None = None,
228
        cwd: str | None = None,
229
    ):
230
        params = params if params is not None else {}
1✔
231
        env_vars = env_vars if env_vars is not None else {}
1✔
232
        self.stopped = False
1✔
233
        self.cmd = cmd
1✔
234
        self.process = None
1✔
235
        self.outfile = outfile
1✔
236
        self.stdin = stdin
1✔
237
        self.env_vars = env_vars
1✔
238
        self.inherit_cwd = inherit_cwd
1✔
239
        self.inherit_env = inherit_env
1✔
240
        self.auto_restart = auto_restart
1✔
241
        self.log_listener = log_listener
1✔
242
        self.stop_listener = stop_listener
1✔
243
        self.strip_color = strip_color
1✔
244
        self.started = threading.Event()
1✔
245
        self.cwd = cwd
1✔
246
        FuncThread.__init__(
1✔
247
            self, self.run_cmd, params, quiet=quiet, name=(name or "shell-cmd-thread")
248
        )
249

250
    def run_cmd(self, params):
1✔
251
        while True:
1✔
252
            self.do_run_cmd()
1✔
253
            from localstack.runtime import events
1✔
254

255
            if (
1✔
256
                events.infra_stopping.is_set()  # FIXME: this is the wrong level of abstraction
257
                or not self.auto_restart
258
                or not self.process
259
                or self.process.returncode == 0
260
            ):
261
                return self.process.returncode if self.process else None
1✔
262
            LOG.info(
×
263
                "Restarting process (received exit code %s): %s", self.process.returncode, self.cmd
264
            )
265

266
    def do_run_cmd(self):
1✔
267
        def convert_line(line):
1✔
268
            line = to_str(line or "")
1✔
269
            if self.strip_color:
1✔
270
                # strip color codes
271
                line = re.sub(r"\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))", "", line)
1✔
272
            return f"{line.strip()}\r\n"
1✔
273

274
        def filter_line(line):
1✔
275
            """Return True if this line should be filtered, i.e., not printed"""
276
            return "(Press CTRL+C to quit)" in line
1✔
277

278
        outfile = self.outfile or os.devnull
1✔
279
        if self.log_listener and outfile == os.devnull:
1✔
280
            outfile = subprocess.PIPE
1✔
281
        try:
1✔
282
            self.process = run(
1✔
283
                self.cmd,
284
                asynchronous=True,
285
                stdin=self.stdin,
286
                outfile=outfile,
287
                env_vars=self.env_vars,
288
                inherit_cwd=self.inherit_cwd,
289
                inherit_env=self.inherit_env,
290
                cwd=self.cwd,
291
            )
292
            self.started.set()
1✔
293
            if outfile:
1✔
294
                if outfile == subprocess.PIPE:
1✔
295
                    # get stdout/stderr from child process and write to parent output
296
                    streams = (
1✔
297
                        (self.process.stdout, sys.stdout),
298
                        (self.process.stderr, sys.stderr),
299
                    )
300
                    for instream, outstream in streams:
1✔
301
                        if not instream:
1✔
302
                            continue
1✔
303
                        for line in iter(instream.readline, None):
1✔
304
                            # `line` should contain a newline at the end as we're iterating,
305
                            # hence we can safely break the loop if `line` is None or empty string
306
                            if line in [None, "", b""]:
1✔
307
                                break
1✔
308
                            if not (line and line.strip()) and self.is_killed():
1✔
309
                                break
×
310
                            line = convert_line(line)
1✔
311
                            if filter_line(line):
1✔
312
                                continue
×
313
                            if self.log_listener:
1✔
314
                                self.log_listener(line, stream=instream)
1✔
315
                            if self.outfile not in [None, os.devnull]:
1✔
316
                                outstream.write(line)
×
317
                                outstream.flush()
×
318
                if self.process:
1✔
319
                    self.process.wait()
×
320
            else:
321
                self.process.communicate()
×
322
        except Exception as e:
×
323
            self.result_future.set_exception(e)
×
324
            if self.process and not self.quiet:
×
325
                LOG.warning('Shell command error "%s": %s', e, self.cmd)
×
326
        if self.process and not self.quiet and self.process.returncode != 0:
1✔
327
            LOG.warning('Shell command exit code "%s": %s', self.process.returncode, self.cmd)
×
328

329
    def is_killed(self):
1✔
330
        from localstack.runtime import events
1✔
331

332
        if not self.process:
1✔
333
            return True
×
334
        if events.infra_stopping.is_set():  # FIXME
1✔
335
            return True
×
336
        # Note: Do NOT import "psutil" at the root scope, as this leads
337
        # to problems when importing this file from our test Lambdas in Docker
338
        # (Error: libc.musl-x86_64.so.1: cannot open shared object file)
339
        import psutil
1✔
340

341
        return not psutil.pid_exists(self.process.pid)
1✔
342

343
    def stop(self, quiet=False):
1✔
344
        if self.stopped:
1✔
345
            return
1✔
346
        if not self.process:
1✔
347
            LOG.warning("No process found for command '%s'", self.cmd)
×
348
            return
×
349

350
        parent_pid = self.process.pid
1✔
351
        try:
1✔
352
            kill_process_tree(parent_pid)
1✔
353
            self.process = None
1✔
354
        except Exception as e:
×
355
            if not quiet:
×
356
                LOG.warning("Unable to kill process with pid %s: %s", parent_pid, e)
×
357
        try:
1✔
358
            self.stop_listener and self.stop_listener(self)
1✔
359
        except Exception as e:
×
360
            if not quiet:
×
361
                LOG.warning("Unable to run stop handler for shell command thread %s: %s", self, e)
×
362
        self.stopped = True
1✔
363

364

365
class CaptureOutput:
1✔
366
    """A context manager that captures stdout/stderr of the current thread. Use it as follows:
367

368
    with CaptureOutput() as c:
369
        ...
370
    print(c.stdout(), c.stderr())
371
    """
372

373
    orig_stdout = sys.stdout
1✔
374
    orig_stderr = sys.stderr
1✔
375
    orig___stdout = sys.__stdout__
1✔
376
    orig___stderr = sys.__stderr__
1✔
377
    CONTEXTS_BY_THREAD = {}
1✔
378

379
    class LogStreamIO(io.StringIO):
1✔
380
        def write(self, s):
1✔
381
            if isinstance(s, str) and hasattr(s, "decode"):
×
382
                s = s.decode("unicode-escape")
×
383
            return super(CaptureOutput.LogStreamIO, self).write(s)
×
384

385
    def __init__(self):
1✔
386
        self._stdout = self.LogStreamIO()
×
387
        self._stderr = self.LogStreamIO()
×
388

389
    def __enter__(self):
1✔
390
        # Note: import werkzeug here (not at top of file) to allow dependency pruning
391
        from werkzeug.local import LocalProxy
×
392

393
        ident = self._ident()
×
394
        if ident not in self.CONTEXTS_BY_THREAD:
×
395
            self.CONTEXTS_BY_THREAD[ident] = self
×
396
            self._set(
×
397
                LocalProxy(self._proxy(sys.stdout, "stdout")),
398
                LocalProxy(self._proxy(sys.stderr, "stderr")),
399
                LocalProxy(self._proxy(sys.__stdout__, "stdout")),
400
                LocalProxy(self._proxy(sys.__stderr__, "stderr")),
401
            )
402
        return self
×
403

404
    def __exit__(self, type, value, traceback):
1✔
405
        ident = self._ident()
×
406
        removed = self.CONTEXTS_BY_THREAD.pop(ident, None)
×
407
        if not self.CONTEXTS_BY_THREAD:
×
408
            # reset pointers
409
            self._set(
×
410
                self.orig_stdout,
411
                self.orig_stderr,
412
                self.orig___stdout,
413
                self.orig___stderr,
414
            )
415
        # get value from streams
416
        removed._stdout.flush()
×
417
        removed._stderr.flush()
×
418
        out = removed._stdout.getvalue()
×
419
        err = removed._stderr.getvalue()
×
420
        # close handles
421
        removed._stdout.close()
×
422
        removed._stderr.close()
×
423
        removed._stdout = out
×
424
        removed._stderr = err
×
425

426
    def _set(self, out, err, __out, __err):
1✔
427
        sys.stdout, sys.stderr, sys.__stdout__, sys.__stderr__ = (
×
428
            out,
429
            err,
430
            __out,
431
            __err,
432
        )
433

434
    def _proxy(self, original_stream, type):
1✔
435
        def proxy():
×
436
            ident = self._ident()
×
437
            ctx = self.CONTEXTS_BY_THREAD.get(ident)
×
438
            if ctx:
×
439
                return ctx._stdout if type == "stdout" else ctx._stderr
×
440
            return original_stream
×
441

442
        return proxy
×
443

444
    def _ident(self):
1✔
445
        return threading.current_thread().ident
×
446

447
    def stdout(self):
1✔
448
        return self._stream_value(self._stdout)
×
449

450
    def stderr(self):
1✔
451
        return self._stream_value(self._stderr)
×
452

453
    def _stream_value(self, stream):
1✔
454
        return stream.getvalue() if hasattr(stream, "getvalue") else stream
×
455

456

457
class CaptureOutputProcess:
1✔
458
    """A context manager that captures stdout/stderr of the current process.
459

460
    Basically a lightweight version of CaptureOutput without tracking internal thread mapping
461

462
    Use it as follows:
463

464
    with CaptureOutput() as c:
465
        ...
466
    print(c.stdout(), c.stderr())
467
    """
468

469
    class LogStreamIO(io.StringIO):
1✔
470
        def write(self, s):
1✔
471
            if isinstance(s, str) and hasattr(s, "decode"):
×
472
                s = s.decode("unicode-escape")
×
473
            return super(CaptureOutputProcess.LogStreamIO, self).write(s)
×
474

475
    def __init__(self):
1✔
476
        self.orig_stdout = sys.stdout
×
477
        self._stdout = self.LogStreamIO()
×
478
        self.orig_stderr = sys.stderr
×
479
        self._stderr = self.LogStreamIO()
×
480
        self.stdout_value = None
×
481
        self.stderr_value = None
×
482

483
    def __enter__(self):
1✔
484
        sys.stdout = self._stdout
×
485
        sys.stderr = self._stderr
×
486
        return self
×
487

488
    def __exit__(self, type, value, traceback):
1✔
489
        self._stdout.flush()
×
490
        self._stderr.flush()
×
491

492
        self.stdout_value = self._stdout.getvalue()
×
493
        self.stderr_value = self._stderr.getvalue()
×
494

495
        # close handles
496
        self._stdout.close()
×
497
        self._stderr.close()
×
498

499
        sys.stdout = self.orig_stdout
×
500
        sys.stderr = self.orig_stderr
×
501

502
    def stdout(self):
1✔
503
        return self.stdout_value
×
504

505
    def stderr(self):
1✔
506
        return self.stderr_value
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc