• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Gallopsled / pwntools / 13600950642

01 Mar 2025 04:10AM UTC coverage: 74.211% (+3.2%) from 71.055%
13600950642

Pull #2546

github

web-flow
Merge 77df40314 into 60cff2437
Pull Request #2546: ssh: Allow passing `disabled_algorithms` keyword argument from `ssh` to paramiko

3812 of 6380 branches covered (59.75%)

0 of 1 new or added line in 1 file covered. (0.0%)

1243 existing lines in 37 files now uncovered.

13352 of 17992 relevant lines covered (74.21%)

0.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.29
/pwnlib/util/misc.py
1
from __future__ import division
1✔
2

3
import json
1✔
4
import base64
1✔
5
import errno
1✔
6
import os
1✔
7
import re
1✔
8
import signal
1✔
9
import socket
1✔
10
import stat
1✔
11
import string
1✔
12
import subprocess
1✔
13
import sys
1✔
14
import tempfile
1✔
15
import inspect
1✔
16
import time
1✔
17
import types
1✔
18

19
from pwnlib import atexit
1✔
20
from pwnlib.context import context
1✔
21
from pwnlib.log import getLogger
1✔
22
from pwnlib.timeout import Timeout
1✔
23
from pwnlib.util import fiddling
1✔
24
from pwnlib.util import lists
1✔
25
from pwnlib.util import packing
1✔
26

27
log = getLogger(__name__)
1✔
28

29
def align(alignment, x):
1✔
30
    """align(alignment, x) -> int
31

32
    Rounds `x` up to nearest multiple of the `alignment`.
33

34
    Example:
35

36
      >>> [align(5, n) for n in range(15)]
37
      [0, 5, 5, 5, 5, 5, 10, 10, 10, 10, 10, 15, 15, 15, 15]
38
    """
39
    return x + -x % alignment
1✔
40

41

42
def align_down(alignment, x):
1✔
43
    """align_down(alignment, x) -> int
44

45
    Rounds `x` down to nearest multiple of the `alignment`.
46

47
    Example:
48

49
        >>> [align_down(5, n) for n in range(15)]
50
        [0, 0, 0, 0, 0, 5, 5, 5, 5, 5, 10, 10, 10, 10, 10]
51
    """
52
    return x - x % alignment
1✔
53

54

55
def binary_ip(host):
1✔
56
    """binary_ip(host) -> str
57

58
    Resolve host and return IP as four byte string.
59

60
    Example:
61

62
        >>> binary_ip("127.0.0.1")
63
        b'\\x7f\\x00\\x00\\x01'
64
    """
65
    return socket.inet_aton(socket.gethostbyname(host))
1✔
66

67

68
def size(n, abbrev = 'B', si = False):
1✔
69
    """size(n, abbrev = 'B', si = False) -> str
70

71
    Convert the length of a bytestream to human readable form.
72

73
    Arguments:
74
      n(int,iterable): The length to convert to human readable form,
75
        or an object which can have ``len()`` called on it.
76
      abbrev(str): String appended to the size, defaults to ``'B'``.
77

78
    Example:
79

80
        >>> size(451)
81
        '451B'
82
        >>> size(1000)
83
        '1000B'
84
        >>> size(1024)
85
        '1.00KB'
86
        >>> size(1024, ' bytes')
87
        '1.00K bytes'
88
        >>> size(1024, si = True)
89
        '1.02KB'
90
        >>> [size(1024 ** n) for n in range(7)]
91
        ['1B', '1.00KB', '1.00MB', '1.00GB', '1.00TB', '1.00PB', '1024.00PB']
92
        >>> size([])
93
        '0B'
94
        >>> size([1,2,3])
95
        '3B'
96
    """
97
    if hasattr(n, '__len__'):
1✔
98
        n = len(n)
1✔
99

100
    base = 1000.0 if si else 1024.0
1✔
101
    if n < base:
1✔
102
        return '%d%s' % (n, abbrev)
1✔
103

104
    for suffix in ['K', 'M', 'G', 'T']:
1✔
105
        n /= base
1✔
106
        if n < base:
1✔
107
            return '%.02f%s%s' % (n, suffix, abbrev)
1✔
108

109
    return '%.02fP%s' % (n / base, abbrev)
1✔
110

111
KB = 1000
1✔
112
MB = 1000 * KB
1✔
113
GB = 1000 * MB
1✔
114

115
KiB = 1024
1✔
116
MiB = 1024 * KiB
1✔
117
GiB = 1024 * MiB
1✔
118

119
def read(path, count=-1, skip=0):
1✔
120
    r"""read(path, count=-1, skip=0) -> str
121

122
    Open file, return content.
123

124
    Examples:
125

126
        >>> read('/proc/self/exe')[:4] # doctest: +LINUX +TODO
127
        b'\x7fELF'
128
    """
129
    path = os.path.expanduser(os.path.expandvars(path))
1✔
130
    with open(path, 'rb') as fd:
1✔
131
        if skip:
1!
UNCOV
132
            fd.seek(skip)
×
133
        return fd.read(count)
1✔
134

135

136
def write(path, data = b'', create_dir = False, mode = 'w'):
1✔
137
    """Create new file or truncate existing to zero length and write data."""
138
    path = os.path.expanduser(os.path.expandvars(path))
1✔
139
    if create_dir:
1!
UNCOV
140
        path = os.path.realpath(path)
×
141
        mkdir_p(os.path.dirname(path))
×
142
    if mode == 'w' and isinstance(data, bytes): mode += 'b'
1✔
143
    with open(path, mode) as f:
1✔
144
        f.write(data)
1✔
145

146
def which(name, all = False, path=None):
1✔
147
    """which(name, flags = os.X_OK, all = False) -> str or str set
148

149
    Works as the system command ``which``; searches $PATH for ``name`` and
150
    returns a full path if found.
151
    Tries all of the file extensions in $PATHEXT on Windows too.
152

153
    If `all` is :const:`True` the set of all found locations is returned, else
154
    the first occurrence or :const:`None` is returned.
155

156
    Arguments:
157
      `name` (str): The file to search for.
158
      `all` (bool):  Whether to return all locations where `name` was found.
159

160
    Returns:
161
      If `all` is :const:`True` the set of all locations where `name` was found,
162
      else the first location or :const:`None` if not found.
163

164
    Example:
165

166
        >>> which('sh') # doctest: +ELLIPSIS +POSIX +TODO
167
        '.../bin/sh'
168
    """
169
    # If name is a path, do not attempt to resolve it.
170
    if os.path.sep in name:
1✔
171
        return name
1✔
172

173
    if sys.platform == 'win32':
1!
UNCOV
174
        pathexts = os.environ.get('PATHEXT', '').split(os.pathsep)
×
175
        isroot = False
×
176
    else:
177
        pathexts = []
1✔
178
        isroot = os.getuid() == 0
1✔
179
    pathexts = [''] + pathexts
1✔
180
    out = set()
1✔
181
    try:
1✔
182
        path = path or os.environ['PATH']
1✔
UNCOV
183
    except KeyError:
×
184
        log.exception('Environment variable $PATH is not set')
×
185
    for path_part in path.split(os.pathsep):
1✔
186
        for ext in pathexts:
1✔
187
            nameext = name + ext
1✔
188
            p = os.path.join(path_part, nameext)
1✔
189
            if os.access(p, os.X_OK):
1✔
190
                st = os.stat(p)
1✔
191
                if not stat.S_ISREG(st.st_mode):
1!
UNCOV
192
                    continue
×
193
                # work around this issue: https://bugs.python.org/issue9311
194
                if isroot and not \
1!
195
                st.st_mode & (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH):
UNCOV
196
                    continue
×
197
                if all:
1!
UNCOV
198
                    out.add(p)
×
199
                    break
×
200
                else:
201
                    return p
1✔
202
    if all:
1!
UNCOV
203
        return out
×
204
    else:
205
        return None
1✔
206

207

208
def normalize_argv_env(argv, env, log, level=2):
1✔
209
    #
210
    # Validate argv
211
    #
212
    # - Must be a list/tuple of strings
213
    # - Each string must not contain '\x00'
214
    #
215
    argv = argv or []
1✔
216
    if isinstance(argv, (str, bytes, bytearray)):
1✔
217
        argv = [argv]
1✔
218

219
    if not isinstance(argv, (list, tuple)):
1!
UNCOV
220
        log.error('argv must be a list or tuple: %r' % argv)
×
221

222
    if not all(isinstance(arg, (str, bytes, bytearray)) for arg in argv):
1!
UNCOV
223
        log.error("argv must be strings or bytes: %r" % argv)
×
224

225
    # Create a duplicate so we can modify it
226
    argv = list(argv)
1✔
227

228
    for i, oarg in enumerate(argv):
1✔
229
        arg = packing._need_bytes(oarg, level, 0x80)  # ASCII text is okay
1✔
230
        if b'\x00' in arg[:-1]:
1!
UNCOV
231
            log.error('Inappropriate nulls in argv[%i]: %r' % (i, oarg))
×
232
        argv[i] = bytearray(arg.rstrip(b'\x00'))
1✔
233

234
    #
235
    # Validate environment
236
    #
237
    # - Must be a dictionary of {string:string}
238
    # - No strings may contain '\x00'
239
    #
240

241
    # Create a duplicate so we can modify it safely
242
    env2 = []
1✔
243
    if hasattr(env, 'items'):
1✔
244
        env_items = env.items()
1✔
245
    else:
246
        env_items = env
1✔
247
    if env:
1✔
248
        for k,v in env_items:
1✔
249
            if not isinstance(k, (bytes, str)):
1!
UNCOV
250
                log.error('Environment keys must be strings: %r' % k)
×
251
            # Check if = is in the key, Required check since we sometimes call ctypes.execve directly
252
            # https://github.com/python/cpython/blob/025995feadaeebeef5d808f2564f0fd65b704ea5/Modules/posixmodule.c#L6476
253
            if b'=' in packing._encode(k):
1!
UNCOV
254
                log.error('Environment keys may not contain "=": %r' % (k))
×
255
            if not isinstance(v, (bytes, str)):
1!
UNCOV
256
                log.error('Environment values must be strings: %r=%r' % (k,v))
×
257
            k = packing._need_bytes(k, level, 0x80)  # ASCII text is okay
1✔
258
            v = packing._need_bytes(v, level, 0x80)  # ASCII text is okay
1✔
259
            if b'\x00' in k[:-1]:
1!
UNCOV
260
                log.error('Inappropriate nulls in env key: %r' % (k))
×
261
            if b'\x00' in v[:-1]:
1!
UNCOV
262
                log.error('Inappropriate nulls in env value: %r=%r' % (k, v))
×
263
            env2.append((bytearray(k.rstrip(b'\x00')), bytearray(v.rstrip(b'\x00'))))
1✔
264

265
    return argv, env2 or env
1✔
266

267

268
def run_in_new_terminal(command, terminal=None, args=None, kill_at_exit=True, preexec_fn=None):
1✔
269
    """run_in_new_terminal(command, terminal=None, args=None, kill_at_exit=True, preexec_fn=None) -> int
270

271
    Run a command in a new terminal.
272

273
    When ``terminal`` is not set:
274
        - If ``context.terminal`` is set it will be used.
275
          If it is an iterable then ``context.terminal[1:]`` are default arguments.
276
        - If a ``pwntools-terminal`` command exists in ``$PATH``, it is used
277
        - If tmux is detected (by the presence of the ``$TMUX`` environment
278
          variable), a new pane will be opened.
279
        - If GNU Screen is detected (by the presence of the ``$STY`` environment
280
          variable), a new screen will be opened.
281
        - If ``$TERM_PROGRAM`` is set, that is used.
282
        - If X11 is detected (by the presence of the ``$DISPLAY`` environment
283
          variable), ``x-terminal-emulator`` is used.
284
        - If KDE Konsole is detected (by the presence of the ``$KONSOLE_VERSION``
285
          environment variable), a terminal will be split.
286
        - If WSL (Windows Subsystem for Linux) is detected (by the presence of
287
          a ``wsl.exe`` binary in the ``$PATH`` and ``/proc/sys/kernel/osrelease``
288
          containing ``Microsoft``), a new ``cmd.exe`` window will be opened.
289

290
    If `kill_at_exit` is :const:`True`, try to close the command/terminal when the
291
    current process exits. This may not work for all terminal types.
292

293
    Arguments:
294
        command (str): The command to run.
295
        terminal (str): Which terminal to use.
296
        args (list): Arguments to pass to the terminal
297
        kill_at_exit (bool): Whether to close the command/terminal on process exit.
298
        preexec_fn (callable): Callable to invoke before exec().
299

300
    Note:
301
        The command is opened with ``/dev/null`` for stdin, stdout, stderr.
302

303
    Returns:
304
      PID of the new terminal process
305
    """
306
    if not terminal:
1!
307
        if context.terminal:
1!
308
            terminal = context.terminal[0]
1✔
309
            args     = context.terminal[1:]
1✔
UNCOV
310
        elif which('pwntools-terminal'):
×
311
            terminal = 'pwntools-terminal'
×
312
            args     = []
×
313
        elif 'TMUX' in os.environ and which('tmux'):
×
314
            terminal = 'tmux'
×
315
            args     = ['splitw']
×
316
        elif 'STY' in os.environ and which('screen'):
×
317
            terminal = 'screen'
×
318
            args     = ['-t','pwntools-gdb','bash','-c']
×
319
        elif 'TERM_PROGRAM' in os.environ and os.environ['TERM_PROGRAM'] == "iTerm.app" and which('osascript'):
×
320
            # if we're on a mac, and using iTerm
UNCOV
321
            terminal = "osascript"
×
322
            args     = []
×
323
        elif 'TERM_PROGRAM' in os.environ and which(os.environ['TERM_PROGRAM']):
×
324
            terminal = os.environ['TERM_PROGRAM']
×
325
            args     = []
×
326
        elif 'DISPLAY' in os.environ and which('x-terminal-emulator'):
×
327
            terminal = 'x-terminal-emulator'
×
328
            args     = ['-e']
×
329
        elif 'KITTY_PID' in os.environ and which('kitty') and which('kitten'):
×
330
            terminal = 'kitten'
×
331
            args = ['@', 'launch']
×
332
        elif 'KONSOLE_VERSION' in os.environ and which('qdbus'):
×
333
            qdbus = which('qdbus')
×
UNCOV
334
            window_id = os.environ['WINDOWID']
×
335
            konsole_dbus_service = os.environ['KONSOLE_DBUS_SERVICE']
×
336

UNCOV
337
            with subprocess.Popen((qdbus, konsole_dbus_service), stdout=subprocess.PIPE) as proc:
×
UNCOV
338
                lines = proc.communicate()[0].decode().split('\n')
×
339

340
            # Iterate over all MainWindows
341
            for line in lines:
×
342
                parts = line.split('/')
×
343
                if len(parts) == 3 and parts[2].startswith('MainWindow_'):
×
UNCOV
344
                    name = parts[2]
×
345
                    with subprocess.Popen((qdbus, konsole_dbus_service, '/konsole/' + name,
×
346
                                           'org.kde.KMainWindow.winId'), stdout=subprocess.PIPE) as proc:
347
                        target_window_id = proc.communicate()[0].decode().strip()
×
UNCOV
348
                        if target_window_id == window_id:
×
349
                            break
×
350
            else:
UNCOV
351
                log.error('MainWindow not found')
×
352

353
            # Split
UNCOV
354
            subprocess.run((qdbus, konsole_dbus_service, '/konsole/' + name,
×
355
                            'org.kde.KMainWindow.activateAction', 'split-view-left-right'), stdout=subprocess.DEVNULL)
356

357
            # Find new session
358
            with subprocess.Popen((qdbus, konsole_dbus_service, os.environ['KONSOLE_DBUS_WINDOW'],
×
359
                                   'org.kde.konsole.Window.sessionList'), stdout=subprocess.PIPE) as proc:
UNCOV
360
                session_list = map(int, proc.communicate()[0].decode().split())
×
361
            last_konsole_session = max(session_list)
×
362

UNCOV
363
            terminal = 'qdbus'
×
UNCOV
364
            args = [konsole_dbus_service, '/Sessions/{}'.format(last_konsole_session),
×
365
                    'org.kde.konsole.Session.runCommand']
366

367
        else:
368
            is_wsl = False
×
369
            if os.path.exists('/proc/sys/kernel/osrelease'):
×
370
                with open('/proc/sys/kernel/osrelease', 'rb') as f:
×
371
                    is_wsl = b'icrosoft' in f.read()
×
372
            if is_wsl and which('cmd.exe') and which('wsl.exe') and which('bash.exe'):
×
373
                terminal    = 'cmd.exe'
×
374
                args        = ['/c', 'start']
×
UNCOV
375
                distro_name = os.getenv('WSL_DISTRO_NAME')
×
UNCOV
376
                current_dir = os.getcwd()
×
377

378
                # Split pane in Windows Terminal
379
                if 'WT_SESSION' in os.environ and which('wt.exe'):
×
380
                    args.extend(['wt.exe', '-w', '0', 'split-pane'])
×
UNCOV
381
                if distro_name:
×
382
                    args.extend(['wsl.exe', '-d', distro_name, '--cd', current_dir, 'bash', '-c'])
×
383
                else:
UNCOV
384
                    args.extend(['bash.exe', '-c'])
×
385

386
    if not terminal:
1!
387
        log.error('Could not find a terminal binary to use. Set context.terminal to your terminal.')
×
388
    elif not which(terminal):
1!
UNCOV
389
        log.error('Could not find terminal binary %r. Set context.terminal to your terminal.' % terminal)
×
390

391
    if isinstance(args, tuple):
1!
UNCOV
392
        args = list(args)
×
393

394
    # When not specifying context.terminal explicitly, we used to set these flags above.
395
    # However, if specifying terminal=['tmux', 'splitw', '-h'], we would be lacking these flags.
396
    # Instead, set them here and hope for the best.
397
    if terminal == 'tmux':
1!
UNCOV
398
        args += ['-F' '#{pane_pid}', '-P']
×
399

400
    argv = [which(terminal)] + args
1✔
401

402
    if isinstance(command, str):
1!
403
        if ';' in command:
×
UNCOV
404
            log.error("Cannot use commands with semicolon.  Create a script and invoke that directly.")
×
UNCOV
405
        argv += [command]
×
406
    elif isinstance(command, (list, tuple)):
1!
407
        # Dump the full command line to a temporary file so we can be sure that
408
        # it is parsed correctly, and we do not need to account for shell expansion
409
        script = '''
1✔
410
#!{executable!s}
411
import os
412
os.execve({argv0!r}, {argv!r}, os.environ)
413
'''
414
        script = script.format(executable='/bin/env ' * (' ' in sys.executable) + sys.executable,
1✔
415
                               argv=command,
416
                               argv0=which(command[0]))
417
        script = script.lstrip()
1✔
418

419
        log.debug("Created script for new terminal:\n%s" % script)
1✔
420

421
        with tempfile.NamedTemporaryFile(delete=False, mode='wt+') as tmp:
1✔
422
          tmp.write(script)
1✔
423
          tmp.flush()
1✔
424
          os.chmod(tmp.name, 0o700)
1✔
425
          argv += [tmp.name]
1✔
426

427

428
    # if we're on a Mac and use iTerm, we use `osascript` to split the current window
429
    # `command` was sanitized on the previous step. It is now either a string, or was written to a tmp file
430
    # we run the command, which is now `argv[-1]`
431
    if terminal == 'osascript':
1!
UNCOV
432
        osa_script = """
×
433
tell application "iTerm"
434
    tell current session of current window
435
        set newSession to (split horizontally with default profile)
436
    end tell
437
    tell newSession
438
        write text "{}"
439
    end tell
440
end tell
441
""".format(argv[-1])
442
        with tempfile.NamedTemporaryFile(delete=False, mode='wt+') as tmp:
×
443
            tmp.write(osa_script.lstrip())
×
444
            tmp.flush()
×
UNCOV
445
            os.chmod(tmp.name, 0o700)
×
UNCOV
446
            argv = [which(terminal), tmp.name]
×
447
    # cmd.exe does not support WSL UNC paths as working directory
448
    # so it gets reset to %WINDIR% before starting wsl again.
449
    # Set the working directory correctly in WSL.
450
    elif terminal == 'cmd.exe':
1!
UNCOV
451
        argv[-1] = "cd '{}' && {}".format(os.getcwd(), argv[-1])
×
452

453
    log.debug("Launching a new terminal: %r" % argv)
1✔
454

455
    stdin = stdout = stderr = open(os.devnull, 'r+b')
1✔
456
    if terminal == 'tmux' or terminal in ('kitty', 'kitten'):
1!
UNCOV
457
        stdout = subprocess.PIPE
×
458

459
    p = subprocess.Popen(argv, stdin=stdin, stdout=stdout, stderr=stderr, preexec_fn=preexec_fn)
1✔
460

461
    if terminal == 'tmux':
1!
462
        out, _ = p.communicate()
×
463
        try:
×
464
            pid = int(out)
×
465
        except ValueError:
×
466
            pid = None
×
UNCOV
467
        if pid is None:
×
468
            log.error("Could not parse PID from tmux output (%r). Start tmux first.", out)
×
469
    elif terminal == 'qdbus':
1!
470
        with subprocess.Popen((qdbus, konsole_dbus_service, '/Sessions/{}'.format(last_konsole_session),
×
471
                               'org.kde.konsole.Session.processId'), stdout=subprocess.PIPE) as proc:
472
            pid = int(proc.communicate()[0].decode())
×
473
    elif terminal in ('kitty', 'kitten'):
1!
474
        pid = None
×
475
        out, _ = p.communicate()
×
476
        try:
×
477
            kittyid = int(out)
×
478
        except ValueError:
×
479
            kittyid = None
×
UNCOV
480
        if kittyid is None:
×
481
            log.error("Could not parse kitty window ID from output (%r)", out)
×
482
        else:
483
            lsout, _ = subprocess.Popen(["kitten", "@", "ls", "--match", "id:%d" % kittyid], stdin=stdin, stdout=stdout, stderr=stderr).communicate()
×
484
            try:
×
485
                lsj = json.loads(lsout)
×
486
                pid = int(lsj[0]["tabs"][0]["windows"][0]["pid"])
×
487
            except json.JSONDecodeError as e:
×
UNCOV
488
                pid = None
×
UNCOV
489
                log.error("Json decode failed while parsing 'kitten @ ls' output (%r) (error: %r)", lsout, e)
×
490
            
491
    elif terminal == 'cmd.exe':
1!
492
        # p.pid is cmd.exe's pid instead of the WSL process we want to start eventually.
493
        # I don't know how to trace the execution through Windows and back into the WSL2 VM.
494
        # Do a best guess by waiting for a new process matching the command to be run.
495
        # Otherwise it's better to return nothing instead of a know wrong pid.
496
        from pwnlib.util.proc import pid_by_name
×
497
        pid = None
×
498
        ran_program = command.split(' ')[0] if isinstance(command, str) else command[0]
×
499
        t = Timeout()
×
500
        with t.countdown(timeout=5):
×
501
            while t.timeout:
×
502
                new_pid = pid_by_name(ran_program)
×
503
                if new_pid and new_pid[0] > p.pid:
×
504
                    pid = new_pid[0]
×
UNCOV
505
                    break
×
UNCOV
506
                time.sleep(0.01)
×
507
    else:
508
        pid = p.pid
1✔
509

510
    if kill_at_exit and pid:
1!
511
        def kill():
1✔
512
            try:
×
UNCOV
513
                if terminal == 'qdbus':
×
514
                    os.kill(pid, signal.SIGHUP)
×
515
                else:
516
                    os.kill(pid, signal.SIGTERM)
×
UNCOV
517
            except OSError:
×
UNCOV
518
                pass
×
519

520
        atexit.register(kill)
1✔
521

522
    return pid
1✔
523

524
def parse_ldd_output(output):
1✔
525
    """Parses the output from a run of 'ldd' on a binary.
526
    Returns a dictionary of {path: address} for
527
    each library required by the specified binary.
528

529
    Arguments:
530
      output(str): The output to parse
531

532
    Example:
533

534
        >>> sorted(parse_ldd_output('''
535
        ...     linux-vdso.so.1 =>  (0x00007fffbf5fe000)
536
        ...     libtinfo.so.5 => /lib/x86_64-linux-gnu/libtinfo.so.5 (0x00007fe28117f000)
537
        ...     libdl.so.2 => /lib/x86_64-linux-gnu/libdl.so.2 (0x00007fe280f7b000)
538
        ...     libc.so.6 => /lib/x86_64-linux-gnu/libc.so.6 (0x00007fe280bb4000)
539
        ...     /lib64/ld-linux-x86-64.so.2 (0x00007fe2813dd000)
540
        ... ''').keys())
541
        ['/lib/x86_64-linux-gnu/libc.so.6', '/lib/x86_64-linux-gnu/libdl.so.2', '/lib/x86_64-linux-gnu/libtinfo.so.5', '/lib64/ld-linux-x86-64.so.2']
542
    """
543
    expr_linux   = re.compile(r'\s(?P<lib>\S?/\S+)\s+\((?P<addr>0x.+)\)')
1✔
544
    expr_openbsd = re.compile(r'^\s+(?P<addr>[0-9a-f]+)\s+[0-9a-f]+\s+\S+\s+[01]\s+[0-9]+\s+[0-9]+\s+(?P<lib>\S+)$')
1✔
545
    libs = {}
1✔
546

547
    for s in output.split('\n'):
1✔
548
        match = expr_linux.search(s) or expr_openbsd.search(s)
1✔
549
        if not match:
1✔
550
            continue
1✔
551
        lib, addr = match.group('lib'), match.group('addr')
1✔
552
        libs[lib] = int(addr, 16)
1✔
553

554
    return libs
1✔
555

556
def mkdir_p(path):
1✔
557
    """Emulates the behavior of ``mkdir -p``."""
558

559
    try:
1✔
560
        os.makedirs(path)
1✔
561
    except OSError as exc:
1✔
562
        if exc.errno == errno.EEXIST and os.path.isdir(path):
1!
563
            pass
1✔
564
        else:
UNCOV
565
            raise
×
566

567
def dealarm_shell(tube):
1✔
568
    """Given a tube which is a shell, dealarm it.
569
    """
570
    tube.clean()
×
571

572
    tube.sendline('which python || echo')
×
573
    if tube.recvline().startswith('/'):
×
UNCOV
574
        tube.sendline('''exec python -c "import signal, os; signal.alarm(0); os.execl('$SHELL','')"''')
×
575
        return tube
×
576

577
    tube.sendline('which perl || echo')
×
578
    if tube.recvline().startswith('/'):
×
UNCOV
579
        tube.sendline('''exec perl -e "alarm 0; exec '${SHELL:-/bin/sh}'"''')
×
580
        return tube
×
581

UNCOV
582
    return None
×
583

584
def register_sizes(regs, in_sizes):
1✔
585
    """Create dictionaries over register sizes and relations
586

587
    Given a list of lists of overlapping register names (e.g. ['eax','ax','al','ah']) and a list of input sizes,
588
    it returns the following:
589

590
    * all_regs    : list of all valid registers
591
    * sizes[reg]  : the size of reg in bits
592
    * bigger[reg] : list of overlapping registers bigger than reg
593
    * smaller[reg]: list of overlapping registers smaller than reg
594

595
    Used in i386/AMD64 shellcode, e.g. the mov-shellcode.
596

597
    Example:
598

599
        >>> regs = [['eax', 'ax', 'al', 'ah'],['ebx', 'bx', 'bl', 'bh'],
600
        ... ['ecx', 'cx', 'cl', 'ch'],
601
        ... ['edx', 'dx', 'dl', 'dh'],
602
        ... ['edi', 'di'],
603
        ... ['esi', 'si'],
604
        ... ['ebp', 'bp'],
605
        ... ['esp', 'sp'],
606
        ... ]
607
        >>> all_regs, sizes, bigger, smaller = register_sizes(regs, [32, 16, 8, 8])
608
        >>> all_regs
609
        ['eax', 'ax', 'al', 'ah', 'ebx', 'bx', 'bl', 'bh', 'ecx', 'cx', 'cl', 'ch', 'edx', 'dx', 'dl', 'dh', 'edi', 'di', 'esi', 'si', 'ebp', 'bp', 'esp', 'sp']
610
        >>> pprint(sizes)
611
        {'ah': 8,
612
         'al': 8,
613
         'ax': 16,
614
         'bh': 8,
615
         'bl': 8,
616
         'bp': 16,
617
         'bx': 16,
618
         'ch': 8,
619
         'cl': 8,
620
         'cx': 16,
621
         'dh': 8,
622
         'di': 16,
623
         'dl': 8,
624
         'dx': 16,
625
         'eax': 32,
626
         'ebp': 32,
627
         'ebx': 32,
628
         'ecx': 32,
629
         'edi': 32,
630
         'edx': 32,
631
         'esi': 32,
632
         'esp': 32,
633
         'si': 16,
634
         'sp': 16}
635
        >>> pprint(bigger)
636
        {'ah': ['eax', 'ax', 'ah'],
637
         'al': ['eax', 'ax', 'al'],
638
         'ax': ['eax', 'ax'],
639
         'bh': ['ebx', 'bx', 'bh'],
640
         'bl': ['ebx', 'bx', 'bl'],
641
         'bp': ['ebp', 'bp'],
642
         'bx': ['ebx', 'bx'],
643
         'ch': ['ecx', 'cx', 'ch'],
644
         'cl': ['ecx', 'cx', 'cl'],
645
         'cx': ['ecx', 'cx'],
646
         'dh': ['edx', 'dx', 'dh'],
647
         'di': ['edi', 'di'],
648
         'dl': ['edx', 'dx', 'dl'],
649
         'dx': ['edx', 'dx'],
650
         'eax': ['eax'],
651
         'ebp': ['ebp'],
652
         'ebx': ['ebx'],
653
         'ecx': ['ecx'],
654
         'edi': ['edi'],
655
         'edx': ['edx'],
656
         'esi': ['esi'],
657
         'esp': ['esp'],
658
         'si': ['esi', 'si'],
659
         'sp': ['esp', 'sp']}
660
        >>> pprint(smaller)
661
        {'ah': [],
662
         'al': [],
663
         'ax': ['al', 'ah'],
664
         'bh': [],
665
         'bl': [],
666
         'bp': [],
667
         'bx': ['bl', 'bh'],
668
         'ch': [],
669
         'cl': [],
670
         'cx': ['cl', 'ch'],
671
         'dh': [],
672
         'di': [],
673
         'dl': [],
674
         'dx': ['dl', 'dh'],
675
         'eax': ['ax', 'al', 'ah'],
676
         'ebp': ['bp'],
677
         'ebx': ['bx', 'bl', 'bh'],
678
         'ecx': ['cx', 'cl', 'ch'],
679
         'edi': ['di'],
680
         'edx': ['dx', 'dl', 'dh'],
681
         'esi': ['si'],
682
         'esp': ['sp'],
683
         'si': [],
684
         'sp': []}
685
    """
686
    sizes = {}
1✔
687
    bigger = {}
1✔
688
    smaller = {}
1✔
689

690
    for l in regs:
1✔
691
        for r, s in zip(l, in_sizes):
1✔
692
            sizes[r] = s
1✔
693

694
        for r in l:
1✔
695
            bigger[r] = [r_ for r_ in l if sizes[r_] > sizes[r] or r == r_]
1✔
696
            smaller[r] = [r_ for r_ in l if sizes[r_] < sizes[r]]
1✔
697

698
    return lists.concat(regs), sizes, bigger, smaller
1✔
699

700

701
def _create_execve_script(argv=None, executable=None, cwd=None, env=None, ignore_environ=None,
1✔
702
        stdin=0, stdout=1, stderr=2, preexec_fn=None, preexec_args=(), aslr=None, setuid=None,
703
        shell=False, log=log):
704
    """
705
    Creates a python wrapper script that triggers the syscall `execve` directly.
706

707
    Arguments:
708
        argv(list):
709
            List of arguments to pass into the process
710
        executable(str):
711
            Path to the executable to run.
712
            If :const:`None`, ``argv[0]`` is used.
713
        cwd(str):
714
            Working directory.  If :const:`None`, uses the working directory specified
715
            on :attr:`cwd` or set via :meth:`set_working_directory`.
716
        env(dict):
717
            Environment variables to add to the environment.
718
        ignore_environ(bool):
719
            Ignore default environment.  By default use default environment iff env not specified.
720
        stdin(int, str):
721
            If an integer, replace stdin with the numbered file descriptor.
722
            If a string, a open a file with the specified path and replace
723
            stdin with its file descriptor.  May also be one of ``sys.stdin``,
724
            ``sys.stdout``, ``sys.stderr``.  If :const:`None`, the file descriptor is closed.
725
        stdout(int, str):
726
            See ``stdin``.
727
        stderr(int, str):
728
            See ``stdin``.
729
        preexec_fn(callable):
730
            Function which is executed on the remote side before execve().
731
            This **MUST** be a self-contained function -- it must perform
732
            all of its own imports, and cannot refer to variables outside
733
            its scope.
734
        preexec_args(object):
735
            Argument passed to ``preexec_fn``.
736
            This **MUST** only consist of native Python objects.
737
        aslr(bool):
738
            See :class:`pwnlib.tubes.process.process` for more information.
739
        setuid(bool):
740
            See :class:`pwnlib.tubes.process.process` for more information.
741
        shell(bool):
742
            Pass the command-line arguments to the shell.
743

744
    Returns:
745
        A string containing the python script.
746
    """
747
    if not argv and not executable:
1!
UNCOV
748
        log.error("Must specify argv or executable")
×
749

750
    aslr      = aslr if aslr is not None else context.aslr
1✔
751

752
    if ignore_environ is None:
1!
753
        ignore_environ = env is not None  # compat
1✔
754

755
    argv, env = normalize_argv_env(argv, env, log)
1✔
756

757
    if shell:
1✔
758
        if len(argv) != 1:
1!
UNCOV
759
            log.error('Cannot provide more than 1 argument if shell=True')
×
760
        argv = [bytearray(b'/bin/sh'), bytearray(b'-c')] + argv
1✔
761

762
    executable = executable or argv[0]
1✔
763
    cwd        = cwd or '.'
1✔
764

765
    # Validate, since failures on the remote side will suck.
766
    if not isinstance(executable, (str, bytes, bytearray)):
1!
767
        log.error("executable / argv[0] must be a string: %r" % executable)
×
768
    executable = bytearray(packing._need_bytes(executable, min_wrong=0x80))
1✔
769

770
    # Allow passing in sys.stdin/stdout/stderr objects
771
    handles = {sys.stdin: 0, sys.stdout:1, sys.stderr:2}
1✔
772
    stdin  = handles.get(stdin, stdin)
1✔
773
    stdout = handles.get(stdout, stdout)
1✔
774
    stderr = handles.get(stderr, stderr)
1✔
775

776
    # Allow the user to provide a self-contained function to run
777
    def func(): pass
1!
778
    func      = preexec_fn or func
1✔
779
    func_args = preexec_args
1✔
780

781
    if not isinstance(func, types.FunctionType):
1✔
782
        log.error("preexec_fn must be a function")
1✔
783

784
    func_name = func.__name__
1✔
785
    if func_name == (lambda: 0).__name__:
1✔
786
        log.error("preexec_fn cannot be a lambda")
1✔
787

788
    func_src  = inspect.getsource(func).strip()
1✔
789
    setuid = True if setuid is None else bool(setuid)
1✔
790

791

792
    script = r"""
1✔
793
#!/usr/bin/env python
794
import os, sys, ctypes, resource, platform, stat
795
from collections import OrderedDict
796
try:
797
    integer_types = int, long
798
except NameError:
799
    integer_types = int,
800
exe   = bytes(%(executable)r)
801
argv  = [bytes(a) for a in %(argv)r]
802
env   = %(env)r
803

804
os.chdir(%(cwd)r)
805

806
if %(ignore_environ)r:
807
    os.environ.clear()
808

809
environ = getattr(os, 'environb', os.environ)
810

811
if env is not None:
812
    env = OrderedDict((bytes(k), bytes(v)) for k,v in env)
813
    environ.update(env)
814
else:
815
    env = environ
816

817
def is_exe(path):
818
    return os.path.isfile(path) and os.access(path, os.X_OK)
819

820
PATH = environ.get(b'PATH',b'').split(os.pathsep.encode())
821

822
if os.path.sep.encode() not in exe and not is_exe(exe):
823
    for path in PATH:
824
        test_path = os.path.join(path, exe)
825
        if is_exe(test_path):
826
            exe = test_path
827
            break
828

829
if not is_exe(exe):
830
    sys.stderr.write('3\n')
831
    sys.stderr.write("{!r} is not executable or does not exist in $PATH: {!r}".format(exe,PATH))
832
    sys.exit(-1)
833

834
if not %(setuid)r:
835
    PR_SET_NO_NEW_PRIVS = 38
836
    result = ctypes.CDLL('libc.so.6').prctl(PR_SET_NO_NEW_PRIVS, 1, 0, 0, 0)
837

838
    if result != 0:
839
        sys.stdout.write('3\n')
840
        sys.stdout.write("Could not disable setuid: prctl(PR_SET_NO_NEW_PRIVS) failed")
841
        sys.exit(-1)
842

843
try:
844
    PR_SET_PTRACER = 0x59616d61
845
    PR_SET_PTRACER_ANY = -1
846
    ctypes.CDLL('libc.so.6').prctl(PR_SET_PTRACER, PR_SET_PTRACER_ANY, 0, 0, 0)
847
except Exception:
848
    pass
849

850
# Determine what UID the process will execute as
851
# This is used for locating apport core dumps
852
suid = os.getuid()
853
sgid = os.getgid()
854
st = os.stat(exe)
855
if %(setuid)r:
856
    if (st.st_mode & stat.S_ISUID):
857
        suid = st.st_uid
858
    if (st.st_mode & stat.S_ISGID):
859
        sgid = st.st_gid
860

861
if sys.argv[-1] == 'check':
862
    sys.stdout.write("1\n")
863
    sys.stdout.write(str(os.getpid()) + "\n")
864
    sys.stdout.write(str(os.getuid()) + "\n")
865
    sys.stdout.write(str(os.getgid()) + "\n")
866
    sys.stdout.write(str(suid) + "\n")
867
    sys.stdout.write(str(sgid) + "\n")
868
    getattr(sys.stdout, 'buffer', sys.stdout).write(os.path.realpath(exe) + b'\x00')
869
    sys.stdout.flush()
870

871
for fd, newfd in {0: %(stdin)r, 1: %(stdout)r, 2:%(stderr)r}.items():
872
    if newfd is None:
873
        os.close(fd)
874
    elif isinstance(newfd, (str, bytes)):
875
        newfd = os.open(newfd, os.O_RDONLY if fd == 0 else (os.O_RDWR|os.O_CREAT))
876
        os.dup2(newfd, fd)
877
        os.close(newfd)
878
    elif isinstance(newfd, integer_types) and newfd != fd:
879
        os.dup2(fd, newfd)
880

881
if not %(aslr)r:
882
    if platform.system().lower() == 'linux' and %(setuid)r is not True:
883
        ADDR_NO_RANDOMIZE = 0x0040000
884
        ctypes.CDLL('libc.so.6').personality(ADDR_NO_RANDOMIZE)
885

886
    resource.setrlimit(resource.RLIMIT_STACK, (-1, -1))
887

888
# Attempt to dump ALL core file regions
889
try:
890
    with open('/proc/self/coredump_filter', 'w') as core_filter:
891
        core_filter.write('0x3f\n')
892
except Exception:
893
    pass
894

895
# Assume that the user would prefer to have core dumps.
896
try:
897
    resource.setrlimit(resource.RLIMIT_CORE, (-1, -1))
898
except Exception:
899
    pass
900

901
%(func_src)s
902
%(func_name)s(*%(func_args)r)
903

904
""" % locals()  
905

906
    if len(argv) > 0 and len(argv[0]) > 0:
1✔
907
        script += r"os.execve(exe, argv, env) " 
1✔
908

909
    # os.execve does not allow us to pass empty argv[0]
910
    # Therefore we use ctypes to call execve directly
911
    else:
912
        script += r"""
1✔
913
# Transform envp from dict to list
914
env_list = [key + b"=" + value for key, value in env.items()]
915

916
# ctypes helper to convert a python list to a NULL-terminated C array
917
def to_carray(py_list):
918
    py_list += [None] # NULL-terminated
919
    return (ctypes.c_char_p * len(py_list))(*py_list)
920

921
c_argv = to_carray(argv)
922
c_env = to_carray(env_list)
923

924
# Call execve
925
libc = ctypes.CDLL('libc.so.6')
926
libc.execve(exe, c_argv, c_env)
927

928
# We should never get here, since we sanitized argv and env,
929
# but just in case, indicate that something went wrong.
930
libc.perror(b"execve")
931
raise OSError("execve failed")
932
""" % locals()
933
    script = script.strip()
1✔
934

935
    return script
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc