• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Gallopsled / pwntools / 6176798313

13 Sep 2023 07:09PM UTC coverage: 74.15% (-0.03%) from 74.176%
6176798313

push

github-actions

gogo2464
Install radare2 on cicd.

4602 of 7328 branches covered (0.0%)

12865 of 17350 relevant lines covered (74.15%)

0.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.39
/pwnlib/radare2.py
1
# -*- coding: utf-8 -*-
2
"""
1✔
3
Radare2 is a reverse engineering framwork. It is a reverse engineering tool. Contrary to gdb, r2 is specifically made and thank to reverse and exploit programs. His philosophy is not to be a programming tool made for IDE contrary to gdb, you do not have to trick his philosophy.
4

5
Pwntools makes this easy-to-do with a handful of helper routines, designed
6
to make your exploit-debug-update cycles much faster.
7

8
Useful Functions
9
----------------
10

11
- :func:`attach` - Attach to an existing process
12
- :func:`debug` - Start a new process under a debugger, stopped at the first instruction
13
- :func:`debug_shellcode` - Build a binary with the provided shellcode, and start it under a debugger. Works with emulation.
14

15
Debugging Tips
16
--------------
17

18
The :func:`attach` and :func:`debug` functions will likely all what you need for debugging.
19

20
Both allow you to provide a script to pass to Radare2 when it is started, so that
21
it can automatically set your breakpoints.
22

23
Attaching to Processes
24
~~~~~~~~~~~~~~~~~~~~~~
25

26
To attach to an existing process, just use :func:`attach`.  It is surprisingly
27
versatile, and can attach to a :class:`.process` for simple
28
binaries, or will automatically find the correct process to attach to for a
29
forking server, if given a :class:`.remote` object.
30

31
Spawning New Processes
32
~~~~~~~~~~~~~~~~~~~~~~
33

34
Attaching to processes with :func:`attach` is useful, but the state the process
35
is in may vary.  If you need to attach to a process very early, and debug it from
36
the very first instruction (or even the start of ``main``), you instead should use
37
:func:`debug`.
38

39
When you use :func:`debug`, the return value is a :class:`.tube` object
40
that you interact with exactly like normal.
41

42
Using Radare2 Python API
43
~~~~~~~~~~~~~~~~~~~~~~~~
44

45
Not tested yet.
46

47

48
Tips and Troubleshooting
49
------------------------
50

51
``NOPTRACE`` magic argument
52
~~~~~~~~~~~~~~~~~~~~~~~~~~~
53

54
It's quite cumbersom to comment and un-comment lines containing `attach`.
55

56
You can cause these lines to be a no-op by running your script with the
57
``NOPTRACE`` argument appended, or with ``PWNLIB_NOPTRACE=1`` in the environment.
58

59
::
60

61
    $ python exploit.py NOPTRACE
62
    [+] Starting local process '/bin/bash': Done
63
    [!] Skipping debug attach since context.noptrace==True
64
    ...
65

66
Kernel Yama ptrace_scope
67
~~~~~~~~~~~~~~~~~~~~~~~~
68

69
The Linux kernel v3.4 introduced a security mechanism called ``ptrace_scope``,
70
which is intended to prevent processes from debugging eachother unless there is
71
a direct parent-child relationship.
72

73
This causes some issues with the normal Pwntools workflow, since the process
74
hierarchy looks like this:
75

76
::
77

78
    python ---> target
79
           `--> gdb
80

81
Note that ``python`` is the parent of ``target``, not ``gdb``.
82

83
In order to avoid this being a problem, Pwntools uses the function
84
``prctl(PR_SET_PTRACER, PR_SET_PTRACER_ANY)``.  This disables Yama
85
for any processes launched by Pwntools via :class:`.process` or via
86
:meth:`.ssh.process`.
87

88
Older versions of Pwntools did not perform the ``prctl`` step, and
89
required that the Yama security feature was disabled systemwide, which
90
requires ``root`` access.
91

92
Member Documentation
93
===============================
94
"""
95

96
import six
1✔
97
import os
1✔
98
import tempfile
1✔
99
from pwnlib.context import LocalContext, context
1✔
100
from pwnlib.timeout import Timeout
1✔
101
from pwnlib.util import misc
1✔
102
from pwnlib.util import proc
1✔
103
from pwnlib.util import packing
1✔
104
from pwnlib.log import getLogger
1✔
105
from pwnlib import tubes
1✔
106

107

108
log = getLogger(__name__)
1✔
109

110

111

112
def binary():
1✔
113
    """binary() -> str
114
    Returns:
115
        str: Path to the appropriate ``radare2`` binary to use.
116
    Example:
117
        >>> radare2.binary() # doctest: +SKIP
118
        '/usr/local/bin/r2'
119
    """
120
    radare2 = misc.which('radare2')
1✔
121

122
    if not radare2:
1!
123
        log.error('radare2 is not installed\n'
×
124
                  '$ git clone https://github.com/radareorg/radare2 ; cd radare2 ; sh sys/install.sh ; cd .. ;')
125

126
    return radare2
1✔
127
    
128

129
@LocalContext
1✔
130
def attach(target, radare2_script = "", exe = None, radare2_args = None, ssh = None, sysroot = None, api = False):
1✔
131
    r"""
132
    Start Radare2 in a new terminal and attach to `target`.
133

134
    Arguments:
135
        target: The target to attach to.
136
        radare2_script(:obj:`str` or :obj:`file`): Radare2 script to run after attaching.
137
        exe(str): The path of the target binary.
138
        arch(str): Architechture of the target binary.  If `exe` known Radare2 will
139
          detect the architechture automatically (if it is supported).
140
        radare2_args(list): List of additional arguments to pass to Radare2.
141
        sysroot(str): Set an alternate system root. The system root is used to
142
            load absolute shared library symbol files. This is useful to instruct
143
            Radare2 to load a local version of binaries/libraries instead of downloading
144
            them from the gdbserver, which is faster
145
        api(bool): Enable access to Radare2 Python API.
146

147
    Returns:
148
        PID of the Radare2 process (or the window which it is running in).
149
        When ``api=True``, a (PID, :class:`Radare2`) tuple.
150

151
    Notes:
152

153
        The ``target`` argument is very robust, and can be any of the following:
154

155
        :obj:`int`
156
            PID of a process
157
        :obj:`str`
158
            Process name.  The youngest process is selected.
159
        :obj:`tuple`
160
            Host, port pair of a listening ``gdbserver``
161
        :class:`.process`
162
            Process to connect to
163
        :class:`.sock`
164
            Connected socket. The executable on the other end of the connection is attached to.
165
            Can be any socket type, including :class:`.listen` or :class:`.remote`.
166
        :class:`.ssh_channel`
167
            Remote process spawned via :meth:`.ssh.process`.
168
            This will use the Radare2 installed on the remote machine.
169
            If a password is required to connect, the ``sshpass`` program must be installed.
170

171
    Examples:
172

173
        Attach to a process by PID
174

175
        >>> pid = radare2.attach(1234) # doctest: +SKIP
176

177
        Attach to the youngest process by name
178

179
        >>> pid = radare2.attach('bash') # doctest: +SKIP
180

181
        Attach a debugger to a :class:`.process` tube and automate interaction
182
        
183
        >>> io = process('bash')
184
        >>> pid = radare2.attach(io, radare2_script='''
185
        ... pd 10 @ main
186
        ... q!!
187
        ... ''')
188
        >>> io = process(['echo', 'ABC'])
189
        >>> pid = radare2.attach(io, radare2_script='dc')
190
        >>> io.recvline()
191
        b'ABC\n'
192
        >>> io = process('bash')
193
        >>> pid = radare2.attach(io, radare2_script='dc')
194
        >>> io.sendline(b'echo Hello from bash && exit')
195
        >>> io.recvall()
196
        b'Hello from bash\n'
197

198
        Using Radare2 Python API:
199

200
        .. doctest
201
           :skipif: six.PY2
202

203
            >>> io = process('bash')
204

205
            Attach a debugger
206

207
            >>> pid, io_radare2 = radare2.attach(io, api=True)
208

209
            Force the program to write something it normally wouldn't
210

211
            >>> #io_radare2.execute('aeC puts("Hello from process debugger!")')
212

213
            Resume the program
214

215
            >>> io_radare2.continue_nowait()
216

217
            Observe the forced line
218

219
            >>> io.recvline()
220
            b'Hello from process debugger!\n'
221

222
            Interact with the program in a regular way
223

224
            >>> io.sendline(b'echo Hello from bash && exit')
225

226
            Observe the results
227

228
            >>> io.recvall()
229
            b'Hello from bash\n'
230

231
        Attach to the remote process from a :class:`.remote` or :class:`.listen` tube,
232
        as long as it is running on the same machine.
233

234
        >>> server = process(['socat', 'tcp-listen:12345,reuseaddr,fork', 'exec:"/bin/bash",nofork'])#server = process(['socat', 'tcp-listen:12345,reuseaddr,fork', 'exec:"/usr/bin/echo QWERTY",nofork'])
235
        >>> sleep(1) # Wait for socat to start
236
        >>> io = remote('127.0.0.1', 12345)
237
        >>> sleep(1) # Wait for process to fork
238
        >>> pid = radare2.attach(io, radare2_script="dc;")
239
        >>> io.sendline(b'echo QWERTY && exit')
240
        >>> io.recvline()
241
        b'QWERTY\n'
242
        
243
        >>> server = process(['socat', 'tcp-listen:12345,reuseaddr,fork', 'exec:"/bin/bash",nofork'])
244
        >>> sleep(1) # Wait for socat to start
245
        >>> io = remote('127.0.0.1', 12345)
246
        >>> sleep(1) # Wait for process to fork
247
        >>> io.sendline(b'echo Hello from bash && exit')
248
        >>> io.recvall()
249
        b'Hello from bash\n'
250

251
        Attach to processes running on a remote machine via an SSH :class:`.ssh` process
252

253
        >>> shell = ssh('travis', 'example.pwnme', password='demopass')
254
        >>> io = shell.process(['echo', 'Hello_world!'])
255
        >>> pid = radare2.attach(io, radare2_script='''
256
        ... dc
257
        ... q!!
258
        ... ''')
259
        
260
        >>> io.recvline(timeout=5)  # doctest: +SKIP
261
        b'Hello_world!\n'
262
        >>> io = shell.process(['bash'])
263
        >>> io.sendline(b'This will be echoed back')
264
        >>> pid = radare2.attach(io, radare2_script='''
265
        ... dc
266
        ... q!!
267
        ... ''')
268

269
        >>> #io.recvline()
270
        
271
        b'This will be echoed back\n'
272
        
273
        >>> #io.close()
274
    """
275
    if context.noptrace:
1!
276
        log.warn_once("Skipping debug attach since context.noptrace==True")
×
277
        return
×
278

279
    # enable radare2.attach(p, 'continue')
280
    if radare2_script and not radare2_script.endswith('\n'):
1✔
281
        radare2_script += '\n'
1✔
282

283
    # radare2 script to run before `radare2_script`
284

285
    # let's see if we can find a pid to attach to
286
    pid = None
1✔
287
    if   isinstance(target, six.integer_types):
1!
288
        # target is a pid, easy peasy
289
        pid = target
×
290
    elif isinstance(target, str):
1!
291
        # pidof picks the youngest process
292
        pidof = proc.pidof
×
293

294
        if context.os == 'android':
×
295
            pidof = adb.pidof
×
296

297
        pids = list(pidof(target))
×
298
        if not pids:
×
299
            log.error('No such process: %s', target)
×
300
        pid = pids[0]
×
301
        log.info('Attaching to youngest process "%s" (PID = %d)' %
×
302
                 (target, pid))
303
    elif isinstance(target, tubes.ssh.ssh_channel):
1✔
304
        if not target.pid:
1!
305
            log.error("PID unknown for channel")
×
306

307
        shell = target.parent
1✔
308

309
        tmpfile = shell.mktemp()
1✔
310
        radare2_script = b'shell rm %s\n%s' % (tmpfile, packing._need_bytes(radare2_script, 2, 0x80))
1✔
311
        shell.upload_data(radare2_script or b'', tmpfile)
1✔
312

313
        cmd = ['ssh', '-C', '-t', '-p', str(shell.port), '-l', shell.user, shell.host]
1✔
314
        if shell.password:
1!
315
            if not misc.which('sshpass'):
1!
316
                log.error("sshpass must be installed to debug ssh processes")
×
317
            cmd = ['sshpass', '-p', shell.password] + cmd
1✔
318
        if shell.keyfile:
1!
319
            cmd += ['-i', shell.keyfile]
×
320
        cmd += ['radare2', '-e', 'dbg.exe.path=', target.executable, '-i', tmpfile, '-d', str(target.pid) ]
1✔
321

322
        misc.run_in_new_terminal(cmd)
1✔
323
        return
1✔
324
    elif isinstance(target, tubes.sock.sock):
1✔
325
        pids = proc.pidof(target)
1✔
326
        
327
        if not pids:
1!
328
            log.error('Could not find remote process (%s:%d) on this machine' %
×
329
                      target.sock.getpeername())
330
        pid = pids[0]
1✔
331

332
        # Specifically check for socat, since it has an intermediary process
333
        # if you do not specify "nofork" to the EXEC: argument
334
        # python(2640)───socat(2642)───socat(2643)───bash(2644)
335
        if proc.exe(pid).endswith('/socat') and time.sleep(0.1) and proc.children(pid):
1!
336
            pid = proc.children(pid)[0]
×
337

338
        # We may attach to the remote process after the fork but before it performs an exec.  
339
        # If an exe is provided, wait until the process is actually running the expected exe
340
        # before we attach the debugger.
341
        t = Timeout()
1✔
342
        with t.countdown(2):
1✔
343
            while exe and os.path.realpath(proc.exe(pid)) != os.path.realpath(exe) and t.timeout:
1!
344
                time.sleep(0.1)
×
345

346
    elif isinstance(target, tubes.process.process):
1!
347
        pid = proc.pidof(target)[0]
1✔
348
        exe = exe or target.executable
1✔
349
    elif isinstance(target, tuple) and len(target) == 2:
×
350
        host, port = target
×
351

352
        if context.os != 'android':
×
353
            pre += 'target remote %s:%d\n' % (host, port)
×
354
        else:
355
            # Android debugging is done over gdbserver, which can't follow
356
            # new inferiors (tldr; follow-fork-mode child) unless it is run
357
            # in extended-remote mode.
358
            pre += 'target extended-remote %s:%d\n' % (host, port)
×
359
            pre += 'set detach-on-fork off\n'
×
360

361
        def findexe():
×
362
            for spid in proc.pidof(target):
×
363
                sexe = proc.exe(spid)
×
364
                name = os.path.basename(sexe)
×
365
                # XXX: parse cmdline
366
                if name.startswith('qemu-') or name.startswith('gdbserver'):
×
367
                    exe = proc.cmdline(spid)[-1]
×
368
                    return os.path.join(proc.cwd(spid), exe)
×
369

370
        exe = exe or findexe()
×
371
    elif isinstance(target, elf.corefile.Corefile):
×
372
        pre += 'target core "%s"\n' % target.path
×
373
    else:
374
        log.error("don't know how to attach to target: %r", target)
×
375

376
    # if we have a pid but no exe, just look it up in /proc/
377
    if pid and not exe:
1✔
378
        exe_fn = proc.exe
1✔
379
        if context.os == 'android':
1!
380
            exe_fn = adb.proc_exe
×
381
        exe = exe_fn(pid)
1✔
382

383
    if not pid and not exe and not ssh:
1!
384
        log.error('could not find target process')
×
385

386
    
387
    cmd = []
1✔
388
    pre = ""
1✔
389

390
    if context.os == 'android' and pid:
1!
391
        runner  = _get_runner()
×
392
        which   = _get_which()
×
393
        gdb_cmd = _gdbserver_args(pid=pid, which=which)
×
394
        gdbserver = runner(gdb_cmd)
×
395
        port    = _gdbserver_port(gdbserver, None)
×
396
        host    = context.adb_host
×
397
        pre    += 'target extended-remote %s:%i\n' % (context.adb_host, port)
×
398

399
        # gdbserver on Android sets 'detach-on-fork on' which breaks things
400
        # when you're trying to debug anything that forks.
401
        pre += 'set detach-on-fork off\n'
×
402

403
    if api:
1!
404
        # create a UNIX socket for talking to GDB
405
        socket_dir = tempfile.mkdtemp()
×
406
        socket_path = os.path.join(socket_dir, 'socket')
×
407
        bridge = os.path.join(os.path.dirname(__file__), 'gdb_api_bridge.py')
×
408

409
        # inject the socket path and the GDB Python API bridge
410
        pre = 'python socket_path = ' + repr(socket_path) + '\n' + \
×
411
              'source ' + bridge + '\n' + \
412
              pre
413

414
    radare2_script = pre + (radare2_script or '')
1✔
415
    
416
    radare2_binary = binary()
1✔
417
    
418
    cmd = [radare2_binary, "-e", "dbg.exe.path=%s" % exe]
1✔
419

420
    if radare2_script:
1!
421
        tmp = tempfile.NamedTemporaryFile(prefix = 'pwn', suffix = '.r2',
1✔
422
                                          delete = False, mode = 'w+')
423
        log.debug('Wrote radare2 script to %r\n%s', tmp.name, radare2_script)
1✔
424
        radare2_script = '!rm %s\n%s' % (tmp.name, radare2_script)
1✔
425

426
        tmp.write(radare2_script)
1✔
427
        tmp.close()
1✔
428
        cmd = cmd + ["-i", tmp.name]
1✔
429

430

431
    if exe and context.native and not pid:
1!
432
        if not ssh and not os.path.isfile(exe):
×
433
            log.error('No such file: %s', exe)
×
434
        cmd += [exe]
×
435
        
436
    if pid and not context.os == 'android':
1!
437
        cmd += ["-d", str(pid)]
1✔
438
        
439
    if radare2_args:
1!
440
        cmd += radare2_args
×
441
    log.info('running in new terminal with arguments: %s', radare2_args)
1✔
442
        
443
    log.info('running in new terminal: %s', cmd)
1✔
444

445
    if api:
1!
446
        # prevent gdb_faketerminal.py from messing up api doctests
447
        def preexec_fn():
×
448
            os.environ['GDB_FAKETERMINAL'] = '0'
×
449
    else:
450
        preexec_fn = None
1✔
451
    gdb_pid = misc.run_in_new_terminal(cmd, preexec_fn = preexec_fn)
1✔
452

453
    if pid and context.native:
1!
454
        proc.wait_for_debugger(pid, gdb_pid)
1✔
455

456
    if not api:
1!
457
        return gdb_pid
1✔
458

459
    # connect to the GDB Python API bridge
460
    from rpyc import BgServingThread
×
461
    from rpyc.utils.factory import unix_connect
×
462
    if six.PY2:
×
463
        retriable = socket.error
×
464
    else:
465
        retriable = ConnectionRefusedError, FileNotFoundError
×
466

467
    t = Timeout()
×
468
    with t.countdown(10):
×
469
        while t.timeout:
×
470
            try:
×
471
                conn = unix_connect(socket_path)
×
472
                break
×
473
            except retriable:
×
474
                time.sleep(0.1)
×
475
        else:
476
            # Check to see if RPyC is installed at all in GDB
477
            rpyc_check = [gdb_binary, '--nx', '-batch', '-ex',
×
478
                          'python import rpyc; import sys; sys.exit(123)']
479

480
            if 123 != tubes.process.process(rpyc_check).poll(block=True):
×
481
                log.error('Failed to connect to GDB: rpyc is not installed')
×
482

483
            # Check to see if the socket ever got created
484
            if not os.path.exists(socket_path):
×
485
                log.error('Failed to connect to GDB: Unix socket %s was never created', socket_path)
×
486

487
            # Check to see if the remote RPyC client is a compatible version
488
            version_check = [gdb_binary, '--nx', '-batch', '-ex',
×
489
                            'python import platform; print(platform.python_version())']
490
            gdb_python_version = tubes.process.process(version_check).recvall().strip()
×
491
            python_version = str(platform.python_version())
×
492

493
            if gdb_python_version != python_version:
×
494
                log.error('Failed to connect to GDB: Version mismatch (%s vs %s)',
×
495
                           gdb_python_version,
496
                           python_version)
497

498
            # Don't know what happened
499
            log.error('Failed to connect to GDB: Unknown error')
×
500

501
    # now that connection is up, remove the socket from the filesystem
502
    os.unlink(socket_path)
×
503
    os.rmdir(socket_dir)
×
504

505
    # create a thread for receiving breakpoint notifications
506
    BgServingThread(conn, callback=lambda: None)
×
507

508
    return gdb_pid, Gdb(conn)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc