• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Gallopsled / pwntools / 19618868396

23 Nov 2025 11:21PM UTC coverage: 73.914% (+0.06%) from 73.858%
19618868396

Pull #2641

github

web-flow
Merge 476cda4e9 into 886354662
Pull Request #2641: support preexec_args in process

3849 of 6484 branches covered (59.36%)

1 of 2 new or added lines in 1 file covered. (50.0%)

4 existing lines in 2 files now uncovered.

13479 of 18236 relevant lines covered (73.91%)

0.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.19
/pwnlib/tubes/process.py
1
# -*- coding: utf-8 -*-
2
from __future__ import absolute_import
1✔
3
from __future__ import division
1✔
4

5
import ctypes
1✔
6
import errno
1✔
7
import logging
1✔
8
import os
1✔
9
import select
1✔
10
import signal
1✔
11
import stat
1✔
12
import subprocess
1✔
13
import sys
1✔
14
import time
1✔
15
from collections import namedtuple
1✔
16

17
IS_WINDOWS = sys.platform.startswith('win')
1✔
18

19
if IS_WINDOWS:
1✔
20
    import queue
1✔
21
    import threading
1✔
22
else:
23
    import fcntl
1✔
24
    import pty
1✔
25
    import resource
1✔
26
    import tty
1✔
27

28
from pwnlib import qemu
1✔
29
from pwnlib.context import context
1✔
30
from pwnlib.log import getLogger
1✔
31
from pwnlib.timeout import Timeout
1✔
32
from pwnlib.tubes.tube import tube
1✔
33
from pwnlib.util.hashes import sha256file
1✔
34
from pwnlib.util.misc import parse_ldd_output
1✔
35
from pwnlib.util.misc import which
1✔
36
from pwnlib.util.misc import normalize_argv_env
1✔
37
from pwnlib.util.packing import _decode
1✔
38

39
log = getLogger(__name__)
1✔
40

41
class PTY(object): pass
1✔
42
PTY=PTY()
1✔
43
STDOUT = subprocess.STDOUT
1✔
44
PIPE = subprocess.PIPE
1✔
45

46
signal_names = {-v:k for k,v in signal.__dict__.items() if k.startswith('SIG')}
1✔
47

48
class process(tube):
1✔
49
    r"""
50
    Spawns a new process, and wraps it with a tube for communication.
51

52
    Arguments:
53

54
        argv(list):
55
            List of arguments to pass to the spawned process.
56
        shell(bool):
57
            Set to `True` to interpret `argv` as a string
58
            to pass to the shell for interpretation instead of as argv.
59
        executable(str):
60
            Path to the binary to execute.  If :const:`None`, uses ``argv[0]``.
61
            Cannot be used with ``shell``.
62
        cwd(str):
63
            Working directory.  Uses the current working directory by default.
64
        env(dict):
65
            Environment variables to add to the environment.
66
        ignore_environ(bool):
67
            Ignore Python's environment.  By default use Python's environment iff env not specified.
68
        stdin(int):
69
            File object or file descriptor number to use for ``stdin``.
70
            By default, a pipe is used.  A pty can be used instead by setting
71
            this to ``PTY``.  This will cause programs to behave in an
72
            interactive manner (e.g.., ``python`` will show a ``>>>`` prompt).
73
            If the application reads from ``/dev/tty`` directly, use a pty.
74
        stdout(int):
75
            File object or file descriptor number to use for ``stdout``.
76
            By default, a pty is used so that any stdout buffering by libc
77
            routines is disabled.
78
            May also be ``PIPE`` to use a normal pipe.
79
        stderr(int):
80
            File object or file descriptor number to use for ``stderr``.
81
            By default, ``STDOUT`` is used.
82
            May also be ``PIPE`` to use a separate pipe,
83
            although the :class:`pwnlib.tubes.tube.tube` wrapper will not be able to read this data.
84
        close_fds(bool):
85
            Close all open file descriptors except stdin, stdout, stderr.
86
            By default, :const:`True` is used.
87
        preexec_fn(callable):
88
            Callable to invoke immediately before calling ``execve``.
89
        preexec_args(iterable):
90
            Arguments passed to ``preexec_fn``.
91
        raw(bool):
92
            Set the created pty to raw mode (i.e. disable echo and control
93
            characters).  :const:`True` by default.  If no pty is created, this
94
            has no effect.
95
        aslr(bool):
96
            If set to :const:`False`, disable ASLR via ``personality`` (``setarch -R``)
97
            and ``setrlimit`` (``ulimit -s unlimited``).
98

99
            This disables ASLR for the target process.  However, the ``setarch``
100
            changes are lost if a ``setuid`` binary is executed.
101

102
            The default value is inherited from ``context.aslr``.
103
            See ``setuid`` below for additional options and information.
104
        setuid(bool):
105
            Used to control `setuid` status of the target binary, and the
106
            corresponding actions taken.
107

108
            By default, this value is :const:`None`, so no assumptions are made.
109

110
            If :const:`True`, treat the target binary as ``setuid``.
111
            This modifies the mechanisms used to disable ASLR on the process if
112
            ``aslr=False``.
113
            This is useful for debugging locally, when the exploit is a
114
            ``setuid`` binary.
115

116
            If :const:`False`, prevent ``setuid`` bits from taking effect on the
117
            target binary.  This is only supported on Linux, with kernels v3.5
118
            or greater.
119
        where(str):
120
            Where the process is running, used for logging purposes.
121
        display(list):
122
            List of arguments to display, instead of the main executable name.
123
        alarm(int):
124
            Set a SIGALRM alarm timeout on the process.
125
        creationflags(int):
126
            Windows only.  Flags to pass to ``CreateProcess``.
127

128
    Examples:
129

130
        >>> p = process('python')
131
        >>> p.sendline(b"print('Hello world')")
132
        >>> p.sendline(b"print('Wow, such data')")
133
        >>> b'' == p.recv(timeout=0.01)
134
        True
135
        >>> p.shutdown('send')
136
        >>> p.proc.stdin.closed
137
        True
138
        >>> p.connected('send')
139
        False
140
        >>> p.recvline()
141
        b'Hello world\n'
142
        >>> p.recvuntil(b',')
143
        b'Wow,'
144
        >>> p.recvregex(b'.*data')
145
        b' such data'
146
        >>> p.recv()
147
        b'\n'
148
        >>> p.recv() # doctest: +ELLIPSIS
149
        Traceback (most recent call last):
150
        ...
151
        EOFError
152

153
        >>> p = process('cat')
154
        >>> d = open('/dev/urandom', 'rb').read(4096)
155
        >>> p.recv(timeout=0.1)
156
        b''
157
        >>> p.write(d)
158
        >>> p.recvrepeat(0.1) == d
159
        True
160
        >>> p.recv(timeout=0.1)
161
        b''
162
        >>> p.shutdown('send')
163
        >>> p.wait_for_close()
164
        >>> p.poll()
165
        0
166

167
        >>> p = process('cat /dev/zero | head -c8', shell=True, stderr=open('/dev/null', 'w+b'))
168
        >>> p.recv()
169
        b'\x00\x00\x00\x00\x00\x00\x00\x00'
170

171
        >>> p = process(['python','-c','import os; print(os.read(2,1024).decode())'],
172
        ...             preexec_fn = lambda: os.dup2(0,2))
173
        >>> p.sendline(b'hello')
174
        >>> p.recvline()
175
        b'hello\n'
176

177
        >>> stack_smashing = ['python','-c','open("/dev/tty","wb").write(b"stack smashing detected")']
178
        >>> process(stack_smashing).recvall()
179
        b'stack smashing detected'
180

181
        >>> process(stack_smashing, stdout=PIPE).recvall()
182
        b''
183

184
        >>> getpass = ['python','-c','import getpass; print(getpass.getpass("XXX"))']
185
        >>> p = process(getpass, stdin=PTY)
186
        >>> p.recv()
187
        b'XXX'
188
        >>> p.sendline(b'hunter2')
189
        >>> p.recvall()
190
        b'\nhunter2\n'
191

192
        >>> process('echo hello 1>&2', shell=True).recvall()
193
        b'hello\n'
194

195
        >>> process('echo hello 1>&2', shell=True, stderr=PIPE).recvall()
196
        b''
197

198
        >>> a = process(['cat', '/proc/self/maps']).recvall()
199
        >>> b = process(['cat', '/proc/self/maps'], aslr=False).recvall()
200
        >>> with context.local(aslr=False):
201
        ...    c = process(['cat', '/proc/self/maps']).recvall()
202
        >>> a == b
203
        False
204
        >>> b == c
205
        True
206

207
        >>> process(['sh','-c','ulimit -s'], aslr=0).recvline()
208
        b'unlimited\n'
209

210
        >>> io = process(['sh','-c','sleep 10; exit 7'], alarm=2)
211
        >>> io.poll(block=True) == -signal.SIGALRM
212
        True
213

214
        >>> binary = ELF.from_assembly('nop', arch='mips')
215
        >>> p = process(binary.path)
216
        >>> binary_dir, binary_name = os.path.split(binary.path)
217
        >>> p = process('./{}'.format(binary_name), cwd=binary_dir)
218
        >>> p = process(binary.path, cwd=binary_dir)
219
        >>> p = process('./{}'.format(binary_name), cwd=os.path.relpath(binary_dir))
220
        >>> p = process(binary.path, cwd=os.path.relpath(binary_dir))
221

222
        >>> def write(s):
223
        ...    import os
224
        ...    os.write(1, s)
225
        >>> print(process('false', preexec_fn=write, preexec_args=(b"Hello World!", )).recvline().strip().decode())
226
        Hello World!
227
    """
228

229
    STDOUT = STDOUT
1✔
230
    PIPE = PIPE
1✔
231
    PTY = PTY
1✔
232

233
    #: Have we seen the process stop?  If so, this is a unix timestamp.
234
    _stop_noticed = 0
1✔
235

236
    proc = None
1✔
237

238
    def __init__(self, argv = None,
1✔
239
                 shell = False,
240
                 executable = None,
241
                 cwd = None,
242
                 env = None,
243
                 ignore_environ = None,
244
                 stdin  = PIPE,
245
                 stdout = PTY if not IS_WINDOWS else PIPE,
246
                 stderr = STDOUT,
247
                 close_fds = True,
248
                 preexec_fn = lambda: None,
249
                 preexec_args = (),
250
                 raw = True,
251
                 aslr = None,
252
                 setuid = None,
253
                 where = 'local',
254
                 display = None,
255
                 alarm = None,
256
                 creationflags = 0,
257
                 *args,
258
                 **kwargs
259
                 ):
260
        super(process, self).__init__(*args,**kwargs)
1✔
261

262
        # Permit using context.binary
263
        if argv is None:
1✔
264
            if context.binary:
1!
265
                argv = [context.binary.path]
1✔
266
            else:
267
                raise TypeError('Must provide argv or set context.binary')
×
268

269
        if IS_WINDOWS and PTY in (stdin, stdout, stderr):
1!
270
            raise NotImplementedError("ConPTY isn't implemented yet")
×
271

272
        #: :class:`subprocess.Popen` object that backs this process
273
        self.proc = None
1✔
274

275
        # We need to keep a copy of the un-_validated environment for printing
276
        original_env = env
1✔
277

278
        if shell:
1✔
279
            executable_val, argv_val, env_val = executable, argv, env
1✔
280
            if executable is None:
1!
281
                if IS_WINDOWS:
1!
282
                    executable_val = os.environ.get('ComSpec', 'cmd.exe')
×
283
                else:
284
                    executable_val = '/bin/sh'
1✔
285
        else:
286
            executable_val, argv_val, env_val = self._validate(cwd, executable, argv, env)
1✔
287

288
        if IS_WINDOWS:
1!
289
            self.pty = None
×
290
            self.raw = False
×
291
            self.aslr = True
×
292
            self._setuid = False
×
293
            self.suid = self.uid = None
×
294
            self.sgid = self.gid = None
×
295
            internal_preexec_fn = None
×
296
        else:
297
            # Avoid the need to have to deal with the STDOUT magic value.
298
            if stderr is STDOUT:
1✔
299
                stderr = stdout
1✔
300

301
            # Determine which descriptors will be attached to a new PTY
302
            handles = (stdin, stdout, stderr)
1✔
303

304
            #: Which file descriptor is the controlling TTY
305
            self.pty          = handles.index(PTY) if PTY in handles else None
1✔
306

307
            #: Whether the controlling TTY is set to raw mode
308
            self.raw          = raw
1✔
309

310
            #: Whether ASLR should be left on
311
            self.aslr         = aslr if aslr is not None else context.aslr
1✔
312

313
            #: Whether setuid is permitted
314
            self._setuid      = setuid if setuid is None else bool(setuid)
1✔
315

316
            # Create the PTY if necessary
317
            stdin, stdout, stderr, master, slave = self._handles(*handles)
1✔
318

319
            internal_preexec_fn = self.__preexec_fn
1✔
320

321
        #: Arguments passed on argv
322
        self.argv = argv_val
1✔
323

324
        #: Full path to the executable
325
        self.executable = executable_val
1✔
326

327
        if ignore_environ is None:
1!
328
            ignore_environ = env is not None  # compat
1✔
329

330
        #: Environment passed on envp
331
        self.env = {} if ignore_environ else dict(getattr(os, "environb", os.environ))
1✔
332

333
        # Add environment variables as needed
334
        self.env.update(env_val or {})
1✔
335

336
        self._cwd = os.path.realpath(cwd or os.path.curdir)
1✔
337

338
        #: Alarm timeout of the process
339
        self.alarm        = alarm
1✔
340

341
        self.preexec_fn = preexec_fn
1✔
342
        self.preexec_args = preexec_args
1✔
343
        self.display    = display or self.program
1✔
344
        self._qemu      = False
1✔
345
        self._corefile  = None
1✔
346

347
        message = "Starting %s process %r" % (where, self.display)
1✔
348

349
        if self.isEnabledFor(logging.DEBUG):
1!
350
            if argv != [self.executable]: message += ' argv=%r ' % self.argv
×
351
            if original_env not in (os.environ, None):  message += ' env=%r ' % self.env
×
352

353
        with self.progress(message) as p:
1✔
354

355
            if not self.aslr:
1✔
356
                self.warn_once("ASLR is disabled!")
1✔
357

358
            # In the event the binary is a foreign architecture,
359
            # and binfmt is not installed (e.g. when running on
360
            # Travis CI), re-try with qemu-XXX if we get an
361
            # 'Exec format error'.
362
            prefixes = [([], self.executable)]
1✔
363
            exception = None
1✔
364

365
            for prefix, executable in prefixes:
1!
366
                try:
1✔
367
                    args = self.argv
1✔
368
                    if prefix:
1!
369
                        args = prefix + args
×
370
                    self.proc = subprocess.Popen(args = args,
1✔
371
                                                 shell = shell,
372
                                                 executable = executable,
373
                                                 cwd = cwd,
374
                                                 env = self.env,
375
                                                 stdin = stdin,
376
                                                 stdout = stdout,
377
                                                 stderr = stderr,
378
                                                 close_fds = close_fds,
379
                                                 preexec_fn = internal_preexec_fn,
380
                                                 creationflags = creationflags)
381
                    break
1✔
382
                except OSError as exception:
×
383
                    if sys.platform == 'win32':
×
384
                        raise
×
385
                    if exception.errno != errno.ENOEXEC:
×
386
                        raise
×
387
                    prefixes.append(self.__on_enoexec(exception))
×
388

389
            p.success('pid %i' % self.pid)
1✔
390

391
        if IS_WINDOWS:
1!
392
            self._read_thread = None
×
393
            self._read_queue = queue.Queue()
×
394
            if self.proc.stdout:
×
395
                # Read from stdout in a thread
396
                self._read_thread = threading.Thread(target=_read_in_thread, args=(self._read_queue, self.proc.stdout))
×
397
                self._read_thread.daemon = True
×
398
                self._read_thread.start()
×
399
            return
×
400

401
        if self.pty is not None:
1✔
402
            if stdin is slave:
1✔
403
                self.proc.stdin = os.fdopen(os.dup(master), 'r+b', 0)
1✔
404
            if stdout is slave:
1!
405
                self.proc.stdout = os.fdopen(os.dup(master), 'r+b', 0)
1✔
406
            if stderr is slave:
1✔
407
                self.proc.stderr = os.fdopen(os.dup(master), 'r+b', 0)
1✔
408

409
            os.close(master)
1✔
410
            os.close(slave)
1✔
411

412
        # Set in non-blocking mode so that a call to call recv(1000) will
413
        # return as soon as a the first byte is available
414
        if self.proc.stdout:
1!
415
            fd = self.proc.stdout.fileno()
1✔
416
            fl = fcntl.fcntl(fd, fcntl.F_GETFL)
1✔
417
            fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
1✔
418

419
        # Save off information about whether the binary is setuid / setgid
420
        self.suid = self.uid = os.getuid()
1✔
421
        self.sgid = self.gid = os.getgid()
1✔
422
        st = os.stat(self.executable)
1✔
423
        if self._setuid:
1!
424
            if (st.st_mode & stat.S_ISUID):
×
425
                self.suid = st.st_uid
×
426
            if (st.st_mode & stat.S_ISGID):
×
427
                self.sgid = st.st_gid
×
428

429
    def __preexec_fn(self):
1✔
430
        """
431
        Routine executed in the child process before invoking execve().
432

433
        Handles setting the controlling TTY as well as invoking the user-
434
        supplied preexec_fn.
435
        """
436
        if self.pty is not None:
×
437
            self.__pty_make_controlling_tty(self.pty)
×
438

439
        if not self.aslr:
×
440
            try:
×
441
                if context.os == 'linux' and self._setuid is not True:
×
442
                    ADDR_NO_RANDOMIZE = 0x0040000
×
443
                    ctypes.CDLL('libc.so.6').personality(ADDR_NO_RANDOMIZE)
×
444

445
                resource.setrlimit(resource.RLIMIT_STACK, (-1, -1))
×
446
            except Exception:
×
447
                self.exception("Could not disable ASLR")
×
448

449
        # Check that the user would prefer to have core dumps or not.
450
        try:
×
451
            if context.disable_corefiles:
×
452
                resource.setrlimit(resource.RLIMIT_CORE, (0, -1))
×
453
            else:
454
                resource.setrlimit(resource.RLIMIT_CORE, (-1, -1))
×
455
        except Exception:
×
456
            pass
×
457

458
        # Given that we want a core file, assume that we want the whole thing.
459
        try:
×
460
            with open('/proc/self/coredump_filter', 'w') as f:
×
461
                f.write('0xff')
×
462
        except Exception:
×
463
            pass
×
464

465
        if self._setuid is False:
×
466
            try:
×
467
                PR_SET_NO_NEW_PRIVS = 38
×
468
                ctypes.CDLL('libc.so.6').prctl(PR_SET_NO_NEW_PRIVS, 1, 0, 0, 0)
×
469
            except Exception:
×
470
                pass
×
471

472
        # Avoid issues with attaching to processes when yama-ptrace is set
473
        try:
×
474
            PR_SET_PTRACER = 0x59616d61
×
475
            PR_SET_PTRACER_ANY = -1
×
476
            ctypes.CDLL('libc.so.6').prctl(PR_SET_PTRACER, PR_SET_PTRACER_ANY, 0, 0, 0)
×
477
        except Exception:
×
478
            pass
×
479

480

481
        if self.alarm is not None:
×
482
            signal.alarm(self.alarm)
×
483

NEW
484
        self.preexec_fn(*self.preexec_args)
×
485

486
    def __on_enoexec(self, exception):
1✔
487
        """We received an 'exec format' error (ENOEXEC)
488

489
        This implies that the user tried to execute e.g.
490
        an ARM binary on a non-ARM system, and does not have
491
        binfmt helpers installed for QEMU.
492
        """
493
        # Get the ELF binary for the target executable
494
        with context.quiet:
×
495
            # XXX: Cyclic imports :(
496
            from pwnlib.elf import ELF
×
497
            binary = ELF(self.executable)
×
498

499
        # If we're on macOS, this will never work.  Bail now.
500
        # if platform.mac_ver()[0]:
501
            # self.error("Cannot run ELF binaries on macOS")
502

503
        # Determine what architecture the binary is, and find the
504
        # appropriate qemu binary to run it.
505
        qemu_path = qemu.user_path(arch=binary.arch)
×
506

507
        if not qemu_path:
×
508
            raise exception
×
509

510
        qemu_path = which(qemu_path)
×
511
        if qemu_path:
×
512
            self._qemu = qemu_path
×
513

514
            args = [qemu_path]
×
515
            if self.argv:
×
516
                args += ['-0', self.argv[0]]
×
517
            args += ['--']
×
518

519
            return [args, qemu_path]
×
520

521
        # If we get here, we couldn't run the binary directly, and
522
        # we don't have a qemu which can run it.
523
        self.exception(exception)
×
524

525
    @property
1✔
526
    def program(self):
1✔
527
        """Alias for ``executable``, for backward compatibility.
528

529
        Example:
530

531
            >>> p = process('/bin/true')
532
            >>> p.executable == '/bin/true'
533
            True
534
            >>> p.executable == p.program
535
            True
536

537
        """
538
        return self.executable
1✔
539

540
    @property
1✔
541
    def cwd(self):
1✔
542
        """Directory that the process is working in.
543

544
        Example:
545

546
            >>> p = process('sh')
547
            >>> p.sendline(b'cd /tmp; echo AAA')
548
            >>> _ = p.recvuntil(b'AAA')
549
            >>> p.cwd == '/tmp'
550
            True
551
            >>> p.sendline(b'cd /proc; echo BBB;')
552
            >>> _ = p.recvuntil(b'BBB')
553
            >>> p.cwd
554
            '/proc'
555
        """
556
        try:
1✔
557
            from pwnlib.util.proc import cwd
1✔
558
            self._cwd = cwd(self.pid)
1✔
559
        except Exception:
1✔
560
            pass
1✔
561

562
        return self._cwd
1✔
563

564

565
    def _validate(self, cwd, executable, argv, env):
1✔
566
        """
567
        Perform extended validation on the executable path, argv, and envp.
568

569
        Mostly to make Python happy, but also to prevent common pitfalls.
570
        """
571

572
        orig_cwd = cwd
1✔
573
        cwd = cwd or os.path.curdir
1✔
574

575
        argv, env = normalize_argv_env(argv, env, self, 4)
1✔
576
        if env:
1✔
577
            if sys.platform == 'win32':
1!
578
                # Windows requires that all environment variables be strings
579
                env = {_decode(k): _decode(v) for k, v in env}
×
580
            else:
581
                env = {bytes(k): bytes(v) for k, v in env}
1✔
582
        if argv:
1!
583
            argv = list(map(bytes, argv))
1✔
584

585
        #
586
        # Validate executable
587
        #
588
        # - Must be an absolute or relative path to the target executable
589
        # - If not, attempt to resolve the name in $PATH
590
        #
591
        if not executable:
1!
592
            if not argv:
1!
593
                self.error("Must specify argv or executable")
×
594
            executable = argv[0]
1✔
595

596
        if not isinstance(executable, str):
1!
597
            executable = executable.decode('utf-8')
1✔
598

599
        path = env and env.get(b'PATH')
1✔
600
        if path:
1✔
601
            path = path.decode()
1✔
602
        else:
603
            path = os.environ.get('PATH')
1✔
604
        # Do not change absolute paths to binaries
605
        if executable.startswith(os.path.sep):
1✔
606
            pass
1✔
607

608
        # If there's no path component, it's in $PATH or relative to the
609
        # target directory.
610
        #
611
        # For example, 'sh'
612
        elif os.path.sep not in executable and which(executable, path=path):
1✔
613
            executable = which(executable, path=path)
1✔
614

615
        # Either there is a path component, or the binary is not in $PATH
616
        # For example, 'foo/bar' or 'bar' with cwd=='foo'
617
        elif os.path.sep not in executable:
1!
618
            tmp = executable
×
619
            executable = os.path.join(cwd, executable)
×
620
            self.warn_once("Could not find executable %r in $PATH, using %r instead" % (tmp, executable))
×
621

622
        # There is a path component and user specified a working directory,
623
        # it must be relative to that directory. For example, 'bar/baz' with
624
        # cwd='foo' or './baz' with cwd='foo/bar'
625
        elif orig_cwd:
1!
626
            executable = os.path.join(orig_cwd, executable)
1✔
627

628
        if not os.path.exists(executable):
1!
629
            self.error("%r does not exist"  % executable)
×
630
        if not os.path.isfile(executable):
1!
631
            self.error("%r is not a file" % executable)
×
632
        if not os.access(executable, os.X_OK):
1!
633
            self.error("%r is not marked as executable (+x)" % executable)
×
634

635
        return executable, argv, env
1✔
636

637
    def _handles(self, stdin, stdout, stderr):
1✔
638
        master = slave = None
1✔
639

640
        if self.pty is not None:
1✔
641
            # Normally we could just use PIPE and be happy.
642
            # Unfortunately, this results in undesired behavior when
643
            # printf() and similar functions buffer data instead of
644
            # sending it directly.
645
            #
646
            # By opening a PTY for STDOUT, the libc routines will not
647
            # buffer any data on STDOUT.
648
            master, slave = pty.openpty()
1✔
649

650
            if self.raw:
1!
651
                # By giving the child process a controlling TTY,
652
                # the OS will attempt to interpret terminal control codes
653
                # like backspace and Ctrl+C.
654
                #
655
                # If we don't want this, we set it to raw mode.
656
                tty.setraw(master)
1✔
657
                tty.setraw(slave)
1✔
658

659
            if stdin is PTY:
1✔
660
                stdin = slave
1✔
661
            if stdout is PTY:
1!
662
                stdout = slave
1✔
663
            if stderr is PTY:
1✔
664
                stderr = slave
1✔
665

666
        return stdin, stdout, stderr, master, slave
1✔
667

668
    def __getattr__(self, attr):
1✔
669
        """Permit pass-through access to the underlying process object for
670
        fields like ``pid`` and ``stdin``.
671
        """
672
        if not attr.startswith('_') and hasattr(self.proc, attr):
1!
673
            return getattr(self.proc, attr)
1✔
674
        raise AttributeError("'process' object has no attribute '%s'" % attr)
×
675

676
    def kill(self):
1✔
677
        """kill()
678

679
        Kills the process.
680
        """
681
        self.close()
1✔
682

683
    def terminate(self):
1✔
684
        """terminate()
685

686
        Terminates the process by sending SIGTERM.
687
        
688
        This is a more graceful way to stop a process compared to :meth:`kill`,
689
        which sends SIGKILL. The process has a chance to clean up and
690
        exit gracefully when receiving SIGTERM.
691

692
        The process can choose to ignore this signal, so proper cleanup
693
        is only done in :meth:`kill`/:meth:`close`.
694
        
695
        Examples:
696
        
697
            >>> p = process(['python', '-u', '-c', 'import signal;signal.signal(signal.SIGTERM, lambda signum,frame: (print("sigterm"),exit(0)));print("ready");input()'])
698
            >>> _ = p.recvline_contains(b'ready')
699
            >>> p.terminate()
700
            >>> p.recvuntil(b'sigterm') == b'sigterm'
701
            True
702
            >>> p.close()
703
        """
704
        if self.proc is None:
1!
705
            return
×
706
            
707
        try:
1✔
708
            self.proc.terminate()
1✔
709
        except OSError:
×
710
            # Process might have already exited
711
            pass
×
712

713
        # Check if process is still running.
714
        self.poll()
1✔
715

716
    def poll(self, block = False):
1✔
717
        """poll(block = False) -> int
718

719
        Arguments:
720
            block(bool): Wait for the process to exit
721

722
        Poll the exit code of the process. Will return None, if the
723
        process has not yet finished and the exit code otherwise.
724
        """
725

726
        # In order to facilitate retrieving core files, force an update
727
        # to the current working directory
728
        _ = self.cwd
1✔
729

730
        if block:
1✔
731
            self.wait_for_close()
1✔
732

733
        self.proc.poll()
1✔
734
        returncode = self.proc.returncode
1✔
735

736
        if returncode is not None and not self._stop_noticed:
1✔
737
            self._stop_noticed = time.time()
1✔
738
            signame = ''
1✔
739
            if returncode < 0:
1✔
740
                signame = ' (%s)' % (signal_names.get(returncode, 'SIG???'))
1✔
741

742
            self.info("Process %r stopped with exit code %d%s (pid %i)" % (self.display,
1✔
743
                                                                  returncode,
744
                                                                  signame,
745
                                                                  self.pid))
746
        return returncode
1✔
747

748
    def communicate(self, stdin = None):
1✔
749
        """communicate(stdin = None) -> str
750

751
        Calls :meth:`subprocess.Popen.communicate` method on the process.
752
        """
753

754
        return self.proc.communicate(stdin)
×
755

756
    # Implementation of the methods required for tube
757
    def recv_raw(self, numb):
1✔
758
        # This is a slight hack. We try to notice if the process is
759
        # dead, so we can write a message.
760
        self.poll()
1✔
761

762
        if not self.connected_raw('recv'):
1!
763
            raise EOFError
×
764

765
        if not self.can_recv_raw(self.timeout):
1✔
766
            return ''
1✔
767

768
        if IS_WINDOWS:
1!
769
            data = b''
×
770
            count = 0
×
771
            while count < numb:
×
772
                if self._read_queue.empty():
×
773
                    break
×
774
                last_byte = self._read_queue.get(block=False)
×
775
                data += last_byte
×
776
                count += 1
×
777
            return data
×
778

779
        # This will only be reached if we either have data,
780
        # or we have reached an EOF. In either case, it
781
        # should be safe to read without expecting it to block.
782
        data = ''
1✔
783

784
        try:
1✔
785
            data = self.proc.stdout.read(numb)
1✔
786
        except IOError:
1✔
787
            pass
1✔
788

789
        if not data:
1✔
790
            self.shutdown("recv")
1✔
791
            raise EOFError
1✔
792

793
        return data
1✔
794

795
    def send_raw(self, data):
1✔
796
        # This is a slight hack. We try to notice if the process is
797
        # dead, so we can write a message.
798
        self.poll()
1✔
799

800
        if not self.connected_raw('send'):
1!
801
            raise EOFError
×
802

803
        try:
1✔
804
            self.proc.stdin.write(data)
1✔
805
            self.proc.stdin.flush()
1✔
806
        except IOError:
×
807
            raise EOFError
×
808

809
    def settimeout_raw(self, timeout):
1✔
810
        pass
1✔
811

812
    def can_recv_raw(self, timeout):
1✔
813
        if not self.connected_raw('recv'):
1!
814
            return False
×
815

816
        if IS_WINDOWS:
1!
817
            with self.countdown(timeout=timeout):
×
818
                while self.timeout and self._read_queue.empty() and self._read_thread.is_alive():
×
819
                    time.sleep(0.01)
×
820
                if not self._read_queue.empty():
×
821
                    return True
×
822
                if not self._read_thread.is_alive():
×
823
                    raise EOFError
×
824
                return False
×
825

826
        try:
1✔
827
            if timeout is None:
1✔
828
                return select.select([self.proc.stdout], [], []) == ([self.proc.stdout], [], [])
1✔
829

830
            return select.select([self.proc.stdout], [], [], timeout) == ([self.proc.stdout], [], [])
1✔
831
        except ValueError:
×
832
            # Not sure why this isn't caught when testing self.proc.stdout.closed,
833
            # but it's not.
834
            #
835
            #   File "/home/user/pwntools/pwnlib/tubes/process.py", line 112, in can_recv_raw
836
            #     return select.select([self.proc.stdout], [], [], timeout) == ([self.proc.stdout], [], [])
837
            # ValueError: I/O operation on closed file
838
            raise EOFError
×
839
        except select.error as v:
×
840
            if v.args[0] == errno.EINTR:
×
841
                return False
×
842

843
    def connected_raw(self, direction):
1✔
844
        if direction == 'any':
1✔
845
            return self.poll() is None
1✔
846
        elif direction == 'send':
1✔
847
            return self.proc.stdin and not self.proc.stdin.closed
1✔
848
        elif direction == 'recv':
1!
849
            return self.proc.stdout and not self.proc.stdout.closed
1✔
850

851
    def close(self):
1✔
852
        if self.proc is None:
1!
853
            return
×
854

855
        # First check if we are already dead
856
        self.poll()
1✔
857

858
        if not self._stop_noticed:
1✔
859
            try:
1✔
860
                self.proc.kill()
1✔
861
                self.proc.wait()
1✔
862
                self._stop_noticed = time.time()
1✔
863
                self.info('Stopped process %r (pid %i)' % (self.program, self.pid))
1✔
864
            except OSError:
×
865
                pass
×
866

867
        # close file descriptors
868
        for fd in [self.proc.stdin, self.proc.stdout, self.proc.stderr]:
1✔
869
            if fd is not None:
1!
870
                try:
1✔
871
                    fd.close()
1✔
872
                except IOError as e:
×
873
                    if e.errno != errno.EPIPE and e.errno != errno.EINVAL:
×
874
                        raise
×
875

876

877
    def fileno(self):
1✔
878
        if not self.connected():
×
879
            self.error("A stopped process does not have a file number")
×
880

881
        return self.proc.stdout.fileno()
×
882

883
    def shutdown_raw(self, direction):
1✔
884
        if direction == "send":
1✔
885
            self.proc.stdin.close()
1✔
886

887
        if direction == "recv":
1✔
888
            self.proc.stdout.close()
1✔
889

890
        if all(fp is None or fp.closed for fp in [self.proc.stdin, self.proc.stdout]):
1✔
891
            self.close()
1✔
892

893
    def __pty_make_controlling_tty(self, tty_fd):
1✔
894
        '''This makes the pseudo-terminal the controlling tty. This should be
895
        more portable than the pty.fork() function. Specifically, this should
896
        work on Solaris. '''
897

898
        child_name = os.ttyname(tty_fd)
×
899

900
        # Disconnect from controlling tty. Harmless if not already connected.
901
        try:
×
902
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
×
903
            if fd >= 0:
×
904
                os.close(fd)
×
905
        # which exception, shouldnt' we catch explicitly .. ?
906
        except OSError:
×
907
            # Already disconnected. This happens if running inside cron.
908
            pass
×
909

910
        os.setsid()
×
911

912
        # Verify we are disconnected from controlling tty
913
        # by attempting to open it again.
914
        try:
×
915
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
×
916
            if fd >= 0:
×
917
                os.close(fd)
×
918
                raise Exception('Failed to disconnect from '
×
919
                    'controlling tty. It is still possible to open /dev/tty.')
920
        # which exception, shouldnt' we catch explicitly .. ?
921
        except OSError:
×
922
            # Good! We are disconnected from a controlling tty.
923
            pass
×
924

925
        # Verify we can open child pty.
926
        fd = os.open(child_name, os.O_RDWR)
×
927
        if fd < 0:
×
928
            raise Exception("Could not open child pty, " + child_name)
×
929
        else:
930
            os.close(fd)
×
931

932
        # Verify we now have a controlling tty.
933
        fd = os.open("/dev/tty", os.O_WRONLY)
×
934
        if fd < 0:
×
935
            raise Exception("Could not open controlling tty, /dev/tty")
×
936
        else:
937
            os.close(fd)
×
938

939
    def maps(self):
1✔
940
        """maps() -> [mapping]
941

942
        Returns a list of process mappings.
943
        
944
        A mapping object has the following fields:
945
            addr, address (addr alias), start (addr alias), end, size, perms, path, rss, pss, shared_clean, shared_dirty, private_clean, private_dirty, referenced, anonymous, swap
946

947
        perms is a permissions object, with the following fields:
948
            read, write, execute, private, shared, string
949

950
        Example:
951
      
952
            >>> p = process(['cat'])
953
            >>> p.sendline(b"meow")
954
            >>> p.recvline()
955
            b'meow\\n'
956
            >>> proc_maps = open("/proc/" + str(p.pid) + "/maps", "r").readlines()
957
            >>> pwn_maps = p.maps()
958
            >>> len(proc_maps) == len(pwn_maps)
959
            True
960
            >>> checker_arr = []
961
            >>> for proc, pwn in zip(proc_maps, pwn_maps):
962
            ...     proc = proc.split(' ')
963
            ...     p_addrs = proc[0].split('-')
964
            ...     checker_arr.append(int(p_addrs[0], 16) == pwn.addr == pwn.address == pwn.start)
965
            ...     checker_arr.append(int(p_addrs[1], 16) == pwn.end)
966
            ...     checker_arr.append(pwn.size == pwn.end - pwn.start)
967
            ...     checker_arr.append(pwn.perms.string == proc[1])
968
            ...     proc_path = proc[-1].strip()
969
            ...     checker_arr.append(pwn.path == proc_path or (pwn.path == '[anon]' and proc_path == ''))
970
            ...
971
            >>> checker_arr == [True] * len(proc_maps) * 5
972
            True
973

974
        Useful information about this can be found at: https://man7.org/linux/man-pages/man5/proc.5.html
975
        specifically the /proc/pid/maps section.
976

977
        memory_maps() returns a list of pmmap_ext objects. The definition (from psutil/_pslinux.py) is:
978

979
        .. code-block:: python
980

981
            pmmap_grouped = namedtuple(
982
                'pmmap_grouped',
983
                ['path', 'rss', 'size', 'pss', 'shared_clean', 'shared_dirty',
984
                'private_clean', 'private_dirty', 'referenced', 'anonymous', 'swap'])
985
            pmmap_ext = namedtuple(
986
                'pmmap_ext', 'addr perms ' + ' '.join(pmmap_grouped._fields))
987

988
            
989
        Here is an example of a pmmap_ext entry: 
990

991
        .. code-block:: python
992

993
            pmmap_ext(addr='15555551c000-155555520000', perms='r--p', path='[vvar]', rss=0, size=16384, pss=0, shared_clean=0, shared_dirty=0, private_clean=0, private_dirty=0, referenced=0, anonymous=0, swap=0)
994
        """
995

996
        permissions = namedtuple("permissions", "read write execute private shared string")
1✔
997
        mapping = namedtuple("mapping", 
1✔
998
            "addr address start end size perms path rss pss shared_clean shared_dirty private_clean private_dirty referenced anonymous swap")
999
        # addr = address (alias) = start (alias)
1000

1001
        from pwnlib.util.proc import memory_maps
1✔
1002
        raw_maps = memory_maps(self.pid)
1✔
1003

1004
        maps = []
1✔
1005
        # raw_mapping
1006
        for r_m in raw_maps:
1✔
1007
            p_perms = permissions('r' in r_m.perms, 'w' in r_m.perms, 'x' in r_m.perms, 'p' in r_m.perms, 's' in r_m.perms, r_m.perms)
1✔
1008
            addr_split = r_m.addr.split('-')
1✔
1009
            p_addr = int(addr_split[0], 16)
1✔
1010
            p_mapping = mapping(p_addr, p_addr, p_addr, int(addr_split[1], 16), r_m.size, p_perms, r_m.path, r_m.rss,
1✔
1011
                                r_m.pss, r_m.shared_clean, r_m.shared_dirty, r_m.private_clean, r_m.private_dirty,
1012
                                r_m.referenced, r_m.anonymous, r_m.swap)
1013
            maps.append(p_mapping)
1✔
1014

1015
        return maps
1✔
1016

1017
    def get_mapping(self, path_value, single=True):
1✔
1018
        """get_mapping(path_value, single=True) -> mapping
1019
        get_mapping(path_value, False) -> [mapping]
1020

1021
        Arguments:
1022
            path_value(str): The exact path of the requested mapping,
1023
                valid values are also [stack], [heap], etc..
1024
            single(bool=True): Whether to only return the first
1025
                mapping matched, or all of them.
1026

1027
        Returns found mapping(s) in process memory according to 
1028
        path_value.
1029

1030
        Example:
1031
            
1032
            >>> p = process(['cat'])
1033
            >>> mapping = p.get_mapping('[stack]')
1034
            >>> mapping.path == '[stack]'
1035
            True
1036
            >>> mapping.perms.execute
1037
            False
1038
            >>>
1039
            >>> mapping = p.get_mapping('does not exist')
1040
            >>> print(mapping)
1041
            None
1042
            >>>
1043
            >>> mappings = p.get_mapping(which('cat'), single=False)
1044
            >>> len(mappings) > 1
1045
            True
1046

1047
        """
1048
        all_maps = self.maps()
1✔
1049

1050
        if single:
1✔
1051
            for mapping in all_maps:
1✔
1052
                if path_value == mapping.path:
1✔
1053
                    return mapping
1✔
1054
            return None
1✔
1055

1056
        m_mappings = []
1✔
1057
        for mapping in all_maps:
1✔
1058
            if path_value == mapping.path:
1✔
1059
                m_mappings.append(mapping)
1✔
1060
        return m_mappings
1✔
1061

1062
    def stack_mapping(self, single=True):
1✔
1063
        """stack_mapping(single=True) -> mapping
1064
        stack_mapping(False) -> [mapping]
1065

1066
        Arguments:
1067
            single(bool=True): Whether to only return the first
1068
                mapping matched, or all of them.
1069

1070
        Returns :meth:`.process.get_mapping` with '[stack]' and single as arguments.
1071

1072
        Example:
1073

1074
            >>> p = process(['cat'])
1075
            >>> mapping = p.stack_mapping()
1076
            >>> mapping.path
1077
            '[stack]'
1078
            >>> mapping.perms.execute
1079
            False
1080
            >>> mapping.perms.write
1081
            True
1082
            >>> hex(mapping.address) # doctest: +SKIP
1083
            '0x7fffd99fe000'
1084
            >>> mappings = p.stack_mapping(single=False)
1085
            >>> len(mappings)
1086
            1
1087

1088
        """
1089
        return self.get_mapping('[stack]', single)
1✔
1090
    
1091
    def heap_mapping(self, single=True):
1✔
1092
        """heap_mapping(single=True) -> mapping
1093
        heap_mapping(False) -> [mapping]
1094

1095
        Arguments:
1096
            single(bool=True): Whether to only return the first
1097
                mapping matched, or all of them.
1098

1099
        Returns :meth:`.process.get_mapping` with '[heap]' and single as arguments.
1100

1101
        Example:
1102

1103
            >>> p = process(['cat'])
1104
            >>> p.sendline(b'meow')
1105
            >>> p.recvline()
1106
            b'meow\\n'
1107
            >>> mapping = p.heap_mapping()
1108
            >>> mapping.path
1109
            '[heap]'
1110
            >>> mapping.perms.execute
1111
            False
1112
            >>> mapping.perms.write
1113
            True
1114
            >>> hex(mapping.address) # doctest: +SKIP
1115
            '0x557650fae000'
1116
            >>> mappings = p.heap_mapping(single=False)
1117
            >>> len(mappings)
1118
            1
1119

1120
        """
1121
        return self.get_mapping('[heap]', single)
1✔
1122
    
1123
    def vdso_mapping(self, single=True):
1✔
1124
        """vdso_mapping(single=True) -> mapping
1125
        vdso_mapping(False) -> [mapping]
1126

1127
        Arguments:
1128
            single(bool=True): Whether to only return the first
1129
                mapping matched, or all of them.
1130

1131
        Returns :meth:`.process.get_mapping` with '[vdso]' and single as arguments.
1132

1133
        Example:
1134

1135
            >>> p = process(['cat'])
1136
            >>> mapping = p.vdso_mapping()
1137
            >>> mapping.path
1138
            '[vdso]'
1139
            >>> mapping.perms.execute
1140
            True
1141
            >>> mapping.perms.write
1142
            False
1143
            >>> hex(mapping.address) # doctest: +SKIP
1144
            '0x7ffcf13af000'
1145
            >>> mappings = p.vdso_mapping(single=False)
1146
            >>> len(mappings)
1147
            1
1148

1149
        """
1150
        return self.get_mapping('[vdso]', single)
1✔
1151
    
1152
    def vvar_mapping(self, single=True):
1✔
1153
        """vvar_mapping(single=True) -> mapping
1154
        vvar_mapping(False) -> [mapping]
1155

1156
        Arguments:
1157
            single(bool=True): Whether to only return the first
1158
                mapping matched, or all of them.
1159

1160
        Returns :meth:`.process.get_mapping` with '[vvar]' and single as arguments.
1161

1162
        Example:
1163

1164
            >>> p = process(['cat'])
1165
            >>> mapping = p.vvar_mapping()
1166
            >>> mapping.path
1167
            '[vvar]'
1168
            >>> mapping.perms.execute
1169
            False
1170
            >>> mapping.perms.write
1171
            False
1172
            >>> hex(mapping.address) # doctest: +SKIP
1173
            '0x7ffee5f60000'
1174
            >>> mappings = p.vvar_mapping(single=False)
1175
            >>> len(mappings)
1176
            1
1177

1178
        """
1179
        return self.get_mapping('[vvar]', single)
1✔
1180
    
1181
    def libc_mapping(self, single=True):
1✔
1182
        """libc_mapping(single=True) -> mapping
1183
        libc_mapping(False) -> [mapping]
1184

1185
        Arguments:
1186
            single(bool=True): Whether to only return the first
1187
                mapping matched, or all of them.
1188

1189
        Returns either the first libc mapping found in process memory,
1190
        or all libc mappings, depending on "single". 
1191

1192
        Example:
1193

1194
            >>> p = process(['cat'])
1195
            >>> p.sendline(b'meow')
1196
            >>> p.recvline()
1197
            b'meow\\n'
1198
            >>> mapping = p.libc_mapping()
1199
            >>> mapping.path # doctest: +ELLIPSIS
1200
            '...libc...'
1201
            >>> mapping.perms.execute
1202
            False
1203
            >>> mapping.perms.write
1204
            False
1205
            >>> hex(mapping.address) # doctest: +SKIP
1206
            '0x7fbde7fd7000'
1207
            >>>
1208
            >>> mappings = p.libc_mapping(single=False)
1209
            >>> len(mappings) > 1
1210
            True
1211
            >>> hex(mappings[1].address) # doctest: +SKIP
1212
            '0x7fbde7ffd000'
1213
            >>> mappings[0].end == mappings[1].start
1214
            True
1215
            >>> mappings[1].perms.execute
1216
            True
1217

1218
        """
1219
        all_maps = self.maps()
1✔
1220

1221
        if single:
1✔
1222
            for mapping in all_maps:
1!
1223
                lib_basename = os.path.basename(mapping.path)
1✔
1224
                if 'libc.so' in lib_basename or ('libc-' in lib_basename and '.so' in lib_basename):
1✔
1225
                    return mapping
1✔
1226
            return None
×
1227

1228
        l_mappings = []
1✔
1229
        for mapping in all_maps:
1✔
1230
            lib_basename = os.path.basename(mapping.path)
1✔
1231
            if 'libc.so' in lib_basename or ('libc-' in lib_basename and '.so' in lib_basename):
1✔
1232
                l_mappings.append(mapping)
1✔
1233
        return l_mappings
1✔
1234
    
1235
    def musl_mapping(self, single=True):
1✔
1236
        """musl_mapping(single=True) -> mapping
1237
        musl_mapping(False) -> [mapping]
1238

1239
        Arguments:
1240
            single(bool=True): Whether to only return the first
1241
                mapping matched, or all of them.
1242

1243
        Returns either the first musl mapping found in process memory,
1244
        or all musl mappings, depending on "single". 
1245
        """
1246
        all_maps = self.maps()
×
1247

1248
        if single:
×
1249
            for mapping in all_maps:
×
1250
                lib_basename = os.path.basename(mapping.path)
×
1251
                if 'musl.so' in lib_basename or ('musl-' in lib_basename and '.so' in lib_basename):
×
1252
                    return mapping
×
1253
            return None
×
1254
        
1255
        m_mappings = []
×
1256
        for mapping in all_maps:
×
1257
            lib_basename = os.path.basename(mapping.path)
×
1258
            if 'musl.so' in lib_basename or ('musl-' in lib_basename and '.so' in lib_basename):
×
1259
                m_mappings.append(mapping)
×
1260
        return m_mappings
×
1261
    
1262
    def elf_mapping(self, single=True):
1✔
1263
        """elf_mapping(single=True) -> mapping
1264
        elf_mapping(False) -> [mapping]
1265

1266
        Arguments:
1267
            single(bool=True): Whether to only return the first
1268
                mapping matched, or all of them.
1269

1270
        Returns :meth:`.process.get_mapping` with the :meth:`.process.elf` path and single as arguments.
1271

1272
        Example:
1273

1274
            >>> p = process(['cat'])
1275
            >>> p.sendline(b'meow')
1276
            >>> p.recvline()
1277
            b'meow\\n'
1278
            >>> mapping = p.elf_mapping()
1279
            >>> mapping.path # doctest: +ELLIPSIS
1280
            '...cat...'
1281
            >>> mapping.perms.execute
1282
            False
1283
            >>> mapping.perms.write
1284
            False
1285
            >>> hex(mapping.address) # doctest: +SKIP
1286
            '0x55a2abba0000'
1287
            >>> mappings = p.elf_mapping(single=False)
1288
            >>> len(mappings) > 1
1289
            True
1290
            >>> hex(mappings[1].address) # doctest: +SKIP
1291
            '0x55a2abba2000'
1292
            >>> mappings[0].end == mappings[1].start
1293
            True
1294
            >>> mappings[1].perms.execute
1295
            True
1296

1297
        """
1298
        return self.get_mapping(self.elf.path, single)
1✔
1299

1300
    def lib_size(self, path_value):
1✔
1301
        """lib_size(path_value) -> int
1302

1303
        Arguments:
1304
            path_value(str): The exact path of the shared library
1305
            loaded by the process
1306

1307
        Returns the size of the shared library in process memory.
1308
        If the library is not found, zero is returned.
1309

1310
        Example:
1311

1312
            >>> from pwn import *
1313
            >>> p = process(['cat'])
1314
            >>> p.send(b'meow')
1315
            >>> p.recvuntil(b'meow')
1316
            b'meow'
1317
            >>> libc_size = p.lib_size(p.libc.path)
1318
            >>> hex(libc_size) # doctest: +SKIP
1319
            '0x1d5000'
1320
            >>> libc_mappings = p.libc_mapping(single=False)
1321
            >>> libc_size == (libc_mappings[-1].end - libc_mappings[0].start)
1322
            True
1323

1324
        """
1325

1326
        # Expecting this to be sorted
1327
        lib_mappings = self.get_mapping(path_value, single=False)
1✔
1328
        
1329
        if len(lib_mappings) == 0:
1!
1330
            return 0
×
1331
    
1332
        is_contiguous = True
1✔
1333
        total_size = lib_mappings[0].size
1✔
1334
        for i in range(1, len(lib_mappings)):
1✔
1335
            total_size += lib_mappings[i].size
1✔
1336

1337
            if lib_mappings[i].start != lib_mappings[i - 1].end:
1!
1338
                is_contiguous = False
×
1339

1340
        if not is_contiguous:
1!
1341
            log.warn("lib_size(): %s mappings aren't contiguous" % path_value)
×
1342

1343
        return total_size
1✔
1344

1345
    def address_mapping(self, address):
1✔
1346
        """address_mapping(address) -> mapping
1347
        
1348
        Returns the mapping at the specified address.
1349

1350
        Example:
1351

1352
            >>> p = process(['cat'])
1353
            >>> p.sendline(b'meow')
1354
            >>> p.recvline()
1355
            b'meow\\n'
1356
            >>> libc = p.libc_mapping().address
1357
            >>> heap = p.heap_mapping().address
1358
            >>> elf = p.elf_mapping().address
1359
            >>> p.address_mapping(libc).path # doctest: +ELLIPSIS
1360
            '.../libc...'
1361
            >>> p.address_mapping(heap + 0x123).path
1362
            '[heap]'
1363
            >>> p.address_mapping(elf + 0x1234).path # doctest: +ELLIPSIS
1364
            '.../cat'
1365
            >>> p.address_mapping(elf - 0x1234) == None
1366
            True
1367

1368
        """
1369

1370
        all_maps = self.maps()
1✔
1371
        for mapping in all_maps:
1✔
1372
            if mapping.addr <= address < mapping.end:
1✔
1373
                return mapping
1✔
1374
        return None
1✔
1375

1376
    def libs(self):
1✔
1377
        """libs() -> dict
1378

1379
        Return a dictionary mapping the path of each shared library loaded
1380
        by the process to the address it is loaded at in the process' address
1381
        space.
1382
        """
1383
        maps_raw = self.poll() is None and self.maps()
1✔
1384

1385
        if not maps_raw:
1!
1386
            import pwnlib.elf.elf
×
1387

1388
            with context.quiet:
×
1389
                return pwnlib.elf.elf.ELF(self.executable).maps
×
1390

1391
        # Enumerate all of the libraries actually loaded right now.
1392
        libs = {}
1✔
1393
        for mapping in maps_raw:
1✔
1394
            path = mapping.path
1✔
1395
            if os.sep not in path: continue
1✔
1396
            path = os.path.realpath(path)
1✔
1397
            if path not in libs:
1✔
1398
                libs[path] = mapping.addr
1✔
1399

1400
        return libs
1✔
1401

1402
    @property
1✔
1403
    def libc(self):
1✔
1404
        """libc() -> ELF
1405

1406
        Returns an ELF for the libc for the current process.
1407
        If possible, it is adjusted to the correct address
1408
        automatically.
1409

1410
        Example:
1411

1412
        >>> p = process("/bin/cat")
1413
        >>> p.send(b"meow")
1414
        >>> p.recvuntil(b"meow")
1415
        b'meow'
1416
        >>> libc = p.libc
1417
        >>> libc is not None
1418
        True
1419
        >>> libc # doctest: +SKIP
1420
        ELF('/lib64/libc-...so')
1421
        >>> p.close()
1422
        """
1423
        from pwnlib.elf import ELF
1✔
1424

1425
        for lib, address in self.libs().items():
1!
1426
            lib_basename = os.path.basename(lib)
1✔
1427
            if 'libc.so' in lib_basename or ('libc-' in lib_basename and '.so' in lib_basename):
1✔
1428
                e = ELF(lib)
1✔
1429
                e.address = address
1✔
1430
                return e
1✔
1431

1432
    @property
1✔
1433
    def elf(self):
1✔
1434
        """elf() -> pwnlib.elf.elf.ELF
1435

1436
        Returns an ELF file for the executable that launched the process.
1437
        """
1438
        import pwnlib.elf.elf
1✔
1439
        return pwnlib.elf.elf.ELF(self.executable)
1✔
1440

1441
    @property
1✔
1442
    def corefile(self):
1✔
1443
        """corefile() -> pwnlib.elf.elf.Core
1444

1445
        Returns a corefile for the process.
1446

1447
        If the process is alive, attempts to create a coredump with GDB.
1448

1449
        If the process is dead: returns None if context.disable_corefiles is enabled,
1450
        otherwise attempts to locate the coredump created by the kernel.
1451
        """
1452
        # If the process is still alive, try using GDB
1453
        import pwnlib.elf.corefile
1✔
1454
        import pwnlib.gdb
1✔
1455

1456
        try:
1✔
1457
            if self.poll() is None:
1✔
1458
                corefile = pwnlib.gdb.corefile(self)
1✔
1459
                if corefile is None:
1!
1460
                    self.error("Could not create corefile with GDB for %s", self.executable)
×
1461
                return corefile
1✔
1462

1463
            if context.disable_corefiles :
1✔
1464
                self._corefile = None
1✔
1465
                return self._corefile
1✔
1466
            # Handle race condition against the kernel or QEMU to write the corefile
1467
            # by waiting up to 5 seconds for it to be written.
1468
            t = Timeout()
1✔
1469
            finder = None
1✔
1470
            with t.countdown(5):
1✔
1471
                while t.timeout and (finder is None or not finder.core_path):
1✔
1472
                    finder = pwnlib.elf.corefile.CorefileFinder(self)
1✔
1473
                    time.sleep(0.5)
1✔
1474

1475
            if not finder.core_path:
1!
1476
                self.error("Could not find core file for pid %i" % self.pid)
×
1477

1478
            core_hash = sha256file(finder.core_path)
1✔
1479

1480
            if self._corefile and self._corefile._hash == core_hash:
1✔
1481
                return self._corefile
1✔
1482

1483
            self._corefile = pwnlib.elf.corefile.Corefile(finder.core_path)
1✔
1484
        except AttributeError as e:
×
1485
            raise RuntimeError(e) # AttributeError would route through __getattr__, losing original message
×
1486
        self._corefile._hash = core_hash
1✔
1487

1488
        return self._corefile
1✔
1489

1490
    def leak(self, address, count=1):
1✔
1491
        r"""Leaks memory within the process at the specified address.
1492

1493
        Arguments:
1494
            address(int): Address to leak memory at
1495
            count(int): Number of bytes to leak at that address.
1496

1497
        Example:
1498

1499
            >>> e = ELF(which('bash-static'))
1500
            >>> p = process(e.path)
1501

1502
            In order to make sure there's not a race condition against
1503
            the process getting set up...
1504

1505
            >>> p.sendline(b'echo hello')
1506
            >>> p.recvuntil(b'hello')
1507
            b'hello'
1508

1509
            Now we can leak some data!
1510

1511
            >>> p.leak(e.address, 4)
1512
            b'\x7fELF'
1513
        """
1514
        # If it's running under qemu-user, don't leak anything.
1515
        if 'qemu-' in os.path.realpath('/proc/%i/exe' % self.pid):
1!
1516
            self.error("Cannot use leaker on binaries under QEMU.")
×
1517

1518
        with open('/proc/%i/mem' % self.pid, 'rb') as mem:
1✔
1519
            mem.seek(address)
1✔
1520
            return mem.read(count) or None
1✔
1521

1522
    readmem = leak
1✔
1523

1524
    def writemem(self, address, data):
1✔
1525
        r"""Writes memory within the process at the specified address.
1526

1527
        Arguments:
1528
            address(int): Address to write memory
1529
            data(bytes): Data to write to the address
1530

1531
        Example:
1532
        
1533
            Let's write data to  the beginning of the mapped memory of the  ELF.
1534

1535
            >>> context.clear(arch='i386')
1536
            >>> address = 0x100000
1537
            >>> data = cyclic(32)
1538
            >>> assembly = shellcraft.nop() * len(data)
1539

1540
            Wait for one byte of input, then write the data to stdout
1541

1542
            >>> assembly += shellcraft.write(1, address, 1)
1543
            >>> assembly += shellcraft.read(0, 'esp', 1)
1544
            >>> assembly += shellcraft.write(1, address, 32)
1545
            >>> assembly += shellcraft.exit()
1546
            >>> asm(assembly)[32:]
1547
            b'j\x01[\xb9\xff\xff\xef\xff\xf7\xd1\x89\xdaj\x04X\xcd\x801\xdb\x89\xe1j\x01Zj\x03X\xcd\x80j\x01[\xb9\xff\xff\xef\xff\xf7\xd1j Zj\x04X\xcd\x801\xdbj\x01X\xcd\x80'
1548

1549
            Assemble the binary and test it
1550

1551
            >>> elf = ELF.from_assembly(assembly, vma=address)
1552
            >>> io = elf.process()
1553
            >>> _ = io.recvuntil(b'\x90')
1554
            >>> _ = io.writemem(address, data)
1555
            >>> io.send(b'X')
1556
            >>> io.recvall()
1557
            b'aaaabaaacaaadaaaeaaafaaagaaahaaa'
1558
        """
1559

1560
        if 'qemu-' in os.path.realpath('/proc/%i/exe' % self.pid):
1!
1561
            self.error("Cannot use leaker on binaries under QEMU.")
×
1562

1563
        with open('/proc/%i/mem' % self.pid, 'wb') as mem:
1✔
1564
            mem.seek(address)
1✔
1565
            return mem.write(data)
1✔
1566

1567

1568
    @property
1✔
1569
    def stdin(self):
1✔
1570
        """Shorthand for ``self.proc.stdin``
1571

1572
        See: :obj:`.process.proc`
1573
        """
1574
        return self.proc.stdin
1✔
1575
    @property
1✔
1576
    def stdout(self):
1✔
1577
        """Shorthand for ``self.proc.stdout``
1578

1579
        See: :obj:`.process.proc`
1580
        """
1581
        return self.proc.stdout
1✔
1582
    @property
1✔
1583
    def stderr(self):
1✔
1584
        """Shorthand for ``self.proc.stderr``
1585

1586
        See: :obj:`.process.proc`
1587
        """
1588
        return self.proc.stderr
×
1589

1590
# Keep reading the process's output in a separate thread,
1591
# since there's no non-blocking read in python on Windows.
1592
def _read_in_thread(recv_queue, proc_stdout):
1✔
1593
    try:
×
1594
        while True:
×
1595
            b = proc_stdout.read(1)
×
1596
            if b:
×
1597
                recv_queue.put(b)
×
1598
            else:
1599
                break
×
1600
    except:
×
1601
        # Ignore any errors during Python shutdown
1602
        pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc