• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Gallopsled / pwntools / 7250413177

18 Dec 2023 03:44PM UTC coverage: 71.866% (-2.7%) from 74.55%
7250413177

Pull #2297

github

web-flow
Merge fbc1d8e0b into c7649c95e
Pull Request #2297: add "retguard" property and display it with checksec

4328 of 7244 branches covered (0.0%)

5 of 6 new or added lines in 1 file covered. (83.33%)

464 existing lines in 9 files now uncovered.

12381 of 17228 relevant lines covered (71.87%)

0.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

23.88
/pwnlib/term/term.py
1
from __future__ import absolute_import
1✔
2
from __future__ import division
1✔
3
from __future__ import unicode_literals
1✔
4

5
import atexit
1✔
6
import errno
1✔
7
import os
1✔
8
import re
1✔
9
import shutil
1✔
10
import signal
1✔
11
import struct
1✔
12
import sys
1✔
13
import threading
1✔
14
import traceback
1✔
15
import weakref
1✔
16

17
if sys.platform != 'win32':
1!
18
    import fcntl
1✔
19
    import termios
1✔
20

21
from ..context import ContextType
1✔
22
from . import termcap
1✔
23
from .. import py2compat
1✔
24

25
__all__ = ['output', 'init']
1✔
26

27
# we assume no terminal can display more lines than this
28
MAX_TERM_HEIGHT = 200
1✔
29

30
# list of callbacks triggered on SIGWINCH
31
on_winch = []
1✔
32

33
cached_pos = None
1✔
34
settings = None
1✔
35
setup_done = False
1✔
36
epoch = 0
1✔
37

38
fd = sys.stdout
1✔
39
winchretry = False
1✔
40
rlock = threading.RLock()
1✔
41

42
class WinchLock(object):
1✔
43
    def __init__(self):
1✔
44
        self.guard = threading.RLock()
1✔
45
        self.lock = threading.Lock()
1✔
46

47
    @property
1✔
48
    def acquire(self):
1✔
49
        return self.lock.acquire
×
50

51
    @property
1✔
52
    def release(self):
1✔
53
        return self.lock.release
×
54

55
    def __enter__(self):
1✔
UNCOV
56
        self.guard.acquire()
×
UNCOV
57
        return self.lock.__enter__()
×
58
    def __exit__(self, tp, val, tb):
1✔
UNCOV
59
        try:
×
UNCOV
60
            return self.lock.__exit__(tp, val, tb)
×
61
        finally:
UNCOV
62
            if winchretry:
×
63
                handler_sigwinch(signal.SIGWINCH, None)
×
UNCOV
64
            self.guard.release()
×
65

66
wlock = WinchLock()
1✔
67

68
def show_cursor():
1✔
UNCOV
69
    do('cnorm')
×
70

71
def hide_cursor():
1✔
UNCOV
72
    do('civis')
×
73

74
def update_geometry():
1✔
75
    global width, height
76
    width, height = shutil.get_terminal_size()
1✔
77

78
def handler_sigwinch(signum, stack):
1✔
79
    global cached_pos, winchretry
80
    with wlock.guard:
×
81
        while True:
×
82
            if not wlock.acquire(False):
×
83
                winchretry = True
×
84
                return
×
85

86
            winchretry = False
×
87
            update_geometry()
×
88
            for cb in on_winch:
×
89
                cb()
×
90
            wlock.release()
×
91
            if not winchretry: break
×
92

93

94
def handler_sigstop(signum, stack):
1✔
95
    resetterm()
×
96
    os.kill(0, signal.SIGSTOP)
×
97

98
def handler_sigcont(signum, stack):
1✔
99
    global epoch, cached_pos, setup_done
100
    epoch += 1
×
101
    cached_pos = None
×
102
    setup_done = False
×
103

104
def setupterm():
1✔
105
    global settings, setup_done
UNCOV
106
    if setup_done:
×
UNCOV
107
        return
×
UNCOV
108
    setup_done = True
×
UNCOV
109
    hide_cursor()
×
UNCOV
110
    update_geometry()
×
UNCOV
111
    do('smkx') # keypad mode
×
UNCOV
112
    mode = termios.tcgetattr(fd)
×
UNCOV
113
    IFLAG, OFLAG, CFLAG, LFLAG, ISPEED, OSPEED, CC = range(7)
×
UNCOV
114
    if not settings:
×
UNCOV
115
        settings = mode[:]
×
UNCOV
116
        settings[CC] = settings[CC][:]
×
UNCOV
117
    mode[LFLAG] &= ~(termios.ECHO | termios.ICANON | termios.IEXTEN)
×
UNCOV
118
    mode[CC][termios.VMIN] = 1
×
UNCOV
119
    mode[CC][termios.VTIME] = 0
×
UNCOV
120
    termios.tcsetattr(fd, termios.TCSADRAIN, mode)
×
UNCOV
121
    fd.flush()
×
122

123
def resetterm():
1✔
124
    global settings, setup_done
UNCOV
125
    if settings:
×
UNCOV
126
        termios.tcsetattr(fd, termios.TCSADRAIN, settings)
×
UNCOV
127
        settings = None
×
UNCOV
128
    if setup_done:
×
UNCOV
129
        setup_done = False
×
UNCOV
130
        show_cursor()
×
UNCOV
131
        do('rmkx')
×
UNCOV
132
        fd.flush()
×
133

134
def init():
1✔
UNCOV
135
    atexit.register(resetterm)
×
UNCOV
136
    signal.signal(signal.SIGWINCH, handler_sigwinch)
×
UNCOV
137
    signal.signal(signal.SIGTSTP, handler_sigstop)
×
UNCOV
138
    signal.signal(signal.SIGCONT, handler_sigcont)
×
UNCOV
139
    class Wrapper:
×
UNCOV
140
        def __init__(self, fd):
×
UNCOV
141
            self._fd = fd
×
UNCOV
142
        def write(self, s):
×
UNCOV
143
            return output(s, frozen=True)
×
UNCOV
144
        def __getattr__(self, k):
×
UNCOV
145
            return getattr(self._fd, k)
×
UNCOV
146
    if sys.stdout.isatty():
×
UNCOV
147
        sys.stdout = Wrapper(sys.stdout)
×
UNCOV
148
    if sys.stderr.isatty():
×
149
        sys.stderr = Wrapper(sys.stderr)
×
150

UNCOV
151
    console = ContextType.defaults['log_console']
×
UNCOV
152
    if console.isatty():
×
UNCOV
153
        ContextType.defaults['log_console'] = Wrapper(console)
×
154

155
    # freeze all cells if an exception is thrown
UNCOV
156
    orig_hook = sys.excepthook
×
UNCOV
157
    def hook(*args):
×
158
        sys.stderr = sys.__stderr__
×
159
        resetterm()
×
160
        cells.clear()
×
161
        if orig_hook:
×
162
            orig_hook(*args)
×
163
        else:
164
            traceback.print_exception(*args)
×
UNCOV
165
    sys.excepthook = hook
×
166

167
tmap = {c: '\\x{:02x}'.format(c) for c in set(range(0x20)) - {0x09, 0x0a, 0x0d, 0x1b} | {0x7f}}
1✔
168

169
def put(s):
1✔
170
    global cached_pos, epoch
UNCOV
171
    s = s.translate(tmap)
×
UNCOV
172
    if cached_pos:
×
UNCOV
173
        it = iter(s.replace('\n', '\r\n'))
×
UNCOV
174
        sanit_s = ''
×
UNCOV
175
        for c in it:
×
UNCOV
176
            if c == '\r':
×
UNCOV
177
                cached_pos[1] = 0
×
UNCOV
178
            elif c == '\n':
×
UNCOV
179
                cached_pos[0] += 1
×
UNCOV
180
            elif c == '\t':
×
181
                cached_pos[1] = (cached_pos[1] + 8) & -8
×
UNCOV
182
            elif c in '\x1b\u009b':  # ESC or CSI
×
UNCOV
183
                seq = c
×
UNCOV
184
                for c in it:
×
UNCOV
185
                    seq += c
×
UNCOV
186
                    if c not in '[0123456789;':
×
UNCOV
187
                        break
×
188
                else:
189
                    # unterminated ctrl seq, just print it visually
190
                    c = seq.replace('\x1b', r'\x1b').replace('\u009b', r'\u009b')
×
191
                    cached_pos[1] += len(c)
×
192

193
                # if '\e[123;123;123;123m' then nothing
UNCOV
194
                if c == 'm':
×
UNCOV
195
                    c = seq
×
196
                else:
197
                    # undefined ctrl seq, just print it visually
198
                    c = seq.replace('\x1b', r'\x1b').replace('\u009b', r'\u009b')
×
199
                    cached_pos[1] += len(c)
×
UNCOV
200
            elif c < ' ':
×
201
                assert False, 'impossible ctrl char'
×
202
            else:
203
                # normal character, nothing to see here
UNCOV
204
                cached_pos[1] += 1
×
UNCOV
205
            sanit_s += c
×
206
        else:
UNCOV
207
            s = sanit_s.replace('\r\n', '\n')
×
UNCOV
208
    return fd.write(s)
×
209

210
def do(c, *args):
1✔
UNCOV
211
    s = termcap.get(c, *args)
×
UNCOV
212
    if s:
×
UNCOV
213
        fd.write(s.decode('utf-8'))
×
214

215
def goto(rc):
1✔
216
    global cached_pos
UNCOV
217
    r, c = rc
×
UNCOV
218
    nowr, nowc = cached_pos or (None, None)
×
UNCOV
219
    cached_pos = [r, c]
×
220
    # common cases: we can just go up/down a couple rows
UNCOV
221
    if c == 0:
×
UNCOV
222
        if r == nowr + 1:
×
UNCOV
223
            fd.write('\n')
×
UNCOV
224
            return
×
UNCOV
225
        if c != nowc:
×
UNCOV
226
            fd.write('\r')
×
UNCOV
227
    elif c != nowc:
×
UNCOV
228
        do('hpa', c)
×
229

UNCOV
230
    if r == nowr - 1:
×
UNCOV
231
        do('cuu1')
×
UNCOV
232
    elif r < nowr:
×
UNCOV
233
        do('cuu', nowr - r)
×
UNCOV
234
    elif r > nowr:
×
UNCOV
235
        do('cud', r - nowr)
×
236

237

238
class Cell(object):
1✔
239
    def __init__(self, value, float):
1✔
UNCOV
240
        self.value = value
×
UNCOV
241
        self.float = float
×
242

243
    def draw(self):
1✔
UNCOV
244
        self.pos = get_position()
×
UNCOV
245
        self.born = epoch
×
UNCOV
246
        put(self.value)
×
UNCOV
247
        self.pos_after = get_position()
×
248

249
    def update(self, value):
1✔
UNCOV
250
        if isinstance(value, bytes):
×
UNCOV
251
            value = value.decode('utf-8', 'backslashreplace')
×
UNCOV
252
        with wlock:
×
UNCOV
253
            want_erase_line = False
×
UNCOV
254
            if '\n' in value:
×
255
                if len(value) < len(self.value):
×
256
                    want_erase_line = True
×
257
                elif '\n' not in self.value:  # not really supported
×
258
                    for cell in cells.iter_after(self):
×
259
                        if cell.value:
×
260
                            want_erase_line = True
×
261
                        break
×
UNCOV
262
            self.value = value
×
UNCOV
263
            self.update_locked(erase_line=want_erase_line)
×
UNCOV
264
            fd.flush()
×
265

266
    def prepare_redraw(self):
1✔
267
        global epoch
UNCOV
268
        if self.born != epoch:
×
269
            return None
×
UNCOV
270
        saved = get_position()
×
UNCOV
271
        if saved < self.pos or saved == (1, 1):
×
272
            epoch += 1
×
273
            return None
×
UNCOV
274
        goto(self.pos)
×
UNCOV
275
        return saved
×
276

277
    def update_locked(self, erase_line=False):
1✔
UNCOV
278
        prev_pos = self.prepare_redraw()
×
UNCOV
279
        if prev_pos is None:
×
280
            for cell in cells:
×
281
                cell.draw()
×
282
            return
×
UNCOV
283
        erased_line = None
×
UNCOV
284
        if erase_line:
×
285
            do('el')
×
286
            erased_line = self.pos[0]
×
UNCOV
287
        put(self.value)
×
UNCOV
288
        pos = get_position()
×
UNCOV
289
        if pos == self.pos_after:
×
UNCOV
290
            goto(prev_pos)
×
UNCOV
291
            return
×
292
        if pos < self.pos_after:
×
293
            do('el')
×
294
            erased_line = self.pos[0]
×
295
        old_after = self.pos_after
×
296
        self.pos_after = pos
×
297

298
        cell = self  # in case there are no more cells
×
299
        for cell in cells.iter_after(self):
×
300
            if old_after != cell.pos:
×
301
                # do not merge gaps
302
                break
×
303
            pos = get_position()
×
304
            if erased_line != pos[0]:
×
305
                if pos[0] < cell.pos[0]:
×
306
                    # the cell moved up, erase its line
307
                    do('el')
×
308
                    erased_line = pos[0]
×
309
                elif cell.pos == pos:
×
310
                    # cell got neither moved nor erased
311
                    break
×
312

313
            if pos[1] < cell.pos[1]:
×
314
                # the cell moved left, it must be same line as self; erase if not yet erased
315
                if not erase_line and erased_line != pos[0]:
×
316
                    do('el')
×
317
                    erased_line = pos[0]
×
318

319
            old_after = cell.pos_after
×
320
            cell.draw()
×
321
            if cell.pos_after == old_after and erased_line != old_after[0]:
×
322
                break
×
323
        else:
324
            if cell.float:
×
325
                # erase all screen after last float
326
                do('ed')
×
327
        if prev_pos > get_position():
×
328
            goto(prev_pos)
×
329

330
    def __repr__(self):
1✔
331
        return '{}({!r}, float={}, pos={})'.format(self.__class__.__name__, self.value, self.float, self.pos)
×
332

333

334
class WeakCellList(object):
1✔
335
    def __init__(self):
1✔
336
        self._cells = []
1✔
337
        self._floats = []
1✔
338
        self._lists = self._cells, self._floats
1✔
339

340
    @property
1✔
341
    def cells(self):
1✔
342
        return self.iter_field(self._cells)
×
343

344
    @property
1✔
345
    def floats(self):
1✔
UNCOV
346
        return self.iter_field(self._floats)
×
347

348
    def iter_field(self, *Ls):
1✔
UNCOV
349
        for L in Ls:
×
UNCOV
350
            for iref in L[:]:
×
UNCOV
351
                i = iref()
×
UNCOV
352
                if i is None:
×
353
                    L.remove(iref)
×
354
                else:
UNCOV
355
                    yield i
×
356

357
    def __iter__(self):
1✔
358
        return self.iter_field(*self._lists)
×
359

360
    def iter_after(self, v):
1✔
361
        it = iter(self)
×
362
        for cell in it:
×
363
            if cell == v:
×
364
                break
×
365
        return it
×
366

367
    def clear(self):
1✔
368
        for c in self:
×
369
            c.float = False
×
370
        for L in self._lists:
×
371
            del L[:]
×
372

373
    def insert(self, v, before):
1✔
374
        L = self._lists[v.float]
×
375
        for i, e in enumerate(self.iter_field(L)):
×
376
            if e == before:
×
377
                L.insert(i, weakref.ref(v))
×
378
                return
×
379
        raise IndexError('output before dead cell')
×
380

381
    def append(self, v):
1✔
UNCOV
382
        L = self._lists[v.float]
×
UNCOV
383
        L.append(weakref.ref(v))
×
384

385

386
cells = WeakCellList()
1✔
387

388

389
def get_position():
1✔
390
    global cached_pos
UNCOV
391
    if not cached_pos:
×
UNCOV
392
        cached_pos = [0, 0]
×
UNCOV
393
    return tuple(cached_pos)
×
394

395

396
def output(s='', float=False, priority=10, frozen=False, indent=0, before=None):
1✔
UNCOV
397
    with wlock:
×
UNCOV
398
        if before:
×
399
            float = before.float
×
400

UNCOV
401
        if isinstance(s, bytes):
×
UNCOV
402
            s = s.decode('utf-8', 'backslashreplace')
×
UNCOV
403
        if frozen:
×
UNCOV
404
            for f in cells.floats:
×
UNCOV
405
                f.prepare_redraw()
×
UNCOV
406
                do('ed')  # we could do it only when necessary
×
UNCOV
407
                break
×
UNCOV
408
            ret = put(s)
×
UNCOV
409
            for f in cells.floats:
×
UNCOV
410
                f.draw()
×
UNCOV
411
            for f in cells.floats:
×
UNCOV
412
                fd.flush()
×
UNCOV
413
                break
×
UNCOV
414
            return ret
×
415

UNCOV
416
        c = Cell(s, float)
×
UNCOV
417
        if before is None:
×
UNCOV
418
            cells.append(c)
×
UNCOV
419
            c.draw()
×
420
        else:
421
            before.prepare_redraw()
×
422
            cells.insert(c, before)
×
423
            c.draw()
×
424
            for f in cells.iter_after(c):
×
425
                f.draw()
×
UNCOV
426
        return c
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc