• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 6523d139-4c8d-4daf-a514-97baaa202bd9

04 Jun 2025 04:30PM UTC coverage: 86.762% (-0.006%) from 86.768%
6523d139-4c8d-4daf-a514-97baaa202bd9

push

circleci

web-flow
test(esm/sqs): Skip flaky test_report_batch_item_failures test (#12713)

65076 of 75005 relevant lines covered (86.76%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.42
/localstack-core/localstack/utils/run.py
1
import io
1✔
2
import logging
1✔
3
import os
1✔
4
import re
1✔
5
import select
1✔
6
import subprocess
1✔
7
import sys
1✔
8
import threading
1✔
9
import time
1✔
10
from functools import lru_cache
1✔
11
from queue import Queue
1✔
12
from typing import Any, AnyStr, Callable, Dict, List, Optional, Union
1✔
13

14
from localstack import config
1✔
15

16
# TODO: remove imports from here (need to update any client code that imports these from utils.common)
17
from localstack.utils.platform import is_linux, is_mac_os, is_windows  # noqa
1✔
18

19
from .sync import retry
1✔
20
from .threads import FuncThread, start_worker_thread
1✔
21

22
LOG = logging.getLogger(__name__)
1✔
23

24

25
def run(
1✔
26
    cmd: Union[str, List[str]],
27
    print_error=True,
28
    asynchronous=False,
29
    stdin=False,
30
    stderr=subprocess.STDOUT,
31
    outfile=None,
32
    env_vars: Optional[Dict[AnyStr, AnyStr]] = None,
33
    inherit_cwd=False,
34
    inherit_env=True,
35
    tty=False,
36
    shell=True,
37
    cwd: str = None,
38
) -> Union[str, subprocess.Popen]:
39
    LOG.debug("Executing command: %s", cmd)
1✔
40
    env_dict = os.environ.copy() if inherit_env else {}
1✔
41
    if env_vars:
1✔
42
        env_dict.update(env_vars)
1✔
43
    env_dict = {k: to_str(str(v)) for k, v in env_dict.items()}
1✔
44

45
    if isinstance(cmd, list):
1✔
46
        # See docs of subprocess.Popen(...):
47
        #  "On POSIX with shell=True, the shell defaults to /bin/sh. If args is a string,
48
        #   the string specifies the command to execute through the shell. [...] If args is
49
        #   a sequence, the first item specifies the command string, and any additional
50
        #   items will be treated as additional arguments to the shell itself."
51
        # Hence, we should *disable* shell mode here to be on the safe side, to prevent
52
        #  arguments in the cmd list from leaking into arguments to the shell itself. This will
53
        #  effectively allow us to call run(..) with both - str and list - as cmd argument, although
54
        #  over time we should move from "cmd: Union[str, List[str]]" to "cmd: List[str]" only.
55
        shell = False
1✔
56

57
    if tty:
1✔
58
        asynchronous = True
×
59
        stdin = True
×
60

61
    try:
1✔
62
        if inherit_cwd and not cwd:
1✔
63
            cwd = os.getcwd()
×
64
        if not asynchronous:
1✔
65
            if stdin:
1✔
66
                return subprocess.check_output(
×
67
                    cmd, shell=shell, stderr=stderr, env=env_dict, stdin=subprocess.PIPE, cwd=cwd
68
                )
69
            output = subprocess.check_output(cmd, shell=shell, stderr=stderr, env=env_dict, cwd=cwd)
1✔
70
            return output.decode(config.DEFAULT_ENCODING)
1✔
71

72
        stdin_arg = subprocess.PIPE if stdin else None
1✔
73
        stdout_arg = open(outfile, "ab") if isinstance(outfile, str) else outfile
1✔
74
        stderr_arg = stderr
1✔
75
        if tty:
1✔
76
            # Note: leave the "pty" import here (not supported in Windows)
77
            import pty
×
78

79
            master_fd, slave_fd = pty.openpty()
×
80
            stdin_arg = slave_fd
×
81
            stdout_arg = stderr_arg = None
×
82

83
        # start the actual sub process
84
        kwargs = {}
1✔
85
        if is_linux() or is_mac_os():
1✔
86
            kwargs["start_new_session"] = True
1✔
87
        process = subprocess.Popen(
1✔
88
            cmd,
89
            shell=shell,
90
            stdin=stdin_arg,
91
            bufsize=-1,
92
            stderr=stderr_arg,
93
            stdout=stdout_arg,
94
            env=env_dict,
95
            cwd=cwd,
96
            **kwargs,
97
        )
98

99
        if tty:
1✔
100
            # based on: https://stackoverflow.com/questions/41542960
101
            def pipe_streams(*args):
×
102
                while process.poll() is None:
×
103
                    r, w, e = select.select([sys.stdin, master_fd], [], [])
×
104
                    if sys.stdin in r:
×
105
                        d = os.read(sys.stdin.fileno(), 10240)
×
106
                        os.write(master_fd, d)
×
107
                    elif master_fd in r:
×
108
                        o = os.read(master_fd, 10240)
×
109
                        if o:
×
110
                            os.write(sys.stdout.fileno(), o)
×
111

112
            FuncThread(pipe_streams, name="pipe-streams").start()
×
113

114
        return process
1✔
115
    except subprocess.CalledProcessError as e:
1✔
116
        if print_error:
1✔
117
            print("ERROR: '%s': exit code %s; output: %s" % (cmd, e.returncode, e.output))
1✔
118
            sys.stdout.flush()
1✔
119
        raise e
1✔
120

121

122
def run_for_max_seconds(max_secs, _function, *args, **kwargs):
1✔
123
    """Run the given function for a maximum of `max_secs` seconds - continue running
124
    in a background thread if the function does not finish in time."""
125

126
    def _worker(*_args):
1✔
127
        try:
1✔
128
            fn_result = _function(*args, **kwargs)
1✔
129
        except Exception as e:
×
130
            fn_result = e
×
131

132
        fn_result = True if fn_result is None else fn_result
1✔
133
        q.put(fn_result)
1✔
134
        return fn_result
1✔
135

136
    start = time.time()
1✔
137
    q = Queue()
1✔
138
    start_worker_thread(_worker)
1✔
139
    for i in range(max_secs * 2):
1✔
140
        result = None
1✔
141
        try:
1✔
142
            result = q.get_nowait()
1✔
143
        except Exception:
1✔
144
            pass
1✔
145
        if result is not None:
1✔
146
            if isinstance(result, Exception):
1✔
147
                raise result
×
148
            return result
1✔
149
        if time.time() - start >= max_secs:
1✔
150
            return
×
151
        time.sleep(0.5)
1✔
152

153

154
def run_interactive(command: List[str]):
1✔
155
    """
156
    Run an interactive command in a subprocess. This blocks the current thread and attaches sys.stdin to
157
    the process. Copied from https://stackoverflow.com/a/43012138/804840
158

159
    :param command: the command to pass to subprocess.Popen
160
    """
161
    subprocess.check_call(command)
×
162

163

164
def is_command_available(cmd: str) -> bool:
1✔
165
    try:
1✔
166
        run(["which", cmd], print_error=False)
1✔
167
        return True
1✔
168
    except Exception:
1✔
169
        return False
1✔
170

171

172
def kill_process_tree(parent_pid):
1✔
173
    # Note: Do NOT import "psutil" at the root scope
174
    import psutil
1✔
175

176
    parent_pid = getattr(parent_pid, "pid", None) or parent_pid
1✔
177
    parent = psutil.Process(parent_pid)
1✔
178
    for child in parent.children(recursive=True):
1✔
179
        try:
1✔
180
            child.kill()
1✔
181
        except Exception:
×
182
            pass
×
183
    parent.kill()
1✔
184

185

186
def wait_for_process_to_be_killed(pid: int, sleep: float = None, retries: int = None):
1✔
187
    import psutil
×
188

189
    def _check_pid():
×
190
        assert not psutil.pid_exists(pid)
×
191

192
    retry(_check_pid, sleep=sleep, retries=retries)
×
193

194

195
def is_root() -> bool:
1✔
196
    return get_os_user() == "root"
1✔
197

198

199
@lru_cache()
1✔
200
def get_os_user() -> str:
1✔
201
    # using getpass.getuser() seems to be reporting a different/invalid user in Docker/macOS
202
    return run("whoami").strip()
1✔
203

204

205
def to_str(obj: Union[str, bytes], errors="strict"):
1✔
206
    return obj.decode(config.DEFAULT_ENCODING, errors) if isinstance(obj, bytes) else obj
1✔
207

208

209
class ShellCommandThread(FuncThread):
1✔
210
    """Helper class to run a shell command in a background thread."""
211

212
    def __init__(
1✔
213
        self,
214
        cmd: Union[str, List[str]],
215
        params: Any = None,
216
        outfile: Union[str, int] = None,
217
        env_vars: Dict[str, str] = None,
218
        stdin: bool = False,
219
        auto_restart: bool = False,
220
        quiet: bool = True,
221
        inherit_cwd: bool = False,
222
        inherit_env: bool = True,
223
        log_listener: Callable = None,
224
        stop_listener: Callable = None,
225
        strip_color: bool = False,
226
        name: Optional[str] = None,
227
        cwd: Optional[str] = None,
228
    ):
229
        params = params if params is not None else {}
1✔
230
        env_vars = env_vars if env_vars is not None else {}
1✔
231
        self.stopped = False
1✔
232
        self.cmd = cmd
1✔
233
        self.process = None
1✔
234
        self.outfile = outfile
1✔
235
        self.stdin = stdin
1✔
236
        self.env_vars = env_vars
1✔
237
        self.inherit_cwd = inherit_cwd
1✔
238
        self.inherit_env = inherit_env
1✔
239
        self.auto_restart = auto_restart
1✔
240
        self.log_listener = log_listener
1✔
241
        self.stop_listener = stop_listener
1✔
242
        self.strip_color = strip_color
1✔
243
        self.started = threading.Event()
1✔
244
        self.cwd = cwd
1✔
245
        FuncThread.__init__(
1✔
246
            self, self.run_cmd, params, quiet=quiet, name=(name or "shell-cmd-thread")
247
        )
248

249
    def run_cmd(self, params):
1✔
250
        while True:
1✔
251
            self.do_run_cmd()
1✔
252
            from localstack.runtime import events
1✔
253

254
            if (
1✔
255
                events.infra_stopping.is_set()  # FIXME: this is the wrong level of abstraction
256
                or not self.auto_restart
257
                or not self.process
258
                or self.process.returncode == 0
259
            ):
260
                return self.process.returncode if self.process else None
1✔
261
            LOG.info(
×
262
                "Restarting process (received exit code %s): %s", self.process.returncode, self.cmd
263
            )
264

265
    def do_run_cmd(self):
1✔
266
        def convert_line(line):
1✔
267
            line = to_str(line or "")
1✔
268
            if self.strip_color:
1✔
269
                # strip color codes
270
                line = re.sub(r"\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))", "", line)
1✔
271
            return "%s\r\n" % line.strip()
1✔
272

273
        def filter_line(line):
1✔
274
            """Return True if this line should be filtered, i.e., not printed"""
275
            return "(Press CTRL+C to quit)" in line
1✔
276

277
        outfile = self.outfile or os.devnull
1✔
278
        if self.log_listener and outfile == os.devnull:
1✔
279
            outfile = subprocess.PIPE
1✔
280
        try:
1✔
281
            self.process = run(
1✔
282
                self.cmd,
283
                asynchronous=True,
284
                stdin=self.stdin,
285
                outfile=outfile,
286
                env_vars=self.env_vars,
287
                inherit_cwd=self.inherit_cwd,
288
                inherit_env=self.inherit_env,
289
                cwd=self.cwd,
290
            )
291
            self.started.set()
1✔
292
            if outfile:
1✔
293
                if outfile == subprocess.PIPE:
1✔
294
                    # get stdout/stderr from child process and write to parent output
295
                    streams = (
1✔
296
                        (self.process.stdout, sys.stdout),
297
                        (self.process.stderr, sys.stderr),
298
                    )
299
                    for instream, outstream in streams:
1✔
300
                        if not instream:
1✔
301
                            continue
1✔
302
                        for line in iter(instream.readline, None):
1✔
303
                            # `line` should contain a newline at the end as we're iterating,
304
                            # hence we can safely break the loop if `line` is None or empty string
305
                            if line in [None, "", b""]:
1✔
306
                                break
1✔
307
                            if not (line and line.strip()) and self.is_killed():
1✔
308
                                break
×
309
                            line = convert_line(line)
1✔
310
                            if filter_line(line):
1✔
311
                                continue
×
312
                            if self.log_listener:
1✔
313
                                self.log_listener(line, stream=instream)
1✔
314
                            if self.outfile not in [None, os.devnull]:
1✔
315
                                outstream.write(line)
×
316
                                outstream.flush()
×
317
                if self.process:
1✔
318
                    self.process.wait()
×
319
            else:
320
                self.process.communicate()
×
321
        except Exception as e:
×
322
            self.result_future.set_exception(e)
×
323
            if self.process and not self.quiet:
×
324
                LOG.warning('Shell command error "%s": %s', e, self.cmd)
×
325
        if self.process and not self.quiet and self.process.returncode != 0:
1✔
326
            LOG.warning('Shell command exit code "%s": %s', self.process.returncode, self.cmd)
×
327

328
    def is_killed(self):
1✔
329
        from localstack.runtime import events
1✔
330

331
        if not self.process:
1✔
332
            return True
×
333
        if events.infra_stopping.is_set():  # FIXME
1✔
334
            return True
×
335
        # Note: Do NOT import "psutil" at the root scope, as this leads
336
        # to problems when importing this file from our test Lambdas in Docker
337
        # (Error: libc.musl-x86_64.so.1: cannot open shared object file)
338
        import psutil
1✔
339

340
        return not psutil.pid_exists(self.process.pid)
1✔
341

342
    def stop(self, quiet=False):
1✔
343
        if self.stopped:
1✔
344
            return
1✔
345
        if not self.process:
1✔
346
            LOG.warning("No process found for command '%s'", self.cmd)
×
347
            return
×
348

349
        parent_pid = self.process.pid
1✔
350
        try:
1✔
351
            kill_process_tree(parent_pid)
1✔
352
            self.process = None
1✔
353
        except Exception as e:
×
354
            if not quiet:
×
355
                LOG.warning("Unable to kill process with pid %s: %s", parent_pid, e)
×
356
        try:
1✔
357
            self.stop_listener and self.stop_listener(self)
1✔
358
        except Exception as e:
×
359
            if not quiet:
×
360
                LOG.warning("Unable to run stop handler for shell command thread %s: %s", self, e)
×
361
        self.stopped = True
1✔
362

363

364
class CaptureOutput:
1✔
365
    """A context manager that captures stdout/stderr of the current thread. Use it as follows:
366

367
    with CaptureOutput() as c:
368
        ...
369
    print(c.stdout(), c.stderr())
370
    """
371

372
    orig_stdout = sys.stdout
1✔
373
    orig_stderr = sys.stderr
1✔
374
    orig___stdout = sys.__stdout__
1✔
375
    orig___stderr = sys.__stderr__
1✔
376
    CONTEXTS_BY_THREAD = {}
1✔
377

378
    class LogStreamIO(io.StringIO):
1✔
379
        def write(self, s):
1✔
380
            if isinstance(s, str) and hasattr(s, "decode"):
×
381
                s = s.decode("unicode-escape")
×
382
            return super(CaptureOutput.LogStreamIO, self).write(s)
×
383

384
    def __init__(self):
1✔
385
        self._stdout = self.LogStreamIO()
×
386
        self._stderr = self.LogStreamIO()
×
387

388
    def __enter__(self):
1✔
389
        # Note: import werkzeug here (not at top of file) to allow dependency pruning
390
        from werkzeug.local import LocalProxy
×
391

392
        ident = self._ident()
×
393
        if ident not in self.CONTEXTS_BY_THREAD:
×
394
            self.CONTEXTS_BY_THREAD[ident] = self
×
395
            self._set(
×
396
                LocalProxy(self._proxy(sys.stdout, "stdout")),
397
                LocalProxy(self._proxy(sys.stderr, "stderr")),
398
                LocalProxy(self._proxy(sys.__stdout__, "stdout")),
399
                LocalProxy(self._proxy(sys.__stderr__, "stderr")),
400
            )
401
        return self
×
402

403
    def __exit__(self, type, value, traceback):
1✔
404
        ident = self._ident()
×
405
        removed = self.CONTEXTS_BY_THREAD.pop(ident, None)
×
406
        if not self.CONTEXTS_BY_THREAD:
×
407
            # reset pointers
408
            self._set(
×
409
                self.orig_stdout,
410
                self.orig_stderr,
411
                self.orig___stdout,
412
                self.orig___stderr,
413
            )
414
        # get value from streams
415
        removed._stdout.flush()
×
416
        removed._stderr.flush()
×
417
        out = removed._stdout.getvalue()
×
418
        err = removed._stderr.getvalue()
×
419
        # close handles
420
        removed._stdout.close()
×
421
        removed._stderr.close()
×
422
        removed._stdout = out
×
423
        removed._stderr = err
×
424

425
    def _set(self, out, err, __out, __err):
1✔
426
        sys.stdout, sys.stderr, sys.__stdout__, sys.__stderr__ = (
×
427
            out,
428
            err,
429
            __out,
430
            __err,
431
        )
432

433
    def _proxy(self, original_stream, type):
1✔
434
        def proxy():
×
435
            ident = self._ident()
×
436
            ctx = self.CONTEXTS_BY_THREAD.get(ident)
×
437
            if ctx:
×
438
                return ctx._stdout if type == "stdout" else ctx._stderr
×
439
            return original_stream
×
440

441
        return proxy
×
442

443
    def _ident(self):
1✔
444
        return threading.current_thread().ident
×
445

446
    def stdout(self):
1✔
447
        return self._stream_value(self._stdout)
×
448

449
    def stderr(self):
1✔
450
        return self._stream_value(self._stderr)
×
451

452
    def _stream_value(self, stream):
1✔
453
        return stream.getvalue() if hasattr(stream, "getvalue") else stream
×
454

455

456
class CaptureOutputProcess:
1✔
457
    """A context manager that captures stdout/stderr of the current process.
458

459
    Basically a lightweight version of CaptureOutput without tracking internal thread mapping
460

461
    Use it as follows:
462

463
    with CaptureOutput() as c:
464
        ...
465
    print(c.stdout(), c.stderr())
466
    """
467

468
    class LogStreamIO(io.StringIO):
1✔
469
        def write(self, s):
1✔
470
            if isinstance(s, str) and hasattr(s, "decode"):
×
471
                s = s.decode("unicode-escape")
×
472
            return super(CaptureOutputProcess.LogStreamIO, self).write(s)
×
473

474
    def __init__(self):
1✔
475
        self.orig_stdout = sys.stdout
×
476
        self._stdout = self.LogStreamIO()
×
477
        self.orig_stderr = sys.stderr
×
478
        self._stderr = self.LogStreamIO()
×
479
        self.stdout_value = None
×
480
        self.stderr_value = None
×
481

482
    def __enter__(self):
1✔
483
        sys.stdout = self._stdout
×
484
        sys.stderr = self._stderr
×
485
        return self
×
486

487
    def __exit__(self, type, value, traceback):
1✔
488
        self._stdout.flush()
×
489
        self._stderr.flush()
×
490

491
        self.stdout_value = self._stdout.getvalue()
×
492
        self.stderr_value = self._stderr.getvalue()
×
493

494
        # close handles
495
        self._stdout.close()
×
496
        self._stderr.close()
×
497

498
        sys.stdout = self.orig_stdout
×
499
        sys.stderr = self.orig_stderr
×
500

501
    def stdout(self):
1✔
502
        return self.stdout_value
×
503

504
    def stderr(self):
1✔
505
        return self.stderr_value
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc